diff --git a/context.go b/context.go index 61158be..83b27f4 100644 --- a/context.go +++ b/context.go @@ -312,7 +312,7 @@ func (ctx *Context) getNode(id NodeID) (*Node, error) { return target, nil } -// Route a Signal to dest. Currently only local context routing is supported +// Route Messages to dest. Currently only local context routing is supported func (ctx *Context) Send(messages Messages) error { for _, msg := range(messages) { if msg.Dest == ZeroID { @@ -434,6 +434,16 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } + wait_info_type := reflect.TypeOf(WaitInfo{}) + wait_info_info, err := GetStructInfo(ctx, wait_info_type) + if err != nil { + return nil, err + } + err = ctx.RegisterType(wait_info_type, WaitInfoType, nil, SerializeStruct(wait_info_info), nil, DeserializeStruct(wait_info_info)) + if err != nil { + return nil, err + } + err = ctx.RegisterType(reflect.TypeOf(time.Duration(0)), DurationType, nil, nil, nil, DeserializeUint64[time.Duration]) if err != nil { return nil, err diff --git a/event.go b/event.go index a516aa2..a7fd1ff 100644 --- a/event.go +++ b/event.go @@ -69,6 +69,13 @@ func (signal EventControlSignal) Permission() Tree { } } +func (ext *EventExt) UpdateState(node *Node, state string) { + if ext.State != state { + ext.State = state + node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now())) + } +} + func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { var messages Messages = nil var changes Changes = nil @@ -125,9 +132,8 @@ func (ext *TestEventExt) Process(ctx *Context, node *Node, source NodeID, signal if exists == true { if event_ext.State == info.from_state { ctx.Log.Logf("event", "%s %s->%s", node.ID, info.from_state, info.to_state) - event_ext.State = info.to_state messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id)) - node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, event_ext.State, time.Now())) + event_ext.UpdateState(node, info.to_state) if event_ext.State == "running" { node.QueueSignal(time.Now().Add(ext.Length), NewEventControlSignal("finish")) } diff --git a/node.go b/node.go index e80e3bc..e009fd4 100644 --- a/node.go +++ b/node.go @@ -146,6 +146,38 @@ func (node *Node) Allows(ctx *Context, principal_id NodeID, action Tree)(map[uui return nil, Deny } +type WaitInfo struct { + NodeID NodeID `gv:"node"` + Timeout uuid.UUID `gv:"timeout"` +} + +type WaitMap map[uuid.UUID]WaitInfo + +// Removes a signal from the wait_map and dequeue the associated timeout signal +// Returns the data, and whether or not the ID was found in the wait_map +func (node *Node) ProcessResponse(wait_map WaitMap, response ResponseSignal) (WaitInfo, bool) { + wait_info, is_processed := wait_map[response.ResponseID()] + if is_processed == true { + delete(wait_map, response.ResponseID()) + if response.ID() != wait_info.Timeout { + node.DequeueSignal(wait_info.Timeout) + } + return wait_info, true + } + return WaitInfo{}, false +} + +// Creates a timeout signal for signal, queues it for the node at the timeout, and adds the info to the wait map +func (node *Node) QueueTimeout(dest NodeID, signal Signal, timeout time.Duration, wait_map WaitMap) { + timeout_signal := NewTimeoutSignal(signal.ID()) + node.QueueSignal(time.Now().Add(timeout), timeout_signal) + + wait_map[signal.ID()] = WaitInfo{ + NodeID: dest, + Timeout: timeout_signal.Id, + } +} + func (node *Node) QueueSignal(time time.Time, signal Signal) { node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) diff --git a/serialize.go b/serialize.go index 8bd598e..14751ee 100644 --- a/serialize.go +++ b/serialize.go @@ -257,6 +257,7 @@ var ( ReqStateType = NewSerializedType("REQ_STATE") ReqInfoType = NewSerializedType("REQ_INFO") + WaitInfoType = NewSerializedType("WAIT_INFO") SignalDirectionType = NewSerializedType("SIGNAL_DIRECTION") NodeStructType = NewSerializedType("NODE_STRUCT") QueuedSignalType = NewSerializedType("QUEUED_SIGNAL")