More cleanup, moved initialization to interface instead of signals

gql_cataclysm
noah metz 2024-03-03 16:37:03 -07:00
parent faab7eb52c
commit 8927077167
11 changed files with 199 additions and 205 deletions

@ -44,6 +44,13 @@ func NewACLExt(policies []Policy) *ACLExt {
} }
} }
func (ext *ACLExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *ACLExt) Unload(ctx *Context, node *Node) {
}
func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
response, is_response := signal.(ResponseSignal) response, is_response := signal.(ResponseSignal)
if is_response == true { if is_response == true {
@ -51,7 +58,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
var changes = Changes{} var changes = Changes{}
info, waiting := ext.Pending[response.ResponseID()] info, waiting := ext.Pending[response.ResponseID()]
if waiting == true { if waiting == true {
AddChange[ACLExt](changes, "pending") changes.Add("pending")
delete(ext.Pending, response.ResponseID()) delete(ext.Pending, response.ResponseID())
if response.ID() != info.Timeout { if response.ID() != info.Timeout {
err := node.DequeueSignal(info.Timeout) err := node.DequeueSignal(info.Timeout)
@ -78,7 +85,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
} }
} else { } else {
if ext.Policies[policy_index].ContinueAllows(ctx, acl_info, response) == Allow { if ext.Policies[policy_index].ContinueAllows(ctx, acl_info, response) == Allow {
AddChange[ACLExt](changes, "pending_acls") changes.Add("pending_acls")
delete(ext.PendingACLs, info.ID) delete(ext.PendingACLs, info.ID)
ctx.Log.Logf("acl", "Request delayed allow") ctx.Log.Logf("acl", "Request delayed allow")
messages = messages.Add(ctx, acl_info.Source, node, nil, NewSuccessSignal(info.ID)) messages = messages.Add(ctx, acl_info.Source, node, nil, NewSuccessSignal(info.ID))
@ -87,7 +94,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
ctx.Log.Logf("acl", "acl proxy timeout dequeue error: %s", err) ctx.Log.Logf("acl", "acl proxy timeout dequeue error: %s", err)
} }
} else if acl_info.Counter == 0 { } else if acl_info.Counter == 0 {
AddChange[ACLExt](changes, "pending_acls") changes.Add("pending_acls")
delete(ext.PendingACLs, info.ID) delete(ext.PendingACLs, info.ID)
ctx.Log.Logf("acl", "Request delayed deny") ctx.Log.Logf("acl", "Request delayed deny")
messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(info.ID, "acl_denied")) messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(info.ID, "acl_denied"))
@ -97,7 +104,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
} }
} else { } else {
node.PendingACLs[info.ID] = acl_info node.PendingACLs[info.ID] = acl_info
AddChange[ACLExt](changes, "pending_acls") changes.Add("pending_acls")
} }
} }
} }
@ -136,7 +143,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
messages = messages.Add(ctx, source, node, nil, NewErrorSignal(sig.Id, "acl_denied")) messages = messages.Add(ctx, source, node, nil, NewErrorSignal(sig.Id, "acl_denied"))
} else if acl_messages != nil { } else if acl_messages != nil {
ctx.Log.Logf("acl", "Request pending") ctx.Log.Logf("acl", "Request pending")
AddChange[ACLExt](changes, "pending") changes.Add("pending")
total_messages := 0 total_messages := 0
// TODO: reasonable timeout/configurable // TODO: reasonable timeout/configurable
timeout_time := time.Now().Add(time.Second) timeout_time := time.Now().Add(time.Second)
@ -175,7 +182,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
acl_info, exists := ext.PendingACLs[sig.ReqID] acl_info, exists := ext.PendingACLs[sig.ReqID]
if exists == true { if exists == true {
delete(ext.PendingACLs, sig.ReqID) delete(ext.PendingACLs, sig.ReqID)
AddChange[ACLExt](changes, "pending_acls") changes.Add("pending_acls")
ctx.Log.Logf("acl", "Request timeout deny") ctx.Log.Logf("acl", "Request timeout deny")
messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(sig.ReqID, "acl_timeout")) messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(sig.ReqID, "acl_timeout"))
err := node.DequeueSignal(acl_info.TimeoutID) err := node.DequeueSignal(acl_info.TimeoutID)

@ -318,7 +318,16 @@ func (ctx *Context) Node(id NodeID) (*Node, bool) {
return node, exists return node, exists
} }
func (ctx *Context) Stop(id NodeID) error { func (ctx *Context) Delete(id NodeID) error {
err := ctx.Unload(id)
if err != nil {
return err
}
// TODO: also delete any associated data
return nil
}
func (ctx *Context) Unload(id NodeID) error {
ctx.nodeMapLock.Lock() ctx.nodeMapLock.Lock()
defer ctx.nodeMapLock.Unlock() defer ctx.nodeMapLock.Unlock()
node, exists := ctx.nodeMap[id] node, exists := ctx.nodeMap[id]
@ -326,15 +335,15 @@ func (ctx *Context) Stop(id NodeID) error {
return fmt.Errorf("%s is not a node in ctx", id) return fmt.Errorf("%s is not a node in ctx", id)
} }
err := node.Stop(ctx) err := node.Unload(ctx)
delete(ctx.nodeMap, id) delete(ctx.nodeMap, id)
return err return err
} }
func (ctx *Context) StopAll() { func (ctx *Context) Stop() {
ctx.nodeMapLock.Lock() ctx.nodeMapLock.Lock()
for id, node := range(ctx.nodeMap) { for id, node := range(ctx.nodeMap) {
node.Stop(ctx) node.Unload(ctx)
delete(ctx.nodeMap, id) delete(ctx.nodeMap, id)
} }
ctx.nodeMapLock.Unlock() ctx.nodeMapLock.Unlock()
@ -675,11 +684,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterSignal[StoppedSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterSignal[AddSubGroupSignal](ctx) err = RegisterSignal[AddSubGroupSignal](ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -710,21 +714,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterSignal[StopSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterSignal[CreateSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterSignal[StartSignal](ctx)
if err != nil {
return nil, err
}
err = RegisterSignal[StatusSignal](ctx) err = RegisterSignal[StatusSignal](ctx)
if err != nil { if err != nil {
return nil, err return nil, err

@ -49,6 +49,13 @@ type EventExt struct {
Parent NodeID `gv:"parent"` Parent NodeID `gv:"parent"`
} }
func (ext *EventExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *EventExt) Unload(ctx *Context, node *Node) {
}
func NewEventExt(parent NodeID, name string) *EventExt { func NewEventExt(parent NodeID, name string) *EventExt {
return &EventExt{ return &EventExt{
Name: name, Name: name,
@ -110,7 +117,7 @@ func (signal EventControlSignal) Permission() Tree {
func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) { func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) {
if ext.State != state { if ext.State != state {
ext.StateStart = state_start ext.StateStart = state_start
AddChange[EventExt](changes, "state") changes.Add("state")
ext.State = state ext.State = state
node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now())) node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now()))
} }
@ -131,6 +138,13 @@ type TestEventExt struct {
Length time.Duration Length time.Duration
} }
func (ext *TestEventExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *TestEventExt) Unload(ctx *Context, node *Node) {
}
type EventCommandMap map[EventCommand]map[EventState]EventState type EventCommandMap map[EventCommand]map[EventState]EventState
var test_event_commands = EventCommandMap{ var test_event_commands = EventCommandMap{
"ready?": { "ready?": {

@ -0,0 +1,24 @@
package graphvent
import (
)
// Extensions are data attached to nodes that process signals
type Extension interface {
// Called to process incoming signals, returning changes and messages to send
Process(*Context, *Node, NodeID, Signal) (Messages, Changes)
// Called when the node is loaded into a context(creation or move), so extension data can be initialized
Load(*Context, *Node) error
// Called when the node is unloaded from a context(deletion or move), so extension data can be cleaned up
Unload(*Context, *Node)
}
// Changes are lists of modifications made to extensions to be communicated
type Changes []string
func (changes *Changes) Add(fields ...string) {
new_changes := append(*changes, fields...)
changes = &new_changes
}

@ -1212,6 +1212,21 @@ type GQLExt struct {
Listen string `gv:"listen"` Listen string `gv:"listen"`
} }
func (ext *GQLExt) Load(ctx *Context, node *Node) error {
ctx.Log.Logf("gql", "Loading GQL server extension on %s", node.ID)
return ext.StartGQLServer(ctx, node)
}
func (ext *GQLExt) Unload(ctx *Context, node *Node) {
ctx.Log.Logf("gql", "Unloading GQL server extension on %s", node.ID)
err := ext.StopGQLServer()
if err != nil {
ctx.Log.Logf("gql", "Error unloading GQL server extension on %s: %s", node.ID, err)
} else {
ctx.Log.Logf("gql", "Unloaded GQL server extension on %s", node.ID)
}
}
func (ext *GQLExt) PostDeserialize(*Context) error { func (ext *GQLExt) PostDeserialize(*Context) error {
ext.resolver_response = map[uuid.UUID]chan Signal{} ext.resolver_response = map[uuid.UUID]chan Signal{}
ext.subscriptions = []SubscriptionInfo{} ext.subscriptions = []SubscriptionInfo{}
@ -1326,23 +1341,6 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa
ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig) ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig)
} }
case *StopSignal:
ctx.Log.Logf("gql", "stopping gql server %s", node.ID)
err := ext.StopGQLServer()
if err != nil {
ctx.Log.Logf("gql", "GQL_STOP_ERROR: %s", err)
}
case *StartSignal:
ctx.Log.Logf("gql", "starting gql server %s", node.ID)
err := ext.StartGQLServer(ctx, node)
if err == nil {
ctx.Log.Logf("gql", "started gql server on %s", ext.Listen)
AddChange[GQLExt](changes, "state")
} else {
ctx.Log.Logf("gql", "GQL_RESTART_ERROR: %s", err)
}
case *StatusSignal: case *StatusSignal:
ext.subscriptions_lock.RLock() ext.subscriptions_lock.RLock()
ctx.Log.Logf("gql", "forwarding status signal from %+v to resolvers %+v", sig.Source, ext.subscriptions) ctx.Log.Logf("gql", "forwarding status signal from %+v to resolvers %+v", sig.Source, ext.subscriptions)

@ -223,6 +223,14 @@ func NewGroupExt(sub_groups map[string][]NodeID) *GroupExt {
} }
} }
func (ext *GroupExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *GroupExt) Unload(ctx *Context, node *Node) {
}
func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil var messages Messages = nil
var changes = Changes{} var changes = Changes{}
@ -240,7 +248,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig
ext.SubGroups[sig.SubGroup] = sub_group ext.SubGroups[sig.SubGroup] = sub_group
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
AddChange[GroupExt](changes, "sub_groups") changes.Add("sub_groups")
} }
} }
@ -257,7 +265,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig
ext.SubGroups[sig.SubGroup] = sub_group ext.SubGroups[sig.SubGroup] = sub_group
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
AddChange[GroupExt](changes, "sub_groups") changes.Add("sub_groups")
} }
} }
@ -268,7 +276,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig
} else { } else {
ext.SubGroups[sig.Name] = []NodeID{} ext.SubGroups[sig.Name] = []NodeID{}
AddChange[GroupExt](changes, "sub_groups") changes.Add("sub_groups")
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
} }
case *RemoveSubGroupSignal: case *RemoveSubGroupSignal:
@ -278,7 +286,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig
} else { } else {
delete(ext.SubGroups, sig.Name) delete(ext.SubGroups, sig.Name)
AddChange[GroupExt](changes, "sub_groups") changes.Add("sub_groups")
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
} }
} }

