Updated intances of state updating to use new API

graph-rework-2
noah metz 2023-06-28 23:51:44 -06:00
parent e862f9e49c
commit 84a700909d
5 changed files with 130 additions and 92 deletions

@ -26,7 +26,6 @@ func TestGQLThread(t * testing.T) {
go func(thread Thread){
time.Sleep(10*time.Millisecond)
// Check that test_thread_1 is running and test_thread_2 is not
SendUpdate(ctx, thread, CancelSignal(nil))
}(gql_thread)

@ -347,6 +347,33 @@ func ReadDBState(ctx * GraphContext, id NodeID) ([]byte, error) {
return bytes, nil
}
func WriteDBStates(ctx * GraphContext, nodes NodeMap) error{
ctx.Log.Logf("db", "DB_WRITES: %d", len(nodes))
var serialized_states [][]byte = make([][]byte, len(nodes))
i := 0
for _, node := range(nodes) {
ser, err := json.Marshal(node.State())
if err != nil {
return fmt.Errorf("DB_MARSHAL_ERROR: %e", err)
}
serialized_states[i] = ser
i++
}
err := ctx.DB.Update(func(txn *badger.Txn) error {
i := 0
for id, _ := range(nodes) {
err := txn.Set([]byte(id), serialized_states[i])
if err != nil {
return fmt.Errorf("DB_MARSHAL_ERROR: %e", err)
}
i++
}
return nil
})
return err
}
func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error {
ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state)
@ -390,7 +417,9 @@ func checkForDuplicate(nodes []GraphNode) error {
}
type NodeStateMap map[NodeID]NodeState
type StatesFn func(states NodeStateMap)(error)
type NodeMap map[NodeID]GraphNode
type StatesFn func(states NodeStateMap) error
type NodesFn func(nodes NodeMap) error
func UseStates(ctx * GraphContext, nodes []GraphNode, states_fn StatesFn) error {
states := NodeStateMap{}
return UseMoreStates(ctx, nodes, states, states_fn)
@ -421,42 +450,28 @@ func UseMoreStates(ctx * GraphContext, nodes []GraphNode, states NodeStateMap, s
return err
}
func UpdateStates(ctx * GraphContext, nodes []GraphNode, states_fn StatesFn) error {
states := NodeStateMap{}
return UpdateMoreStates(ctx, nodes, states, states_fn)
}
func UpdateMoreStates(ctx * GraphContext, nodes []GraphNode, states NodeStateMap, states_fn StatesFn) error {
err := checkForDuplicate(nodes)
if err != nil {
return err
func UpdateStates(ctx * GraphContext, nodes []GraphNode, nodes_fn NodesFn) error {
locked_nodes := NodeMap{}
err := UpdateMoreStates(ctx, nodes, locked_nodes, nodes_fn)
if err == nil {
err = WriteDBStates(ctx, locked_nodes)
}
locked_nodes := []GraphNode{}
for _, node := range(locked_nodes) {
node.StateLock().Unlock()
}
return err
}
func UpdateMoreStates(ctx * GraphContext, nodes []GraphNode, locked_nodes NodeMap, nodes_fn NodesFn) error {
for _, node := range(nodes) {
_, locked := states[node.ID()]
_, locked := locked_nodes[node.ID()]
if locked == false {
node.StateLock().Lock()
states[node.ID()] = node.State()
locked_nodes = append(locked_nodes, node)
}
}
err = states_fn(states)
if err == nil {
for _, node := range(nodes) {
err := WriteDBState(ctx, node.ID(), node.State())
if err != nil {
panic(fmt.Sprintf("DB_WRITE_ERROR: %s", err))
}
locked_nodes[node.ID()] = node
}
}
for _, node := range(locked_nodes) {
delete(states, node.ID())
node.StateLock().Unlock()
}
return err
return nodes_fn(locked_nodes)
}
func (node * BaseNode) UpdateListeners(ctx * GraphContext, update GraphSignal) {

@ -161,6 +161,7 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl
return nil
}
found := map[NodeID]bool{}
for _, requirement := range(requirements) {
if requirement == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link nil to a Lockable as a requirement")
@ -169,32 +170,39 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl
if lockable.ID() == requirement.ID() {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s to itself", lockable.ID())
}
_, exists := found[requirement.ID()]
if exists == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s twice", requirement.ID())
}
found[requirement.ID()] = true
}
nodes := make([]GraphNode, len(requirements) + 1)
nodes[0] = lockable
gnodes := make([]GraphNode, len(requirements) + 1)
gnodes[0] = lockable
for i, node := range(requirements) {
nodes[i+1] = node
gnodes[i+1] = node
}
err := UpdateStates(ctx, nodes, func(states NodeStateMap) error {
err := UpdateStates(ctx, gnodes, func(nodes NodeMap) error {
// Check that all the requirements can be added
lockable_state := states[lockable.ID()].(LockableState)
lockable_state := lockable.State().(LockableState)
// If the lockable is already locked, need to lock this resource as well before we can add it
for _, requirement := range(requirements) {
requirement_state := states[requirement.ID()].(LockableState)
requirement_state := requirement.State().(LockableState)
for _, req := range(requirements) {
if req.ID() == requirement.ID() {
continue
}
if checkIfRequirement(ctx, req.ID(), requirement_state, requirement.ID(), states) == true {
if checkIfRequirement(ctx, req.ID(), requirement_state, requirement.ID(), nodes) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependenyc of %s so cannot add the same dependency", req.ID(), requirement.ID())
}
}
if checkIfRequirement(ctx, lockable.ID(), requirement_state, requirement.ID(), states) == true {
if checkIfRequirement(ctx, lockable.ID(), requirement_state, requirement.ID(), nodes) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID(), lockable.ID())
}
if checkIfRequirement(ctx, requirement.ID(), lockable_state, lockable.ID(), states) == true {
if checkIfRequirement(ctx, requirement.ID(), lockable_state, lockable.ID(), nodes) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID(), requirement.ID())
}
if lockable_state.Owner() == nil {
@ -211,7 +219,7 @@ func LinkLockables(ctx * GraphContext, lockable Lockable, requirements []Lockabl
}
// Update the states of the requirements
for _, requirement := range(requirements) {
requirement_state := states[requirement.ID()].(LockableState)
requirement_state := requirement.State().(LockableState)
requirement_state.AddDependency(lockable)
lockable_state.AddRequirement(requirement)
ctx.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID())
@ -281,15 +289,15 @@ func (lockable * BaseLockable) PropagateUpdate(ctx * GraphContext, signal GraphS
})
}
func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_id NodeID, states NodeStateMap) bool {
func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_id NodeID, nodes NodeMap) bool {
for _, c := range(cur.Requirements()) {
if c.ID() == r_id {
return true
}
is_requirement := false
UpdateMoreStates(ctx, []GraphNode{c}, states, func(states NodeStateMap) (error) {
requirement_state := states[c.ID()].(LockableState)
is_requirement = checkIfRequirement(ctx, cur_id, requirement_state, c.ID(), states)
UpdateMoreStates(ctx, []GraphNode{c}, nodes, func(nodes NodeMap) (error) {
requirement_state := c.State().(LockableState)
is_requirement = checkIfRequirement(ctx, cur_id, requirement_state, c.ID(), nodes)
return nil
})
@ -301,7 +309,7 @@ func checkIfRequirement(ctx * GraphContext, r_id NodeID, cur LockableState, cur_
return false
}
func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, holder_state LockableState, states NodeStateMap) error {
func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, holder_state LockableState, nodes NodeMap) error {
if to_lock == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: no list provided")
}
@ -333,15 +341,11 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold
node_list[i] = l
}
err := UpdateMoreStates(ctx, node_list, states, func(states NodeStateMap) error {
err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_lock) {
state := states[req.ID()]
req_state, ok := state.(LockableState)
req_state := req.State().(LockableState)
ctx.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), holder.ID())
if ok == false {
return fmt.Errorf("LOCKABLE_LOCK_ERR: %s(requirement of %s) does not have a LockableState", req.ID(), holder.ID())
}
// Check custom lock conditions
err := req.CanLock(holder, req_state)
@ -367,21 +371,17 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold
// a) in this case, we're holding every state mutex up to the resource being locked
// and all the owners passing a lock, so we can start to change state
// 2) req has children, and we will recurse(checking that locking is allowed) until we reach a leaf and can release the locks as we change state. The call will either return nil if state has changed, on an error if no state has changed
err := LockLockables(ctx, req_state.Requirements(), req, req_state, states)
err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes)
if err != nil {
return err
}
} else {
err := UpdateMoreStates(ctx, []GraphNode{owner}, states, func(states NodeStateMap)(error){
owner_state, ok := states[owner.ID()].(LockableState)
if ok == false {
return fmt.Errorf("LOCKABLE_LOCK_ERR: %s does not have a LockableState", owner.ID())
}
err := UpdateMoreStates(ctx, []GraphNode{owner}, nodes, func(nodes NodeMap)(error){
owner_state := owner.State().(LockableState)
if owner_state.AllowedToTakeLock(holder.ID(), req.ID()) == false {
return fmt.Errorf("LOCKABLE_LOCK_ERR: %s is not allowed to take %s's lock from %s", holder.ID(), req.ID(), owner.ID())
}
err := LockLockables(ctx, req_state.Requirements(), req, req_state, states)
err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes)
return err
})
if err != nil {
@ -389,7 +389,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold
}
}
} else {
err := LockLockables(ctx, req_state.Requirements(), req, req_state, states)
err := LockLockables(ctx, req_state.Requirements(), req, req_state, nodes)
if err != nil {
return err
}
@ -398,7 +398,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold
// At this point state modification will be started, so no errors can be returned
for _, req := range(to_lock) {
req_state := states[req.ID()].(LockableState)
req_state := req.State().(LockableState)
old_owner := req_state.Owner()
req_state.SetOwner(holder)
if req.ID() == holder.ID() {
@ -418,7 +418,7 @@ func LockLockables(ctx * GraphContext, to_lock []Lockable, holder Lockable, hold
return err
}
func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, holder_state LockableState, states NodeStateMap) error {
func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable, holder_state LockableState, nodes NodeMap) error {
if to_unlock == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided")
}
@ -449,14 +449,11 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable,
node_list[i] = l
}
err := UpdateMoreStates(ctx, node_list, states, func(states NodeStateMap) error {
err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_unlock) {
req_state, ok := states[req.ID()].(LockableState)
req_state := req.State().(LockableState)
ctx.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), holder.ID())
if ok == false {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s(requirement of %s) does not have a LockableState", req.ID(), holder.ID())
}
// Check if the owner is correct
if req_state.Owner() != nil {
@ -473,7 +470,7 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable,
return err
}
err = UnlockLockables(ctx, req_state.Requirements(), req, req_state, states)
err = UnlockLockables(ctx, req_state.Requirements(), req, req_state, nodes)
if err != nil {
return err
}
@ -481,7 +478,7 @@ func UnlockLockables(ctx * GraphContext, to_unlock []Lockable, holder Lockable,
// At this point state modification will be started, so no errors can be returned
for _, req := range(to_unlock) {
req_state := states[req.ID()].(LockableState)
req_state := req.State().(LockableState)
var new_owner Lockable = nil
if holder_state == nil {
new_owner = req_state.ReturnLock(req.ID())

@ -24,7 +24,7 @@ func TestRepeatedChildLockable(t * testing.T) {
_, err = NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{r1, r1})
if err == nil {
t.Fatal("Added the same lockable as a child twice to the same lockable")
t.Fatal("Added the same lockable as a requirement twice to the same lockable")
}
}
@ -34,7 +34,9 @@ func TestLockableSelfLock(t * testing.T) {
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
err = LockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
return LockLockables(ctx, []Lockable{r1}, r1, nil, nodes)
})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) {
@ -46,7 +48,9 @@ func TestLockableSelfLock(t * testing.T) {
})
fatalErr(t, err)
err = UnlockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
return UnlockLockables(ctx, []Lockable{r1}, r1, nil, nodes)
})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) {
@ -72,7 +76,9 @@ func TestLockableSelfLockTiered(t * testing.T) {
r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1, r2})
fatalErr(t, err)
err = LockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error {
return LockLockables(ctx, []Lockable{r3}, r3, nil, nodes)
})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states NodeStateMap) (error) {
@ -89,7 +95,9 @@ func TestLockableSelfLockTiered(t * testing.T) {
})
fatalErr(t, err)
err = UnlockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error {
return UnlockLockables(ctx, []Lockable{r3}, r3, nil, nodes)
})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states NodeStateMap) (error) {
@ -122,9 +130,9 @@ func TestLockableLockOther(t * testing.T) {
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) {
node_state := states[r2.ID()].(LockableState)
err := LockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r1, r2}, func(nodes NodeMap) (error) {
node_state := r2.State().(LockableState)
err := LockLockables(ctx, []Lockable{r1}, r2, node_state, nodes)
fatalErr(t, err)
return nil
})
@ -140,9 +148,9 @@ func TestLockableLockOther(t * testing.T) {
})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) {
node_state := states[r2.ID()].(LockableState)
err := UnlockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) (error) {
node_state := r2.State().(LockableState)
err := UnlockLockables(ctx, []Lockable{r1}, r2, node_state, nodes)
fatalErr(t, err)
return nil
})
@ -169,12 +177,14 @@ func TestLockableLockSimpleConflict(t * testing.T) {
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{})
fatalErr(t, err)
err = LockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
return LockLockables(ctx, []Lockable{r1}, r1, nil, nodes)
})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r2}, func(states NodeStateMap) (error) {
node_state := states[r2.ID()].(LockableState)
err := LockLockables(ctx, []Lockable{r1}, r2, node_state, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) (error) {
node_state := r2.State().(LockableState)
err := LockLockables(ctx, []Lockable{r1}, r2, node_state, nodes)
if err == nil {
t.Fatal("r2 took r1's lock from itself")
}
@ -193,7 +203,9 @@ func TestLockableLockSimpleConflict(t * testing.T) {
})
fatalErr(t, err)
err = UnlockLockables(ctx, []Lockable{r1}, r1, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
return UnlockLockables(ctx, []Lockable{r1}, r1, nil, nodes)
})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{r1}, func(states NodeStateMap) (error) {
@ -220,10 +232,14 @@ func TestLockableLockTieredConflict(t * testing.T) {
r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1})
fatalErr(t, err)
err = LockLockables(ctx, []Lockable{r2}, r2, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) error {
return LockLockables(ctx, []Lockable{r2}, r2, nil, nodes)
})
fatalErr(t, err)
err = LockLockables(ctx, []Lockable{r3}, r3, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error {
return LockLockables(ctx, []Lockable{r3}, r3, nil, nodes)
})
if err == nil {
t.Fatal("Locked r3 which depends on r1 while r2 which depends on r1 is already locked")
}
@ -329,6 +345,13 @@ func TestLockableDBLoad(t * testing.T){
fatalErr(t, err)
_, err = NewSimpleBaseLockable(ctx, "Test Lockable 5", []Lockable{l4})
fatalErr(t, err)
l6, err := NewSimpleBaseLockable(ctx, "Test Lockable 6", []Lockable{})
err = UpdateStates(ctx, []GraphNode{l6, l3}, func(nodes NodeMap) error {
l6_state := l6.State().(LockableState)
err := LockLockables(ctx, []Lockable{l3}, l6, l6_state, nodes)
return err
})
fatalErr(t, err)
_, err = LoadNode(ctx, l3.ID())
fatalErr(t, err)

@ -190,9 +190,9 @@ func LinkThreads(ctx * GraphContext, thread Thread, child Thread, info ThreadInf
}
err := UpdateStates(ctx, []GraphNode{thread, child}, func(states NodeStateMap) error {
thread_state := states[thread.ID()].(ThreadState)
child_state := states[child.ID()].(ThreadState)
err := UpdateStates(ctx, []GraphNode{thread, child}, func(nodes NodeMap) error {
thread_state := thread.State().(ThreadState)
child_state := child.State().(ThreadState)
if child_state.Parent() != nil {
return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID())
@ -284,7 +284,9 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_
func RunThread(ctx * GraphContext, thread Thread) error {
ctx.Log.Logf("thread", "THREAD_RUN: %s", thread.ID())
err := LockLockables(ctx, []Lockable{thread}, thread, nil, NodeStateMap{})
err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) {
return LockLockables(ctx, []Lockable{thread}, thread, nil, nodes)
})
if err != nil {
return err
}
@ -332,7 +334,9 @@ func RunThread(ctx * GraphContext, thread Thread) error {
return err
}
err = UnlockLockables(ctx, []Lockable{thread}, thread, nil, NodeStateMap{})
err = UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) {
return UnlockLockables(ctx, []Lockable{thread}, thread, nil, nodes)
})
if err != nil {
ctx.Log.Logf("thread", "THREAD_RUN_UNLOCK_ERR: %e", err)
return err