diff --git a/gql_test.go b/gql_test.go index 5629b7c..5c64ad7 100644 --- a/gql_test.go +++ b/gql_test.go @@ -10,10 +10,11 @@ import ( "net" "crypto/tls" "bytes" + "github.com/google/uuid" ) func TestGQL(t *testing.T) { - ctx := logTestContext(t, []string{"test", "gql", "policy"}) + ctx := logTestContext(t, []string{"node", "test", "gql", "policy"}) TestNodeType := NodeType("TEST") err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType, ACLExtType}) @@ -23,18 +24,14 @@ func TestGQL(t *testing.T) { fatalErr(t, err) listener_ext := NewListenerExt(10) policy := NewAllNodesPolicy(Actions{MakeAction("+")}) - gql := NewNode(ctx, nil, GQLNodeType, 10, nil, NewLockableExt(), NewACLExt(policy), gql_ext, NewGroupExt(nil)) - n1 := NewNode(ctx, nil, TestNodeType, 10, nil, NewLockableExt(), NewACLExt(policy), listener_ext) + gql := NewNode(ctx, nil, GQLNodeType, 10, []QueuedSignal{ + QueuedSignal{uuid.New(), StateSignal{NewDirectSignal(GQLStateSignalType), "start_server"}, time.Now()}, + }, NewLockableExt(), NewACLExt(policy), gql_ext, NewGroupExt(nil), listener_ext) + n1 := NewNode(ctx, nil, TestNodeType, 10, nil, NewLockableExt(), NewACLExt(policy)) err = LinkRequirement(ctx, gql.ID, n1.ID) fatalErr(t, err) - _, err = WaitForSignal(ctx, listener_ext, time.Millisecond*10, LinkSignalType, func(sig StateSignal) bool { - return sig.State == "linked_as_dep" - }) - fatalErr(t, err) - - ctx.Send(n1.ID, gql.ID, StateSignal{NewDirectSignal(GQLStateSignalType), "start_server"}) _, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StateSignal) bool { return sig.State == "server_started" }) diff --git a/node.go b/node.go index 1b1b02b..1c046e6 100644 --- a/node.go +++ b/node.go @@ -94,8 +94,9 @@ type Extension interface { // A QueuedSignal is a Signal that has been Queued to trigger at a set time type QueuedSignal struct { - Signal Signal - Time time.Time + uuid.UUID + Signal + time.Time } // Default message channel size for nodes @@ -119,9 +120,11 @@ type Node struct { NextSignal *QueuedSignal } -func (node *Node) QueueSignal(time time.Time, signal Signal) { - node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) +func (node *Node) QueueSignal(time time.Time, signal Signal) uuid.UUID { + id := uuid.New() + node.SignalQueue = append(node.SignalQueue, QueuedSignal{id, signal, time}) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) + return id } func (node *Node) ClearSignalQueue() { @@ -204,6 +207,20 @@ func nodeLoop(ctx *Context, node *Node) error { case <-node.TimeoutChan: signal = node.NextSignal.Signal source = node.ID + i := -1 + for j, queued := range(node.SignalQueue) { + if queued.UUID == node.NextSignal.UUID { + i = j + break + } + } + if i == -1 { + panic("node.NextSignal not in node.SignalQueue") + } + l := len(node.SignalQueue) + node.SignalQueue[i] = node.SignalQueue[l-1] + node.SignalQueue = node.SignalQueue[:(l-1)] + node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) ctx.Log.Logf("node", "NODE_TIMEOUT %s - NEXT_SIGNAL: %s", node.ID, signal) }