From 3eee736f97c79f7623c3dbd12a10f4d6749ad39c Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 30 Mar 2024 23:36:50 -0700 Subject: [PATCH] Moved SendMsg and RecvMsg to one object --- context.go | 14 +++++----- extension.go | 2 +- gql.go | 4 +-- gql_node.go | 4 +-- graph_test.go | 2 +- listener.go | 2 +- lockable.go | 70 ++++++++++++++++++++++++------------------------ lockable_test.go | 4 +-- message.go | 9 ++----- node.go | 31 +++++++++++---------- node_test.go | 2 +- 11 files changed, 69 insertions(+), 75 deletions(-) diff --git a/context.go b/context.go index 901276c..52a575f 100644 --- a/context.go +++ b/context.go @@ -935,22 +935,22 @@ func (ctx *Context) getNode(id NodeID) (*Node, error) { } // Route Messages to dest. Currently only local context routing is supported -func (ctx *Context) Send(node *Node, messages []SendMsg) error { +func (ctx *Context) Send(node *Node, messages []Message) error { for _, msg := range(messages) { - ctx.Log.Logf("signal", "Sending %s to %s", msg.Signal, msg.Dest) - if msg.Dest == ZeroID { + ctx.Log.Logf("signal", "Sending %s to %s", msg.Signal, msg.Node) + if msg.Node == ZeroID { panic("Can't send to null ID") } - target, err := ctx.getNode(msg.Dest) + target, err := ctx.getNode(msg.Node) if err == nil { select { - case target.MsgChan <- RecvMsg{node.ID, msg.Signal}: - ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Dest) + case target.MsgChan <- Message{node.ID, msg.Signal}: + ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Node) default: buf := make([]byte, 4096) n := runtime.Stack(buf, false) stack_str := string(buf[:n]) - return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Dest, stack_str) + return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Node, stack_str) } } else if errors.Is(err, NodeNotFoundError) { // TODO: Handle finding nodes in other contexts diff --git a/extension.go b/extension.go index cc700a2..5d243a9 100644 --- a/extension.go +++ b/extension.go @@ -6,7 +6,7 @@ type Changes []Tag // Extensions are data attached to nodes that process signals type Extension interface { // Called to process incoming signals, returning changes and messages to send - Process(*Context, *Node, NodeID, Signal) ([]SendMsg, Changes) + Process(*Context, *Node, NodeID, Signal) ([]Message, Changes) // Called when the node is loaded into a context(creation or move), so extension data can be initialized Load(*Context, *Node) error diff --git a/gql.go b/gql.go index 00f8aeb..372376a 100644 --- a/gql.go +++ b/gql.go @@ -624,10 +624,10 @@ func (ext *GQLExt) FreeResponseChannel(req_id uuid.UUID) chan Signal { return response_chan } -func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) { +func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) { // Process ReadResultSignalType by forwarding it to the waiting resolver var changes Changes = nil - var messages []SendMsg = nil + var messages []Message = nil switch sig := signal.(type) { case *SuccessSignal: diff --git a/gql_node.go b/gql_node.go index fe4a884..b9af05a 100644 --- a/gql_node.go +++ b/gql_node.go @@ -109,8 +109,8 @@ func ResolveNode(id NodeID, p graphql.ResolveParams) (NodeResult, error) { signal := NewReadSignal(not_cached) response_chan := ctx.Ext.GetResponseChannel(signal.ID()) // TODO: TIMEOUT DURATION - err = ctx.Context.Send(ctx.Server, []SendMsg{{ - Dest: id, + err = ctx.Context.Send(ctx.Server, []Message{{ + Node: id, Signal: signal, }}) if err != nil { diff --git a/graph_test.go b/graph_test.go index f0e70e8..c823ff6 100644 --- a/graph_test.go +++ b/graph_test.go @@ -49,7 +49,7 @@ func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *No source_listener, err := GetExt[ListenerExt](source) fatalErr(t, err) - messages := []SendMsg{{destination.ID, signal}} + messages := []Message{{destination.ID, signal}} fatalErr(t, ctx.Send(source, messages)) response, signals, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID()) diff --git a/listener.go b/listener.go index 719b6db..9b0267e 100644 --- a/listener.go +++ b/listener.go @@ -50,7 +50,7 @@ func NewListenerExt(buffer int) *ListenerExt { } // Send the signal to the channel, logging an overflow if it occurs -func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) { +func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) { ctx.Log.Logf("listener", "%s - %+v", node.ID, reflect.TypeOf(signal)) ctx.Log.Logf("listener_debug", "%s->%s - %+v", source, node.ID, signal) select { diff --git a/lockable.go b/lockable.go index 67836a3..3546935 100644 --- a/lockable.go +++ b/lockable.go @@ -84,13 +84,13 @@ func NewLockableExt(requirements []NodeID) *LockableExt { func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) { signal := NewUnlockSignal() - messages := []SendMsg{{node.ID, signal}} + messages := []Message{{node.ID, signal}} return signal.ID(), ctx.Send(node, messages) } func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) { signal := NewLockSignal() - messages := []SendMsg{{node.ID, signal}} + messages := []Message{{node.ID, signal}} return signal.ID(), ctx.Send(node, messages) } @@ -104,8 +104,8 @@ func (ext *LockableExt) Unload(ctx *Context, node *Node) { // Handle link signal by adding/removing the requested NodeID // returns an error if the node is not unlocked -func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil switch ext.State { @@ -114,29 +114,29 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID case "add": _, exists := ext.Requirements[signal.NodeID] if exists == true { - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "already_requirement")}) + messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "already_requirement")}) } else { if ext.Requirements == nil { ext.Requirements = map[NodeID]ReqState{} } ext.Requirements[signal.NodeID] = Unlocked changes = append(changes, "requirements") - messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) + messages = append(messages, Message{source, NewSuccessSignal(signal.ID())}) } case "remove": _, exists := ext.Requirements[signal.NodeID] if exists == false { - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_requirement")}) + messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "not_requirement")}) } else { delete(ext.Requirements, signal.NodeID) changes = append(changes, "requirements") - messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())}) + messages = append(messages, Message{source, NewSuccessSignal(signal.ID())}) } default: - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")}) + messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "unknown_action")}) } default: - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked: %s", ext.State)}) + messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "not_unlocked: %s", ext.State)}) } return messages, changes @@ -144,14 +144,14 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID // Handle an UnlockSignal by either transitioning to Unlocked state, // sending unlock signals to requirements, or returning an error signal -func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil switch ext.State { case Locked: if source != *ext.Owner { - messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")}) + messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_owner")}) } else { if len(ext.Requirements) == 0 { changes = append(changes, "state", "owner", "pending_owner") @@ -162,7 +162,7 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node ext.State = Unlocked - messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) + messages = append(messages, Message{source, NewSuccessSignal(signal.Id)}) } else { changes = append(changes, "state", "waiting", "requirements", "pending_owner") @@ -177,12 +177,12 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node ext.Waiting[unlock_signal.Id] = id ext.Requirements[id] = Unlocking - messages = append(messages, SendMsg{id, unlock_signal}) + messages = append(messages, Message{id, unlock_signal}) } } } default: - messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_locked")}) + messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_locked")}) } return messages, changes @@ -190,8 +190,8 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node // Handle a LockSignal by either transitioning to a locked state, // sending lock signals to requirements, or returning an error signal -func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil switch ext.State { @@ -204,7 +204,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID ext.PendingOwner = &source ext.State = Locked - messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) + messages = append(messages, Message{source, NewSuccessSignal(signal.Id)}) } else { changes = append(changes, "state", "requirements", "waiting", "pending_owner") @@ -219,19 +219,19 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID ext.Waiting[lock_signal.Id] = id ext.Requirements[id] = Locking - messages = append(messages, SendMsg{id, lock_signal}) + messages = append(messages, Message{id, lock_signal}) } } default: - messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_unlocked: %s", ext.State)}) + messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_unlocked: %s", ext.State)}) } return messages, changes } // Handle an error signal by aborting the lock, or retrying the unlock -func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil id, waiting := ext.Waiting[signal.ReqID] @@ -255,7 +255,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI ext.Waiting[unlock_signal.Id] = req_id ext.Requirements[req_id] = Unlocking - messages = append(messages, SendMsg{req_id, unlock_signal}) + messages = append(messages, Message{req_id, unlock_signal}) case Unlocked: unlocked += 1 } @@ -273,7 +273,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI case Unlocking: unlock_signal := NewUnlockSignal() ext.Waiting[unlock_signal.Id] = id - messages = append(messages, SendMsg{id, unlock_signal}) + messages = append(messages, Message{id, unlock_signal}) case AbortingLock: req_state := ext.Requirements[id] @@ -299,7 +299,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI // Handle error for unlocking requirement while unlocking by retrying unlock unlock_signal := NewUnlockSignal() ext.Waiting[unlock_signal.Id] = id - messages = append(messages, SendMsg{id, unlock_signal}) + messages = append(messages, Message{id, unlock_signal}) } } } @@ -308,8 +308,8 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI } // Handle a success signal by checking if all requirements have been locked/unlocked -func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil id, waiting := ext.Waiting[signal.ReqID] @@ -330,7 +330,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod ext.Owner = ext.PendingOwner - messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) + messages = append(messages, Message{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) ext.ReqID = nil } else { ctx.Log.Logf("lockable", "%s PARTIAL_LOCK: %d/%d", node.ID, len(ext.Locked), len(ext.Requirements)) @@ -342,7 +342,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod ext.Requirements[id] = Unlocking unlock_signal := NewUnlockSignal() ext.Waiting[unlock_signal.Id] = id - messages = append(messages, SendMsg{id, unlock_signal}) + messages = append(messages, Message{id, unlock_signal}) case Unlocking: ext.Requirements[id] = Unlocked ext.Unlocked[id] = nil @@ -359,7 +359,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod if unlocked == len(ext.Requirements) { changes = append(changes, "state", "pending_owner", "req_id") - messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)}) + messages = append(messages, Message{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)}) ext.State = Unlocked ext.ReqID = nil ext.PendingOwner = nil @@ -375,7 +375,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod if len(ext.Unlocked) == len(ext.Requirements) { changes = append(changes, "state", "owner", "req_id") - messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) + messages = append(messages, Message{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) ext.State = Unlocked ext.ReqID = nil ext.Owner = nil @@ -386,8 +386,8 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod return messages, changes } -func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) { - var messages []SendMsg = nil +func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) { + var messages []Message = nil var changes Changes = nil switch sig := signal.(type) { @@ -395,7 +395,7 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal // Forward StatusSignals up to the owner(unless that would be a cycle) if ext.Owner != nil { if *ext.Owner != node.ID { - messages = append(messages, SendMsg{*ext.Owner, signal}) + messages = append(messages, Message{*ext.Owner, signal}) } } case *LinkSignal: diff --git a/lockable_test.go b/lockable_test.go index 16ef676..4a46053 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -19,7 +19,7 @@ func TestLink(t *testing.T) { fatalErr(t, err) link_signal := NewLinkSignal("add", l2.ID) - msgs := []SendMsg{{l1.ID, link_signal}} + msgs := []Message{{l1.ID, link_signal}} err = ctx.Send(l1, msgs) fatalErr(t, err) @@ -34,7 +34,7 @@ func TestLink(t *testing.T) { } unlink_signal := NewLinkSignal("remove", l2.ID) - msgs = []SendMsg{{l1.ID, unlink_signal}} + msgs = []Message{{l1.ID, unlink_signal}} err = ctx.Send(l1, msgs) fatalErr(t, err) diff --git a/message.go b/message.go index bf5fc1e..df7c5f6 100644 --- a/message.go +++ b/message.go @@ -1,11 +1,6 @@ package graphvent -type SendMsg struct { - Dest NodeID - Signal Signal -} - -type RecvMsg struct { - Source NodeID +type Message struct { + Node NodeID Signal Signal } diff --git a/node.go b/node.go index df82df3..9c141a1 100644 --- a/node.go +++ b/node.go @@ -88,9 +88,15 @@ func NewQueue[T any](initial int) *Queue[T] { } go func(queue *Queue[T]) { - }(&queue) + if len(queue.buffer) == 0 { + select { - go func(queue *Queue[T]) { + } + } else { + select { + + } + } }(&queue) return &queue @@ -113,7 +119,7 @@ type Node struct { Extensions map[ExtType]Extension // Channel for this node to receive messages from the Context - MsgChan chan RecvMsg + MsgChan chan Message // Size of MsgChan BufferSize uint32 `gv:"buffer_size"` // Channel for this node to process delayed signals @@ -132,7 +138,7 @@ func (node *Node) PostDeserialize(ctx *Context) error { public := node.Key.Public().(ed25519.PublicKey) node.ID = KeyID(public) - node.MsgChan = make(chan RecvMsg, node.BufferSize) + node.MsgChan = make(chan Message, node.BufferSize) return nil } @@ -257,7 +263,6 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string) var signal Signal var source NodeID - select { case command := <-control: switch command { @@ -305,17 +310,15 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string) } case msg := <- node.MsgChan: signal = msg.Signal - source = msg.Source + source = msg.Node } - ctx.Log.Logf("node", "NODE_SIGNAL_QUEUE[%s]: %+v", node.ID, node.SignalQueue) - switch sig := signal.(type) { case *ReadSignal: result := node.ReadFields(ctx, sig.Fields) - msgs := []SendMsg{} - msgs = append(msgs, SendMsg{source, NewReadResultSignal(sig.ID(), node.ID, node.Type, result)}) + msgs := []Message{} + msgs = append(msgs, Message{source, NewReadResultSignal(sig.ID(), node.ID, node.Type, result)}) ctx.Send(node, msgs) default: @@ -367,21 +370,17 @@ func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error } func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { - ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) - messages := []SendMsg{} + messages := []Message{} changes := map[ExtType]Changes{} for ext_type, ext := range(node.Extensions) { - ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) ext_messages, ext_changes := ext.Process(ctx, node, source, signal) if len(ext_messages) != 0 { messages = append(messages, ext_messages...) } if len(ext_changes) != 0 { changes[ext_type] = ext_changes - ctx.Log.Logf("changes", "Changes for %s ext[%+v] - %+v", node.ID, ext_type, ext_changes) } } - ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes) if len(messages) != 0 { send_err := ctx.Send(node, messages) @@ -490,7 +489,7 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size ID: id, Type: node_type, Extensions: ext_map, - MsgChan: make(chan RecvMsg, buffer_size), + MsgChan: make(chan Message, buffer_size), BufferSize: buffer_size, SignalQueue: []QueuedSignal{}, writeSignalQueue: false, diff --git a/node_test.go b/node_test.go index cf63b13..a7445b3 100644 --- a/node_test.go +++ b/node_test.go @@ -48,7 +48,7 @@ func TestNodeRead(t *testing.T) { fatalErr(t, err) read_sig := NewReadSignal([]string{"buffer"}) - msgs := []SendMsg{{n1.ID, read_sig}} + msgs := []Message{{n1.ID, read_sig}} err = ctx.Send(n2, msgs) fatalErr(t, err)