Moved SendMsg and RecvMsg to one object

master
noah metz 2024-03-30 23:36:50 -07:00
parent 7e157068d7
commit 3eee736f97
11 changed files with 69 additions and 75 deletions

@ -935,22 +935,22 @@ func (ctx *Context) getNode(id NodeID) (*Node, error) {
}
// Route Messages to dest. Currently only local context routing is supported
func (ctx *Context) Send(node *Node, messages []SendMsg) error {
func (ctx *Context) Send(node *Node, messages []Message) error {
for _, msg := range(messages) {
ctx.Log.Logf("signal", "Sending %s to %s", msg.Signal, msg.Dest)
if msg.Dest == ZeroID {
ctx.Log.Logf("signal", "Sending %s to %s", msg.Signal, msg.Node)
if msg.Node == ZeroID {
panic("Can't send to null ID")
}
target, err := ctx.getNode(msg.Dest)
target, err := ctx.getNode(msg.Node)
if err == nil {
select {
case target.MsgChan <- RecvMsg{node.ID, msg.Signal}:
ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Dest)
case target.MsgChan <- Message{node.ID, msg.Signal}:
ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Node)
default:
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
stack_str := string(buf[:n])
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Dest, stack_str)
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Node, stack_str)
}
} else if errors.Is(err, NodeNotFoundError) {
// TODO: Handle finding nodes in other contexts

@ -6,7 +6,7 @@ type Changes []Tag
// Extensions are data attached to nodes that process signals
type Extension interface {
// Called to process incoming signals, returning changes and messages to send
Process(*Context, *Node, NodeID, Signal) ([]SendMsg, Changes)
Process(*Context, *Node, NodeID, Signal) ([]Message, Changes)
// Called when the node is loaded into a context(creation or move), so extension data can be initialized
Load(*Context, *Node) error

@ -624,10 +624,10 @@ func (ext *GQLExt) FreeResponseChannel(req_id uuid.UUID) chan Signal {
return response_chan
}
func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) {
func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) {
// Process ReadResultSignalType by forwarding it to the waiting resolver
var changes Changes = nil
var messages []SendMsg = nil
var messages []Message = nil
switch sig := signal.(type) {
case *SuccessSignal:

@ -109,8 +109,8 @@ func ResolveNode(id NodeID, p graphql.ResolveParams) (NodeResult, error) {
signal := NewReadSignal(not_cached)
response_chan := ctx.Ext.GetResponseChannel(signal.ID())
// TODO: TIMEOUT DURATION
err = ctx.Context.Send(ctx.Server, []SendMsg{{
Dest: id,
err = ctx.Context.Send(ctx.Server, []Message{{
Node: id,
Signal: signal,
}})
if err != nil {

@ -49,7 +49,7 @@ func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *No
source_listener, err := GetExt[ListenerExt](source)
fatalErr(t, err)
messages := []SendMsg{{destination.ID, signal}}
messages := []Message{{destination.ID, signal}}
fatalErr(t, ctx.Send(source, messages))
response, signals, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID())

@ -50,7 +50,7 @@ func NewListenerExt(buffer int) *ListenerExt {
}
// Send the signal to the channel, logging an overflow if it occurs
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) {
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) {
ctx.Log.Logf("listener", "%s - %+v", node.ID, reflect.TypeOf(signal))
ctx.Log.Logf("listener_debug", "%s->%s - %+v", source, node.ID, signal)
select {

@ -84,13 +84,13 @@ func NewLockableExt(requirements []NodeID) *LockableExt {
func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
signal := NewUnlockSignal()
messages := []SendMsg{{node.ID, signal}}
messages := []Message{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages)
}
func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
signal := NewLockSignal()
messages := []SendMsg{{node.ID, signal}}
messages := []Message{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages)
}
@ -104,8 +104,8 @@ func (ext *LockableExt) Unload(ctx *Context, node *Node) {
// Handle link signal by adding/removing the requested NodeID
// returns an error if the node is not unlocked
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
switch ext.State {
@ -114,29 +114,29 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
case "add":
_, exists := ext.Requirements[signal.NodeID]
if exists == true {
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "already_requirement")})
messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "already_requirement")})
} else {
if ext.Requirements == nil {
ext.Requirements = map[NodeID]ReqState{}
}
ext.Requirements[signal.NodeID] = Unlocked
changes = append(changes, "requirements")
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
messages = append(messages, Message{source, NewSuccessSignal(signal.ID())})
}
case "remove":
_, exists := ext.Requirements[signal.NodeID]
if exists == false {
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_requirement")})
messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "not_requirement")})
} else {
delete(ext.Requirements, signal.NodeID)
changes = append(changes, "requirements")
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
messages = append(messages, Message{source, NewSuccessSignal(signal.ID())})
}
default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")})
messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "unknown_action")})
}
default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked: %s", ext.State)})
messages = append(messages, Message{source, NewErrorSignal(signal.ID(), "not_unlocked: %s", ext.State)})
}
return messages, changes
@ -144,14 +144,14 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
// Handle an UnlockSignal by either transitioning to Unlocked state,
// sending unlock signals to requirements, or returning an error signal
func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
switch ext.State {
case Locked:
if source != *ext.Owner {
messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")})
messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_owner")})
} else {
if len(ext.Requirements) == 0 {
changes = append(changes, "state", "owner", "pending_owner")
@ -162,7 +162,7 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node
ext.State = Unlocked
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
messages = append(messages, Message{source, NewSuccessSignal(signal.Id)})
} else {
changes = append(changes, "state", "waiting", "requirements", "pending_owner")
@ -177,12 +177,12 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node
ext.Waiting[unlock_signal.Id] = id
ext.Requirements[id] = Unlocking
messages = append(messages, SendMsg{id, unlock_signal})
messages = append(messages, Message{id, unlock_signal})
}
}
}
default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_locked")})
messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_locked")})
}
return messages, changes
@ -190,8 +190,8 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node
// Handle a LockSignal by either transitioning to a locked state,
// sending lock signals to requirements, or returning an error signal
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
switch ext.State {
@ -204,7 +204,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
ext.PendingOwner = &source
ext.State = Locked
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
messages = append(messages, Message{source, NewSuccessSignal(signal.Id)})
} else {
changes = append(changes, "state", "requirements", "waiting", "pending_owner")
@ -219,19 +219,19 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
ext.Waiting[lock_signal.Id] = id
ext.Requirements[id] = Locking
messages = append(messages, SendMsg{id, lock_signal})
messages = append(messages, Message{id, lock_signal})
}
}
default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_unlocked: %s", ext.State)})
messages = append(messages, Message{source, NewErrorSignal(signal.Id, "not_unlocked: %s", ext.State)})
}
return messages, changes
}
// Handle an error signal by aborting the lock, or retrying the unlock
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
id, waiting := ext.Waiting[signal.ReqID]
@ -255,7 +255,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
ext.Waiting[unlock_signal.Id] = req_id
ext.Requirements[req_id] = Unlocking
messages = append(messages, SendMsg{req_id, unlock_signal})
messages = append(messages, Message{req_id, unlock_signal})
case Unlocked:
unlocked += 1
}
@ -273,7 +273,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
case Unlocking:
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = id
messages = append(messages, SendMsg{id, unlock_signal})
messages = append(messages, Message{id, unlock_signal})
case AbortingLock:
req_state := ext.Requirements[id]
@ -299,7 +299,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
// Handle error for unlocking requirement while unlocking by retrying unlock
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = id
messages = append(messages, SendMsg{id, unlock_signal})
messages = append(messages, Message{id, unlock_signal})
}
}
}
@ -308,8 +308,8 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
}
// Handle a success signal by checking if all requirements have been locked/unlocked
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
id, waiting := ext.Waiting[signal.ReqID]
@ -330,7 +330,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
ext.Owner = ext.PendingOwner
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
messages = append(messages, Message{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
ext.ReqID = nil
} else {
ctx.Log.Logf("lockable", "%s PARTIAL_LOCK: %d/%d", node.ID, len(ext.Locked), len(ext.Requirements))
@ -342,7 +342,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
ext.Requirements[id] = Unlocking
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = id
messages = append(messages, SendMsg{id, unlock_signal})
messages = append(messages, Message{id, unlock_signal})
case Unlocking:
ext.Requirements[id] = Unlocked
ext.Unlocked[id] = nil
@ -359,7 +359,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
if unlocked == len(ext.Requirements) {
changes = append(changes, "state", "pending_owner", "req_id")
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)})
messages = append(messages, Message{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)})
ext.State = Unlocked
ext.ReqID = nil
ext.PendingOwner = nil
@ -375,7 +375,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
if len(ext.Unlocked) == len(ext.Requirements) {
changes = append(changes, "state", "owner", "req_id")
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
messages = append(messages, Message{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
ext.State = Unlocked
ext.ReqID = nil
ext.Owner = nil
@ -386,8 +386,8 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
return messages, changes
}
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) {
var messages []Message = nil
var changes Changes = nil
switch sig := signal.(type) {
@ -395,7 +395,7 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal
// Forward StatusSignals up to the owner(unless that would be a cycle)
if ext.Owner != nil {
if *ext.Owner != node.ID {
messages = append(messages, SendMsg{*ext.Owner, signal})
messages = append(messages, Message{*ext.Owner, signal})
}
}
case *LinkSignal:

@ -19,7 +19,7 @@ func TestLink(t *testing.T) {
fatalErr(t, err)
link_signal := NewLinkSignal("add", l2.ID)
msgs := []SendMsg{{l1.ID, link_signal}}
msgs := []Message{{l1.ID, link_signal}}
err = ctx.Send(l1, msgs)
fatalErr(t, err)
@ -34,7 +34,7 @@ func TestLink(t *testing.T) {
}
unlink_signal := NewLinkSignal("remove", l2.ID)
msgs = []SendMsg{{l1.ID, unlink_signal}}
msgs = []Message{{l1.ID, unlink_signal}}
err = ctx.Send(l1, msgs)
fatalErr(t, err)

@ -1,11 +1,6 @@
package graphvent
type SendMsg struct {
Dest NodeID
Signal Signal
}
type RecvMsg struct {
Source NodeID
type Message struct {
Node NodeID
Signal Signal
}

@ -88,9 +88,15 @@ func NewQueue[T any](initial int) *Queue[T] {
}
go func(queue *Queue[T]) {
}(&queue)
if len(queue.buffer) == 0 {
select {
go func(queue *Queue[T]) {
}
} else {
select {
}
}
}(&queue)
return &queue
@ -113,7 +119,7 @@ type Node struct {
Extensions map[ExtType]Extension
// Channel for this node to receive messages from the Context
MsgChan chan RecvMsg
MsgChan chan Message
// Size of MsgChan
BufferSize uint32 `gv:"buffer_size"`
// Channel for this node to process delayed signals
@ -132,7 +138,7 @@ func (node *Node) PostDeserialize(ctx *Context) error {
public := node.Key.Public().(ed25519.PublicKey)
node.ID = KeyID(public)
node.MsgChan = make(chan RecvMsg, node.BufferSize)
node.MsgChan = make(chan Message, node.BufferSize)
return nil
}
@ -257,7 +263,6 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string)
var signal Signal
var source NodeID
select {
case command := <-control:
switch command {
@ -305,17 +310,15 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string)
}
case msg := <- node.MsgChan:
signal = msg.Signal
source = msg.Source
source = msg.Node
}
ctx.Log.Logf("node", "NODE_SIGNAL_QUEUE[%s]: %+v", node.ID, node.SignalQueue)
switch sig := signal.(type) {
case *ReadSignal:
result := node.ReadFields(ctx, sig.Fields)
msgs := []SendMsg{}
msgs = append(msgs, SendMsg{source, NewReadResultSignal(sig.ID(), node.ID, node.Type, result)})
msgs := []Message{}
msgs = append(msgs, Message{source, NewReadResultSignal(sig.ID(), node.ID, node.Type, result)})
ctx.Send(node, msgs)
default:
@ -367,21 +370,17 @@ func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error
}
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
messages := []SendMsg{}
messages := []Message{}
changes := map[ExtType]Changes{}
for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
ext_messages, ext_changes := ext.Process(ctx, node, source, signal)
if len(ext_messages) != 0 {
messages = append(messages, ext_messages...)
}
if len(ext_changes) != 0 {
changes[ext_type] = ext_changes
ctx.Log.Logf("changes", "Changes for %s ext[%+v] - %+v", node.ID, ext_type, ext_changes)
}
}
ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes)
if len(messages) != 0 {
send_err := ctx.Send(node, messages)
@ -490,7 +489,7 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size
ID: id,
Type: node_type,
Extensions: ext_map,
MsgChan: make(chan RecvMsg, buffer_size),
MsgChan: make(chan Message, buffer_size),
BufferSize: buffer_size,
SignalQueue: []QueuedSignal{},
writeSignalQueue: false,

@ -48,7 +48,7 @@ func TestNodeRead(t *testing.T) {
fatalErr(t, err)
read_sig := NewReadSignal([]string{"buffer"})
msgs := []SendMsg{{n1.ID, read_sig}}
msgs := []Message{{n1.ID, read_sig}}
err = ctx.Send(n2, msgs)
fatalErr(t, err)