diff --git a/acl.go b/acl.go index 147183a..f5b81ff 100644 --- a/acl.go +++ b/acl.go @@ -44,6 +44,13 @@ func NewACLExt(policies []Policy) *ACLExt { } } +func (ext *ACLExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *ACLExt) Unload(ctx *Context, node *Node) { +} + func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { response, is_response := signal.(ResponseSignal) if is_response == true { @@ -51,7 +58,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa var changes = Changes{} info, waiting := ext.Pending[response.ResponseID()] if waiting == true { - AddChange[ACLExt](changes, "pending") + changes.Add("pending") delete(ext.Pending, response.ResponseID()) if response.ID() != info.Timeout { err := node.DequeueSignal(info.Timeout) @@ -78,7 +85,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } } else { if ext.Policies[policy_index].ContinueAllows(ctx, acl_info, response) == Allow { - AddChange[ACLExt](changes, "pending_acls") + changes.Add("pending_acls") delete(ext.PendingACLs, info.ID) ctx.Log.Logf("acl", "Request delayed allow") messages = messages.Add(ctx, acl_info.Source, node, nil, NewSuccessSignal(info.ID)) @@ -87,7 +94,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa ctx.Log.Logf("acl", "acl proxy timeout dequeue error: %s", err) } } else if acl_info.Counter == 0 { - AddChange[ACLExt](changes, "pending_acls") + changes.Add("pending_acls") delete(ext.PendingACLs, info.ID) ctx.Log.Logf("acl", "Request delayed deny") messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(info.ID, "acl_denied")) @@ -97,7 +104,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa } } else { node.PendingACLs[info.ID] = acl_info - AddChange[ACLExt](changes, "pending_acls") + changes.Add("pending_acls") } } } @@ -136,7 +143,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa messages = messages.Add(ctx, source, node, nil, NewErrorSignal(sig.Id, "acl_denied")) } else if acl_messages != nil { ctx.Log.Logf("acl", "Request pending") - AddChange[ACLExt](changes, "pending") + changes.Add("pending") total_messages := 0 // TODO: reasonable timeout/configurable timeout_time := time.Now().Add(time.Second) @@ -175,7 +182,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa acl_info, exists := ext.PendingACLs[sig.ReqID] if exists == true { delete(ext.PendingACLs, sig.ReqID) - AddChange[ACLExt](changes, "pending_acls") + changes.Add("pending_acls") ctx.Log.Logf("acl", "Request timeout deny") messages = messages.Add(ctx, acl_info.Source, node, nil, NewErrorSignal(sig.ReqID, "acl_timeout")) err := node.DequeueSignal(acl_info.TimeoutID) diff --git a/context.go b/context.go index e99cac2..3ce389a 100644 --- a/context.go +++ b/context.go @@ -318,7 +318,16 @@ func (ctx *Context) Node(id NodeID) (*Node, bool) { return node, exists } -func (ctx *Context) Stop(id NodeID) error { +func (ctx *Context) Delete(id NodeID) error { + err := ctx.Unload(id) + if err != nil { + return err + } + // TODO: also delete any associated data + return nil +} + +func (ctx *Context) Unload(id NodeID) error { ctx.nodeMapLock.Lock() defer ctx.nodeMapLock.Unlock() node, exists := ctx.nodeMap[id] @@ -326,15 +335,15 @@ func (ctx *Context) Stop(id NodeID) error { return fmt.Errorf("%s is not a node in ctx", id) } - err := node.Stop(ctx) + err := node.Unload(ctx) delete(ctx.nodeMap, id) return err } -func (ctx *Context) StopAll() { +func (ctx *Context) Stop() { ctx.nodeMapLock.Lock() for id, node := range(ctx.nodeMap) { - node.Stop(ctx) + node.Unload(ctx) delete(ctx.nodeMap, id) } ctx.nodeMapLock.Unlock() @@ -675,11 +684,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = RegisterSignal[StoppedSignal](ctx) - if err != nil { - return nil, err - } - err = RegisterSignal[AddSubGroupSignal](ctx) if err != nil { return nil, err @@ -710,21 +714,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = RegisterSignal[StopSignal](ctx) - if err != nil { - return nil, err - } - - err = RegisterSignal[CreateSignal](ctx) - if err != nil { - return nil, err - } - - err = RegisterSignal[StartSignal](ctx) - if err != nil { - return nil, err - } - err = RegisterSignal[StatusSignal](ctx) if err != nil { return nil, err diff --git a/event.go b/event.go index d101cf7..c8e7c12 100644 --- a/event.go +++ b/event.go @@ -49,6 +49,13 @@ type EventExt struct { Parent NodeID `gv:"parent"` } +func (ext *EventExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *EventExt) Unload(ctx *Context, node *Node) { +} + func NewEventExt(parent NodeID, name string) *EventExt { return &EventExt{ Name: name, @@ -110,7 +117,7 @@ func (signal EventControlSignal) Permission() Tree { func (ext *EventExt) UpdateState(node *Node, changes Changes, state EventState, state_start time.Time) { if ext.State != state { ext.StateStart = state_start - AddChange[EventExt](changes, "state") + changes.Add("state") ext.State = state node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now())) } @@ -131,6 +138,13 @@ type TestEventExt struct { Length time.Duration } +func (ext *TestEventExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *TestEventExt) Unload(ctx *Context, node *Node) { +} + type EventCommandMap map[EventCommand]map[EventState]EventState var test_event_commands = EventCommandMap{ "ready?": { diff --git a/extension.go b/extension.go new file mode 100644 index 0000000..3cef47c --- /dev/null +++ b/extension.go @@ -0,0 +1,24 @@ +package graphvent + +import ( + +) + +// Extensions are data attached to nodes that process signals +type Extension interface { + // Called to process incoming signals, returning changes and messages to send + Process(*Context, *Node, NodeID, Signal) (Messages, Changes) + + // Called when the node is loaded into a context(creation or move), so extension data can be initialized + Load(*Context, *Node) error + + // Called when the node is unloaded from a context(deletion or move), so extension data can be cleaned up + Unload(*Context, *Node) +} + +// Changes are lists of modifications made to extensions to be communicated +type Changes []string +func (changes *Changes) Add(fields ...string) { + new_changes := append(*changes, fields...) + changes = &new_changes +} diff --git a/gql.go b/gql.go index 9414d77..db5328b 100644 --- a/gql.go +++ b/gql.go @@ -1212,6 +1212,21 @@ type GQLExt struct { Listen string `gv:"listen"` } +func (ext *GQLExt) Load(ctx *Context, node *Node) error { + ctx.Log.Logf("gql", "Loading GQL server extension on %s", node.ID) + return ext.StartGQLServer(ctx, node) +} + +func (ext *GQLExt) Unload(ctx *Context, node *Node) { + ctx.Log.Logf("gql", "Unloading GQL server extension on %s", node.ID) + err := ext.StopGQLServer() + if err != nil { + ctx.Log.Logf("gql", "Error unloading GQL server extension on %s: %s", node.ID, err) + } else { + ctx.Log.Logf("gql", "Unloaded GQL server extension on %s", node.ID) + } +} + func (ext *GQLExt) PostDeserialize(*Context) error { ext.resolver_response = map[uuid.UUID]chan Signal{} ext.subscriptions = []SubscriptionInfo{} @@ -1326,23 +1341,6 @@ func (ext *GQLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig) } - case *StopSignal: - ctx.Log.Logf("gql", "stopping gql server %s", node.ID) - err := ext.StopGQLServer() - if err != nil { - ctx.Log.Logf("gql", "GQL_STOP_ERROR: %s", err) - } - - case *StartSignal: - ctx.Log.Logf("gql", "starting gql server %s", node.ID) - err := ext.StartGQLServer(ctx, node) - if err == nil { - ctx.Log.Logf("gql", "started gql server on %s", ext.Listen) - AddChange[GQLExt](changes, "state") - } else { - ctx.Log.Logf("gql", "GQL_RESTART_ERROR: %s", err) - } - case *StatusSignal: ext.subscriptions_lock.RLock() ctx.Log.Logf("gql", "forwarding status signal from %+v to resolvers %+v", sig.Source, ext.subscriptions) diff --git a/group.go b/group.go index dcfa14f..84121d8 100644 --- a/group.go +++ b/group.go @@ -223,6 +223,14 @@ func NewGroupExt(sub_groups map[string][]NodeID) *GroupExt { } } +func (ext *GroupExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *GroupExt) Unload(ctx *Context, node *Node) { + +} + func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { var messages Messages = nil var changes = Changes{} @@ -240,7 +248,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig ext.SubGroups[sig.SubGroup] = sub_group messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) - AddChange[GroupExt](changes, "sub_groups") + changes.Add("sub_groups") } } @@ -257,7 +265,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig ext.SubGroups[sig.SubGroup] = sub_group messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) - AddChange[GroupExt](changes, "sub_groups") + changes.Add("sub_groups") } } @@ -268,7 +276,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig } else { ext.SubGroups[sig.Name] = []NodeID{} - AddChange[GroupExt](changes, "sub_groups") + changes.Add("sub_groups") messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) } case *RemoveSubGroupSignal: @@ -278,7 +286,7 @@ func (ext *GroupExt) Process(ctx *Context, node *Node, source NodeID, signal Sig } else { delete(ext.SubGroups, sig.Name) - AddChange[GroupExt](changes, "sub_groups") + changes.Add("sub_groups") messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) } } diff --git a/listener.go b/listener.go index 2d88c5e..ebeab5f 100644 --- a/listener.go +++ b/listener.go @@ -10,6 +10,13 @@ type ListenerExt struct { Chan chan Signal } +func (ext *ListenerExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *ListenerExt) Unload(ctx *Context, node *Node) { +} + func (ext *ListenerExt) PostDeserialize(ctx *Context) error { ext.Chan = make(chan Signal, ext.Buffer) return nil diff --git a/lockable.go b/lockable.go index 86daa4a..30e10a4 100644 --- a/lockable.go +++ b/lockable.go @@ -75,15 +75,22 @@ func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) { return signal.ID(), ctx.Send(messages) } +func (ext *LockableExt) Load(ctx *Context, node *Node) error { + return nil +} + +func (ext *LockableExt) Unload(ctx *Context, node *Node) { +} + func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) (Messages, Changes) { var messages Messages = nil - var changes = Changes{} + var changes Changes = nil info, info_found := node.ProcessResponse(ext.WaitInfos, signal) if info_found { state, found := ext.Requirements[info.Destination] if found == true { - AddChange[LockableExt](changes, "wait_infos") + changes.Add("wait_infos") ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State]) switch ext.State { case AbortingLock: @@ -96,11 +103,11 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI } } if all_unlocked == true { - AddChange[LockableExt](changes, "state") + changes.Add("state") ext.State = Unlocked } case Locking: - AddChange[LockableExt](changes, "state") + changes.Add("state") ext.Requirements[info.Destination] = Unlocked unlocked := 0 for _, state := range(ext.Requirements) { @@ -160,7 +167,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID ext.Requirements = map[NodeID]ReqState{} } ext.Requirements[signal.NodeID] = Unlocked - AddChange[LockableExt](changes, "requirements") + changes.Add("requirements") messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID())) } case "remove": @@ -169,7 +176,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID messages = messages.Add(ctx, source, node, nil, NewErrorSignal(signal.ID(), "can't link: not_requirement")) } else { delete(ext.Requirements, signal.NodeID) - AddChange[LockableExt](changes, "requirements") + changes.Add("requirements") messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(signal.ID())) } default: @@ -210,10 +217,10 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) ext.State = Locked ext.Owner = ext.PendingOwner - AddChange[LockableExt](changes, "state", "owner", "requirements") + changes.Add("state", "owner", "requirements") messages = messages.Add(ctx, *ext.Owner, node, nil, NewSuccessSignal(ext.PendingID)) } else { - AddChange[LockableExt](changes, "requirements") + changes.Add("requirements") ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) } case AbortingLock: @@ -246,15 +253,15 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod previous_owner := *ext.Owner ext.Owner = ext.PendingOwner ext.ReqID = nil - AddChange[LockableExt](changes, "state", "owner", "req_id") + changes.Add("state", "owner", "req_id") messages = messages.Add(ctx, previous_owner, node, nil, NewSuccessSignal(ext.PendingID)) } else if old_state == AbortingLock { - AddChange[LockableExt](changes, "state", "pending_owner") + changes.Add("state", "pending_owner") messages = messages.Add(ctx, *ext.PendingOwner, node, nil, NewErrorSignal(*ext.ReqID, "not_unlocked")) ext.PendingOwner = ext.Owner } } else { - AddChange[LockableExt](changes, "state") + changes.Add("state") ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements)) } } @@ -278,7 +285,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.Owner = &new_owner - AddChange[LockableExt](changes, "state", "pending_owner", "owner") + changes.Add("state", "pending_owner", "owner") messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID())) } else { ext.State = Locking @@ -287,7 +294,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = &new_owner ext.PendingID = signal.ID() - AddChange[LockableExt](changes, "state", "req_id", "pending_owner", "pending_id") + changes.Add("state", "req_id", "pending_owner", "pending_id") for id, state := range(ext.Requirements) { if state != Unlocked { ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") @@ -311,7 +318,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID new_owner := source ext.PendingOwner = nil ext.Owner = nil - AddChange[LockableExt](changes, "state", "pending_owner", "owner") + changes.Add("state", "pending_owner", "owner") messages = messages.Add(ctx, new_owner, node, nil, NewSuccessSignal(signal.ID())) } else if source == *ext.Owner { ext.State = Unlocking @@ -319,7 +326,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID ext.ReqID = &id ext.PendingOwner = nil ext.PendingID = signal.ID() - AddChange[LockableExt](changes, "state", "pending_owner", "pending_id", "req_id") + changes.Add("state", "pending_owner", "pending_id", "req_id") for id, state := range(ext.Requirements) { if state != Locked { ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") @@ -347,7 +354,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) if found == true { - AddChange[LockableExt](changes, "wait_infos") + changes.Add("wait_infos") state, found := ext.Requirements[wait_info.Destination] if found == true { ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) @@ -362,7 +369,7 @@ func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source Nod } } if all_unlocked == true { - AddChange[LockableExt](changes, "state") + changes.Add("state") ext.State = Unlocked } case Locking: diff --git a/node.go b/node.go index 57ec34a..c6ba523 100644 --- a/node.go +++ b/node.go @@ -47,27 +47,6 @@ func RandID() NodeID { return NodeID(uuid.New()) } -type Changes map[ExtType][]string - -func AddChange[E any, T interface { *E; Extension}](changes Changes, fields ...string) { - ext_type := ExtType(SerializedTypeFor[E]()) - changes.Add(ext_type, fields...) -} - -func (changes Changes) Add(ext ExtType, fields ...string) { - current, exists := changes[ext] - if exists == false { - current = []string{} - } - current = append(current, fields...) - changes[ext] = current -} - -// Extensions are data attached to nodes that process signals -type Extension interface { - Process(*Context, *Node, NodeID, Signal) (Messages, Changes) -} - // A QueuedSignal is a Signal that has been Queued to trigger at a set time type QueuedSignal struct { Signal `gv:"signal"` @@ -313,8 +292,14 @@ func nodeLoop(ctx *Context, node *Node) error { return fmt.Errorf("%s is already started, will not start again", node.ID) } - // Perform startup actions - node.Process(ctx, ZeroID, NewStartSignal()) + // Load each extension before starting the main loop + for _, extension := range(node.Extensions) { + err := extension.Load(ctx, node) + if err != nil { + return err + } + } + run := true for run == true { var signal Signal @@ -497,17 +482,6 @@ func nodeLoop(ctx *Context, node *Node) error { } switch sig := signal.(type) { - case *StopSignal: - node.Process(ctx, source, signal) - if source == node.ID { - node.Process(ctx, source, NewStoppedSignal(sig, node.ID)) - } else { - msgs := Messages{} - msgs = msgs.Add(ctx, node.ID, node, nil, NewStoppedSignal(sig, node.ID)) - ctx.Send(msgs) - } - run = false - case *ReadSignal: result := node.ReadFields(ctx, sig.Extensions) msgs := Messages{} @@ -530,20 +504,18 @@ func nodeLoop(ctx *Context, node *Node) error { return nil } -func (node *Node) Stop(ctx *Context) error { +func (node *Node) Unload(ctx *Context) error { if node.Active.Load() { - msg, err := NewMessage(ctx, node.ID, node, nil, NewStopSignal()) - if err != nil { - return err + for _, extension := range(node.Extensions) { + extension.Unload(ctx, node) } - node.MsgChan <- msg return nil } else { return fmt.Errorf("Node not active") } } -func (node *Node) QueueChanges(ctx *Context, changes Changes) error { +func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error { node.QueueSignal(time.Now(), NewStatusSignal(node.ID, changes)) return nil } @@ -551,18 +523,15 @@ func (node *Node) QueueChanges(ctx *Context, changes Changes) error { func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) messages := Messages{} - changes := Changes{} + changes := map[ExtType]Changes{} for ext_type, ext := range(node.Extensions) { ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) ext_messages, ext_changes := ext.Process(ctx, node, source, signal) if len(ext_messages) != 0 { messages = append(messages, ext_messages...) } - if len(ext_changes) != 0 { - for ext, change_list := range(ext_changes) { - changes[ext] = append(changes[ext], change_list...) - } + changes[ext_type] = ext_changes } } ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes) @@ -575,17 +544,14 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { } if len(changes) != 0 { - _, ok := signal.(*StoppedSignal) - if (ok == false) || (source != node.ID) { - write_err := WriteNodeChanges(ctx, node, changes) - if write_err != nil { - return write_err - } + write_err := WriteNodeChanges(ctx, node, changes) + if write_err != nil { + return write_err + } - status_err := node.QueueChanges(ctx, changes) - if status_err != nil { - return status_err - } + status_err := node.QueueChanges(ctx, changes) + if status_err != nil { + return status_err } } @@ -657,7 +623,6 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size return nil, fmt.Errorf("Node type %+v not registered in Context", node_type) } - changes := Changes{} ext_map := map[ExtType]Extension{} for _, ext := range(extensions) { ext_type, exists := ctx.ExtensionTypes[reflect.TypeOf(ext)] @@ -669,7 +634,6 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size return nil, fmt.Errorf("Cannot add the same extension to a node twice") } ext_map[ext_type] = ext - changes.Add(ext_type, "init") } for _, required_ext := range(def.Extensions) { @@ -700,18 +664,12 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size } node.writeSignalQueue = true - err = WriteNodeChanges(ctx, node, changes) + err = WriteNodeInit(ctx, node) if err != nil { return nil, err } ctx.AddNode(id, node) - - err = node.Process(ctx, ZeroID, NewCreateSignal()) - if err != nil { - return nil, err - } - go runNode(ctx, node) return node, nil @@ -734,6 +692,8 @@ func WriteNodeExtList(ctx *Context, node *Node) error { i += 1 } + ctx.Log.Logf("db", "Writing ext_list for %s - %+v", node.ID, ext_list) + id_bytes, err := node.ID.MarshalBinary() if err != nil { return err @@ -749,7 +709,54 @@ func WriteNodeExtList(ctx *Context, node *Node) error { }) } -func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error { +func WriteNodeInit(ctx *Context, node *Node) error { + ctx.Log.Logf("db", "Writing initial entry for %s - %+v", node.ID, node) + + ext_serialized := map[ExtType]SerializedValue{} + for ext_type, ext := range(node.Extensions) { + serialized_ext, err := SerializeAny(ctx, ext) + if err != nil { + return err + } + ext_serialized[ext_type] = serialized_ext + } + + sq_serialized, err := SerializeAny(ctx, node.SignalQueue) + if err != nil { + return err + } + + node_serialized, err := SerializeAny(ctx, node) + if err != nil { + return err + } + + id_bytes, err := node.ID.MarshalBinary() + + return ctx.DB.Update(func(txn *badger.Txn) error { + err := txn.Set(id_bytes, node_serialized.Data) + if err != nil { + return nil + } + + err = txn.Set(append(id_bytes, signal_queue_suffix...), sq_serialized.Data) + if err != nil { + return err + } + + for ext_type, data := range(ext_serialized) { + err := txn.Set(append(id_bytes, ExtTypeSuffix(ext_type)...), data.Data) + if err != nil { + return err + } + } + + return nil + }) + +} + +func WriteNodeChanges(ctx *Context, node *Node, changes map[ExtType]Changes) error { ctx.Log.Logf("db", "Writing changes for %s - %+v", node.ID, changes) ext_serialized := map[ExtType]SerializedValue{} diff --git a/node_test.go b/node_test.go index 05e393f..f78f67b 100644 --- a/node_test.go +++ b/node_test.go @@ -9,7 +9,7 @@ import ( ) func TestNodeDB(t *testing.T) { - ctx := logTestContext(t, []string{"signal", "serialize", "node", "db", "listener"}) + ctx := logTestContext(t, []string{"node", "db"}) node_listener := NewListenerExt(10) node, err := NewNode(ctx, nil, "Base", 10, nil, NewGroupExt(nil), NewLockableExt(nil), node_listener) @@ -23,14 +23,7 @@ func TestNodeDB(t *testing.T) { return false }) - msgs := Messages{} - msgs = msgs.Add(ctx, node.ID, node, nil, NewStopSignal()) - err = ctx.Send(msgs) - fatalErr(t, err) - - _, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StoppedSignal) bool { - return sig.Source == node.ID - }) + err = ctx.Unload(node.ID) fatalErr(t, err) ctx.nodeMap = map[NodeID]*Node{} diff --git a/signal.go b/signal.go index ba9bd1d..46acec9 100644 --- a/signal.go +++ b/signal.go @@ -143,66 +143,6 @@ func NewResponseHeader(req_id uuid.UUID, direction SignalDirection) ResponseHead } } -type CreateSignal struct { - SignalHeader -} - -func (signal CreateSignal) Permission() Tree { - return Tree{ - SerializedType(SignalTypeFor[CreateSignal]()): nil, - } -} - -func NewCreateSignal() *CreateSignal { - return &CreateSignal{ - NewSignalHeader(Direct), - } -} - -type StartSignal struct { - SignalHeader -} -func (signal StartSignal) Permission() Tree { - return Tree{ - SerializedType(SignalTypeFor[StartSignal]()): nil, - } -} -func NewStartSignal() *StartSignal { - return &StartSignal{ - NewSignalHeader(Direct), - } -} - -type StoppedSignal struct { - ResponseHeader - Source NodeID -} -func (signal StoppedSignal) Permission() Tree { - return Tree{ - SerializedType(SignalTypeFor[ResponseSignal]()): nil, - } -} -func NewStoppedSignal(sig *StopSignal, source NodeID) *StoppedSignal { - return &StoppedSignal{ - NewResponseHeader(sig.ID(), Up), - source, - } -} - -type StopSignal struct { - SignalHeader -} -func (signal StopSignal) Permission() Tree { - return Tree{ - SerializedType(SignalTypeFor[StopSignal]()): nil, - } -} -func NewStopSignal() *StopSignal { - return &StopSignal{ - NewSignalHeader(Direct), - } -} - type SuccessSignal struct { ResponseHeader } @@ -263,7 +203,7 @@ func NewACLTimeoutSignal(req_id uuid.UUID) *ACLTimeoutSignal { type StatusSignal struct { SignalHeader Source NodeID `gv:"source"` - Changes Changes `gv:"changes"` + Changes map[ExtType]Changes `gv:"changes"` } func (signal StatusSignal) Permission() Tree { return Tree{ @@ -273,7 +213,7 @@ func (signal StatusSignal) Permission() Tree { func (signal StatusSignal) String() string { return fmt.Sprintf("StatusSignal(%s, %+v)", signal.SignalHeader, signal.Changes) } -func NewStatusSignal(source NodeID, changes Changes) *StatusSignal { +func NewStatusSignal(source NodeID, changes map[ExtType]Changes) *StatusSignal { return &StatusSignal{ NewSignalHeader(Up), source,