diff --git a/gql.go b/gql.go index 8afd80d..4b6d999 100644 --- a/gql.go +++ b/gql.go @@ -406,9 +406,8 @@ func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadIn } func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) { - thread := RestoreBaseThread(ctx, id, gql_actions, gql_handlers) gql_thread := GQLThread{ - BaseThread: thread, + BaseThread: RestoreBaseThread(ctx, id, gql_actions, gql_handlers), http_server: nil, http_done: &sync.WaitGroup{}, } diff --git a/graph.go b/graph.go index 35b39e2..c7bb6d3 100644 --- a/graph.go +++ b/graph.go @@ -450,7 +450,9 @@ func RestoreNode(ctx * GraphContext, id NodeID) BaseNode { id: id, signal: make(chan GraphSignal, NODE_SIGNAL_BUFFER), listeners: map[chan GraphSignal]chan GraphSignal{}, + listeners_lock: &sync.Mutex{}, state: nil, + state_lock: &sync.RWMutex{}, } ctx.Log.Logf("graph", "RESTORE_NODE: %s", node.id) @@ -485,7 +487,9 @@ func NewNode(ctx * GraphContext, state NodeState) (BaseNode, error) { id: RandID(), signal: make(chan GraphSignal, NODE_SIGNAL_BUFFER), listeners: map[chan GraphSignal]chan GraphSignal{}, + listeners_lock: &sync.Mutex{}, state: state, + state_lock: &sync.RWMutex{}, } err := WriteDBState(ctx, node.id, state) @@ -503,11 +507,11 @@ type BaseNode struct { id NodeID state NodeState - state_lock sync.RWMutex + state_lock *sync.RWMutex signal chan GraphSignal - listeners_lock sync.Mutex + listeners_lock *sync.Mutex listeners map[chan GraphSignal]chan GraphSignal } @@ -520,7 +524,7 @@ func (node * BaseNode) State() NodeState { } func (node * BaseNode) StateLock() * sync.RWMutex { - return &node.state_lock + return node.state_lock } func ReadDBState(ctx * GraphContext, id NodeID) ([]byte, error) { diff --git a/thread.go b/thread.go index 57619da..a616842 100644 --- a/thread.go +++ b/thread.go @@ -123,6 +123,7 @@ func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, han BaseLockable: base_lockable, Actions: actions, Handlers: handlers, + child_waits: &sync.WaitGroup{}, } return thread @@ -540,11 +541,11 @@ type BaseThread struct { Handlers ThreadHandlers timeout_chan <-chan time.Time - child_waits sync.WaitGroup + child_waits *sync.WaitGroup } func (thread * BaseThread) ChildWaits() *sync.WaitGroup { - return &thread.child_waits + return thread.child_waits } func (thread * BaseThread) CanLock(node GraphNode, state LockableState) error { @@ -627,7 +628,6 @@ var ThreadWait = func(ctx * GraphContext, thread Thread) (string, error) { return timeout_action, nil } } - return "wait", nil } var ThreadAbort = func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { @@ -688,6 +688,7 @@ func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHan BaseLockable: lockable, Actions: actions, Handlers: handlers, + child_waits: &sync.WaitGroup{}, } return thread, nil