Fixed GQL issues, started docs

master
noah metz 2024-03-21 14:13:54 -06:00
parent 8f9a759b26
commit 0bced58fd1
6 changed files with 97 additions and 36 deletions

@ -583,6 +583,49 @@ func RegisterObject[T any](ctx *Context) error {
return nil return nil
} }
func RegisterObjectNoGQL[T any](ctx *Context) error {
reflect_type := reflect.TypeFor[T]()
serialized_type := SerializedTypeFor[T]()
_, exists := ctx.TypeTypes[reflect_type]
if exists {
return fmt.Errorf("%+v already registered in TypeMap", reflect_type)
}
field_infos := map[FieldTag]FieldInfo{}
post_deserialize, post_deserialize_exists := reflect.PointerTo(reflect_type).MethodByName("PostDeserialize")
post_deserialize_index := -1
if post_deserialize_exists {
post_deserialize_index = post_deserialize.Index
}
for _, field := range(reflect.VisibleFields(reflect_type)) {
gv_tag, tagged_gv := field.Tag.Lookup("gv")
if tagged_gv {
node_tag := field.Tag.Get("node")
field_infos[GetFieldTag(gv_tag)] = FieldInfo{
Type: field.Type,
Index: field.Index,
NodeTag: node_tag,
Tag: gv_tag,
}
}
}
ctx.TypeMap[serialized_type] = &TypeInfo{
PostDeserializeIndex: post_deserialize_index,
Serialized: serialized_type,
Reflect: reflect_type,
Fields: field_infos,
Type: nil,
Resolve: nil,
}
ctx.TypeTypes[reflect_type] = ctx.TypeMap[serialized_type]
return nil
}
func identity(value interface{}) interface{} { func identity(value interface{}) interface{} {
return value return value
} }
@ -944,6 +987,11 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterInterface[Signal](ctx)
if err != nil {
return nil, err
}
err = RegisterScalar[NodeType](ctx, identity, coerce[NodeType], astInt[NodeType], nil) err = RegisterScalar[NodeType](ctx, identity, coerce[NodeType], astInt[NodeType], nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1004,6 +1052,21 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterObjectNoGQL[QueuedSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterObjectNoGQL[TimeoutSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterObjectNoGQL[StatusSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterObject[Node](ctx) err = RegisterObject[Node](ctx)
if err != nil { if err != nil {
return nil, err return nil, err

@ -69,7 +69,7 @@ func NewEventControlSignal(command EventCommand) *EventControlSignal {
func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) { func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) {
if ext.State != state { if ext.State != state {
ext.StateStart = state_start ext.StateStart = state_start
changes.Add("state") changes = append(changes, "state")
ext.State = state ext.State = state
node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now())) node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now()))
} }

@ -1,8 +1,7 @@
package graphvent package graphvent
import ( type Change string
type Changes []Change
)
// Extensions are data attached to nodes that process signals // Extensions are data attached to nodes that process signals
type Extension interface { type Extension interface {
@ -15,10 +14,3 @@ type Extension interface {
// Called when the node is unloaded from a context(deletion or move), so extension data can be cleaned up // Called when the node is unloaded from a context(deletion or move), so extension data can be cleaned up
Unload(*Context, *Node) Unload(*Context, *Node)
} }
// Changes are lists of modifications made to extensions to be communicated
type Changes []string
func (changes *Changes) Add(fields ...string) {
new_changes := append(*changes, fields...)
changes = &new_changes
}

@ -18,7 +18,7 @@ import (
func TestGQLSubscribe(t *testing.T) { func TestGQLSubscribe(t *testing.T) {
ctx := logTestContext(t, []string{"test"}) ctx := logTestContext(t, []string{"test", "listener", "changes"})
n1, err := NewNode(ctx, nil, "Lockable", 10, NewLockableExt(nil)) n1, err := NewNode(ctx, nil, "Lockable", 10, NewLockableExt(nil))
fatalErr(t, err) fatalErr(t, err)
@ -35,7 +35,7 @@ func TestGQLSubscribe(t *testing.T) {
ctx.Log.Logf("test", "NODE: %s", n1.ID) ctx.Log.Logf("test", "NODE: %s", n1.ID)
sub_1 := GQLPayload{ sub_1 := GQLPayload{
Query: "subscription Self { Self { ID, Type } }", Query: "subscription { Self { ID, Type ... on Lockable { lockable_state } } }",
} }
port := gql_ext.tcp_listener.Addr().(*net.TCPAddr).Port port := gql_ext.tcp_listener.Addr().(*net.TCPAddr).Port
@ -96,14 +96,19 @@ func TestGQLSubscribe(t *testing.T) {
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "SUB: %s", resp[:n]) ctx.Log.Logf("test", "SUB: %s", resp[:n])
err = ctx.Send(gql, []SendMsg{{ lock_id, err := LockLockable(ctx, gql)
Dest: gql.ID,
Signal: NewStatusSignal(gql.ID, map[ExtType]Changes{
ExtTypeFor[GQLExt](): {"state"},
}),
}})
fatalErr(t, err) fatalErr(t, err)
response, _, err := WaitForResponse(listener_ext.Chan, 100*time.Millisecond, lock_id)
fatalErr(t, err)
switch response.(type) {
case *SuccessSignal:
ctx.Log.Logf("test", "Locked %s", gql.ID)
default:
t.Errorf("Unexpected lock response: %s", response)
}
n, err = ws.Read(resp) n, err = ws.Read(resp)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "SUB: %s", resp[:n]) ctx.Log.Logf("test", "SUB: %s", resp[:n])

@ -76,7 +76,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
if info_found { if info_found {
state, found := ext.Requirements[info.Destination] state, found := ext.Requirements[info.Destination]
if found == true { if found == true {
changes.Add("wait_infos") changes = append(changes, "wait_infos")
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State]) ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State])
switch ext.State { switch ext.State {
case AbortingLock: case AbortingLock:
@ -89,11 +89,11 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
} }
} }
if all_unlocked == true { if all_unlocked == true {
changes.Add("state") changes = append(changes, "state")
ext.State = Unlocked ext.State = Unlocked
} }
case Locking: case Locking:
changes.Add("state") changes = append(changes, "state")
ext.Requirements[info.Destination] = Unlocked ext.Requirements[info.Destination] = Unlocked
unlocked := 0 unlocked := 0
for _, state := range(ext.Requirements) { for _, state := range(ext.Requirements) {
@ -153,7 +153,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
ext.Requirements = map[NodeID]ReqState{} ext.Requirements = map[NodeID]ReqState{}
} }
ext.Requirements[signal.NodeID] = Unlocked ext.Requirements[signal.NodeID] = Unlocked
changes.Add("requirements") changes = append(changes, "requirements")
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
} }
case "remove": case "remove":
@ -162,7 +162,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")})
} else { } else {
delete(ext.Requirements, signal.NodeID) delete(ext.Requirements, signal.NodeID)
changes.Add("requirements") changes = append(changes, "requirements")
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
} }
default: default:
@ -203,10 +203,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
ext.State = Locked ext.State = Locked
ext.Owner = ext.PendingOwner ext.Owner = ext.PendingOwner
changes.Add("state", "owner", "requirements") changes = append(changes, "state", "owner", "requirements")
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)}) messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)})
} else { } else {
changes.Add("requirements") changes = append(changes, "requirements")
ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements))
} }
case AbortingLock: case AbortingLock:
@ -239,15 +239,15 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
previous_owner := *ext.Owner previous_owner := *ext.Owner
ext.Owner = ext.PendingOwner ext.Owner = ext.PendingOwner
ext.ReqID = nil ext.ReqID = nil
changes.Add("state", "owner", "req_id") changes = append(changes, "state", "owner", "req_id")
messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)}) messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)})
} else if old_state == AbortingLock { } else if old_state == AbortingLock {
changes.Add("state", "pending_owner") changes = append(changes, "state", "pending_owner")
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")}) messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")})
ext.PendingOwner = ext.Owner ext.PendingOwner = ext.Owner
} }
} else { } else {
changes.Add("state") changes = append(changes, "state")
ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements)) ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements))
} }
} }
@ -271,7 +271,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = &new_owner ext.PendingOwner = &new_owner
ext.Owner = &new_owner ext.Owner = &new_owner
changes.Add("state", "pending_owner", "owner") changes = append(changes, "state", "pending_owner", "owner")
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())})
} else { } else {
ext.State = Locking ext.State = Locking
@ -280,7 +280,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = &new_owner ext.PendingOwner = &new_owner
ext.PendingID = signal.ID() ext.PendingID = signal.ID()
changes.Add("state", "req_id", "pending_owner", "pending_id") changes = append(changes, "state", "req_id", "pending_owner", "pending_id")
for id, state := range(ext.Requirements) { for id, state := range(ext.Requirements) {
if state != Unlocked { if state != Unlocked {
ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING")
@ -304,7 +304,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = nil ext.PendingOwner = nil
ext.Owner = nil ext.Owner = nil
changes.Add("state", "pending_owner", "owner") changes = append(changes, "state", "pending_owner", "owner")
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())})
} else if source == *ext.Owner { } else if source == *ext.Owner {
ext.State = Unlocking ext.State = Unlocking
@ -312,7 +312,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
ext.ReqID = &id ext.ReqID = &id
ext.PendingOwner = nil ext.PendingOwner = nil
ext.PendingID = signal.ID() ext.PendingID = signal.ID()
changes.Add("state", "pending_owner", "pending_id", "req_id") changes = append(changes, "state", "pending_owner", "pending_id", "req_id")
for id, state := range(ext.Requirements) { for id, state := range(ext.Requirements) {
if state != Locked { if state != Locked {
ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING")
@ -340,7 +340,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod
wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) wait_info, found := node.ProcessResponse(ext.WaitInfos, signal)
if found == true { if found == true {
changes.Add("wait_infos") changes = append(changes, "wait_infos")
state, found := ext.Requirements[wait_info.Destination] state, found := ext.Requirements[wait_info.Destination]
if found == true { if found == true {
ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state])
@ -355,7 +355,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod
} }
} }
if all_unlocked == true { if all_unlocked == true {
changes.Add("state") changes = append(changes, "state")
ext.State = Unlocked ext.State = Unlocked
} }
case Locking: case Locking:

@ -345,6 +345,7 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
} }
if len(ext_changes) != 0 { if len(ext_changes) != 0 {
changes[ext_type] = ext_changes changes[ext_type] = ext_changes
ctx.Log.Logf("changes", "Changes for %s ext[%+v] - %+v", node.ID, ext_type, ext_changes)
} }
} }
ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes) ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes)