Fix signal queue

gql_cataclysm
noah metz 2023-07-30 01:29:15 -06:00
parent f56f92a58b
commit 79e40bf3f3
2 changed files with 27 additions and 13 deletions

@ -10,10 +10,11 @@ import (
"net" "net"
"crypto/tls" "crypto/tls"
"bytes" "bytes"
"github.com/google/uuid"
) )
func TestGQL(t *testing.T) { func TestGQL(t *testing.T) {
ctx := logTestContext(t, []string{"test", "gql", "policy"}) ctx := logTestContext(t, []string{"node", "test", "gql", "policy"})
TestNodeType := NodeType("TEST") TestNodeType := NodeType("TEST")
err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType, ACLExtType}) err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType, ACLExtType})
@ -23,18 +24,14 @@ func TestGQL(t *testing.T) {
fatalErr(t, err) fatalErr(t, err)
listener_ext := NewListenerExt(10) listener_ext := NewListenerExt(10)
policy := NewAllNodesPolicy(Actions{MakeAction("+")}) policy := NewAllNodesPolicy(Actions{MakeAction("+")})
gql := NewNode(ctx, nil, GQLNodeType, 10, nil, NewLockableExt(), NewACLExt(policy), gql_ext, NewGroupExt(nil)) gql := NewNode(ctx, nil, GQLNodeType, 10, []QueuedSignal{
n1 := NewNode(ctx, nil, TestNodeType, 10, nil, NewLockableExt(), NewACLExt(policy), listener_ext) QueuedSignal{uuid.New(), StateSignal{NewDirectSignal(GQLStateSignalType), "start_server"}, time.Now()},
}, NewLockableExt(), NewACLExt(policy), gql_ext, NewGroupExt(nil), listener_ext)
n1 := NewNode(ctx, nil, TestNodeType, 10, nil, NewLockableExt(), NewACLExt(policy))
err = LinkRequirement(ctx, gql.ID, n1.ID) err = LinkRequirement(ctx, gql.ID, n1.ID)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForSignal(ctx, listener_ext, time.Millisecond*10, LinkSignalType, func(sig StateSignal) bool {
return sig.State == "linked_as_dep"
})
fatalErr(t, err)
ctx.Send(n1.ID, gql.ID, StateSignal{NewDirectSignal(GQLStateSignalType), "start_server"})
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StateSignal) bool { _, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StateSignal) bool {
return sig.State == "server_started" return sig.State == "server_started"
}) })

@ -94,8 +94,9 @@ type Extension interface {
// A QueuedSignal is a Signal that has been Queued to trigger at a set time // A QueuedSignal is a Signal that has been Queued to trigger at a set time
type QueuedSignal struct { type QueuedSignal struct {
Signal Signal uuid.UUID
Time time.Time Signal
time.Time
} }
// Default message channel size for nodes // Default message channel size for nodes
@ -119,9 +120,11 @@ type Node struct {
NextSignal *QueuedSignal NextSignal *QueuedSignal
} }
func (node *Node) QueueSignal(time time.Time, signal Signal) { func (node *Node) QueueSignal(time time.Time, signal Signal) uuid.UUID {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) id := uuid.New()
node.SignalQueue = append(node.SignalQueue, QueuedSignal{id, signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
return id
} }
func (node *Node) ClearSignalQueue() { func (node *Node) ClearSignalQueue() {
@ -204,6 +207,20 @@ func nodeLoop(ctx *Context, node *Node) error {
case <-node.TimeoutChan: case <-node.TimeoutChan:
signal = node.NextSignal.Signal signal = node.NextSignal.Signal
source = node.ID source = node.ID
i := -1
for j, queued := range(node.SignalQueue) {
if queued.UUID == node.NextSignal.UUID {
i = j
break
}
}
if i == -1 {
panic("node.NextSignal not in node.SignalQueue")
}
l := len(node.SignalQueue)
node.SignalQueue[i] = node.SignalQueue[l-1]
node.SignalQueue = node.SignalQueue[:(l-1)]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
ctx.Log.Logf("node", "NODE_TIMEOUT %s - NEXT_SIGNAL: %s", node.ID, signal) ctx.Log.Logf("node", "NODE_TIMEOUT %s - NEXT_SIGNAL: %s", node.ID, signal)
} }