Added start signal, and used it to restore gql server if was previously running

gql_cataclysm
noah metz 2023-07-31 16:25:18 -06:00
parent 064dc72820
commit 0313d6a33f
4 changed files with 30 additions and 7 deletions

@ -1000,18 +1000,35 @@ func (ext *GQLExt) Process(ctx *Context, source NodeID, node *Node, signal Signa
sig := signal.(StringSignal) sig := signal.(StringSignal)
switch sig.Str { switch sig.Str {
case "start_server": case "start_server":
err := ext.StartGQLServer(ctx, node) if ext.State == "stopped" {
if err == nil { err := ext.StartGQLServer(ctx, node)
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_started"}) if err == nil {
ext.State = "running"
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_started"})
}
} }
case "stop_server": case "stop_server":
err := ext.StopGQLServer() if ext.State == "running" {
if err == nil { err := ext.StopGQLServer()
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_stopped"}) if err == nil {
ext.State = "stopped"
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_stopped"})
}
} }
default: default:
ctx.Log.Logf("gql", "unknown gql state %s", sig.Str) ctx.Log.Logf("gql", "unknown gql state %s", sig.Str)
} }
} else if signal.Type() == StartSignalType {
switch ext.State {
case "running":
err := ext.StartGQLServer(ctx, node)
if err == nil {
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_started"})
}
case "stopped":
default:
ctx.Log.Logf("gql", "unknown state to restore from: %s", ext.State)
}
} }
} }
@ -1090,6 +1107,7 @@ func NewGQLExt(ctx *Context, listen string, tls_cert []byte, tls_key []byte, sta
tls_key = ssl_key_pem tls_key = ssl_key_pem
} }
return &GQLExt{ return &GQLExt{
State: state,
Listen: listen, Listen: listen,
resolver_reads: map[uuid.UUID]uuid.UUID{}, resolver_reads: map[uuid.UUID]uuid.UUID{},
resolver_chans: map[uuid.UUID]chan *ReadResultSignal{}, resolver_chans: map[uuid.UUID]chan *ReadResultSignal{},

@ -20,7 +20,7 @@ func TestGQL(t *testing.T) {
err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType, ACLExtType}) err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType, ACLExtType})
fatalErr(t, err) fatalErr(t, err)
gql_ext, err := NewGQLExt(ctx, ":0", nil, nil, "start") gql_ext, err := NewGQLExt(ctx, ":0", nil, nil, "stopped")
fatalErr(t, err) fatalErr(t, err)
listener_ext := NewListenerExt(10) listener_ext := NewListenerExt(10)
policy := NewAllNodesPolicy(Actions{MakeAction("+")}) policy := NewAllNodesPolicy(Actions{MakeAction("+")})

@ -195,6 +195,10 @@ func nodeLoop(ctx *Context, node *Node) error {
if started == false { if started == false {
return fmt.Errorf("%s is already started, will not start again", node.ID) return fmt.Errorf("%s is already started, will not start again", node.ID)
} }
// Queue the signal for extensions to perform startup actions
node.QueueSignal(time.Now(), NewDirectSignal(StartSignalType))
for true { for true {
var signal Signal var signal Signal
var source NodeID var source NodeID

@ -17,6 +17,7 @@ import (
type SignalDirection int type SignalDirection int
const ( const (
StopSignalType SignalType = "STOP" StopSignalType SignalType = "STOP"
StartSignalType = "START"
ErrorSignalType = "ERROR" ErrorSignalType = "ERROR"
StatusSignalType = "STATUS" StatusSignalType = "STATUS"
LinkSignalType = "LINK" LinkSignalType = "LINK"