From 0bced58fd1b158ae98dd15128d21adcaf6c84deb Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Thu, 21 Mar 2024 14:13:54 -0600 Subject: [PATCH] Fixed GQL issues, started docs --- context.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++- event.go | 2 +- extension.go | 12 ++-------- gql_test.go | 21 ++++++++++------- lockable.go | 32 +++++++++++++------------- node.go | 1 + 6 files changed, 97 insertions(+), 36 deletions(-) diff --git a/context.go b/context.go index 19211f9..f18535f 100644 --- a/context.go +++ b/context.go @@ -583,6 +583,49 @@ func RegisterObject[T any](ctx *Context) error { return nil } +func RegisterObjectNoGQL[T any](ctx *Context) error { + reflect_type := reflect.TypeFor[T]() + serialized_type := SerializedTypeFor[T]() + + _, exists := ctx.TypeTypes[reflect_type] + if exists { + return fmt.Errorf("%+v already registered in TypeMap", reflect_type) + } + + field_infos := map[FieldTag]FieldInfo{} + + post_deserialize, post_deserialize_exists := reflect.PointerTo(reflect_type).MethodByName("PostDeserialize") + post_deserialize_index := -1 + if post_deserialize_exists { + post_deserialize_index = post_deserialize.Index + } + + for _, field := range(reflect.VisibleFields(reflect_type)) { + gv_tag, tagged_gv := field.Tag.Lookup("gv") + if tagged_gv { + node_tag := field.Tag.Get("node") + field_infos[GetFieldTag(gv_tag)] = FieldInfo{ + Type: field.Type, + Index: field.Index, + NodeTag: node_tag, + Tag: gv_tag, + } + } + } + + ctx.TypeMap[serialized_type] = &TypeInfo{ + PostDeserializeIndex: post_deserialize_index, + Serialized: serialized_type, + Reflect: reflect_type, + Fields: field_infos, + Type: nil, + Resolve: nil, + } + ctx.TypeTypes[reflect_type] = ctx.TypeMap[serialized_type] + + return nil +} + func identity(value interface{}) interface{} { return value } @@ -938,12 +981,17 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { if err != nil { return nil, err } - + err = RegisterInterface[Extension](ctx) if err != nil { return nil, err } + err = RegisterInterface[Signal](ctx) + if err != nil { + return nil, err + } + err = RegisterScalar[NodeType](ctx, identity, coerce[NodeType], astInt[NodeType], nil) if err != nil { return nil, err @@ -1004,6 +1052,21 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } + err = RegisterObjectNoGQL[QueuedSignal](ctx) + if err != nil { + return nil, err + } + + err = RegisterObjectNoGQL[TimeoutSignal](ctx) + if err != nil { + return nil, err + } + + err = RegisterObjectNoGQL[StatusSignal](ctx) + if err != nil { + return nil, err + } + err = RegisterObject[Node](ctx) if err != nil { return nil, err diff --git a/event.go b/event.go index 5f05903..273ebb1 100644 --- a/event.go +++ b/event.go @@ -69,7 +69,7 @@ func NewEventControlSignal(command EventCommand) *EventControlSignal { func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) { if ext.State != state { ext.StateStart = state_start - changes.Add("state") + changes = append(changes, "state") ext.State = state node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now())) } diff --git a/extension.go b/extension.go index 5747e67..0370a0f 100644 --- a/extension.go +++ b/extension.go @@ -1,8 +1,7 @@ package graphvent -import ( - -) +type Change string +type Changes []Change // Extensions are data attached to nodes that process signals type Extension interface { @@ -15,10 +14,3 @@ type Extension interface { // Called when the node is unloaded from a context(deletion or move), so extension data can be cleaned up Unload(*Context, *Node) } - -// Changes are lists of modifications made to extensions to be communicated -type Changes []string -func (changes *Changes) Add(fields ...string) { - new_changes := append(*changes, fields...) - changes = &new_changes -} diff --git a/gql_test.go b/gql_test.go index de4b31f..8697468 100644 --- a/gql_test.go +++ b/gql_test.go @@ -18,7 +18,7 @@ import ( func TestGQLSubscribe(t *testing.T) { - ctx := logTestContext(t, []string{"test"}) + ctx := logTestContext(t, []string{"test", "listener", "changes"}) n1, err := NewNode(ctx, nil, "Lockable", 10, NewLockableExt(nil)) fatalErr(t, err) @@ -35,7 +35,7 @@ func TestGQLSubscribe(t *testing.T) { ctx.Log.Logf("test", "NODE: %s", n1.ID) sub_1 := GQLPayload{ - Query: "subscription Self { Self { ID, Type } }", + Query: "subscription { Self { ID, Type ... on Lockable { lockable_state } } }", } port := gql_ext.tcp_listener.Addr().(*net.TCPAddr).Port @@ -96,14 +96,19 @@ func TestGQLSubscribe(t *testing.T) { fatalErr(t, err) ctx.Log.Logf("test", "SUB: %s", resp[:n]) - err = ctx.Send(gql, []SendMsg{{ - Dest: gql.ID, - Signal: NewStatusSignal(gql.ID, map[ExtType]Changes{ - ExtTypeFor[GQLExt](): {"state"}, - }), - }}) + lock_id, err := LockLockable(ctx, gql) fatalErr(t, err) + response, _, err := WaitForResponse(listener_ext.Chan, 100*time.Millisecond, lock_id) + fatalErr(t, err) + + switch response.(type) { + case *SuccessSignal: + ctx.Log.Logf("test", "Locked %s", gql.ID) + default: + t.Errorf("Unexpected lock response: %s", response) + } + n, err = ws.Read(resp) fatalErr(t, err) ctx.Log.Logf("test", "SUB: %s", resp[:n]) diff --git a/lockable.go b/lockable.go index 05ab54c..0906a34 100644 --- a/lockable.go +++ b/lockable.go @@ -76,7 +76,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI if info_found { state, found := ext.Requirements[info.Destination] if found == true { - changes.Add("wait_infos") + changes = append(changes, "wait_infos") ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State]) switch ext.State { case AbortingLock: @@ -89,11 +89,11 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI } } if all_unlocked == true { - changes.Add("state") + changes = append(changes, "state") ext.State = Unlocked } case Locking: - changes.Add("state") + changes = append(changes, "state") ext.Requirements[info.Destination] = Unlocked unlocked := 0 for _, state := range(ext.Requirements) { @@ -153,7 +153,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID ext.Requirements = map[NodeID]ReqState{} } ext.Requirements[signal.NodeID] = Unlocked - changes.Add("requirements") + changes = append(changes, "requirements") messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) } case "remove": @@ -162,7 +162,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")}) } else { delete(ext.Requirements, signal.NodeID) - changes.Add("requirements") + changes = append(changes, "requirements") messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) } default: @@ -203,10 +203,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) ext.State = Locked ext.Owner = ext.PendingOwner - changes.Add("state", "owner", "requirements") + changes = append(changes, "state", "owner", "requirements") messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)}) } else { - changes.Add("requirements") + changes = append(changes, "requirements") ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) } case AbortingLock: @@ -239,15 +239,15 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod previous_owner := *ext.Owner ext.Owner = ext.PendingOwner ext.ReqID = nil - changes.Add("state", "owner", "req_id") + changes = append(changes, "state", "owner", "req_id") messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)}) } else if old_state == AbortingLock { - changes.Add("state", "pending_owner") + changes = append(changes, "state", "pending_owner") messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")}) ext.PendingOwner = ext.Owner } } else { - changes.Add("state") + changes = append(changes, "state") ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements)) } } @@ -271,7 +271,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.Owner = &new_owner - changes.Add("state", "pending_owner", "owner") + changes = append(changes, "state", "pending_owner", "owner") messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) } else { ext.State = Locking @@ -280,7 +280,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.PendingID = signal.ID() - changes.Add("state", "req_id", "pending_owner", "pending_id") + changes = append(changes, "state", "req_id", "pending_owner", "pending_id") for id, state := range(ext.Requirements) { if state != Unlocked { ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") @@ -304,7 +304,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = nil ext.Owner = nil - changes.Add("state", "pending_owner", "owner") + changes = append(changes, "state", "pending_owner", "owner") messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) } else if source == *ext.Owner { ext.State = Unlocking @@ -312,7 +312,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID ext.ReqID = &id ext.PendingOwner = nil ext.PendingID = signal.ID() - changes.Add("state", "pending_owner", "pending_id", "req_id") + changes = append(changes, "state", "pending_owner", "pending_id", "req_id") for id, state := range(ext.Requirements) { if state != Locked { ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") @@ -340,7 +340,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) if found == true { - changes.Add("wait_infos") + changes = append(changes, "wait_infos") state, found := ext.Requirements[wait_info.Destination] if found == true { ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) @@ -355,7 +355,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod } } if all_unlocked == true { - changes.Add("state") + changes = append(changes, "state") ext.State = Unlocked } case Locking: diff --git a/node.go b/node.go index 99da216..547b894 100644 --- a/node.go +++ b/node.go @@ -345,6 +345,7 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { } if len(ext_changes) != 0 { changes[ext_type] = ext_changes + ctx.Log.Logf("changes", "Changes for %s ext[%+v] - %+v", node.ID, ext_type, ext_changes) } } ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes)