Added 'changed' field to 'Process'

gql_cataclysm
noah metz 2023-10-07 23:00:07 -06:00
parent 7451e8e960
commit 302f0f42fe
8 changed files with 191 additions and 84 deletions

@ -1282,8 +1282,10 @@ func (ext *GQLExt) FreeResponseChannel(req_id uuid.UUID) chan Signal {
return response_chan
}
func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
// Process ReadResultSignalType by forwarding it to the waiting resolver
var changes Changes = nil
switch sig := signal.(type) {
case *SuccessSignal:
response_chan := ext.FreeResponseChannel(sig.ReqID)
@ -1297,6 +1299,7 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
} else {
ctx.Log.Logf("gql", "received success signal response %+v with no mapped resolver", sig)
}
case *ErrorSignal:
// TODO: Forward to resolver if waiting for it
response_chan := ext.FreeResponseChannel(sig.ReqID)
@ -1311,6 +1314,7 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
} else {
ctx.Log.Logf("gql", "received error signal response %+v with no mapped resolver", sig)
}
case *ReadResultSignal:
response_chan := ext.FindResponseChannel(sig.ReqID)
if response_chan != nil {
@ -1323,14 +1327,17 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
} else {
ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig)
}
case *StartSignal:
ctx.Log.Logf("gql", "starting gql server %s", node.ID)
err := ext.StartGQLServer(ctx, node)
changes = changes.AddDetail(GQLExtType, "", "server_started")
if err == nil {
node.QueueSignal(time.Now(), NewStatusSignal(node.ID, "server_started"))
} else {
ctx.Log.Logf("gql", "GQL_RESTART_ERROR: %s", err)
}
case *StatusSignal:
ext.subscriptions_lock.RLock()
ctx.Log.Logf("gql", "forwarding status signal from %+v to resolvers %+v", sig.Source, ext.subscriptions)
@ -1344,7 +1351,8 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
}
ext.subscriptions_lock.RUnlock()
}
return nil
return nil, changes
}
var ecdsa_curves = map[uint8]elliptic.Curve{

@ -69,17 +69,14 @@ func TestGQLServer(t *testing.T) {
fatalErr(t, err)
gql, err := NewNode(ctx, gql_key, GQLNodeType, 10, []Policy{group_policy_2, group_policy_1},
NewLockableExt([]NodeID{n1.ID}), gql_ext, NewGroupExt(map[NodeID]string{
n1.ID: "user",
gql_id: "self",
}), listener_ext)
NewLockableExt([]NodeID{n1.ID}), gql_ext, NewGroupExt([]NodeID{n1.ID, gql_id}), listener_ext)
fatalErr(t, err)
ctx.Log.Logf("test", "GQL: %s", gql.ID)
ctx.Log.Logf("test", "NODE: %s", n1.ID)
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "server_started"
return sig.Source == gql_id
})
fatalErr(t, err)
@ -205,8 +202,8 @@ func TestGQLServer(t *testing.T) {
msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStopSignal(), gql.ID)
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "stopped"
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool {
return sig.Source == gql_id
})
fatalErr(t, err)
}
@ -236,8 +233,8 @@ func TestGQLDB(t *testing.T) {
msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStopSignal(), gql.ID)
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "stopped" && sig.Source == gql.ID
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool {
return sig.Source == gql.ID
})
fatalErr(t, err)
@ -251,8 +248,8 @@ func TestGQLDB(t *testing.T) {
msgs = msgs.Add(ctx, gql_loaded.ID, gql_loaded.Key, NewStopSignal(), gql_loaded.ID)
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "stopped" && sig.Source == gql_loaded.ID
_, err = WaitForSignal(listener_ext.Chan, 100*time.Millisecond, func(sig *StoppedSignal) bool {
return sig.Source == gql_loaded.ID
})
fatalErr(t, err)
}

@ -21,7 +21,7 @@ func NewGroupExt(members []NodeID) *GroupExt {
}
}
func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
return nil
func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
return nil, nil
}

@ -28,7 +28,7 @@ func (listener *ListenerExt) Type() ExtType {
}
// Send the signal to the channel, logging an overflow if it occurs
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
ctx.Log.Logf("listener", "LISTENER_PROCESS: %s - %+v", node.ID, reflect.TypeOf(signal))
ctx.Log.Logf("listener_debug", "LISTENER_DETAIL %+v", signal)
select {
@ -36,5 +36,5 @@ func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal
default:
ctx.Log.Logf("listener", "LISTENER_OVERFLOW: %s", node.ID)
}
return nil
return nil, nil
}

