diff --git a/gql_test.go b/gql_test.go index 563c715..9e9ce78 100644 --- a/gql_test.go +++ b/gql_test.go @@ -26,7 +26,6 @@ func TestGQLThread(t * testing.T) { go func(thread Thread){ time.Sleep(10*time.Millisecond) - // Check that test_thread_1 is running and test_thread_2 is not SendUpdate(ctx, thread, CancelSignal(nil)) }(gql_thread) diff --git a/graph.go b/graph.go index 59e689a..477f4f2 100644 --- a/graph.go +++ b/graph.go @@ -347,6 +347,33 @@ func ReadDBState(ctx * GraphContext, id NodeID) ([]byte, error) { return bytes, nil } +func WriteDBStates(ctx * GraphContext, nodes NodeMap) error{ + ctx.Log.Logf("db", "DB_WRITES: %d", len(nodes)) + var serialized_states [][]byte = make([][]byte, len(nodes)) + i := 0 + for _, node := range(nodes) { + ser, err := json.Marshal(node.State()) + if err != nil { + return fmt.Errorf("DB_MARSHAL_ERROR: %e", err) + } + serialized_states[i] = ser + i++ + } + + err := ctx.DB.Update(func(txn *badger.Txn) error { + i := 0 + for id, _ := range(nodes) { + err := txn.Set([]byte(id), serialized_states[i]) + if err != nil { + return fmt.Errorf("DB_MARSHAL_ERROR: %e", err) + } + i++ + } + return nil + }) + return err +} + func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error { ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state) @@ -390,7 +417,9 @@ func checkForDuplicate(nodes []GraphNode) error { } type NodeStateMap map[NodeID]NodeState -type StatesFn func(states NodeStateMap)(error) +type NodeMap map[NodeID]GraphNode +type StatesFn func(states NodeStateMap) error +type NodesFn func(nodes NodeMap) error func UseStates(ctx * GraphContext, nodes []GraphNode, states_fn StatesFn) error { states := NodeStateMap{} return UseMoreStates(ctx, nodes, states, states_fn) @@ -421,42 +450,28 @@ func UseMoreStates(ctx * GraphContext, nodes []GraphNode, states NodeStateMap, s return err } -func UpdateStates(ctx * GraphContext, nodes []GraphNode, states_fn StatesFn) error { - states := NodeStateMap{} - return UpdateMoreStates(ctx, nodes, states, states_fn) -} -func UpdateMoreStates(ctx * GraphContext, nodes []GraphNode, states NodeStateMap, states_fn StatesFn) error { - err := checkForDuplicate(nodes) - if err != nil { - return err +func UpdateStates(ctx * GraphContext, nodes []GraphNode, nodes_fn NodesFn) error { + locked_nodes := NodeMap{} + err := UpdateMoreStates(ctx, nodes, locked_nodes, nodes_fn) + if err == nil { + err = WriteDBStates(ctx, locked_nodes) } - locked_nodes := []GraphNode{} + for _, node := range(locked_nodes) { + node.StateLock().Unlock() + } + return err +} +func UpdateMoreStates(ctx * GraphContext, nodes []GraphNode, locked_nodes NodeMap, nodes_fn NodesFn) error { for _, node := range(nodes) { - _, locked := states[node.ID()] + _, locked := locked_nodes[node.ID()] if locked == false { node.StateLock().Lock() - states[node.ID()] = node.State() - locked_nodes = append(locked_nodes, node) + locked_nodes[node.ID()] = node } } - err = states_fn(states) - if err == nil { - for _, node := range(nodes) { - err := WriteDBState(ctx, node.ID(), node.State()) - if err != nil { - panic(fmt.Sprintf("DB_WRITE_ERROR: %s", err)) - } - } - } - - for _, node := range(locked_nodes) { - delete(states, node.ID()) - node.StateLock().Unlock() - } - - return err + return nodes_fn(locked_nodes) } func (node * BaseNode) UpdateListeners(ctx * GraphContext, update GraphSignal) { diff --git a/lockable.go b/lockable.go index 24b02af..81c4128 100644 --- a/lockable.go +++ b/lockable.go @@ -161,6 +161,7 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl return nil } + found := map[NodeID]bool{} for _, requirement := range(requirements) { if requirement == nil { return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link nil to a Lockable as a requirement") @@ -169,32 +170,39 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl if lockable.ID() == requirement.ID() { return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s to itself", lockable.ID()) } + + _, exists := found[requirement.ID()] + if exists == true { + return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s twice", requirement.ID()) + } + found[requirement.ID()] = true } - nodes := make([]GraphNode, len(requirements) + 1) - nodes[0] = lockable + gnodes := make([]GraphNode, len(requirements) + 1) + gnodes[0] = lockable for i, node := range(requirements) { - nodes[i+1] = node + gnodes[i+1] = node } - err := UpdateStates(ctx, nodes, func(states NodeStateMap) error { + + err := UpdateStates(ctx, gnodes, func(nodes NodeMap) error { // Check that all the requirements can be added - lockable_state := states[lockable.ID()].(LockableState) + lockable_state := lockable.State().(LockableState) // If the lockable is already locked, need to lock this resource as well before we can add it for _, requirement := range(requirements) { - requirement_state := states[requirement.ID()].(LockableState) + requirement_state := requirement.State().(LockableState) for _, req := range(requirements) { if req.ID() == requirement.ID() { continue } - if checkIfRequirement(ctx, req.ID(), requirement_state, requirement.ID(), states) == true { + if checkIfRequirement(ctx, req.ID(), requirement_state, requirement.ID(), nodes) == true { return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependenyc of %s so cannot add the same dependency", req.ID(), requirement.ID()) } } - if checkIfRequirement(ctx, lockable.ID(), requirement_state, requirement.ID(), states) == true { + if checkIfRequirement(ctx, lockable.ID(), requirement_state, requirement.ID(), nodes) == true { return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID(), lockable.ID()) } - if checkIfRequirement(ctx, requirement.ID(), lockable_state, lockable.ID(), states) == true { + if checkIfRequirement(ctx, requirement.ID(), lockable_state, lockable.ID(), nodes) == true { return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID(), requirement.ID()) } if lockable_state.Owner() == nil { @@ -211,7 +219,7 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl } // Update the states of the requirements for _, requirement := range(requirements) { - requirement_state := states[requirement.ID()].(LockableState) + requirement_state := requirement.State().(LockableState) requirement_state.AddDependency(lockable) lockable_state.AddRequirement(requirement) ctx.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID()) @@ -281,15 +289,15 @@ func (lockable * BaseLockable) PropagateUpdate(ctx * GraphContext, signal GraphS }) } -func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_id NodeID, states NodeStateMap) bool { +func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_id NodeID, nodes NodeMap) bool { for _, c := range(cur.Requirements()) { if c.ID() == r_id { return true } is_requirement := false - UpdateMoreStates(ctx, []GraphNode{c}, states, func(states NodeStateMap) (error) { - requirement_state := states[c.ID()].(LockableState) - is_requirement = checkIfRequirement(ctx, cur_id, requirement_state, c.ID(), states) + UpdateMoreStates(ctx, []GraphNode{c}, nodes, func(nodes NodeMap) (error) { + requirement_state := c.State().(LockableState) + is_requirement = checkIfRequirement(ctx, cur_id, requirement_state, c.ID(), nodes) return nil }) @@ -301,7 +309,7 @@ func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_ return false } -func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, holder_state LockableState, states NodeStateMap) error { +func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, holder_state LockableState, nodes NodeMap) error { if to_lock == nil { return fmt.Errorf("LOCKABLE_LOCK_ERR: no list provided") } @@ -333,15 +341,11 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold node_list[i] = l } - err := UpdateMoreStates(ctx, node_list, states, func(states NodeStateMap) error { + err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error { // First loop is to check that the states can be locked, and locks all requirements for _, req := range(to_lock) { - state := states[req.ID()] - req_state, ok := state.(LockableState) + req_state := req.State().(LockableState) ctx.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), holder.ID()) - if ok == false { - return fmt.Errorf("LOCKABLE_LOCK_ERR: %s(requirement of %s) does not have a LockableState", req.ID(), holder.ID()) - } // Check custom lock conditions err := req.CanLock(holder, req_state) @@ -367,21 +371,17 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold // a) in this case, we're holding every state mutex up to the resource being locked // and all the owners passing a lock, so we can start to change state // 2) req has children, and we will recurse(checking that locking is allowed) until we reach a leaf and can release the locks as we change state. The call will either return nil if state has changed, on an error if no state has changed - err := LockLockables(ctx, req_state.Requirements(), req, req_state, states) + err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes) if err != nil { return err } } else { - err := UpdateMoreStates(ctx, []GraphNode{owner}, states, func(states NodeStateMap)(error){ - owner_state, ok := states[owner.ID()].(LockableState) - if ok == false { - return fmt.Errorf("LOCKABLE_LOCK_ERR: %s does not have a LockableState", owner.ID()) - } - + err := UpdateMoreStates(ctx, []GraphNode{owner}, nodes, func(nodes NodeMap)(error){ + owner_state := owner.State().(LockableState) if owner_state.AllowedToTakeLock(holder.ID(), req.ID()) == false { return fmt.Errorf("LOCKABLE_LOCK_ERR: %s is not allowed to take %s's lock from %s", holder.ID(), req.ID(), owner.ID()) } - err := LockLockables(ctx, req_state.Requirements(), req, req_state, states) + err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes) return err }) if err != nil { @@ -389,7 +389,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold } } } else { - err := LockLockables(ctx, req_state.Requirements(), req, req_state, states) + err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes) if err != nil { return err } @@ -398,7 +398,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold // At this point state modification will be started, so no errors can be returned for _, req := range(to_lock) { - req_state := states[req.ID()].(LockableState) + req_state := req.State().(LockableState) old_owner := req_state.Owner() req_state.SetOwner(holder) if req.ID() == holder.ID() { @@ -418,7 +418,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold return err } -func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, holder_state LockableState, states NodeStateMap) error { +func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, holder_state LockableState, nodes NodeMap) error { if to_unlock == nil { return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided") } @@ -449,14 +449,11 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, node_list[i] = l } - err := UpdateMoreStates(ctx, node_list, states, func(states NodeStateMap) error { + err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error { // First loop is to check that the states can be locked, and locks all requirements for _, req := range(to_unlock) { - req_state, ok := states[req.ID()].(LockableState) + req_state := req.State().(LockableState) ctx.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), holder.ID()) - if ok == false { - return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s(requirement of %s) does not have a LockableState", req.ID(), holder.ID()) - } // Check if the owner is correct if req_state.Owner() != nil { @@ -473,7 +470,7 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, return err } - err = UnlockLockables(ctx, req_state.Requirements(), req, req_state, states) + err = UnlockLockables(ctx, req_state.Requirements(), req, req_state, nodes) if err != nil { return err } @@ -481,7 +478,7 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, // At this point state modification will be started, so no errors can be returned for _, req := range(to_unlock) { - req_state := states[req.ID()].(LockableState) + req_state := req.State().(LockableState) var new_owner Lockable = nil if holder_state == nil { new_owner = req_state.ReturnLock(req.ID()) diff --git a/lockable_test.go b/lockable_test.go index ad86d0d..14b1a8d 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -24,7 +24,7 @@ func TestRepeatedChildLockable(t * testing.T) { _, err = NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{r1, r1}) if err == nil { - t.Fatal("Added the same lockable as a child twice to the same lockable") + t.Fatal("Added the same lockable as a requirement twice to the same lockable") } } @@ -34,7 +34,9 @@ func TestLockableSelfLock(t * testing.T) { r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - err = LockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error { + return LockLockables(ctx, []Lockable{r1}, r1, nil, nodes) + }) fatalErr(t, err) err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) { @@ -46,7 +48,9 @@ func TestLockableSelfLock(t * testing.T) { }) fatalErr(t, err) - err = UnlockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error { + return UnlockLockables(ctx, []Lockable{r1}, r1, nil, nodes) + }) fatalErr(t, err) err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) { @@ -72,7 +76,9 @@ func TestLockableSelfLockTiered(t * testing.T) { r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1, r2}) fatalErr(t, err) - err = LockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error { + return LockLockables(ctx, []Lockable{r3}, r3, nil, nodes) + }) fatalErr(t, err) err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states NodeStateMap) (error) { @@ -89,7 +95,9 @@ func TestLockableSelfLockTiered(t * testing.T) { }) fatalErr(t, err) - err = UnlockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error { + return UnlockLockables(ctx, []Lockable{r3}, r3, nil, nodes) + }) fatalErr(t, err) err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states NodeStateMap) (error) { @@ -122,9 +130,9 @@ func TestLockableLockOther(t * testing.T) { r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{}) fatalErr(t, err) - err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) { - node_state := states[r2.ID()].(LockableState) - err := LockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r1, r2}, func(nodes NodeMap) (error) { + node_state := r2.State().(LockableState) + err := LockLockables(ctx, []Lockable{r1}, r2, node_state, nodes) fatalErr(t, err) return nil }) @@ -140,9 +148,9 @@ func TestLockableLockOther(t * testing.T) { }) fatalErr(t, err) - err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) { - node_state := states[r2.ID()].(LockableState) - err := UnlockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) (error) { + node_state := r2.State().(LockableState) + err := UnlockLockables(ctx, []Lockable{r1}, r2, node_state, nodes) fatalErr(t, err) return nil }) @@ -169,12 +177,14 @@ func TestLockableLockSimpleConflict(t * testing.T) { r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{}) fatalErr(t, err) - err = LockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error { + return LockLockables(ctx, []Lockable{r1}, r1, nil, nodes) + }) fatalErr(t, err) - err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) { - node_state := states[r2.ID()].(LockableState) - err := LockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) (error) { + node_state := r2.State().(LockableState) + err := LockLockables(ctx, []Lockable{r1}, r2, node_state, nodes) if err == nil { t.Fatal("r2 took r1's lock from itself") } @@ -193,7 +203,9 @@ func TestLockableLockSimpleConflict(t * testing.T) { }) fatalErr(t, err) - err = UnlockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error { + return UnlockLockables(ctx, []Lockable{r1}, r1, nil, nodes) + }) fatalErr(t, err) err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) { @@ -220,10 +232,14 @@ func TestLockableLockTieredConflict(t * testing.T) { r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1}) fatalErr(t, err) - err = LockLockables(ctx, []Lockable{r2}, r2, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) error { + return LockLockables(ctx, []Lockable{r2}, r2, nil, nodes) + }) fatalErr(t, err) - err = LockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error { + return LockLockables(ctx, []Lockable{r3}, r3, nil, nodes) + }) if err == nil { t.Fatal("Locked r3 which depends on r1 while r2 which depends on r1 is already locked") } @@ -329,6 +345,13 @@ func TestLockableDBLoad(t * testing.T){ fatalErr(t, err) _, err = NewSimpleBaseLockable(ctx, "Test Lockable 5", []Lockable{l4}) fatalErr(t, err) + l6, err := NewSimpleBaseLockable(ctx, "Test Lockable 6", []Lockable{}) + err = UpdateStates(ctx, []GraphNode{l6, l3}, func(nodes NodeMap) error { + l6_state := l6.State().(LockableState) + err := LockLockables(ctx, []Lockable{l3}, l6, l6_state, nodes) + return err + }) + fatalErr(t, err) _, err = LoadNode(ctx, l3.ID()) fatalErr(t, err) diff --git a/thread.go b/thread.go index d3de178..36d25e9 100644 --- a/thread.go +++ b/thread.go @@ -190,9 +190,9 @@ func LinkThreads(ctx * GraphContext, thread Thread, child Thread, info ThreadInf } - err := UpdateStates(ctx, []GraphNode{thread, child}, func(states NodeStateMap) error { - thread_state := states[thread.ID()].(ThreadState) - child_state := states[child.ID()].(ThreadState) + err := UpdateStates(ctx, []GraphNode{thread, child}, func(nodes NodeMap) error { + thread_state := thread.State().(ThreadState) + child_state := child.State().(ThreadState) if child_state.Parent() != nil { return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID()) @@ -284,7 +284,9 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_ func RunThread(ctx * GraphContext, thread Thread) error { ctx.Log.Logf("thread", "THREAD_RUN: %s", thread.ID()) - err := LockLockables(ctx, []Lockable{thread}, thread, nil, NodeStateMap{}) + err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) { + return LockLockables(ctx, []Lockable{thread}, thread, nil, nodes) + }) if err != nil { return err } @@ -332,7 +334,9 @@ func RunThread(ctx * GraphContext, thread Thread) error { return err } - err = UnlockLockables(ctx, []Lockable{thread}, thread, nil, NodeStateMap{}) + err = UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) { + return UnlockLockables(ctx, []Lockable{thread}, thread, nil, nodes) + }) if err != nil { ctx.Log.Logf("thread", "THREAD_RUN_UNLOCK_ERR: %e", err) return err