|
|
@ -146,8 +146,9 @@ func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
node_timeout := time.Until(soonest_time)
|
|
|
|
if soonest_signal != nil {
|
|
|
|
if soonest_signal != nil {
|
|
|
|
return soonest_signal, time.After(time.Until(soonest_time))
|
|
|
|
return soonest_signal, time.After(node_timeout)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
return nil, nil
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -209,6 +210,7 @@ func nodeLoop(ctx *Context, node *Node) error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-node.TimeoutChan:
|
|
|
|
case <-node.TimeoutChan:
|
|
|
|
signal = node.NextSignal.Signal
|
|
|
|
signal = node.NextSignal.Signal
|
|
|
|
|
|
|
|
t := node.NextSignal.Time
|
|
|
|
source = node.ID
|
|
|
|
source = node.ID
|
|
|
|
i := -1
|
|
|
|
i := -1
|
|
|
|
for j, queued := range(node.SignalQueue) {
|
|
|
|
for j, queued := range(node.SignalQueue) {
|
|
|
@ -226,9 +228,9 @@ func nodeLoop(ctx *Context, node *Node) error {
|
|
|
|
|
|
|
|
|
|
|
|
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
|
|
|
|
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
|
|
|
|
if node.NextSignal == nil {
|
|
|
|
if node.NextSignal == nil {
|
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v - NEXT_SIGNAL nil", node.ID, signal)
|
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil", node.ID, t, signal)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v - NEXT_SIGNAL: %s@%s", node.ID, signal, node.NextSignal, node.NextSignal.Time)
|
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, t, signal, node.NextSignal, node.NextSignal.Time)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|