Added StoppedSignal as stopped status(no db write happens after processing) and added helper functions to stop nodes.

gql_cataclysm
noah metz 2023-10-06 20:04:53 -06:00
parent 9eadb00397
commit 7451e8e960
3 changed files with 62 additions and 7 deletions

@ -230,6 +230,28 @@ func (ctx *Context) Node(id NodeID) (*Node, bool) {
return node, exists return node, exists
} }
func (ctx *Context) Stop(id NodeID) error {
ctx.nodeMapLock.Lock()
defer ctx.nodeMapLock.Unlock()
node, exists := ctx.nodeMap[id]
if exists == false {
return fmt.Errorf("%s is not a node in ctx", id)
}
err := node.Stop(ctx)
delete(ctx.nodeMap, id)
return err
}
func (ctx *Context) StopAll() {
ctx.nodeMapLock.Lock()
for id, node := range(ctx.nodeMap) {
node.Stop(ctx)
delete(ctx.nodeMap, id)
}
ctx.nodeMapLock.Unlock()
}
// Get a node from the context, or load from the database if not loaded // Get a node from the context, or load from the database if not loaded
func (ctx *Context) getNode(id NodeID) (*Node, error) { func (ctx *Context) getNode(id NodeID) (*Node, error) {
target, exists := ctx.Node(id) target, exists := ctx.Node(id)

@ -415,11 +415,13 @@ func nodeLoop(ctx *Context, node *Node) error {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if source == node.ID {
node.Process(ctx, source, NewStoppedSignal(sig, node.ID))
} else {
msgs := Messages{} msgs := Messages{}
msgs = msgs.Add(ctx, node.ID, node.Key, NewStatusSignal(node.ID, "stopped"), source) msgs = msgs.Add(ctx, node.ID, node.Key, NewStoppedSignal(sig, node.ID), source)
ctx.Send(msgs) ctx.Send(msgs)
node.Process(ctx, node.ID, NewStatusSignal(node.ID, "stopped")) }
run = false run = false
case *ReadSignal: case *ReadSignal:
@ -499,19 +501,34 @@ func NewMessage(ctx *Context, dest NodeID, source NodeID, principal ed25519.Priv
}, nil }, nil
} }
func (node *Node) Stop(ctx *Context) error {
if node.Active.Load() {
msg, err := NewMessage(ctx, node.ID, node.ID, node.Key, NewStopSignal())
if err != nil {
return err
}
node.MsgChan <- msg
return nil
} else {
return fmt.Errorf("Node not active")
}
}
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal) ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
messages := Messages{} messages := Messages{}
for ext_type, ext := range(node.Extensions) { for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type) ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
//TODO: add extension and node info to log
resp := ext.Process(ctx, node, source, signal) resp := ext.Process(ctx, node, source, signal)
if resp != nil { if resp != nil {
messages = append(messages, resp...) messages = append(messages, resp...)
} }
} }
if len(messages) != 0 {
return ctx.Send(messages) return ctx.Send(messages)
}
return nil
} }
func GetCtx[C any](ctx *Context, ext_type ExtType) (C, error) { func GetCtx[C any](ctx *Context, ext_type ExtType) (C, error) {

@ -167,6 +167,22 @@ func NewStartSignal() *StartSignal {
} }
} }
type StoppedSignal struct {
ResponseHeader
Source NodeID
}
func (signal StoppedSignal) Permission() Tree {
return Tree{
ResponseType: nil,
}
}
func NewStoppedSignal(sig *StopSignal, source NodeID) *StoppedSignal {
return &StoppedSignal{
NewResponseHeader(sig.ID(), Up),
source,
}
}
type StopSignal struct { type StopSignal struct {
SignalHeader SignalHeader
} }