@ -51,26 +51,28 @@ func NewLockableExt(requirements []NodeID) *LockableExt {
}
func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
msgs := Messages{}
messages := Messages{}
signal := NewLockSignal("unlock")
msgs = msgs.Add(ctx, node.ID, node.Key, signal, node.ID)
return signal.ID(), ctx.Send(msgs)
messages = messages.Add(ctx, node.ID, node.Key, signal, node.ID)
return signal.ID(), ctx.Send(messages)
}
func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
msgs := Messages{}
messages := Messages{}
signal := NewLockSignal("lock")
msgs = msgs.Add(ctx, node.ID, node.Key, signal, node.ID)
return signal.ID(), ctx.Send(msgs)
messages = messages.Add(ctx, node.ID, node.Key, signal, node.ID)
return signal.ID(), ctx.Send(messages)
}
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) Messages {
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) (Messages, Changes) {
str := signal.Error
ctx.Log.Logf("lockable", "ERROR_SIGNAL: %s->%s %+v", source, node.ID, str)
msgs := Messages {}
var messages Messages = nil
var changes Changes = nil
switch str {
case "not_unlocked":
changes = changes.Add(LockableExtType, "requirements")
if ext.State == Locking {
ext.State = AbortingLock
req_info := ext.Requirements[source]
@ -86,7 +88,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
ext.Requirements[id] = req_info
ctx.Log.Logf("lockable", "SENT_ABORT_UNLOCK: %s to %s", lock_signal.ID(), id)
msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id)
messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id)
}
}
}
@ -95,17 +97,18 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
case "not_requirement":
}
return msgs
return messages, changes
}
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) Messages {
msgs := Messages {}
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
if ext.State == Unlocked {
switch signal.Action {
case "add":
_, exists := ext.Requirements[signal.NodeID]
if exists == true {
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "already_requirement"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "already_requirement"), source)
} else {
if ext.Requirements == nil {
ext.Requirements = map[NodeID]ReqInfo{}
@ -114,31 +117,34 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
Unlocked,
uuid.UUID{},
}
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "req_added"), source)
changes = changes.Add(LockableExtType, "requirements")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), source)
}
case "remove":
_, exists := ext.Requirements[signal.NodeID]
if exists == false {
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "can't link: not_requirement"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "can't link: not_requirement"), source)
} else {
delete(ext.Requirements, signal.NodeID)
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "req_removed"), source)
changes = changes.Add(LockableExtType, "requirements")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), source)
}
default:
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "unknown_action"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "unknown_action"), source)
}
} else {
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source)
}
return msgs
return messages, changes
}
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) Messages {
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) (Messages, Changes) {
ctx.Log.Logf("lockable", "SUCCESS_SIGNAL: %+v", signal)
msgs := Messages{}
var messages Messages = nil
var changes Changes = nil
if source == node.ID {
return msgs
return messages, changes
}
info, found := ext.Requirements[source]
@ -148,6 +154,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
} else if info.MsgID != signal.ReqID {
ctx.Log.Logf("lockable", "Got success for wrong signal for %s: %s, expecting %s", source, signal.ReqID, info.MsgID)
} else {
changes = changes.Add(LockableExtType, "requirements")
if info.State == Locking {
if ext.State == Locking {
info.State = Locked
@ -166,7 +173,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
ext.State = Locked
ext.Owner = ext.PendingOwner
msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), *ext.Owner)
changes = changes.Add(LockableExtType, "state")
changes = changes.Add(LockableExtType, "owner")
changes = changes.Add(LockableExtType, "pending_owner")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), *ext.Owner)
} else {
ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, reqs)
}
@ -175,7 +185,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
info.State = Unlocking
info.MsgID = lock_signal.ID()
ext.Requirements[source] = info
msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, source)
messages = messages.Add(ctx, node.ID, node.Key, lock_signal, source)
}
} else if info.State == Unlocking {
info.State = Unlocked
@ -193,14 +203,17 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
if unlocked == reqs {
old_state := ext.State
ext.State = Unlocked
changes = changes.Add(LockableExtType, "state")
ctx.Log.Logf("lockable", "WHOLE UNLOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
if old_state == Unlocking {
previous_owner := *ext.Owner
ext.Owner = ext.PendingOwner
ext.ReqID = nil
msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), previous_owner)
changes = changes.Add(LockableExtType, "owner")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(ext.PendingID), previous_owner)
} else if old_state == AbortingLock {
msgs = msgs.Add(ctx ,node.ID, node.Key, NewErrorSignal(*ext.ReqID, "not_unlocked"), *ext.PendingOwner)
messages = messages.Add(ctx ,node.ID, node.Key, NewErrorSignal(*ext.ReqID, "not_unlocked"), *ext.PendingOwner)
changes = changes.Add(LockableExtType, "pending_owner")
ext.PendingOwner = ext.Owner
}
} else {
@ -209,14 +222,16 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
}
}
return msgs
return messages, changes
}
// Handle a LockSignal and update the extensions owner/requirement states
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) Messages {
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) (Messages, Changes) {
ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal.State)
msgs := Messages{}
var messages Messages = nil
var changes Changes = nil
switch signal.State {
case "lock":
if ext.State == Unlocked {
@ -225,7 +240,10 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source
ext.PendingOwner = &new_owner
ext.Owner = &new_owner
msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner)
changes = changes.Add(LockableExtType, "state")
changes = changes.Add(LockableExtType, "pending_owner")
changes = changes.Add(LockableExtType, "owner")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner)
} else {
ext.State = Locking
id := signal.ID()
@ -233,6 +251,9 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source
ext.PendingOwner = &new_owner
ext.PendingID = signal.ID()
changes = changes.Add(LockableExtType, "state")
changes = changes.Add(LockableExtType, "pending_owner")
changes = changes.Add(LockableExtType, "requirements")
for id, info := range(ext.Requirements) {
if info.State != Unlocked {
ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING")
@ -241,11 +262,11 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
info.State = Locking
info.MsgID = lock_signal.ID()
ext.Requirements[id] = info
msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id)
messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id)
}
}
} else {
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocked"), source)
}
case "unlock":
if ext.State == Locked {
@ -254,13 +275,19 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source
ext.PendingOwner = nil
ext.Owner = nil
msgs = msgs.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner)
changes = changes.Add(LockableExtType, "state")
changes = changes.Add(LockableExtType, "owner")
changes = changes.Add(LockableExtType, "pending_owner")
messages = messages.Add(ctx, node.ID, node.Key, NewSuccessSignal(signal.ID()), new_owner)
} else if source == *ext.Owner {
ext.State = Unlocking
id := signal.ID()
ext.ReqID = &id
ext.PendingOwner = nil
ext.PendingID = signal.ID()
changes = changes.Add(LockableExtType, "requirements")
changes = changes.Add(LockableExtType, "pending_owner")
changes = changes.Add(LockableExtType, "state")
for id, info := range(ext.Requirements) {
if info.State != Locked {
ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING")
@ -269,22 +296,24 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
info.State = Unlocking
info.MsgID = lock_signal.ID()
ext.Requirements[id] = info
msgs = msgs.Add(ctx, node.ID, node.Key, lock_signal, id)
messages = messages.Add(ctx, node.ID, node.Key, lock_signal, id)
}
}
} else {
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locked"), source)
messages = messages.Add(ctx, node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locked"), source)
}
default:
ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", signal.State)
}
return msgs
return messages, changes
}
// LockableExts process Up/Down signals by forwarding them to owner, dependency, and requirement nodes
// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
messages := Messages{}
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
switch signal.Direction() {
case Up:
if ext.Owner != nil {
@ -292,24 +321,26 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal
messages = messages.Add(ctx, node.ID, node.Key, signal, *ext.Owner)
}
}
case Down:
for requirement, _ := range(ext.Requirements) {
for requirement := range(ext.Requirements) {
messages = messages.Add(ctx, node.ID, node.Key, signal, requirement)
}
case Direct:
switch sig := signal.(type) {
case *LinkSignal:
messages = ext.HandleLinkSignal(ctx, node, source, sig)
messages, changes = ext.HandleLinkSignal(ctx, node, source, sig)
case *LockSignal:
messages = ext.HandleLockSignal(ctx, node, source, sig)
messages, changes = ext.HandleLockSignal(ctx, node, source, sig)
case *ErrorSignal:
messages = ext.HandleErrorSignal(ctx, node, source, sig)
messages, changes = ext.HandleErrorSignal(ctx, node, source, sig)
case *SuccessSignal:
messages = ext.HandleSuccessSignal(ctx, node, source, sig)
messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig)
default:
}
default:
}
return messages
return messages, changes
}

