diff --git a/thread.go b/thread.go deleted file mode 100644 index f3a6b6b..0000000 --- a/thread.go +++ /dev/null @@ -1,723 +0,0 @@ -package graphvent - -import ( - "fmt" - "time" - "sync" - "errors" - "encoding/json" - "crypto/sha512" - "encoding/binary" -) - -type ThreadAction func(*Context, *Node, *ThreadExt)(string, error) -type ThreadActions map[string]ThreadAction -type ThreadHandler func(*Context, *Node, *ThreadExt, Signal)(string, error) -type ThreadHandlers map[SignalType]ThreadHandler - -type InfoType string -func (t InfoType) String() string { - return string(t) -} - -type Info interface { - Serializable[InfoType] -} - -// Data required by a parent thread to restore it's children -type ParentInfo struct { - Start bool `json:"start"` - StartAction string `json:"start_action"` - RestoreAction string `json:"restore_action"` -} - -const ParentInfoType = InfoType("PARENT") -func (info *ParentInfo) Type() InfoType { - return ParentInfoType -} - -func (info *ParentInfo) Serialize() ([]byte, error) { - return json.MarshalIndent(info, "", " ") -} - -type QueuedAction struct { - Timeout time.Time `json:"time"` - Action string `json:"action"` -} - -type ThreadType string -func (thread ThreadType) Hash() uint64 { - hash := sha512.Sum512([]byte(fmt.Sprintf("THREAD: %s", string(thread)))) - return binary.BigEndian.Uint64(hash[(len(hash)-9):(len(hash)-1)]) -} - -type ThreadInfo struct { - Actions ThreadActions - Handlers ThreadHandlers -} - -type InfoLoadFunc func([]byte)(Info, error) -type ThreadExtContext struct { - Types map[ThreadType]ThreadInfo - Loads map[InfoType]InfoLoadFunc -} - -const BaseThreadType = ThreadType("BASE") -func NewThreadExtContext() *ThreadExtContext { - return &ThreadExtContext{ - Types: map[ThreadType]ThreadInfo{ - BaseThreadType: ThreadInfo{ - Actions: BaseThreadActions, - Handlers: BaseThreadHandlers, - }, - }, - Loads: map[InfoType]InfoLoadFunc{ - ParentInfoType: func(data []byte) (Info, error) { - var info ParentInfo - err := json.Unmarshal(data, &info) - if err != nil { - return nil, err - } - - return &info, nil - }, - }, - } -} - -func (ctx *ThreadExtContext) RegisterThreadType(thread_type ThreadType, actions ThreadActions, handlers ThreadHandlers) error { - if actions == nil || handlers == nil { - return fmt.Errorf("Cannot register ThreadType %s with nil actions or handlers", thread_type) - } - - _, exists := ctx.Types[thread_type] - if exists == true { - return fmt.Errorf("ThreadType %s already registered in ThreadExtContext, cannot register again", thread_type) - } - ctx.Types[thread_type] = ThreadInfo{ - Actions: actions, - Handlers: handlers, - } - - return nil -} - -func (ctx *ThreadExtContext) RegisterInfoType(info_type InfoType, load_fn InfoLoadFunc) error { - if load_fn == nil { - return fmt.Errorf("Cannot register %s with nil load_fn", info_type) - } - - _, exists := ctx.Loads[info_type] - if exists == true { - return fmt.Errorf("InfoType %s is already registered in ThreadExtContext, cannot register again", info_type) - } - - ctx.Loads[info_type] = load_fn - return nil -} - -type ThreadExt struct { - Actions ThreadActions - Handlers ThreadHandlers - - ThreadType ThreadType - - SignalChan chan Signal - TimeoutChan <-chan time.Time - - ChildWaits sync.WaitGroup - - ActiveLock sync.Mutex - Active bool - State string - - Parent *Node - Children map[NodeID]ChildInfo - - ActionQueue []QueuedAction - NextAction *QueuedAction -} - -type ThreadExtJSON struct { - State string `json:"state"` - Type string `json:"type"` - Parent string `json:"parent"` - Children map[string]map[string][]byte `json:"children"` - ActionQueue []QueuedAction -} - -func (ext *ThreadExt) Serialize() ([]byte, error) { - children := map[string]map[string][]byte{} - for id, child := range(ext.Children) { - id_str := id.String() - children[id_str] = map[string][]byte{} - for info_type, info := range(child.Infos) { - var err error - children[id_str][string(info_type)], err = info.Serialize() - if err != nil { - return nil, err - } - } - } - - return json.MarshalIndent(&ThreadExtJSON{ - State: ext.State, - Type: string(ext.ThreadType), - Parent: SaveNode(ext.Parent), - Children: children, - ActionQueue: ext.ActionQueue, - }, "", " ") -} - -func NewThreadExt(ctx*Context, thread_type ThreadType, parent *Node, children map[NodeID]ChildInfo, state string, action_queue []QueuedAction) (*ThreadExt, error) { - if children == nil { - children = map[NodeID]ChildInfo{} - } - - if action_queue == nil { - action_queue = []QueuedAction{} - } - - thread_ctx, err := GetCtx[*ThreadExt, *ThreadExtContext](ctx) - if err != nil { - return nil, err - } - type_info, exists := thread_ctx.Types[thread_type] - if exists == false { - return nil, fmt.Errorf("Tried to load thread type %s which is not in context", thread_type) - } - next_action, timeout_chan := SoonestAction(action_queue) - - return &ThreadExt{ - ThreadType: thread_type, - Actions: type_info.Actions, - Handlers: type_info.Handlers, - SignalChan: make(chan Signal, THREAD_BUFFER_SIZE), - TimeoutChan: timeout_chan, - Active: false, - State: state, - Parent: parent, - Children: children, - ActionQueue: action_queue, - NextAction: next_action, - }, nil -} - -const THREAD_BUFFER_SIZE int = 1024 -func LoadThreadExt(ctx *Context, data []byte) (Extension, error) { - var j ThreadExtJSON - err := json.Unmarshal(data, &j) - if err != nil { - return nil, err - } - ctx.Log.Logf("db", "DB_LOAD_THREAD_EXT_JSON: %+v", j) - - parent, err := RestoreNode(ctx, j.Parent) - if err != nil { - return nil, err - } - - - thread_ctx, err := GetCtx[*ThreadExt, *ThreadExtContext](ctx) - if err != nil { - return nil, err - } - - children := map[NodeID]ChildInfo{} - for id_str, infos := range(j.Children) { - child_node, err := RestoreNode(ctx, id_str) - if err != nil { - return nil, err - } - child_infos := map[InfoType]Info{} - for info_type_str, info_data := range(infos) { - info_type := InfoType(info_type_str) - info_load, exists := thread_ctx.Loads[info_type] - if exists == false { - return nil, fmt.Errorf("%s is not a known InfoType in ThreacExrContxt", info_type) - } - - info, err := info_load(info_data) - if err != nil { - return nil, err - } - child_infos[info_type] = info - } - - children[child_node.ID] = ChildInfo{ - Child: child_node, - Infos: child_infos, - } - } - - return NewThreadExt(ctx, ThreadType(j.Type), parent, children, j.State, j.ActionQueue) -} - -const ThreadExtType = ExtType("THREAD") -func (ext *ThreadExt) Type() ExtType { - return ThreadExtType -} - -func (ext *ThreadExt) QueueAction(end time.Time, action string) { - ext.ActionQueue = append(ext.ActionQueue, QueuedAction{end, action}) - ext.NextAction, ext.TimeoutChan = SoonestAction(ext.ActionQueue) -} - -func (ext *ThreadExt) ClearActionQueue() { - ext.ActionQueue = []QueuedAction{} - ext.NextAction = nil - ext.TimeoutChan = nil -} - -func SoonestAction(actions []QueuedAction) (*QueuedAction, <-chan time.Time) { - var soonest_action *QueuedAction - var soonest_time time.Time - for _, action := range(actions) { - if action.Timeout.Compare(soonest_time) == -1 || soonest_action == nil { - soonest_action = &action - soonest_time = action.Timeout - } - } - if soonest_action != nil { - return soonest_action, time.After(time.Until(soonest_action.Timeout)) - } else { - return nil, nil - } -} - -func (ext *ThreadExt) ChildList() []*Node { - ret := make([]*Node, len(ext.Children)) - i := 0 - for _, info := range(ext.Children) { - ret[i] = info.Child - i += 1 - } - - return ret -} - -// Assumed that thread is already locked for signal -func (ext *ThreadExt) Process(context *StateContext, node *Node, signal Signal) error { - context.Graph.Log.Logf("signal", "THREAD_PROCESS: %s - %+v", node.ID, signal) - - switch signal.Direction() { - case Up: - if ext.Parent != nil { - if ext.Parent.ID != node.ID { - err := ext.Parent.Process(context, node.ID, signal) - if err != nil { - return err - } - } - } - case Down: - for _, info := range(ext.Children) { - err := info.Child.Process(context, node.ID, signal) - if err != nil { - return err - } - } - case Direct: - default: - return fmt.Errorf("Invalid signal direction %d", signal.Direction()) - } - ext.SignalChan <- signal - return nil -} - -func UnlinkThreads(context *StateContext, principal *Node, thread *Node, child *Node) error { - thread_ext, err := GetExt[*ThreadExt](thread) - if err != nil { - return err - } - - child_ext, err := GetExt[*ThreadExt](child) - if err != nil { - return err - } - - return UpdateStates(context, principal, ACLMap{ - thread.ID: ACLInfo{thread, []string{"children"}}, - child.ID: ACLInfo{child, []string{"parent"}}, - }, func(context *StateContext) error { - _, is_child := thread_ext.Children[child.ID] - if is_child == false { - return fmt.Errorf("UNLINK_THREADS_ERR: %s is not a child of %s", child.ID, thread.ID) - } - - delete(thread_ext.Children, child.ID) - child_ext.Parent = nil - return nil - }) -} - -func checkIfChild(context *StateContext, id NodeID, cur *ThreadExt) (bool, error) { - for _, info := range(cur.Children) { - child := info.Child - if child.ID == id { - return true, nil - } - - child_ext, err := GetExt[*ThreadExt](child) - if err != nil { - return false, err - } - - var is_child bool - err = UpdateStates(context, child, NewACLInfo(child, []string{"children"}), func(context *StateContext) error { - is_child, err = checkIfChild(context, id, child_ext) - return err - }) - if err != nil { - return false, err - } - if is_child { - return true, nil - } - } - - return false, nil -} - -// Links child to parent with info as the associated info -// Continues the write context with princ, getting children for thread and parent for child -func LinkThreads(context *StateContext, principal *Node, thread *Node, info ChildInfo) error { - if context == nil || principal == nil || thread == nil || info.Child == nil { - return fmt.Errorf("invalid input") - } - - child := info.Child - if thread.ID == child.ID { - return fmt.Errorf("Will not link %s as a child of itself", thread.ID) - } - - thread_ext, err := GetExt[*ThreadExt](thread) - if err != nil { - return err - } - - child_ext, err := GetExt[*ThreadExt](child) - if err != nil { - return err - } - - return UpdateStates(context, principal, ACLMap{ - child.ID: ACLInfo{Node: child, Resources: []string{"parent"}}, - thread.ID: ACLInfo{Node: thread, Resources: []string{"children"}}, - }, func(context *StateContext) error { - if child_ext.Parent != nil { - return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID) - } - - is_child, err := checkIfChild(context, thread.ID, child_ext) - if err != nil { - return err - } else if is_child == true { - return fmt.Errorf("EVENT_LINK_ERR: %s is a child of %s so cannot add as parent", thread.ID, child.ID) - } - - is_child, err = checkIfChild(context, child.ID, thread_ext) - if err != nil { - - return err - } else if is_child == true { - return fmt.Errorf("EVENT_LINK_ERR: %s is already a parent of %s so will not add again", thread.ID, child.ID) - } - - // TODO check for info types - - thread_ext.Children[child.ID] = info - child_ext.Parent = thread - - return nil - }) -} - -type ChildInfo struct { - Child *Node - Infos map[InfoType]Info -} - -func NewChildInfo(child *Node, infos map[InfoType]Info) ChildInfo { - if infos == nil { - infos = map[InfoType]Info{} - } - - return ChildInfo{ - Child: child, - Infos: infos, - } -} - -func (ext *ThreadExt) SetActive(active bool) error { - ext.ActiveLock.Lock() - defer ext.ActiveLock.Unlock() - if ext.Active == true && active == true { - return fmt.Errorf("alreday active, cannot set active") - } else if ext.Active == false && active == false { - return fmt.Errorf("already inactive, canot set inactive") - } - ext.Active = active - return nil -} - -func (ext *ThreadExt) SetState(state string) error { - ext.State = state - return nil -} - -// Requires the read permission of threads children -func FindChild(context *StateContext, principal *Node, thread *Node, id NodeID) (*Node, error) { - if thread == nil { - panic("cannot recurse through nil") - } - - if id == thread.ID { - return thread, nil - } - - thread_ext, err := GetExt[*ThreadExt](thread) - if err != nil { - return nil, err - } - - var found *Node = nil - err = UseStates(context, principal, NewACLInfo(thread, []string{"children"}), func(context *StateContext) error { - for _, info := range(thread_ext.Children) { - found, err = FindChild(context, principal, info.Child, id) - if err != nil { - return err - } - if found != nil { - return nil - } - } - return nil - }) - - return found, err -} - -func ChildGo(ctx * Context, thread_ext *ThreadExt, child *Node, first_action string) { - thread_ext.ChildWaits.Add(1) - go func(child *Node) { - defer thread_ext.ChildWaits.Done() - err := ThreadLoop(ctx, child, first_action) - if err != nil { - ctx.Log.Logf("thread", "THREAD_CHILD_RUN_ERR: %s %s", child.ID, err) - } else { - ctx.Log.Logf("thread", "THREAD_CHILD_RUN_DONE: %s", child.ID) - } - }(child) -} - -// Main Loop for Threads, starts a write context, so cannot be called from a write or read context -func ThreadLoop(ctx * Context, thread *Node, first_action string) error { - thread_ext, err := GetExt[*ThreadExt](thread) - if err != nil { - return err - } - - ctx.Log.Logf("thread", "THREAD_LOOP_START: %s - %s", thread.ID, first_action) - - err = thread_ext.SetActive(true) - if err != nil { - ctx.Log.Logf("thread", "THREAD_LOOP_START_ERR: %e", err) - return err - } - next_action := first_action - for next_action != "" { - action, exists := thread_ext.Actions[next_action] - if exists == false { - error_str := fmt.Sprintf("%s is not a valid action", next_action) - return errors.New(error_str) - } - - ctx.Log.Logf("thread", "THREAD_ACTION: %s - %s", thread.ID, next_action) - next_action, err = action(ctx, thread, thread_ext) - if err != nil { - return err - } - } - - err = thread_ext.SetActive(false) - if err != nil { - ctx.Log.Logf("thread", "THREAD_LOOP_STOP_ERR: %e", err) - return err - } - - ctx.Log.Logf("thread", "THREAD_LOOP_DONE: %s", thread.ID) - - return nil -} - -func ThreadChildLinked(ctx *Context, thread *Node, thread_ext *ThreadExt, signal Signal) (string, error) { - ctx.Log.Logf("thread", "THREAD_CHILD_LINKED: %s - %+v", thread.ID, signal) - context := NewWriteContext(ctx) - err := UpdateStates(context, thread, NewACLInfo(thread, []string{"children"}), func(context *StateContext) error { - sig, ok := signal.(IDSignal) - if ok == false { - ctx.Log.Logf("thread", "THREAD_NODE_LINKED_BAD_CAST") - return nil - } - info, exists := thread_ext.Children[sig.ID] - if exists == false { - ctx.Log.Logf("thread", "THREAD_NODE_LINKED: %s is not a child of %s", sig.ID) - return nil - } - parent_info, exists := info.Infos["parent"].(*ParentInfo) - if exists == false { - panic("ran ThreadChildLinked from a thread that doesn't require 'parent' child info. library party foul") - } - - if parent_info.Start == true { - ChildGo(ctx, thread_ext, info.Child, parent_info.StartAction) - } - return nil - }) - - if err != nil { - - } else { - - } - return "wait", nil -} - -// Helper function to start a child from a thread during a signal handler -// Starts a write context, so cannot be called from either a write or read context -func ThreadStartChild(ctx *Context, thread *Node, thread_ext *ThreadExt, signal Signal) (string, error) { - sig, ok := signal.(StartChildSignal) - if ok == false { - return "wait", nil - } - - context := NewWriteContext(ctx) - return "wait", UpdateStates(context, thread, NewACLInfo(thread, []string{"children"}), func(context *StateContext) error { - info, exists:= thread_ext.Children[sig.ID] - if exists == false { - return fmt.Errorf("%s is not a child of %s", sig.ID, thread.ID) - } - - parent_info, exists := info.Infos["parent"].(*ParentInfo) - if exists == false { - return fmt.Errorf("Called ThreadStartChild from a thread that doesn't require parent child info") - } - parent_info.Start = true - ChildGo(ctx, thread_ext, info.Child, sig.Action) - - return nil - }) -} - -// Helper function to restore threads that should be running from a parents restore action -// Starts a write context, so cannot be called from either a write or read context -func ThreadRestore(ctx * Context, thread *Node, thread_ext *ThreadExt, start bool) error { - context := NewWriteContext(ctx) - return UpdateStates(context, thread, NewACLInfo(thread, []string{"children"}), func(context *StateContext) error { - return UpdateStates(context, thread, ACLList(thread_ext.ChildList(), []string{"state"}), func(context *StateContext) error { - for _, info := range(thread_ext.Children) { - child_ext, err := GetExt[*ThreadExt](info.Child) - if err != nil { - return err - } - - parent_info := info.Infos[ParentInfoType].(*ParentInfo) - if parent_info.Start == true && child_ext.State != "finished" { - ctx.Log.Logf("thread", "THREAD_RESTORED: %s -> %s", thread.ID, info.Child.ID) - if start == true { - ChildGo(ctx, thread_ext, info.Child, parent_info.StartAction) - } else { - ChildGo(ctx, thread_ext, info.Child, parent_info.RestoreAction) - } - } - } - return nil - }) - }) -} - -// Helper function to be called during a threads start action, sets the thread state to started -// Starts a write context, so cannot be called from either a write or read context -// Returns "wait", nil on success, so the first return value can be ignored safely -func ThreadStart(ctx * Context, thread *Node, thread_ext *ThreadExt) (string, error) { - context := NewWriteContext(ctx) - err := UpdateStates(context, thread, NewACLInfo(thread, []string{"state"}), func(context *StateContext) error { - return thread_ext.SetState("started") - }) - if err != nil { - return "", err - } - - context = NewReadContext(ctx) - return "wait", thread.Process(context, thread.ID, NewStatusSignal("started", thread.ID)) -} - -func ThreadWait(ctx * Context, thread *Node, thread_ext *ThreadExt) (string, error) { - ctx.Log.Logf("thread", "THREAD_WAIT: %s - %+v", thread.ID, thread_ext.ActionQueue) - for { - select { - case signal := <- thread_ext.SignalChan: - ctx.Log.Logf("thread", "THREAD_SIGNAL: %s %+v", thread.ID, signal) - signal_fn, exists := thread_ext.Handlers[signal.Type()] - if exists == true { - ctx.Log.Logf("thread", "THREAD_HANDLER: %s - %s", thread.ID, signal.Type()) - return signal_fn(ctx, thread, thread_ext, signal) - } else { - ctx.Log.Logf("thread", "THREAD_NOHANDLER: %s - %s", thread.ID, signal.Type()) - } - case <- thread_ext.TimeoutChan: - timeout_action := "" - context := NewWriteContext(ctx) - err := UpdateStates(context, thread, NewACLMap(NewACLInfo(thread, []string{"timeout"})), func(context *StateContext) error { - timeout_action = thread_ext.NextAction.Action - thread_ext.NextAction, thread_ext.TimeoutChan = SoonestAction(thread_ext.ActionQueue) - return nil - }) - if err != nil { - ctx.Log.Logf("thread", "THREAD_TIMEOUT_ERR: %s - %e", thread.ID, err) - } - ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID, timeout_action) - return timeout_action, nil - } - } -} - -func ThreadFinish(ctx *Context, thread *Node, thread_ext *ThreadExt) (string, error) { - context := NewWriteContext(ctx) - return "", UpdateStates(context, thread, NewACLInfo(thread, []string{"state"}), func(context *StateContext) error { - return thread_ext.SetState("finished") - }) -} - -var ThreadAbortedError = errors.New("Thread aborted by signal") - -// Default thread action function for "abort", sends a signal and returns a ThreadAbortedError -func ThreadAbort(ctx * Context, thread *Node, thread_ext *ThreadExt, signal Signal) (string, error) { - context := NewReadContext(ctx) - err := thread.Process(context, thread.ID, NewStatusSignal("aborted", thread.ID)) - if err != nil { - return "", err - } - return "", ThreadAbortedError -} - -// Default thread action for "stop", sends a signal and returns no error -func ThreadStop(ctx * Context, thread *Node, thread_ext *ThreadExt, signal Signal) (string, error) { - context := NewReadContext(ctx) - err := thread.Process(context, thread.ID, NewStatusSignal("stopped", thread.ID)) - return "finish", err -} - -// Default thread actions -var BaseThreadActions = ThreadActions{ - "wait": ThreadWait, - "start": ThreadStart, - "finish": ThreadFinish, -} - -// Default thread signal handlers -var BaseThreadHandlers = ThreadHandlers{ - "abort": ThreadAbort, - "stop": ThreadStop, -}