@ -10,6 +10,13 @@ type ListenerExt struct {
Chan chan Signal Chan chan Signal
} }
func (ext *ListenerExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *ListenerExt) Unload(ctx *Context, node *Node) {
}
func (ext *ListenerExt) PostDeserialize(ctx *Context) error { func (ext *ListenerExt) PostDeserialize(ctx *Context) error {
ext.Chan = make(chan Signal, ext.Buffer) ext.Chan = make(chan Signal, ext.Buffer)
return nil return nil

@ -75,15 +75,22 @@ func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
return signal.ID(), ctx.Send(messages) return signal.ID(), ctx.Send(messages)
} }
func (ext *LockableExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *LockableExt) Unload(ctx *Context, node *Node) {
}
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) (Messages, Changes) { func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) (Messages, Changes) {
var messages Messages = nil var messages Messages = nil
var changes = Changes{} var changes Changes = nil
info, info_found := node.ProcessResponse(ext.WaitInfos, signal) info, info_found := node.ProcessResponse(ext.WaitInfos, signal)
if info_found { if info_found {
state, found := ext.Requirements[info.Destination] state, found := ext.Requirements[info.Destination]
if found == true { if found == true {
AddChange[LockableExt](changes, "wait_infos") changes.Add("wait_infos")
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State]) ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State])
switch ext.State { switch ext.State {
case AbortingLock: case AbortingLock:
@ -96,11 +103,11 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
} }
} }
if all_unlocked == true { if all_unlocked == true {
AddChange[LockableExt](changes, "state") changes.Add("state")
ext.State = Unlocked ext.State = Unlocked
} }
case Locking: case Locking:
AddChange[LockableExt](changes, "state") changes.Add("state")
ext.Requirements[info.Destination] = Unlocked ext.Requirements[info.Destination] = Unlocked
unlocked := 0 unlocked := 0
for _, state := range(ext.Requirements) { for _, state := range(ext.Requirements) {
@ -160,7 +167,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
ext.Requirements = map[NodeID]ReqState{} ext.Requirements = map[NodeID]ReqState{}
} }
ext.Requirements[signal.NodeID] = Unlocked ext.Requirements[signal.NodeID] = Unlocked
AddChange[LockableExt](changes, "requirements") changes.Add("requirements")
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID())) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID()))
} }
case "remove": case "remove":
@ -169,7 +176,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
messages = messages.Add(ctx, source, node, nil, NewErrorSignal(signal.ID(), "can't link: not_requirement")) messages = messages.Add(ctx, source, node, nil, NewErrorSignal(signal.ID(), "can't link: not_requirement"))
} else { } else {
delete(ext.Requirements, signal.NodeID) delete(ext.Requirements, signal.NodeID)
AddChange[LockableExt](changes, "requirements") changes.Add("requirements")
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID())) messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID()))
} }
default: default:
@ -210,10 +217,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
ext.State = Locked ext.State = Locked
ext.Owner = ext.PendingOwner ext.Owner = ext.PendingOwner
AddChange[LockableExt](changes, "state", "owner", "requirements") changes.Add("state", "owner", "requirements")
messages = messages.Add(ctx, *ext.Owner, node, nil, NewSuccessSignal(ext.PendingID)) messages = messages.Add(ctx, *ext.Owner, node, nil, NewSuccessSignal(ext.PendingID))
} else { } else {
AddChange[LockableExt](changes, "requirements") changes.Add("requirements")
ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements))
} }
case AbortingLock: case AbortingLock:
@ -246,15 +253,15 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
previous_owner := *ext.Owner previous_owner := *ext.Owner
ext.Owner = ext.PendingOwner ext.Owner = ext.PendingOwner
ext.ReqID = nil ext.ReqID = nil
AddChange[LockableExt](changes, "state", "owner", "req_id") changes.Add("state", "owner", "req_id")
messages = messages.Add(ctx, previous_owner, node, nil, NewSuccessSignal(ext.PendingID)) messages = messages.Add(ctx, previous_owner, node, nil, NewSuccessSignal(ext.PendingID))
} else if old_state == AbortingLock { } else if old_state == AbortingLock {
AddChange[LockableExt](changes, "state", "pending_owner") changes.Add("state", "pending_owner")
messages = messages.Add(ctx, *ext.PendingOwner, node, nil, NewErrorSignal(*ext.ReqID, "not_unlocked")) messages = messages.Add(ctx, *ext.PendingOwner, node, nil, NewErrorSignal(*ext.ReqID, "not_unlocked"))
ext.PendingOwner = ext.Owner ext.PendingOwner = ext.Owner
} }
} else { } else {
AddChange[LockableExt](changes, "state") changes.Add("state")
ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements)) ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements))
} }
} }
@ -278,7 +285,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = &new_owner ext.PendingOwner = &new_owner
ext.Owner = &new_owner ext.Owner = &new_owner
AddChange[LockableExt](changes, "state", "pending_owner", "owner") changes.Add("state", "pending_owner", "owner")
messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID())) messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID()))
} else { } else {
ext.State = Locking ext.State = Locking
@ -287,7 +294,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = &new_owner ext.PendingOwner = &new_owner
ext.PendingID = signal.ID() ext.PendingID = signal.ID()
AddChange[LockableExt](changes, "state", "req_id", "pending_owner", "pending_id") changes.Add("state", "req_id", "pending_owner", "pending_id")
for id, state := range(ext.Requirements) { for id, state := range(ext.Requirements) {
if state != Unlocked { if state != Unlocked {
ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING")
@ -311,7 +318,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
new_owner := source new_owner := source
ext.PendingOwner = nil ext.PendingOwner = nil
ext.Owner = nil ext.Owner = nil
AddChange[LockableExt](changes, "state", "pending_owner", "owner") changes.Add("state", "pending_owner", "owner")
messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID())) messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID()))
} else if source == *ext.Owner { } else if source == *ext.Owner {
ext.State = Unlocking ext.State = Unlocking
@ -319,7 +326,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
ext.ReqID = &id ext.ReqID = &id
ext.PendingOwner = nil ext.PendingOwner = nil
ext.PendingID = signal.ID() ext.PendingID = signal.ID()
AddChange[LockableExt](changes, "state", "pending_owner", "pending_id", "req_id") changes.Add("state", "pending_owner", "pending_id", "req_id")
for id, state := range(ext.Requirements) { for id, state := range(ext.Requirements) {
if state != Locked { if state != Locked {
ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING")
@ -347,7 +354,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod
wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) wait_info, found := node.ProcessResponse(ext.WaitInfos, signal)
if found == true { if found == true {
AddChange[LockableExt](changes, "wait_infos") changes.Add("wait_infos")
state, found := ext.Requirements[wait_info.Destination] state, found := ext.Requirements[wait_info.Destination]
if found == true { if found == true {
ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state])
@ -362,7 +369,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod
} }
} }
if all_unlocked == true { if all_unlocked == true {
AddChange[LockableExt](changes, "state") changes.Add("state")
ext.State = Unlocked ext.State = Unlocked
} }
case Locking: case Locking:

@ -47,27 +47,6 @@ func RandID() NodeID {
return NodeID(uuid.New()) return NodeID(uuid.New())
} }
type Changes map[ExtType][]string
func AddChange[E any, T interface { *E; Extension}](changes Changes, fields ...string) {
ext_type := ExtType(SerializedTypeFor[E]())
changes.Add(ext_type, fields...)
}
func (changes Changes) Add(ext ExtType, fields ...string) {
current, exists := changes[ext]
if exists == false {
current = []string{}
}
current = append(current, fields...)
changes[ext] = current
}
// Extensions are data attached to nodes that process signals
type Extension interface {
Process(*Context, *Node, NodeID, Signal) (Messages, Changes)
}
// A QueuedSignal is a Signal that has been Queued to trigger at a set time // A QueuedSignal is a Signal that has been Queued to trigger at a set time
type QueuedSignal struct { type QueuedSignal struct {
Signal `gv:"signal"` Signal `gv:"signal"`
@ -313,8 +292,14 @@ func nodeLoop(ctx *Context, node *Node) error {
return fmt.Errorf("%s is already started, will not start again", node.ID) return fmt.Errorf("%s is already started, will not start again", node.ID)
} }
// Perform startup actions // Load each extension before starting the main loop
node.Process(ctx, ZeroID, NewStartSignal()) for _, extension := range(node.Extensions) {
err := extension.Load(ctx, node)
if err != nil {
return err
}
}
run := true run := true
for run == true { for run == true {
var signal Signal var signal Signal
@ -497,17 +482,6 @@ func nodeLoop(ctx *Context, node *Node) error {
} }
switch sig := signal.(type) { switch sig := signal.(type) {
case *StopSignal:
node.Process(ctx, source, signal)
if source == node.ID {
node.Process(ctx, source, NewStoppedSignal(sig, node.ID))
} else {
msgs := Messages{}
msgs = msgs.Add(ctx, node.ID, node, nil, NewStoppedSignal(sig, node.ID))
ctx.Send(msgs)
}
run = false
case *ReadSignal: case *ReadSignal:
result := node.ReadFields(ctx, sig.Extensions) result := node.ReadFields(ctx, sig.Extensions)
msgs := Messages{} msgs := Messages{}
@ -530,20 +504,18 @@ func nodeLoop(ctx *Context, node *Node) error {
return nil return nil
} }
func (node *Node) Stop(ctx *Context) error { func (node *Node) Unload(ctx *Context) error {
if node.Active.Load() { if node.Active.Load() {
msg, err := NewMessage(ctx, node.ID, node, nil, NewStopSignal()) for _, extension := range(node.Extensions) {
if err != nil { extension.Unload(ctx, node)
return err
} }
node.MsgChan <- msg
return nil return nil
} else { } else {
return fmt.Errorf("Node not active") return fmt.Errorf("Node not active")
} }
} }
func (node *Node) QueueChanges(ctx *Context, changes Changes) error { func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error {
node.QueueSignal(time.Now(), NewStatusSignal(node.ID, changes)) node.QueueSignal(time.Now(), NewStatusSignal(node.ID, changes))
return nil return nil
} }
@ -551,18 +523,15 @@ func (node *Node) QueueChanges(ctx *Context, changes Changes) error {
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
messages := Messages{} messages := Messages{}
changes := Changes{} changes := map[ExtType]Changes{}
for ext_type, ext := range(node.Extensions) { for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
ext_messages, ext_changes := ext.Process(ctx, node, source, signal) ext_messages, ext_changes := ext.Process(ctx, node, source, signal)
if len(ext_messages) != 0 { if len(ext_messages) != 0 {
messages = append(messages, ext_messages...) messages = append(messages, ext_messages...)
} }
if len(ext_changes) != 0 { if len(ext_changes) != 0 {
for ext, change_list := range(ext_changes) { changes[ext_type] = ext_changes
changes[ext] = append(changes[ext], change_list...)
}
} }
} }
ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes) ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes)
@ -575,8 +544,6 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
} }
if len(changes) != 0 { if len(changes) != 0 {
_, ok := signal.(*StoppedSignal)
if (ok == false) || (source != node.ID) {
write_err := WriteNodeChanges(ctx, node, changes) write_err := WriteNodeChanges(ctx, node, changes)
if write_err != nil { if write_err != nil {
return write_err return write_err
@ -587,7 +554,6 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
return status_err return status_err
} }
} }
}
return nil return nil
} }
@ -657,7 +623,6 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size
return nil, fmt.Errorf("Node type %+v not registered in Context", node_type) return nil, fmt.Errorf("Node type %+v not registered in Context", node_type)
} }
changes := Changes{}
ext_map := map[ExtType]Extension{} ext_map := map[ExtType]Extension{}
for _, ext := range(extensions) { for _, ext := range(extensions) {
ext_type, exists := ctx.ExtensionTypes[reflect.TypeOf(ext)] ext_type, exists := ctx.ExtensionTypes[reflect.TypeOf(ext)]
@ -669,7 +634,6 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size
return nil, fmt.Errorf("Cannot add the same extension to a node twice") return nil, fmt.Errorf("Cannot add the same extension to a node twice")
} }
ext_map[ext_type] = ext ext_map[ext_type] = ext
changes.Add(ext_type, "init")
} }
for _, required_ext := range(def.Extensions) { for _, required_ext := range(def.Extensions) {
@ -700,18 +664,12 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size
} }
node.writeSignalQueue = true node.writeSignalQueue = true
err = WriteNodeChanges(ctx, node, changes) err = WriteNodeInit(ctx, node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx.AddNode(id, node) ctx.AddNode(id, node)
err = node.Process(ctx, ZeroID, NewCreateSignal())
if err != nil {
return nil, err
}
go runNode(ctx, node) go runNode(ctx, node)
return node, nil return node, nil
@ -734,6 +692,8 @@ func WriteNodeExtList(ctx *Context, node *Node) error {
i += 1 i += 1
} }
ctx.Log.Logf("db", "Writing ext_list for %s - %+v", node.ID, ext_list)
id_bytes, err := node.ID.MarshalBinary() id_bytes, err := node.ID.MarshalBinary()
if err != nil { if err != nil {
return err return err
@ -749,7 +709,54 @@ func WriteNodeExtList(ctx *Context, node *Node) error {
}) })
} }
func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error { func WriteNodeInit(ctx *Context, node *Node) error {
ctx.Log.Logf("db", "Writing initial entry for %s - %+v", node.ID, node)
ext_serialized := map[ExtType]SerializedValue{}
for ext_type, ext := range(node.Extensions) {
serialized_ext, err := SerializeAny(ctx, ext)
if err != nil {
return err
}
ext_serialized[ext_type] = serialized_ext
}
sq_serialized, err := SerializeAny(ctx, node.SignalQueue)
if err != nil {
return err
}
node_serialized, err := SerializeAny(ctx, node)
if err != nil {
return err
}
id_bytes, err := node.ID.MarshalBinary()
return ctx.DB.Update(func(txn *badger.Txn) error {
err := txn.Set(id_bytes, node_serialized.Data)
if err != nil {
return nil
}
err = txn.Set(append(id_bytes, signal_queue_suffix...), sq_serialized.Data)
if err != nil {
return err
}
for ext_type, data := range(ext_serialized) {
err := txn.Set(append(id_bytes, ExtTypeSuffix(ext_type)...), data.Data)
if err != nil {
return err
}
}
return nil
})
}
func WriteNodeChanges(ctx *Context, node *Node, changes map[ExtType]Changes) error {
ctx.Log.Logf("db", "Writing changes for %s - %+v", node.ID, changes) ctx.Log.Logf("db", "Writing changes for %s - %+v", node.ID, changes)
ext_serialized := map[ExtType]SerializedValue{} ext_serialized := map[ExtType]SerializedValue{}

@ -9,7 +9,7 @@ import (
) )
func TestNodeDB(t *testing.T) { func TestNodeDB(t *testing.T) {
ctx := logTestContext(t, []string{"signal", "serialize", "node", "db", "listener"}) ctx := logTestContext(t, []string{"node", "db"})
node_listener := NewListenerExt(10) node_listener := NewListenerExt(10)
node, err := NewNode(ctx, nil, "Base", 10, nil, NewGroupExt(nil), NewLockableExt(nil), node_listener) node, err := NewNode(ctx, nil, "Base", 10, nil, NewGroupExt(nil), NewLockableExt(nil), node_listener)
@ -23,14 +23,7 @@ func TestNodeDB(t *testing.T) {
return false return false
}) })
msgs := Messages{} err = ctx.Unload(node.ID)
msgs = msgs.Add(ctx, node.ID, node, nil, NewStopSignal())
err = ctx.Send(msgs)
fatalErr(t, err)
_, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StoppedSignal) bool {
return sig.Source == node.ID
})
fatalErr(t, err) fatalErr(t, err)
ctx.nodeMap = map[NodeID]*Node{} ctx.nodeMap = map[NodeID]*Node{}