@ -18,7 +18,7 @@ func lockableTestContext(t *testing.T, logs []string) *Context {
}
func TestLink(t *testing.T) {
ctx := lockableTestContext(t, []string{"lockable"})
ctx := lockableTestContext(t, []string{"lockable", "listener"})
l1_pub, l1_key, err := ed25519.GenerateKey(rand.Reader)
fatalErr(t, err)
@ -34,31 +34,37 @@ func TestLink(t *testing.T) {
)
fatalErr(t, err)
l1_lockable := NewLockableExt(nil)
l1_listener := NewListenerExt(10)
l1, err := NewNode(ctx, l1_key, TestLockableType, 10, nil,
l1_listener,
NewLockableExt(nil),
l1_lockable,
)
fatalErr(t, err)
msgs := Messages{}
msgs = msgs.Add(ctx, l1.ID, l1.Key, NewLinkSignal("add", l2.ID), l1.ID)
link_signal := NewLinkSignal("add", l2.ID)
msgs = msgs.Add(ctx, l1.ID, l1.Key, link_signal, l1.ID)
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(l1_listener.Chan, time.Millisecond*10, func(sig *ErrorSignal) bool {
return sig.Error == "req_added"
})
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID())
fatalErr(t, err)
info, exists := l1_lockable.Requirements[l2.ID]
if exists == false {
t.Fatal("l2 not in l1 requirements")
} else if info.State != Unlocked {
t.Fatalf("l2 in bad requirement state in l1: %+v", info.State)
}
msgs = Messages{}
msgs = msgs.Add(ctx, l1.ID, l1.Key, NewLinkSignal("remove", l2.ID), l1.ID)
unlink_signal := NewLinkSignal("remove", l2.ID)
msgs = msgs.Add(ctx, l1.ID, l1.Key, unlink_signal, l1.ID)
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(l1_listener.Chan, time.Millisecond*10, func(sig *ErrorSignal) bool {
return sig.Error == "req_removed"
})
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID())
fatalErr(t, err)
}

