From 302f0f42fe0394b2c553fecd5d9db6a40a164502 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 7 Oct 2023 23:00:07 -0600 Subject: [PATCH] Added 'changed' field to 'Process' --- gql.go | 12 ++++- gql_test.go | 19 ++++---- group.go | 4 +- listener.go | 4 +- lockable.go | 119 +++++++++++++++++++++++++++++------------------ lockable_test.go | 26 +++++++---- node.go | 87 +++++++++++++++++++++++++++++----- node_test.go | 4 +- 8 files changed, 191 insertions(+), 84 deletions(-) diff --git a/gql.go b/gql.go index b2bcef3..1c3b3fd 100644 --- a/gql.go +++ b/gql.go @@ -1282,8 +1282,10 @@ func (ext *GQLExt) FreeResponseChannel(req_id uuid.UUID) chan Signal { return response_chan } -func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages { +func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { // Process ReadResultSignalType by forwarding it to the waiting resolver + var changes Changes = nil + switch sig := signal.(type) { case *SuccessSignal: response_chan := ext.FreeResponseChannel(sig.ReqID) @@ -1297,6 +1299,7 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } else { ctx.Log.Logf("gql", "received success signal response %+v with no mapped resolver", sig) } + case *ErrorSignal: // TODO: Forward to resolver if waiting for it response_chan := ext.FreeResponseChannel(sig.ReqID) @@ -1311,6 +1314,7 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } else { ctx.Log.Logf("gql", "received error signal response %+v with no mapped resolver", sig) } + case *ReadResultSignal: response_chan := ext.FindResponseChannel(sig.ReqID) if response_chan != nil { @@ -1323,14 +1327,17 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } else { ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig) } + case *StartSignal: ctx.Log.Logf("gql", "starting gql server %s", node.ID) err := ext.StartGQLServer(ctx, node) + changes = changes.AddDetail(GQLExtType, "", "server_started") if err == nil { node.QueueSignal(time.Now(), NewStatusSignal(node.ID, "server_started")) } else { ctx.Log.Logf("gql", "GQL_RESTART_ERROR: %s", err) } + case *StatusSignal: ext.subscriptions_lock.RLock() ctx.Log.Logf("gql", "forwarding status signal from %+v to resolvers %+v", sig.Source, ext.subscriptions) @@ -1344,7 +1351,8 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } ext.subscriptions_lock.RUnlock() } - return nil + + return nil, changes } var ecdsa_curves = map[uint8]elliptic.Curve{ diff --git a/gql_test.go b/gql_test.go index 1d7b10d..f3e984e 100644 --- a/gql_test.go +++ b/gql_test.go @@ -69,17 +69,14 @@ func TestGQLServer(t *testing.T) { fatalErr(t, err) gql, err := NewNode(ctx, gql_key, GQLNodeType, 10, []Policy{group_policy_2, group_policy_1}, - NewLockableExt([]NodeID{n1.ID}), gql_ext, NewGroupExt(map[NodeID]string{ - n1.ID: "user", - gql_id: "self", - }), listener_ext) + NewLockableExt([]NodeID{n1.ID}), gql_ext, NewGroupExt([]NodeID{n1.ID, gql_id}), listener_ext) fatalErr(t, err) ctx.Log.Logf("test", "GQL: %s", gql.ID) ctx.Log.Logf("test", "NODE: %s", n1.ID) _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool { - return sig.Status == "server_started" + return sig.Source == gql_id }) fatalErr(t, err) @@ -205,8 +202,8 @@ func TestGQLServer(t *testing.T) { msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStopSignal(), gql.ID) err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool { - return sig.Status == "stopped" + _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool { + return sig.Source == gql_id }) fatalErr(t, err) } @@ -236,8 +233,8 @@ func TestGQLDB(t *testing.T) { msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStopSignal(), gql.ID) err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool { - return sig.Status == "stopped" && sig.Source == gql.ID + _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool { + return sig.Source == gql.ID }) fatalErr(t, err) @@ -251,8 +248,8 @@ func TestGQLDB(t *testing.T) { msgs = msgs.Add(ctx, gql_loaded.ID, gql_loaded.Key, NewStopSignal(), gql_loaded.ID) err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool { - return sig.Status == "stopped" && sig.Source == gql_loaded.ID + _, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool { + return sig.Source == gql_loaded.ID }) fatalErr(t, err) } diff --git a/group.go b/group.go index b6747d6..e958158 100644 --- a/group.go +++ b/group.go @@ -21,7 +21,7 @@ func NewGroupExt(members []NodeID) *GroupExt { } } -func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages { - return nil +func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { + return nil, nil } diff --git a/listener.go b/listener.go index 65cacdf..6ab4523 100644 --- a/listener.go +++ b/listener.go @@ -28,7 +28,7 @@ func (listener *ListenerExt) Type() ExtType { } // Send the signal to the channel, logging an overflow if it occurs -func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages { +func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { ctx.Log.Logf("listener", "LISTENER_PROCESS: %s - %+v", node.ID, reflect.TypeOf(signal)) ctx.Log.Logf("listener_debug", "LISTENER_DETAIL %+v", signal) select { @@ -36,5 +36,5 @@ func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal default: ctx.Log.Logf("listener", "LISTENER_OVERFLOW: %s", node.ID) } - return nil + return nil, nil } diff --git a/lockable.go b/lockable.go index 54fca8f..dbe2c4a 100644 --- a/lockable.go +++ b/lockable.go @@ -51,26 +51,28 @@ func NewLockableExt(requirements []NodeID) *LockableExt { } func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) { - msgs := Messages{} + messages := Messages{} signal := NewLockSignal("unlock") - msgs = msgs.Add(ctx, node.ID, node.Key, signal, node.ID) - return signal.ID(), ctx.Send(msgs) + messages = messages.Add(ctx, node.ID, node.Key, signal, node.ID) + return signal.ID(), ctx.Send(messages) } func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) { - msgs := Messages{} + messages := Messages{} signal := NewLockSignal("lock") - msgs = msgs.Add(ctx, node.ID, node.Key, signal, node.ID) - return signal.ID(), ctx.Send(msgs) + messages = messages.Add(ctx, node.ID, node.Key, signal, node.ID) + return signal.ID(), ctx.Send(messages) } -func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) Messages { +func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) (Messages, Changes) { str := signal.Error ctx.Log.Logf("lockable", "ERROR_SIGNAL: %s->%s %+v", source, node.ID, str) - msgs := Messages {} + var messages Messages = nil + var changes Changes = nil switch str { case "not_unlocked": + changes = changes.Add(LockableExtType, "requirements") if ext.State == Locking { ext.State = AbortingLock req_info := ext.Requirements[source] @@ -86,7 +88,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI ext.Requirements[id] = req_info ctx.Log.Logf("lockable", "SENT_ABORT_UNLOCK: %s to %s", lock_signal.ID(), id) - msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id) + messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id) } } } @@ -95,17 +97,18 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI case "not_requirement": } - return msgs + return messages, changes } -func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) Messages { - msgs := Messages {} +func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) (Messages, Changes) { + var messages Messages = nil + var changes Changes = nil if ext.State == Unlocked { switch signal.Action { case "add": _, exists := ext.Requirements[signal.NodeID] if exists == true { - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "already_requirement"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "already_requirement"), source) } else { if ext.Requirements == nil { ext.Requirements = map[NodeID]ReqInfo{} @@ -114,31 +117,34 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID Unlocked, uuid.UUID{}, } - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "req_added"), source) + changes = changes.Add(LockableExtType, "requirements") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), source) } case "remove": _, exists := ext.Requirements[signal.NodeID] if exists == false { - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "can't link: not_requirement"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "can't link: not_requirement"), source) } else { delete(ext.Requirements, signal.NodeID) - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "req_removed"), source) + changes = changes.Add(LockableExtType, "requirements") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), source) } default: - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "unknown_action"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "unknown_action"), source) } } else { - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source) } - return msgs + return messages, changes } -func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) Messages { +func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) (Messages, Changes) { ctx.Log.Logf("lockable", "SUCCESS_SIGNAL: %+v", signal) - msgs := Messages{} + var messages Messages = nil + var changes Changes = nil if source == node.ID { - return msgs + return messages, changes } info, found := ext.Requirements[source] @@ -148,6 +154,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod } else if info.MsgID != signal.ReqID { ctx.Log.Logf("lockable", "Got success for wrong signal for %s: %s, expecting %s", source, signal.ReqID, info.MsgID) } else { + changes = changes.Add(LockableExtType, "requirements") if info.State == Locking { if ext.State == Locking { info.State = Locked @@ -166,7 +173,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) ext.State = Locked ext.Owner = ext.PendingOwner - msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), *ext.Owner) + changes = changes.Add(LockableExtType, "state") + changes = changes.Add(LockableExtType, "owner") + changes = changes.Add(LockableExtType, "pending_owner") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), *ext.Owner) } else { ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, reqs) } @@ -175,7 +185,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod info.State = Unlocking info.MsgID = lock_signal.ID() ext.Requirements[source] = info - msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, source) + messages = messages.Add(ctx, node.ID, node.Key, lock_signal, source) } } else if info.State == Unlocking { info.State = Unlocked @@ -193,14 +203,17 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod if unlocked == reqs { old_state := ext.State ext.State = Unlocked + changes = changes.Add(LockableExtType, "state") ctx.Log.Logf("lockable", "WHOLE UNLOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) if old_state == Unlocking { previous_owner := *ext.Owner ext.Owner = ext.PendingOwner ext.ReqID = nil - msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), previous_owner) + changes = changes.Add(LockableExtType, "owner") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), previous_owner) } else if old_state == AbortingLock { - msgs = msgs.Add(ctx ,node.ID, node.Key, NewErrorSignal(*ext.ReqID, "not_unlocked"), *ext.PendingOwner) + messages = messages.Add(ctx ,node.ID, node.Key, NewErrorSignal(*ext.ReqID, "not_unlocked"), *ext.PendingOwner) + changes = changes.Add(LockableExtType, "pending_owner") ext.PendingOwner = ext.Owner } } else { @@ -209,14 +222,16 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod } } - return msgs + return messages, changes } // Handle a LockSignal and update the extensions owner/requirement states -func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) Messages { +func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) (Messages, Changes) { ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal.State) - msgs := Messages{} + var messages Messages = nil + var changes Changes = nil + switch signal.State { case "lock": if ext.State == Unlocked { @@ -225,7 +240,10 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.Owner = &new_owner - msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner) + changes = changes.Add(LockableExtType, "state") + changes = changes.Add(LockableExtType, "pending_owner") + changes = changes.Add(LockableExtType, "owner") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner) } else { ext.State = Locking id := signal.ID() @@ -233,6 +251,9 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.PendingID = signal.ID() + changes = changes.Add(LockableExtType, "state") + changes = changes.Add(LockableExtType, "pending_owner") + changes = changes.Add(LockableExtType, "requirements") for id, info := range(ext.Requirements) { if info.State != Unlocked { ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") @@ -241,11 +262,11 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID info.State = Locking info.MsgID = lock_signal.ID() ext.Requirements[id] = info - msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id) + messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id) } } } else { - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source) } case "unlock": if ext.State == Locked { @@ -254,13 +275,19 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = nil ext.Owner = nil - msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner) + changes = changes.Add(LockableExtType, "state") + changes = changes.Add(LockableExtType, "owner") + changes = changes.Add(LockableExtType, "pending_owner") + messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner) } else if source == *ext.Owner { ext.State = Unlocking id := signal.ID() ext.ReqID = &id ext.PendingOwner = nil ext.PendingID = signal.ID() + changes = changes.Add(LockableExtType, "requirements") + changes = changes.Add(LockableExtType, "pending_owner") + changes = changes.Add(LockableExtType, "state") for id, info := range(ext.Requirements) { if info.State != Locked { ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") @@ -269,22 +296,24 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID info.State = Unlocking info.MsgID = lock_signal.ID() ext.Requirements[id] = info - msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id) + messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id) } } } else { - msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locked"), source) + messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locked"), source) } default: ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", signal.State) } - return msgs + return messages, changes } // LockableExts process Up/Down signals by forwarding them to owner, dependency, and requirement nodes // LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state -func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages { - messages := Messages{} +func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { + var messages Messages = nil + var changes Changes = nil + switch signal.Direction() { case Up: if ext.Owner != nil { @@ -292,24 +321,26 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal messages = messages.Add(ctx, node.ID, node.Key, signal, *ext.Owner) } } + case Down: - for requirement, _ := range(ext.Requirements) { + for requirement := range(ext.Requirements) { messages = messages.Add(ctx, node.ID, node.Key, signal, requirement) } + case Direct: switch sig := signal.(type) { case *LinkSignal: - messages = ext.HandleLinkSignal(ctx, node, source, sig) + messages, changes = ext.HandleLinkSignal(ctx, node, source, sig) case *LockSignal: - messages = ext.HandleLockSignal(ctx, node, source, sig) + messages, changes = ext.HandleLockSignal(ctx, node, source, sig) case *ErrorSignal: - messages = ext.HandleErrorSignal(ctx, node, source, sig) + messages, changes = ext.HandleErrorSignal(ctx, node, source, sig) case *SuccessSignal: - messages = ext.HandleSuccessSignal(ctx, node, source, sig) + messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig) default: } default: } - return messages + return messages, changes } diff --git a/lockable_test.go b/lockable_test.go index 9974d69..fc10e16 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -18,7 +18,7 @@ func lockableTestContext(t *testing.T, logs []string) *Context { } func TestLink(t *testing.T) { - ctx := lockableTestContext(t, []string{"lockable"}) + ctx := lockableTestContext(t, []string{"lockable", "listener"}) l1_pub, l1_key, err := ed25519.GenerateKey(rand.Reader) fatalErr(t, err) @@ -34,31 +34,37 @@ func TestLink(t *testing.T) { ) fatalErr(t, err) + l1_lockable := NewLockableExt(nil) l1_listener := NewListenerExt(10) l1, err := NewNode(ctx, l1_key, TestLockableType, 10, nil, l1_listener, - NewLockableExt(nil), + l1_lockable, ) fatalErr(t, err) msgs := Messages{} - msgs = msgs.Add(ctx, l1.ID, l1.Key, NewLinkSignal("add", l2.ID), l1.ID) + link_signal := NewLinkSignal("add", l2.ID) + msgs = msgs.Add(ctx, l1.ID, l1.Key, link_signal, l1.ID) err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(l1_listener.Chan, time.Millisecond*10, func(sig *ErrorSignal) bool { - return sig.Error == "req_added" - }) + _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID()) fatalErr(t, err) + info, exists := l1_lockable.Requirements[l2.ID] + if exists == false { + t.Fatal("l2 not in l1 requirements") + } else if info.State != Unlocked { + t.Fatalf("l2 in bad requirement state in l1: %+v", info.State) + } + msgs = Messages{} - msgs = msgs.Add(ctx, l1.ID, l1.Key, NewLinkSignal("remove", l2.ID), l1.ID) + unlink_signal := NewLinkSignal("remove", l2.ID) + msgs = msgs.Add(ctx, l1.ID, l1.Key, unlink_signal, l1.ID) err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(l1_listener.Chan, time.Millisecond*10, func(sig *ErrorSignal) bool { - return sig.Error == "req_removed" - }) + _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID()) fatalErr(t, err) } diff --git a/node.go b/node.go index 9350b94..46a4a29 100644 --- a/node.go +++ b/node.go @@ -47,9 +47,33 @@ func RandID() NodeID { return NodeID(uuid.New()) } +type Change struct { + Extension ExtType + Field string + Detail string +} + +type Changes []Change + +func (changes Changes) Add(ext ExtType, field string) Changes { + return append(changes, Change{ + Extension: ext, + Field: field, + Detail: "", + }) +} + +func (changes Changes) AddDetail(ext ExtType, field string, detail string) Changes { + return append(changes, Change{ + Extension: ext, + Field: field, + Detail: detail, + }) +} + // Extensions are data attached to nodes that process signals type Extension interface { - Process(*Context, *Node, NodeID, Signal) Messages + Process(*Context, *Node, NodeID, Signal) (Messages, Changes) } // A QueuedSignal is a Signal that has been Queued to trigger at a set time @@ -249,6 +273,10 @@ func nodeLoop(ctx *Context, node *Node) error { // Perform startup actions node.Process(ctx, ZeroID, NewStartSignal()) + err := WriteNode(ctx, node) + if err != nil { + panic(err) + } run := true for run == true { var signal Signal @@ -411,10 +439,6 @@ func nodeLoop(ctx *Context, node *Node) error { switch sig := signal.(type) { case *StopSignal: node.Process(ctx, source, signal) - err := WriteNode(ctx, node) - if err != nil { - panic(err) - } if source == node.ID { node.Process(ctx, source, NewStoppedSignal(sig, node.ID)) } else { @@ -431,8 +455,7 @@ func nodeLoop(ctx *Context, node *Node) error { ctx.Send(msgs) default: - node.Process(ctx, source, signal) - err := WriteNode(ctx, node) + err := node.Process(ctx, source, signal) if err != nil { panic(err) } @@ -514,20 +537,57 @@ func (node *Node) Stop(ctx *Context) error { } } +func (node *Node) QueueChanges(ctx *Context, changes Changes) error { + change_map := map[ExtType][]string{} + for _, change := range(changes) { + _, exists := change_map[change.Extension] + if exists == false { + change_map[change.Extension] = []string{} + } + change_map[change.Extension] = append(change_map[change.Extension], change.Field) + } + node.QueueSignal(time.Now(), NewStatusSignal(node.ID, fmt.Sprintf("%+v", change_map))) + return nil +} + func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) messages := Messages{} + changes := Changes{} for ext_type, ext := range(node.Extensions) { ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) - resp := ext.Process(ctx, node, source, signal) - if resp != nil { - messages = append(messages, resp...) + ext_messages, ext_changes := ext.Process(ctx, node, source, signal) + if len(ext_messages) != 0 { + messages = append(messages, ext_messages...) + } + + if len(ext_changes) != 0 { + changes = append(changes, ext_changes...) } } if len(messages) != 0 { - return ctx.Send(messages) + send_err := ctx.Send(messages) + if send_err != nil { + return send_err + } } + + if len(changes) != 0 { + _, ok := signal.(*StoppedSignal) + if (ok == false) || (source != node.ID) { + write_err := WriteNodeChanges(ctx, node, changes) + if write_err != nil { + return write_err + } + + status_err := node.QueueChanges(ctx, changes) + if status_err != nil { + return status_err + } + } + } + return nil } @@ -637,6 +697,11 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, node_type NodeType, buffer_si return node, nil } +func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error { + // TODO: optimize to not re-serialize unchanged extensions/fields(might need to cache the serialized values) + return WriteNode(ctx, node) +} + // Write a node to the database func WriteNode(ctx *Context, node *Node) error { ctx.Log.Logf("db", "DB_WRITE: %s", node.ID) diff --git a/node_test.go b/node_test.go index 5c0e0e5..7b4cf80 100644 --- a/node_test.go +++ b/node_test.go @@ -26,8 +26,8 @@ func TestNodeDB(t *testing.T) { err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StatusSignal) bool { - return sig.Status == "stopped" && sig.Source == node.ID + _, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StoppedSignal) bool { + return sig.Source == node.ID }) fatalErr(t, err)