From 7451e8e9608beb8d3bad4f84d2c9b1a2b1177f35 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Fri, 6 Oct 2023 20:04:53 -0600 Subject: [PATCH] Added StoppedSignal as stopped status(no db write happens after processing) and added helper functions to stop nodes. --- context.go | 22 ++++++++++++++++++++++ node.go | 31 ++++++++++++++++++++++++------- signal.go | 16 ++++++++++++++++ 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/context.go b/context.go index 90344b4..2588761 100644 --- a/context.go +++ b/context.go @@ -230,6 +230,28 @@ func (ctx *Context) Node(id NodeID) (*Node, bool) { return node, exists } +func (ctx *Context) Stop(id NodeID) error { + ctx.nodeMapLock.Lock() + defer ctx.nodeMapLock.Unlock() + node, exists := ctx.nodeMap[id] + if exists == false { + return fmt.Errorf("%s is not a node in ctx", id) + } + + err := node.Stop(ctx) + delete(ctx.nodeMap, id) + return err +} + +func (ctx *Context) StopAll() { + ctx.nodeMapLock.Lock() + for id, node := range(ctx.nodeMap) { + node.Stop(ctx) + delete(ctx.nodeMap, id) + } + ctx.nodeMapLock.Unlock() +} + // Get a node from the context, or load from the database if not loaded func (ctx *Context) getNode(id NodeID) (*Node, error) { target, exists := ctx.Node(id) diff --git a/node.go b/node.go index 8eda196..9350b94 100644 --- a/node.go +++ b/node.go @@ -415,11 +415,13 @@ func nodeLoop(ctx *Context, node *Node) error { if err != nil { panic(err) } - - msgs := Messages{} - msgs = msgs.Add(ctx, node.ID, node.Key, NewStatusSignal(node.ID, "stopped"), source) - ctx.Send(msgs) - node.Process(ctx, node.ID, NewStatusSignal(node.ID, "stopped")) + if source == node.ID { + node.Process(ctx, source, NewStoppedSignal(sig, node.ID)) + } else { + msgs := Messages{} + msgs = msgs.Add(ctx, node.ID, node.Key, NewStoppedSignal(sig, node.ID), source) + ctx.Send(msgs) + } run = false case *ReadSignal: @@ -499,19 +501,34 @@ func NewMessage(ctx *Context, dest NodeID, source NodeID, principal ed25519.Priv }, nil } +func (node *Node) Stop(ctx *Context) error { + if node.Active.Load() { + msg, err := NewMessage(ctx, node.ID, node.ID, node.Key, NewStopSignal()) + if err != nil { + return err + } + node.MsgChan <- msg + return nil + } else { + return fmt.Errorf("Node not active") + } +} + func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) messages := Messages{} for ext_type, ext := range(node.Extensions) { ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) - //TODO: add extension and node info to log resp := ext.Process(ctx, node, source, signal) if resp != nil { messages = append(messages, resp...) } } - return ctx.Send(messages) + if len(messages) != 0 { + return ctx.Send(messages) + } + return nil } func GetCtx[C any](ctx *Context, ext_type ExtType) (C, error) { diff --git a/signal.go b/signal.go index f848457..82d65ee 100644 --- a/signal.go +++ b/signal.go @@ -167,6 +167,22 @@ func NewStartSignal() *StartSignal { } } +type StoppedSignal struct { + ResponseHeader + Source NodeID +} +func (signal StoppedSignal) Permission() Tree { + return Tree{ + ResponseType: nil, + } +} +func NewStoppedSignal(sig *StopSignal, source NodeID) *StoppedSignal { + return &StoppedSignal{ + NewResponseHeader(sig.ID(), Up), + source, + } +} + type StopSignal struct { SignalHeader }