@ -143,66 +143,6 @@ func NewResponseHeader(req_id uuid.UUID, direction SignalDirection) ResponseHead
} }
} }
type CreateSignal struct {
SignalHeader
}
func (signal CreateSignal) Permission() Tree {
return Tree{
SerializedType(SignalTypeFor[CreateSignal]()): nil,
}
}
func NewCreateSignal() *CreateSignal {
return &CreateSignal{
NewSignalHeader(Direct),
}
}
type StartSignal struct {
SignalHeader
}
func (signal StartSignal) Permission() Tree {
return Tree{
SerializedType(SignalTypeFor[StartSignal]()): nil,
}
}
func NewStartSignal() *StartSignal {
return &StartSignal{
NewSignalHeader(Direct),
}
}
type StoppedSignal struct {
ResponseHeader
Source NodeID
}
func (signal StoppedSignal) Permission() Tree {
return Tree{
SerializedType(SignalTypeFor[ResponseSignal]()): nil,
}
}
func NewStoppedSignal(sig *StopSignal, source NodeID) *StoppedSignal {
return &StoppedSignal{
NewResponseHeader(sig.ID(), Up),
source,
}
}
type StopSignal struct {
SignalHeader
}
func (signal StopSignal) Permission() Tree {
return Tree{
SerializedType(SignalTypeFor[StopSignal]()): nil,
}
}
func NewStopSignal() *StopSignal {
return &StopSignal{
NewSignalHeader(Direct),
}
}
type SuccessSignal struct { type SuccessSignal struct {
ResponseHeader ResponseHeader
} }
@ -263,7 +203,7 @@ func NewACLTimeoutSignal(req_id uuid.UUID) *ACLTimeoutSignal {
type StatusSignal struct { type StatusSignal struct {
SignalHeader SignalHeader
Source NodeID `gv:"source"` Source NodeID `gv:"source"`
Changes Changes `gv:"changes"` Changes map[ExtType]Changes `gv:"changes"`
} }
func (signal StatusSignal) Permission() Tree { func (signal StatusSignal) Permission() Tree {
return Tree{ return Tree{
@ -273,7 +213,7 @@ func (signal StatusSignal) Permission() Tree {
func (signal StatusSignal) String() string { func (signal StatusSignal) String() string {
return fmt.Sprintf("StatusSignal(%s, %+v)", signal.SignalHeader, signal.Changes) return fmt.Sprintf("StatusSignal(%s, %+v)", signal.SignalHeader, signal.Changes)
} }
func NewStatusSignal(source NodeID, changes Changes) *StatusSignal { func NewStatusSignal(source NodeID, changes map[ExtType]Changes) *StatusSignal {
return &StatusSignal{ return &StatusSignal{
NewSignalHeader(Up), NewSignalHeader(Up),
source, source,