diff --git a/thread.go b/thread.go index 2209109..06015bd 100644 --- a/thread.go +++ b/thread.go @@ -160,8 +160,8 @@ func NewChildInfo(child ThreadNode, infos map[InfoType]interface{}) ChildInfo { } type QueuedAction struct { - Timeout time.Time - Action string + Timeout time.Time `json:"time"` + Action string `json:"action"` } type ThreadNode interface { @@ -185,9 +185,13 @@ type Thread struct { Parent ThreadNode Children map[NodeID]ChildInfo InfoTypes []InfoType - TimeoutAction string - Timeout time.Time + ActionQueue []QueuedAction + NextAction *QueuedAction +} +func (thread *Thread) QueueAction(end time.Time, action string) { + thread.ActionQueue = append(thread.ActionQueue, QueuedAction{end, action}) + thread.NextAction, thread.TimeoutChan = thread.SoonestAction() } func (thread *Thread) ThreadHandle() *Thread { @@ -216,8 +220,7 @@ func (thread *Thread) ChildList() []ThreadNode { type ThreadJSON struct { Parent string `json:"parent"` Children map[string]map[string]interface{} `json:"children"` - Timeout time.Time `json:"timeout"` - TimeoutAction string `json:"timeout_action"` + ActionQueue []QueuedAction `json:"action_queue"` StateName string `json:"state_name"` LockableJSON } @@ -242,8 +245,7 @@ func NewThreadJSON(thread *Thread) ThreadJSON { return ThreadJSON{ Parent: parent_id, Children: children, - Timeout: thread.Timeout, - TimeoutAction: thread.TimeoutAction, + ActionQueue: thread.ActionQueue, StateName: thread.StateName, LockableJSON: lockable_json, } @@ -267,11 +269,24 @@ func LoadThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, erro return &thread, nil } -func RestoreThread(ctx *Context, thread *Thread, j ThreadJSON, nodes NodeMap) error { - if j.TimeoutAction != "" { - thread.Timeout = j.Timeout - thread.TimeoutAction = j.TimeoutAction +func (thread *Thread) SoonestAction() (*QueuedAction, <-chan time.Time) { + var soonest_action *QueuedAction + var soonest_time time.Time + for _, action := range(thread.ActionQueue) { + if action.Timeout.Compare(soonest_time) == -1 { + soonest_action = &action + } + } + if soonest_action != nil { + return soonest_action, time.After(time.Until(soonest_action.Timeout)) + } else { + return nil, nil } +} + +func RestoreThread(ctx *Context, thread *Thread, j ThreadJSON, nodes NodeMap) error { + thread.ActionQueue = j.ActionQueue + thread.NextAction, thread.TimeoutChan = thread.SoonestAction() if j.Parent != "" { parent_id, err := ParseID(j.Parent) @@ -574,7 +589,7 @@ func ThreadStart(ctx * Context, node ThreadNode) (string, error) { func ThreadWait(ctx * Context, node ThreadNode) (string, error) { thread := node.ThreadHandle() - ctx.Log.Logf("thread", "THREAD_WAIT: %s TIMEOUT: %+v", thread.ID(), thread.Timeout) + ctx.Log.Logf("thread", "THREAD_WAIT: %s", thread.ID()) for { select { case signal := <- thread.Chan: @@ -590,10 +605,8 @@ func ThreadWait(ctx * Context, node ThreadNode) (string, error) { timeout_action := "" context := NewWriteContext(ctx) err := UpdateStates(context, thread, NewLockMap(NewLockInfo(thread, []string{"timeout"})), func(context *StateContext) error { - timeout_action = thread.TimeoutAction - thread.TimeoutChan = nil - thread.TimeoutAction = "" - thread.Timeout = time.Time{} + timeout_action = thread.NextAction.Action + thread.NextAction, thread.TimeoutChan = thread.SoonestAction() return nil }) if err != nil {