2023-06-23 20:56:09 -06:00
|
|
|
package graphvent
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"time"
|
2023-06-26 21:20:04 -06:00
|
|
|
"sync"
|
2023-06-23 20:56:09 -06:00
|
|
|
"errors"
|
|
|
|
"reflect"
|
|
|
|
"encoding/json"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Update the threads listeners, and notify the parent to do the same
|
|
|
|
func (thread * BaseThread) PropagateUpdate(ctx * GraphContext, signal GraphSignal) {
|
2023-06-28 21:49:23 -06:00
|
|
|
UseStates(ctx, []GraphNode{thread}, func(states NodeStateMap) (error) {
|
|
|
|
thread_state := states[thread.ID()].(ThreadState)
|
2023-06-23 20:56:09 -06:00
|
|
|
if signal.Direction() == Up {
|
2023-06-24 19:48:59 -06:00
|
|
|
// Child->Parent, thread updates parent and connected requirement
|
2023-06-23 20:56:09 -06:00
|
|
|
if thread_state.Parent() != nil {
|
|
|
|
SendUpdate(ctx, thread_state.Parent(), signal)
|
|
|
|
}
|
|
|
|
|
2023-06-24 19:48:59 -06:00
|
|
|
for _, dep := range(thread_state.Dependencies()) {
|
|
|
|
SendUpdate(ctx, dep, signal)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
} else if signal.Direction() == Down {
|
2023-06-24 19:48:59 -06:00
|
|
|
// Parent->Child, updates children and dependencies
|
2023-06-23 20:56:09 -06:00
|
|
|
for _, child := range(thread_state.Children()) {
|
|
|
|
SendUpdate(ctx, child, signal)
|
|
|
|
}
|
2023-06-23 21:21:14 -06:00
|
|
|
|
2023-06-24 19:48:59 -06:00
|
|
|
for _, requirement := range(thread_state.Requirements()) {
|
|
|
|
SendUpdate(ctx, requirement, signal)
|
2023-06-23 21:21:14 -06:00
|
|
|
}
|
2023-06-23 20:56:09 -06:00
|
|
|
} else if signal.Direction() == Direct {
|
|
|
|
|
|
|
|
} else {
|
|
|
|
panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction()))
|
|
|
|
}
|
|
|
|
|
2023-06-28 00:48:49 -06:00
|
|
|
return nil
|
2023-06-23 20:56:09 -06:00
|
|
|
})
|
|
|
|
thread.signal <- signal
|
|
|
|
}
|
|
|
|
|
|
|
|
type ThreadInfo interface {
|
|
|
|
}
|
|
|
|
|
|
|
|
// An Thread is a lockable that has an additional parent->child relationship with other Threads
|
|
|
|
// This relationship allows the thread tree to be modified independent of the lockable state
|
|
|
|
type ThreadState interface {
|
|
|
|
LockableState
|
|
|
|
|
|
|
|
Parent() Thread
|
|
|
|
SetParent(parent Thread)
|
|
|
|
Children() []Thread
|
2023-06-26 21:20:04 -06:00
|
|
|
Child(id NodeID) Thread
|
2023-06-23 20:56:09 -06:00
|
|
|
ChildInfo(child NodeID) ThreadInfo
|
|
|
|
AddChild(child Thread, info ThreadInfo) error
|
2023-06-26 21:20:04 -06:00
|
|
|
Start() error
|
|
|
|
Stop() error
|
2023-07-02 12:14:04 -06:00
|
|
|
|
|
|
|
TimeoutAction() string
|
|
|
|
SetTimeout(end_time time.Time, action string)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
type BaseThreadState struct {
|
|
|
|
BaseLockableState
|
|
|
|
parent Thread
|
|
|
|
children []Thread
|
|
|
|
child_info map[NodeID] ThreadInfo
|
2023-06-26 17:03:09 -06:00
|
|
|
InfoType reflect.Type
|
2023-06-26 21:20:04 -06:00
|
|
|
running bool
|
2023-07-02 12:14:04 -06:00
|
|
|
timeout time.Time
|
|
|
|
timeout_action string
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-23 22:19:43 -06:00
|
|
|
type BaseThreadStateJSON struct {
|
|
|
|
Parent *NodeID `json:"parent"`
|
|
|
|
Children map[NodeID]interface{} `json:"children"`
|
2023-07-02 12:17:53 -06:00
|
|
|
Timeout time.Time `json:"timeout"`
|
|
|
|
TimeoutAction string `json:"timeout_action"`
|
2023-07-01 13:03:28 -06:00
|
|
|
BaseLockableStateJSON
|
2023-06-23 22:19:43 -06:00
|
|
|
}
|
|
|
|
|
2023-06-30 13:25:35 -06:00
|
|
|
func SaveBaseThreadState(state * BaseThreadState) BaseThreadStateJSON {
|
2023-06-23 20:56:09 -06:00
|
|
|
children := map[NodeID]interface{}{}
|
|
|
|
for _, child := range(state.children) {
|
|
|
|
children[child.ID()] = state.child_info[child.ID()]
|
|
|
|
}
|
|
|
|
|
|
|
|
var parent_id *NodeID = nil
|
|
|
|
if state.parent != nil {
|
|
|
|
new_str := state.parent.ID()
|
|
|
|
parent_id = &new_str
|
|
|
|
}
|
|
|
|
|
2023-06-30 13:25:35 -06:00
|
|
|
lockable_state := SaveBaseLockableState(&state.BaseLockableState)
|
|
|
|
|
|
|
|
return BaseThreadStateJSON{
|
2023-06-23 20:56:09 -06:00
|
|
|
Parent: parent_id,
|
|
|
|
Children: children,
|
2023-07-02 12:17:53 -06:00
|
|
|
Timeout: state.timeout,
|
|
|
|
TimeoutAction: state.timeout_action,
|
2023-07-01 13:03:28 -06:00
|
|
|
BaseLockableStateJSON: lockable_state,
|
2023-06-30 13:25:35 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, handlers ThreadHandlers) BaseThread {
|
2023-07-01 13:03:28 -06:00
|
|
|
base_lockable := RestoreBaseLockable(ctx, id)
|
|
|
|
thread := BaseThread{
|
|
|
|
BaseLockable: base_lockable,
|
2023-07-02 12:14:04 -06:00
|
|
|
Actions: actions,
|
|
|
|
Handlers: handlers,
|
2023-07-01 13:03:28 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
return thread
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func LoadSimpleThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
|
|
|
|
thread := RestoreBaseThread(ctx, id, BaseThreadActions, BaseThreadHandlers)
|
2023-07-01 13:03:28 -06:00
|
|
|
return &thread, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_nodes NodeMap) (*BaseThreadState, error) {
|
|
|
|
lockable_state, err := RestoreBaseLockableState(ctx, j.BaseLockableStateJSON, loaded_nodes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
state := BaseThreadState{
|
|
|
|
BaseLockableState: *lockable_state,
|
|
|
|
parent: nil,
|
|
|
|
children: make([]Thread, len(j.Children)),
|
|
|
|
child_info: map[NodeID]ThreadInfo{},
|
|
|
|
InfoType: nil,
|
|
|
|
running: false,
|
2023-07-02 12:17:53 -06:00
|
|
|
timeout: j.Timeout,
|
|
|
|
timeout_action: j.TimeoutAction,
|
2023-07-01 13:03:28 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
if j.Parent != nil {
|
|
|
|
p, err := LoadNodeRecurse(ctx, *j.Parent, loaded_nodes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
p_t, ok := p.(Thread)
|
|
|
|
if ok == false {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
state.owner = p_t
|
|
|
|
}
|
|
|
|
|
|
|
|
i := 0
|
|
|
|
for id, info := range(j.Children) {
|
|
|
|
child_node, err := LoadNodeRecurse(ctx, id, loaded_nodes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
child_t, ok := child_node.(Thread)
|
|
|
|
if ok == false {
|
|
|
|
return nil, fmt.Errorf("%+v is not a Thread as expected", child_node)
|
|
|
|
}
|
|
|
|
state.children[i] = child_t
|
|
|
|
state.child_info[id] = info
|
|
|
|
i++
|
|
|
|
}
|
|
|
|
|
|
|
|
return &state, nil
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func LoadSimpleThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(NodeState, error){
|
2023-07-01 13:03:28 -06:00
|
|
|
var j BaseThreadStateJSON
|
|
|
|
err := json.Unmarshal(data, &j)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
state, err := RestoreBaseThreadState(ctx, j, loaded_nodes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return state, nil
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func (state * BaseThreadState) SetTimeout(timeout time.Time, action string) {
|
|
|
|
state.timeout = timeout
|
|
|
|
state.timeout_action = action
|
|
|
|
}
|
|
|
|
|
|
|
|
func (state * BaseThreadState) TimeoutAction() string {
|
|
|
|
return state.timeout_action
|
|
|
|
}
|
|
|
|
|
2023-06-30 13:25:35 -06:00
|
|
|
func (state * BaseThreadState) MarshalJSON() ([]byte, error) {
|
|
|
|
thread_state := SaveBaseThreadState(state)
|
|
|
|
return json.Marshal(&thread_state)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
func (state * BaseThreadState) Start() error {
|
|
|
|
if state.running == true {
|
|
|
|
return fmt.Errorf("Cannot start a running thread")
|
|
|
|
}
|
|
|
|
state.running = true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (state * BaseThreadState) Stop() error {
|
|
|
|
if state.running == false {
|
|
|
|
return fmt.Errorf("Cannot stop a thread that's not running")
|
|
|
|
}
|
|
|
|
state.running = false
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-06-23 20:56:09 -06:00
|
|
|
func (state * BaseThreadState) Parent() Thread {
|
|
|
|
return state.parent
|
|
|
|
}
|
|
|
|
|
|
|
|
func (state * BaseThreadState) SetParent(parent Thread) {
|
|
|
|
state.parent = parent
|
|
|
|
}
|
|
|
|
|
|
|
|
func (state * BaseThreadState) Children() []Thread {
|
|
|
|
return state.children
|
|
|
|
}
|
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
func (state * BaseThreadState) Child(id NodeID) Thread {
|
|
|
|
for _, child := range(state.children) {
|
|
|
|
if child.ID() == id {
|
|
|
|
return child
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
2023-06-23 20:56:09 -06:00
|
|
|
func (state * BaseThreadState) ChildInfo(child NodeID) ThreadInfo {
|
|
|
|
return state.child_info[child]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (state * BaseThreadState) AddChild(child Thread, info ThreadInfo) error {
|
|
|
|
if child == nil {
|
|
|
|
return fmt.Errorf("Will not connect nil to the thread tree")
|
|
|
|
}
|
|
|
|
|
|
|
|
_, exists := state.child_info[child.ID()]
|
|
|
|
if exists == true {
|
|
|
|
return fmt.Errorf("Will not connect the same child twice")
|
|
|
|
}
|
|
|
|
|
2023-06-26 17:03:09 -06:00
|
|
|
if info == nil && state.InfoType != nil {
|
2023-06-23 20:56:09 -06:00
|
|
|
return fmt.Errorf("nil info passed when expecting info")
|
|
|
|
} else if info != nil {
|
2023-06-26 17:03:09 -06:00
|
|
|
if reflect.TypeOf(info) != state.InfoType {
|
|
|
|
return fmt.Errorf("info type mismatch, expecting %+v", state.InfoType)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
state.children = append(state.children, child)
|
|
|
|
state.child_info[child.ID()] = info
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-06-25 13:39:00 -06:00
|
|
|
func checkIfChild(ctx * GraphContext, thread_id NodeID, cur_state ThreadState, cur_id NodeID) bool {
|
2023-06-23 20:56:09 -06:00
|
|
|
for _, child := range(cur_state.Children()) {
|
|
|
|
if child.ID() == thread_id {
|
|
|
|
return true
|
|
|
|
}
|
2023-06-28 00:48:49 -06:00
|
|
|
is_child := false
|
2023-06-28 21:49:23 -06:00
|
|
|
UseStates(ctx, []GraphNode{child}, func(states NodeStateMap) (error) {
|
|
|
|
child_state := states[child.ID()].(ThreadState)
|
2023-06-28 00:48:49 -06:00
|
|
|
is_child = checkIfChild(ctx, cur_id, child_state, child.ID())
|
|
|
|
return nil
|
2023-06-23 20:56:09 -06:00
|
|
|
})
|
|
|
|
if is_child {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func LinkThreads(ctx * GraphContext, thread Thread, child Thread, info ThreadInfo) error {
|
|
|
|
if ctx == nil || thread == nil || child == nil {
|
|
|
|
return fmt.Errorf("invalid input")
|
|
|
|
}
|
|
|
|
|
|
|
|
if thread.ID() == child.ID() {
|
|
|
|
return fmt.Errorf("Will not link %s as a child of itself", thread.ID())
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2023-06-28 23:51:44 -06:00
|
|
|
err := UpdateStates(ctx, []GraphNode{thread, child}, func(nodes NodeMap) error {
|
|
|
|
thread_state := thread.State().(ThreadState)
|
|
|
|
child_state := child.State().(ThreadState)
|
2023-06-23 20:56:09 -06:00
|
|
|
|
|
|
|
if child_state.Parent() != nil {
|
2023-06-28 21:49:23 -06:00
|
|
|
return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID())
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-25 13:39:00 -06:00
|
|
|
if checkIfChild(ctx, thread.ID(), child_state, child.ID()) == true {
|
2023-06-28 21:49:23 -06:00
|
|
|
return fmt.Errorf("EVENT_LINK_ERR: %s is a child of %s so cannot add as parent", thread.ID(), child.ID())
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-25 13:39:00 -06:00
|
|
|
if checkIfChild(ctx, child.ID(), thread_state, thread.ID()) == true {
|
2023-06-28 21:49:23 -06:00
|
|
|
return fmt.Errorf("EVENT_LINK_ERR: %s is already a parent of %s so will not add again", thread.ID(), child.ID())
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
err := thread_state.AddChild(child, info)
|
|
|
|
if err != nil {
|
2023-06-28 21:49:23 -06:00
|
|
|
return fmt.Errorf("EVENT_LINK_ERR: error adding %s as child to %s: %e", child.ID(), thread.ID(), err)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
child_state.SetParent(thread)
|
|
|
|
|
2023-06-28 21:49:23 -06:00
|
|
|
return nil
|
2023-06-23 20:56:09 -06:00
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
SendUpdate(ctx, thread, NewSignal(child, "child_added"))
|
|
|
|
|
2023-06-23 20:56:09 -06:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Thread is the interface that thread tree nodes must implement
|
|
|
|
type Thread interface {
|
2023-06-24 19:48:59 -06:00
|
|
|
Lockable
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
Action(action string) (ThreadAction, bool)
|
|
|
|
Handler(signal_type string) (ThreadHandler, bool)
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-07-02 12:23:35 -06:00
|
|
|
SetTimeout(end time.Time)
|
2023-06-23 20:56:09 -06:00
|
|
|
Timeout() <-chan time.Time
|
2023-07-02 12:14:04 -06:00
|
|
|
ClearTimeout()
|
2023-06-26 21:20:04 -06:00
|
|
|
|
|
|
|
ChildWaits() *sync.WaitGroup
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-07-02 09:05:34 -06:00
|
|
|
func FindChild(ctx * GraphContext, thread Thread, thread_state ThreadState, id NodeID, states NodeStateMap) Thread {
|
2023-06-23 20:56:09 -06:00
|
|
|
if thread == nil {
|
|
|
|
panic("cannot recurse through nil")
|
|
|
|
}
|
|
|
|
if id == thread.ID() {
|
|
|
|
return thread
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, child := range thread_state.Children() {
|
2023-06-28 00:48:49 -06:00
|
|
|
var result Thread = nil
|
2023-07-02 09:05:34 -06:00
|
|
|
UseMoreStates(ctx, []GraphNode{child}, states, func(states NodeStateMap) (error) {
|
2023-06-28 21:49:23 -06:00
|
|
|
child_state := states[child.ID()].(ThreadState)
|
2023-07-02 09:05:34 -06:00
|
|
|
result = FindChild(ctx, child, child_state, id, states)
|
2023-06-28 00:48:49 -06:00
|
|
|
return nil
|
2023-06-23 20:56:09 -06:00
|
|
|
})
|
|
|
|
if result != nil {
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:47:45 -06:00
|
|
|
func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_id NodeID, first_action string) {
|
2023-06-26 21:20:04 -06:00
|
|
|
child := thread_state.Child(child_id)
|
|
|
|
if child == nil {
|
|
|
|
panic(fmt.Errorf("Child not in thread, can't start %s", child_id))
|
|
|
|
}
|
|
|
|
thread.ChildWaits().Add(1)
|
|
|
|
go func(child Thread) {
|
|
|
|
ctx.Log.Logf("gql", "THREAD_START_CHILD: %s", child.ID())
|
|
|
|
defer thread.ChildWaits().Done()
|
2023-07-02 12:47:45 -06:00
|
|
|
err := RunThread(ctx, child, first_action)
|
2023-06-26 21:20:04 -06:00
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("gql", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err)
|
|
|
|
} else {
|
|
|
|
ctx.Log.Logf("gql", "THREAD_CHILD_RUN_DONE: %s", child.ID())
|
|
|
|
}
|
|
|
|
}(child)
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:47:45 -06:00
|
|
|
func RunThread(ctx * GraphContext, thread Thread, first_action string) error {
|
2023-06-26 21:20:04 -06:00
|
|
|
ctx.Log.Logf("thread", "THREAD_RUN: %s", thread.ID())
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-06-28 23:51:44 -06:00
|
|
|
err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) {
|
2023-07-02 11:26:58 -06:00
|
|
|
thread_state := thread.State().(ThreadState)
|
|
|
|
owner_id := NodeID("")
|
|
|
|
if thread_state.Owner() != nil {
|
|
|
|
owner_id = thread_state.Owner().ID()
|
|
|
|
}
|
|
|
|
if owner_id != thread.ID() {
|
|
|
|
return LockLockables(ctx, []Lockable{thread}, thread, nil, nodes)
|
|
|
|
}
|
|
|
|
return nil
|
2023-06-28 23:51:44 -06:00
|
|
|
})
|
2023-06-24 19:48:59 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-28 21:49:23 -06:00
|
|
|
err = UseStates(ctx, []GraphNode{thread}, func(states NodeStateMap) (error) {
|
|
|
|
thread_state := states[thread.ID()].(ThreadState)
|
2023-06-23 20:56:09 -06:00
|
|
|
if thread_state.Owner() == nil {
|
2023-06-28 00:48:49 -06:00
|
|
|
return fmt.Errorf("THREAD_RUN_NOT_LOCKED: %s", thread_state.Name())
|
2023-06-23 20:56:09 -06:00
|
|
|
} else if thread_state.Owner().ID() != thread.ID() {
|
2023-06-28 00:48:49 -06:00
|
|
|
return fmt.Errorf("THREAD_RUN_RESOURCE_ALREADY_LOCKED: %s, %s", thread_state.Name(), thread_state.Owner().ID())
|
2023-06-26 21:20:04 -06:00
|
|
|
} else if err := thread_state.Start(); err != nil {
|
2023-06-28 00:48:49 -06:00
|
|
|
return fmt.Errorf("THREAD_START_ERR: %e", err)
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
2023-06-28 00:48:49 -06:00
|
|
|
return nil
|
2023-06-23 20:56:09 -06:00
|
|
|
})
|
2023-06-28 00:48:49 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-07-02 12:47:45 -06:00
|
|
|
next_action := first_action
|
2023-06-23 20:56:09 -06:00
|
|
|
for next_action != "" {
|
|
|
|
action, exists := thread.Action(next_action)
|
|
|
|
if exists == false {
|
|
|
|
error_str := fmt.Sprintf("%s is not a valid action", next_action)
|
|
|
|
return errors.New(error_str)
|
|
|
|
}
|
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
ctx.Log.Logf("thread", "THREAD_ACTION: %s - %s", thread.ID(), next_action)
|
2023-06-23 21:21:14 -06:00
|
|
|
next_action, err = action(ctx, thread)
|
2023-06-23 20:56:09 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-28 21:49:23 -06:00
|
|
|
err = UseStates(ctx, []GraphNode{thread}, func(states NodeStateMap) (error) {
|
|
|
|
thread_state := states[thread.ID()].(ThreadState)
|
2023-06-26 21:20:04 -06:00
|
|
|
err := thread_state.Stop()
|
2023-06-28 00:48:49 -06:00
|
|
|
return err
|
2023-06-26 23:15:40 -06:00
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_RUN_STOP_ERR: %e", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-28 23:51:44 -06:00
|
|
|
err = UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) {
|
|
|
|
return UnlockLockables(ctx, []Lockable{thread}, thread, nil, nodes)
|
|
|
|
})
|
2023-06-26 23:15:40 -06:00
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_RUN_UNLOCK_ERR: %e", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-23 20:56:09 -06:00
|
|
|
SendUpdate(ctx, thread, NewSignal(thread, "thread_done"))
|
|
|
|
|
2023-06-26 21:20:04 -06:00
|
|
|
ctx.Log.Logf("thread", "THREAD_RUN_DONE: %s", thread.ID())
|
2023-06-23 20:56:09 -06:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
type ThreadAction func(* GraphContext, Thread)(string, error)
|
|
|
|
type ThreadActions map[string]ThreadAction
|
|
|
|
type ThreadHandler func(* GraphContext, Thread, GraphSignal)(string, error)
|
|
|
|
type ThreadHandlers map[string]ThreadHandler
|
2023-06-23 20:56:09 -06:00
|
|
|
|
|
|
|
// Thread is the most basic thread that can exist in the thread tree.
|
|
|
|
// On start it automatically transitions to completion.
|
|
|
|
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
|
|
|
|
// When started, this thread automatically transitions to completion
|
|
|
|
type BaseThread struct {
|
2023-06-24 19:48:59 -06:00
|
|
|
BaseLockable
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
Actions ThreadActions
|
|
|
|
Handlers ThreadHandlers
|
2023-06-23 20:56:09 -06:00
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
timeout_chan <-chan time.Time
|
2023-06-26 21:20:04 -06:00
|
|
|
child_waits sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
func (thread * BaseThread) ChildWaits() *sync.WaitGroup {
|
|
|
|
return &thread.child_waits
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-28 00:48:49 -06:00
|
|
|
func (thread * BaseThread) CanLock(node GraphNode, state LockableState) error {
|
2023-06-23 20:56:09 -06:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-06-28 00:48:49 -06:00
|
|
|
func (thread * BaseThread) CanUnlock(node GraphNode, state LockableState) error {
|
2023-06-23 20:56:09 -06:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-06-28 00:48:49 -06:00
|
|
|
func (thread * BaseThread) Lock(node GraphNode, state LockableState) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (thread * BaseThread) Unlock(node GraphNode, state LockableState) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
func (thread * BaseThread) Action(action string) (ThreadAction, bool) {
|
2023-06-23 20:56:09 -06:00
|
|
|
action_fn, exists := thread.Actions[action]
|
|
|
|
return action_fn, exists
|
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
func (thread * BaseThread) Handler(signal_type string) (ThreadHandler, bool) {
|
|
|
|
handler, exists := thread.Handlers[signal_type]
|
|
|
|
return handler, exists
|
|
|
|
}
|
|
|
|
|
|
|
|
func (thread * BaseThread) Timeout() <-chan time.Time {
|
2023-07-02 12:14:04 -06:00
|
|
|
return thread.timeout_chan
|
2023-06-23 21:21:14 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
func (thread * BaseThread) ClearTimeout() {
|
2023-07-02 12:14:04 -06:00
|
|
|
thread.timeout_chan = nil
|
2023-06-23 21:21:14 -06:00
|
|
|
}
|
|
|
|
|
2023-07-02 12:23:35 -06:00
|
|
|
func (thread * BaseThread) SetTimeout(end time.Time) {
|
|
|
|
thread.timeout_chan = time.After(time.Until(end))
|
2023-06-23 21:21:14 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
var ThreadDefaultStart = func(ctx * GraphContext, thread Thread) (string, error) {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_DEFAUL_START: %s", thread.ID())
|
|
|
|
return "wait", nil
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:47:45 -06:00
|
|
|
var ThreadDefaultRestore = func(ctx * GraphContext, thread Thread) (string, error) {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_DEFAULT_RESTORE: %s", thread.ID())
|
|
|
|
return "wait", nil
|
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
var ThreadWait = func(ctx * GraphContext, thread Thread) (string, error) {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_WAIT: %s TIMEOUT: %+v", thread.ID(), thread.Timeout())
|
2023-06-24 19:48:59 -06:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case signal := <- thread.SignalChannel():
|
|
|
|
if signal.Source() == thread.ID() {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_SIGNAL_INTERNAL")
|
2023-06-25 21:00:00 -06:00
|
|
|
} else {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_SIGNAL: %s %+v", thread.ID(), signal)
|
2023-06-24 19:48:59 -06:00
|
|
|
}
|
|
|
|
signal_fn, exists := thread.Handler(signal.Type())
|
|
|
|
if exists == true {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_HANDLER: %s - %s", thread.ID(), signal.Type())
|
|
|
|
return signal_fn(ctx, thread, signal)
|
2023-06-25 21:00:00 -06:00
|
|
|
} else {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_NOHANDLER: %s - %s", thread.ID(), signal.Type())
|
2023-06-24 19:48:59 -06:00
|
|
|
}
|
|
|
|
case <- thread.Timeout():
|
2023-07-02 12:14:04 -06:00
|
|
|
timeout_action := ""
|
|
|
|
err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) error {
|
|
|
|
thread_state := thread.State().(ThreadState)
|
|
|
|
timeout_action = thread_state.TimeoutAction()
|
|
|
|
thread.ClearTimeout()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("thread", "THREAD_TIMEOUT_ERR: %s - %e", thread.ID(), err)
|
|
|
|
}
|
|
|
|
ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), timeout_action)
|
|
|
|
return timeout_action, nil
|
2023-06-24 19:48:59 -06:00
|
|
|
}
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
2023-06-23 21:21:14 -06:00
|
|
|
return "wait", nil
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
var ThreadAbort = func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) {
|
|
|
|
return "", fmt.Errorf("%s aborted by signal from %s", thread.ID(), signal.Source())
|
|
|
|
}
|
|
|
|
|
|
|
|
var ThreadCancel = func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) {
|
|
|
|
return "", nil
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
|
2023-06-28 21:49:23 -06:00
|
|
|
func NewBaseThreadState(name string, _type string) BaseThreadState {
|
2023-06-23 20:56:09 -06:00
|
|
|
return BaseThreadState{
|
2023-06-28 21:49:23 -06:00
|
|
|
BaseLockableState: NewBaseLockableState(name, _type),
|
2023-06-23 20:56:09 -06:00
|
|
|
children: []Thread{},
|
|
|
|
child_info: map[NodeID]ThreadInfo{},
|
|
|
|
parent: nil,
|
2023-07-02 12:14:04 -06:00
|
|
|
timeout: time.Time{},
|
|
|
|
timeout_action: "wait",
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func NewThreadActions() ThreadActions{
|
|
|
|
actions := ThreadActions{}
|
|
|
|
for k, v := range(BaseThreadActions) {
|
|
|
|
actions[k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
return actions
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewThreadHandlers() ThreadHandlers{
|
|
|
|
handlers := ThreadHandlers{}
|
|
|
|
for k, v := range(BaseThreadHandlers) {
|
|
|
|
handlers[k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
return handlers
|
|
|
|
}
|
|
|
|
|
|
|
|
var BaseThreadActions = ThreadActions{
|
|
|
|
"wait": ThreadWait,
|
|
|
|
"start": ThreadDefaultStart,
|
2023-07-02 12:47:45 -06:00
|
|
|
"restore": ThreadDefaultRestore,
|
2023-07-02 12:14:04 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
var BaseThreadHandlers = ThreadHandlers{
|
|
|
|
"abort": ThreadAbort,
|
|
|
|
"cancel": ThreadCancel,
|
|
|
|
}
|
|
|
|
|
2023-06-25 20:20:59 -06:00
|
|
|
func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
|
2023-06-26 01:16:44 -06:00
|
|
|
lockable, err := NewBaseLockable(ctx, state)
|
|
|
|
if err != nil {
|
|
|
|
return BaseThread{}, err
|
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
thread := BaseThread{
|
2023-06-26 01:16:44 -06:00
|
|
|
BaseLockable: lockable,
|
2023-07-02 12:14:04 -06:00
|
|
|
Actions: actions,
|
|
|
|
Handlers: handlers,
|
2023-06-25 13:39:00 -06:00
|
|
|
}
|
|
|
|
|
2023-06-23 21:21:14 -06:00
|
|
|
return thread, nil
|
|
|
|
}
|
|
|
|
|
2023-07-02 12:14:04 -06:00
|
|
|
func NewSimpleThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) {
|
|
|
|
state := NewBaseThreadState(name, "simple_thread")
|
|
|
|
|
2023-06-25 20:20:59 -06:00
|
|
|
thread, err := NewBaseThread(ctx, actions, handlers, &state)
|
2023-06-23 20:56:09 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
thread_ptr := &thread
|
|
|
|
|
2023-06-25 13:39:00 -06:00
|
|
|
err = LinkLockables(ctx, thread_ptr, requirements)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2023-06-23 20:56:09 -06:00
|
|
|
}
|
|
|
|
return thread_ptr, nil
|
|
|
|
}
|