@ -47,9 +47,33 @@ func RandID() NodeID {
return NodeID(uuid.New())
}
type Change struct {
Extension ExtType
Field string
Detail string
}
type Changes []Change
func (changes Changes) Add(ext ExtType, field string) Changes {
return append(changes, Change{
Extension: ext,
Field: field,
Detail: "",
})
}
func (changes Changes) AddDetail(ext ExtType, field string, detail string) Changes {
return append(changes, Change{
Extension: ext,
Field: field,
Detail: detail,
})
}
// Extensions are data attached to nodes that process signals
type Extension interface {
Process(*Context, *Node, NodeID, Signal) Messages
Process(*Context, *Node, NodeID, Signal) (Messages, Changes)
}
// A QueuedSignal is a Signal that has been Queued to trigger at a set time
@ -249,6 +273,10 @@ func nodeLoop(ctx *Context, node *Node) error {
// Perform startup actions
node.Process(ctx, ZeroID, NewStartSignal())
err := WriteNode(ctx, node)
if err != nil {
panic(err)
}
run := true
for run == true {
var signal Signal
@ -411,10 +439,6 @@ func nodeLoop(ctx *Context, node *Node) error {
switch sig := signal.(type) {
case *StopSignal:
node.Process(ctx, source, signal)
err := WriteNode(ctx, node)
if err != nil {
panic(err)
}
if source == node.ID {
node.Process(ctx, source, NewStoppedSignal(sig, node.ID))
} else {
@ -431,8 +455,7 @@ func nodeLoop(ctx *Context, node *Node) error {
ctx.Send(msgs)
default:
node.Process(ctx, source, signal)
err := WriteNode(ctx, node)
err := node.Process(ctx, source, signal)
if err != nil {
panic(err)
}
@ -514,20 +537,57 @@ func (node *Node) Stop(ctx *Context) error {
}
}
func (node *Node) QueueChanges(ctx *Context, changes Changes) error {
change_map := map[ExtType][]string{}
for _, change := range(changes) {
_, exists := change_map[change.Extension]
if exists == false {
change_map[change.Extension] = []string{}
}
change_map[change.Extension] = append(change_map[change.Extension], change.Field)
}
node.QueueSignal(time.Now(), NewStatusSignal(node.ID, fmt.Sprintf("%+v", change_map)))
return nil
}
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
messages := Messages{}
changes := Changes{}
for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
resp := ext.Process(ctx, node, source, signal)
if resp != nil {
messages = append(messages, resp...)
ext_messages, ext_changes := ext.Process(ctx, node, source, signal)
if len(ext_messages) != 0 {
messages = append(messages, ext_messages...)
}
if len(ext_changes) != 0 {
changes = append(changes, ext_changes...)
}
}
if len(messages) != 0 {
return ctx.Send(messages)
send_err := ctx.Send(messages)
if send_err != nil {
return send_err
}
}
if len(changes) != 0 {
_, ok := signal.(*StoppedSignal)
if (ok == false) || (source != node.ID) {
write_err := WriteNodeChanges(ctx, node, changes)
if write_err != nil {
return write_err
}
status_err := node.QueueChanges(ctx, changes)
if status_err != nil {
return status_err
}
}
}
return nil
}
@ -637,6 +697,11 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, node_type NodeType, buffer_si
return node, nil
}
func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error {
// TODO: optimize to not re-serialize unchanged extensions/fields(might need to cache the serialized values)
return WriteNode(ctx, node)
}
// Write a node to the database
func WriteNode(ctx *Context, node *Node) error {
ctx.Log.Logf("db", "DB_WRITE: %s", node.ID)

@ -26,8 +26,8 @@ func TestNodeDB(t *testing.T) {
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "stopped" && sig.Source == node.ID
_, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StoppedSignal) bool {
return sig.Source == node.ID
})
fatalErr(t, err)