From 30971f00bd56e462fc0dd3f27b2d70a351b57592 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Fri, 23 Jun 2023 20:56:09 -0600 Subject: [PATCH] Renamed resource to lockable and event to thread --- graph.go | 18 +- lockable.go | 363 ++++++++++++++++ resource_test.go => lockable_test.go | 105 ++--- resource.go | 355 ---------------- thread.go | 595 +++++++++++++++++++++++++++ 5 files changed, 1022 insertions(+), 414 deletions(-) create mode 100644 lockable.go rename resource_test.go => lockable_test.go (55%) delete mode 100644 resource.go create mode 100644 thread.go diff --git a/graph.go b/graph.go index fd64154..7b1eed1 100644 --- a/graph.go +++ b/graph.go @@ -8,6 +8,7 @@ import ( "github.com/rs/zerolog" "fmt" badger "github.com/dgraph-io/badger/v3" + "encoding/json" ) type GraphContext struct { @@ -173,10 +174,7 @@ func NewCancelSignal(source GraphNode) BaseSignal { } type NodeState interface { - Serialize() []byte - OriginalLockHolder(id NodeID) GraphNode - AllowedToTakeLock(id NodeID) bool - RecordLockHolder(id NodeID, lock_holder GraphNode) NodeState + } // GraphNode is the interface common to both DAG nodes and Event tree nodes @@ -188,7 +186,6 @@ type GraphNode interface { StateLock() *sync.RWMutex SetState(new_state NodeState) - DeserializeState([]byte) NodeState // Signal propagation function for listener channels UpdateListeners(ctx * GraphContext, update GraphSignal) @@ -246,16 +243,16 @@ func (node * BaseNode) StateLock() * sync.RWMutex { return &node.state_lock } -func (node * BaseNode) DeserializeState([]byte) NodeState { - return nil -} - func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error { ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state) var serialized_state []byte = nil if state != nil { - serialized_state = state.Serialize() + ser, err := json.Marshal(state) + if err != nil { + return fmt.Errorf("DB_MARSHAL_ERROR: %e", err) + } + serialized_state = ser } else { serialized_state = []byte{} } @@ -272,6 +269,7 @@ func (node * BaseNode) SetState(new_state NodeState) { node.state = new_state } +// How to prevent the states from being modified if they're pointer receivers? func UseStates(ctx * GraphContext, nodes []GraphNode, states_fn func(states []NodeState)(interface{}, error)) (interface{}, error) { for _, node := range(nodes) { node.StateLock().RLock() diff --git a/lockable.go b/lockable.go new file mode 100644 index 0000000..0ec2a3d --- /dev/null +++ b/lockable.go @@ -0,0 +1,363 @@ +package graphvent + +import ( + "fmt" + "encoding/json" +) + +// Any struct that wants to hold a lock must implement this interface +type LockHolderState interface { + OriginalLockHolder(id NodeID) GraphNode + AllowedToTakeLock(id NodeID) bool + RecordLockHolder(id NodeID, lock_holder GraphNode) +} + +// Any node that wants to be connected to the lockable DAG must implement this interface +type LockableState interface { + LockHolderState + Name() string + Requirements() []Lockable + AddRequirement(requirement Lockable) + Dependencies() []Lockable + AddDependency(dependency Lockable) + Owner() GraphNode + SetOwner(owner GraphNode) +} + +type BaseLockableState struct { + name string + owner GraphNode + requirements []Lockable + dependencies []Lockable +} + +func (state * BaseLockableState) MarshalJSON() ([]byte, error) { + requirement_ids := make([]NodeID, len(state.requirements)) + for i, requirement := range(state.requirements) { + requirement_ids[i] = requirement.ID() + } + + dependency_ids := make([]NodeID, len(state.dependencies)) + for i, dependency := range(state.dependencies) { + dependency_ids[i] = dependency.ID() + } + + var owner_id *NodeID = nil + if state.owner != nil { + new_str := state.owner.ID() + owner_id = &new_str + } + + return json.Marshal(&struct{ + Name string `json:"name"` + Owner *NodeID `json:"owner"` + Dependencies []NodeID `json:"dependencies"` + Requirements []NodeID `json:"requirements"` + }{ + Name: state.name, + Owner: owner_id, + Dependencies: dependency_ids, + Requirements: requirement_ids, + }) +} + +func (state * BaseLockableState) Name() string { + return state.name +} + +// Locks cannot be passed between base lockables, so the answer to +// "who used to own this lock held by a base lockable" is always "nobody" +func (state * BaseLockableState) OriginalLockHolder(id NodeID) GraphNode { + return nil +} + +// Nothing can take a lock from a base lockable either +func (state * BaseLockableState) AllowedToTakeLock(id NodeID) bool { + return false +} + +func (state * BaseLockableState) RecordLockHolder(id NodeID, lock_holder GraphNode) { + if lock_holder != nil { + panic("Attempted to delegate a lock to a lockable") + } +} + +func (state * BaseLockableState) Owner() GraphNode { + return state.owner +} + +func (state * BaseLockableState) SetOwner(owner GraphNode) { + state.owner = owner +} + +func (state * BaseLockableState) Requirements() []Lockable { + return state.requirements +} + +func (state * BaseLockableState) AddRequirement(requirement Lockable) { + if requirement == nil { + panic("Will not connect nil to the DAG") + } + state.requirements = append(state.requirements, requirement) +} + +func (state * BaseLockableState) Dependencies() []Lockable { + return state.dependencies +} + +func (state * BaseLockableState) AddDependency(dependency Lockable) { + if dependency == nil { + panic("Will not connect nil to the DAG") + } + + state.dependencies = append(state.dependencies, dependency) +} + +func NewLockableState(name string) BaseLockableState { + return BaseLockableState{ + name: name, + owner: nil, + requirements: []Lockable{}, + dependencies: []Lockable{}, + } +} + +// Link a lockable with a requirement +func LinkLockables(ctx * GraphContext, lockable Lockable, requirement Lockable) error { + if lockable == nil || requirement == nil { + return fmt.Errorf("Will not connect nil to DAG") + } + + if lockable.ID() == requirement.ID() { + return fmt.Errorf("Will not link %s as requirement of itself", lockable.ID()) + } + + _, err := UpdateStates(ctx, []GraphNode{lockable, requirement}, func(states []NodeState) ([]NodeState, interface{}, error) { + lockable_state := states[0].(LockableState) + requirement_state := states[1].(LockableState) + + if checkIfRequirement(ctx, lockable_state, lockable.ID(), requirement_state, requirement.ID()) == true { + return nil, nil, fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID(), lockable.ID()) + } + + if checkIfRequirement(ctx, requirement_state, requirement.ID(), lockable_state, lockable.ID()) == true { + return nil, nil, fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID(), requirement.ID()) + } + + lockable_state.AddRequirement(requirement) + requirement_state.AddDependency(lockable) + return []NodeState{lockable_state, requirement_state}, nil, nil + }) + return err +} + +type Lockable interface { + GraphNode + // Called when locking the node to allow for custom lock behaviour + Lock(node GraphNode, state LockableState) error + // Called when unlocking the node to allow for custom lock behaviour + Unlock(node GraphNode, state LockableState) error +} + +// Lockables propagate update up to multiple dependencies, and not downwards +// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) +func (lockable * BaseLockable) PropagateUpdate(ctx * GraphContext, signal GraphSignal) { + UseStates(ctx, []GraphNode{lockable}, func(states []NodeState) (interface{}, error){ + lockable_state := states[0].(LockableState) + if signal.Direction() == Up { + // Child->Parent, lockable updates dependency lockables + for _, dependency := range lockable_state.Dependencies() { + SendUpdate(ctx, dependency, signal) + } + } else if signal.Direction() == Down { + // Parent->Child, lockable updates lock holder + if lockable_state.Owner() != nil { + SendUpdate(ctx, lockable_state.Owner(), signal) + } + + for _, requirement := range(lockable_state.Requirements()) { + SendUpdate(ctx, requirement, signal) + } + } else if signal.Direction() == Direct { + } else { + panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction())) + } + return nil, nil + }) +} + +func checkIfRequirement(ctx * GraphContext, r LockableState, r_id NodeID, cur LockableState, cur_id NodeID) bool { + for _, c := range(cur.Requirements()) { + if c.ID() == r_id { + return true + } + val, _ := UseStates(ctx, []GraphNode{c}, func(states []NodeState) (interface{}, error) { + requirement_state := states[0].(LockableState) + return checkIfRequirement(ctx, cur, cur_id, requirement_state, c.ID()), nil + }) + + is_requirement := val.(bool) + if is_requirement { + return true + } + } + + return false +} + +func UnlockLockable(ctx * GraphContext, lockable Lockable, node GraphNode, node_state LockHolderState) error { + if node == nil || lockable == nil{ + panic("Cannot unlock without a specified node and lockable") + } + _, err := UpdateStates(ctx, []GraphNode{lockable}, func(states []NodeState) ([]NodeState, interface{}, error) { + if lockable.ID() == node.ID() { + if node_state != nil { + panic("node_state must be nil if unlocking lockable from itself") + } + node_state = states[0].(LockHolderState) + } + lockable_state := states[0].(LockableState) + + if lockable_state.Owner() == nil { + return nil, nil, fmt.Errorf("Lockable already unlocked") + } + + if lockable_state.Owner().ID() != node.ID() { + return nil, nil, fmt.Errorf("Lockable %s not locked by %s", lockable.ID(), node.ID()) + } + + var lock_err error = nil + for _, requirement := range(lockable_state.Requirements()) { + var err error = nil + err = UnlockLockable(ctx, requirement, node, node_state) + if err != nil { + lock_err = err + break + } + } + + if lock_err != nil { + return nil, nil, fmt.Errorf("Lockable %s failed to unlock: %e", lockable.ID(), lock_err) + } + + new_owner := node_state.OriginalLockHolder(lockable.ID()) + lockable_state.SetOwner(new_owner) + err := lockable.Unlock(node, lockable_state) + if err != nil { + return nil, nil, fmt.Errorf("Lockable %s failed custom Unlock: %e", lockable.ID(), err) + } + + if lockable_state.Owner() == nil { + ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", node.ID(), lockable.ID()) + } else { + ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", node.ID(), lockable.ID(), lockable_state.Owner().ID()) + } + + return []NodeState{lockable_state}, nil, nil + }) + + return err +} + +func LockLockable(ctx * GraphContext, lockable Lockable, node GraphNode, node_state LockHolderState) error { + if node == nil || lockable == nil { + panic("Cannot lock without a specified node and lockable") + } + + _, err := UpdateStates(ctx, []GraphNode{lockable}, func(states []NodeState) ([]NodeState, interface{}, error) { + if lockable.ID() == node.ID() { + if node_state != nil { + panic("node_state must be nil if locking lockable from itself") + } + node_state = states[0].(LockHolderState) + } + lockable_state := states[0].(LockableState) + if lockable_state.Owner() != nil { + var lock_pass_allowed bool = false + + if lockable_state.Owner().ID() == lockable.ID() { + lock_pass_allowed = lockable_state.AllowedToTakeLock(node.ID()) + } else { + tmp, _ := UseStates(ctx, []GraphNode{lockable_state.Owner()}, func(states []NodeState)(interface{}, error){ + return states[0].(LockHolderState).AllowedToTakeLock(node.ID()), nil + }) + lock_pass_allowed = tmp.(bool) + } + + + if lock_pass_allowed == false { + return nil, nil, fmt.Errorf("%s is not allowed to take a lock from %s", node.ID(), lockable_state.Owner().ID()) + } + } + + err := lockable.Lock(node, lockable_state) + if err != nil { + return nil, nil, fmt.Errorf("Failed to lock lockable: %e", err) + } + + var lock_err error = nil + locked_requirements := []Lockable{} + for _, requirement := range(lockable_state.Requirements()) { + err = LockLockable(ctx, requirement, node, node_state) + if err != nil { + lock_err = err + break + } + locked_requirements = append(locked_requirements, requirement) + } + + if lock_err != nil { + for _, locked_lockable := range(locked_requirements) { + err = UnlockLockable(ctx, locked_lockable, node, node_state) + if err != nil { + panic(err) + } + } + return nil, nil, fmt.Errorf("Lockable failed to lock: %e", lock_err) + } + + old_owner := lockable_state.Owner() + lockable_state.SetOwner(node) + node_state.RecordLockHolder(node.ID(), old_owner) + + if old_owner == nil { + ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", node.ID(), lockable.ID()) + } else { + ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", node.ID(), lockable.ID(), old_owner.ID()) + } + + return []NodeState{lockable_state}, nil, nil + }) + + return err +} + +// BaseLockables represent simple lockables in the DAG that can be used to create a hierarchy of locks that store names +type BaseLockable struct { + BaseNode +} + +//BaseLockables don't check anything special when locking/unlocking +func (lockable * BaseLockable) Lock(node GraphNode, state LockableState) error { + return nil +} + +func (lockable * BaseLockable) Unlock(node GraphNode, state LockableState) error { + return nil +} + +func NewLockable(ctx * GraphContext, name string, requirements []Lockable) (* BaseLockable, error) { + state := NewLockableState(name) + lockable := &BaseLockable{ + BaseNode: NewNode(ctx, RandID(), &state), + } + + for _, requirement := range(requirements) { + err := LinkLockables(ctx, lockable, requirement) + if err != nil { + return nil, err + } + } + + return lockable, nil +} diff --git a/resource_test.go b/lockable_test.go similarity index 55% rename from resource_test.go rename to lockable_test.go index 98923cc..9b8022b 100644 --- a/resource_test.go +++ b/lockable_test.go @@ -3,41 +3,42 @@ package graphvent import ( "testing" "fmt" + "encoding/json" ) -func TestNewResource(t * testing.T) { +func TestNewLockable(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - _, err = NewResource(ctx, "Test resource 2", []Resource{r1}) + _, err = NewLockable(ctx, "Test lockable 2", []Lockable{r1}) fatalErr(t, err) } -func TestRepeatedChildResource(t * testing.T) { +func TestRepeatedChildLockable(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - _, err = NewResource(ctx, "Test resource 2", []Resource{r1, r1}) + _, err = NewLockable(ctx, "Test lockable 2", []Lockable{r1, r1}) if err == nil { - t.Fatal("Added the same resource as a child twice to the same resource") + t.Fatal("Added the same lockable as a child twice to the same lockable") } } -func TestResourceSelfLock(t * testing.T) { +func TestLockableSelfLock(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - _, err = LockResource(ctx, r1, r1, nil) + err = LockLockable(ctx, r1, r1, nil) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner_id := states[0].(ResourceState).owner.ID() + owner_id := states[0].(LockableState).Owner().ID() if owner_id != r1.ID() { return nil, fmt.Errorf("r1 is owned by %s instead of self", owner_id) } @@ -45,11 +46,11 @@ func TestResourceSelfLock(t * testing.T) { }) fatalErr(t, err) - _, err = UnlockResource(ctx, r1, r1, nil) + err = UnlockLockable(ctx, r1, r1, nil) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner := states[0].(ResourceState).owner + owner := states[0].(LockableState).Owner() if owner != nil { return nil, fmt.Errorf("r1 is not unowned after unlock: %s", owner.ID()) } @@ -59,50 +60,53 @@ func TestResourceSelfLock(t * testing.T) { fatalErr(t, err) } -func TestResourceSelfLockTiered(t * testing.T) { +func TestLockableSelfLockTiered(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - r2, err := NewResource(ctx, "Test resource 1", []Resource{}) + r2, err := NewLockable(ctx, "Test lockable 2", []Lockable{}) fatalErr(t, err) - r3, err := NewResource(ctx, "Test resource 3", []Resource{r1, r2}) + r3, err := NewLockable(ctx, "Test lockable 3", []Lockable{r1, r2}) fatalErr(t, err) - _, err = LockResource(ctx, r3, r3, nil) + err = LockLockable(ctx, r3, r3, nil) fatalErr(t, err) - _, err = UseStates(ctx, []GraphNode{r1, r2}, func(states []NodeState) (interface{}, error) { - owner_1_id := states[0].(ResourceState).owner.ID() + _, err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states []NodeState) (interface{}, error) { + owner_1_id := states[0].(LockableState).Owner().ID() if owner_1_id != r3.ID() { return nil, fmt.Errorf("r1 is owned by %s instead of r3", owner_1_id) } - owner_2_id := states[1].(ResourceState).owner.ID() + owner_2_id := states[1].(LockableState).Owner().ID() if owner_2_id != r3.ID() { return nil, fmt.Errorf("r2 is owned by %s instead of r3", owner_2_id) } + ser, _ := json.MarshalIndent(states, "", " ") + fmt.Printf("\n\n%s\n\n", ser) + return nil, nil }) fatalErr(t, err) - _, err = UnlockResource(ctx, r3, r3, nil) + err = UnlockLockable(ctx, r3, r3, nil) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states []NodeState) (interface{}, error) { - owner_1 := states[0].(ResourceState).owner + owner_1 := states[0].(LockableState).Owner() if owner_1 != nil { return nil, fmt.Errorf("r1 is not unowned after unlocking: %s", owner_1.ID()) } - owner_2 := states[1].(ResourceState).owner + owner_2 := states[1].(LockableState).Owner() if owner_2 != nil { return nil, fmt.Errorf("r2 is not unowned after unlocking: %s", owner_2.ID()) } - owner_3 := states[2].(ResourceState).owner + owner_3 := states[2].(LockableState).Owner() if owner_3 != nil { return nil, fmt.Errorf("r3 is not unowned after unlocking: %s", owner_3.ID()) } @@ -112,24 +116,25 @@ func TestResourceSelfLockTiered(t * testing.T) { fatalErr(t, err) } -func TestResourceLockOther(t * testing.T) { +func TestLockableLockOther(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - r2, err := NewResource(ctx, "Test resource 2", []Resource{}) + r2, err := NewLockable(ctx, "Test lockable 2", []Lockable{}) fatalErr(t, err) _, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) { - new_state, err := LockResource(ctx, r1, r2, states[0]) + node_state := states[0].(LockHolderState) + err := LockLockable(ctx, r1, r2, node_state) fatalErr(t, err) - return []NodeState{new_state}, nil, nil + return []NodeState{node_state}, nil, nil }) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner_id := states[0].(ResourceState).owner.ID() + owner_id := states[0].(LockableState).Owner().ID() if owner_id != r2.ID() { return nil, fmt.Errorf("r1 is owned by %s instead of r2", owner_id) } @@ -139,14 +144,15 @@ func TestResourceLockOther(t * testing.T) { fatalErr(t, err) _, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) { - new_state, err := UnlockResource(ctx, r1, r2, states[0]) + node_state := states[0].(LockHolderState) + err := UnlockLockable(ctx, r1, r2, node_state) fatalErr(t, err) - return []NodeState{new_state}, nil, nil + return []NodeState{node_state}, nil, nil }) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner := states[0].(ResourceState).owner + owner := states[0].(LockableState).Owner() if owner != nil { return nil, fmt.Errorf("r1 is owned by %s instead of r2", owner.ID()) } @@ -157,30 +163,31 @@ func TestResourceLockOther(t * testing.T) { } -func TestResourceLockSimpleConflict(t * testing.T) { +func TestLockableLockSimpleConflict(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - r2, err := NewResource(ctx, "Test resource 2", []Resource{}) + r2, err := NewLockable(ctx, "Test lockable 2", []Lockable{}) fatalErr(t, err) - _, err = LockResource(ctx, r1, r1, nil) + err = LockLockable(ctx, r1, r1, nil) fatalErr(t, err) _, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) { - new_state, err := LockResource(ctx, r1, r2, states[0]) + node_state := states[0].(LockHolderState) + err := LockLockable(ctx, r1, r2, node_state) if err == nil { t.Fatal("r2 took r1's lock from itself") } - return []NodeState{new_state}, nil, nil + return []NodeState{node_state}, nil, nil }) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner_id := states[0].(ResourceState).owner.ID() + owner_id := states[0].(LockableState).Owner().ID() if owner_id != r1.ID() { return nil, fmt.Errorf("r1 is owned by %s instead of r1", owner_id) } @@ -189,11 +196,11 @@ func TestResourceLockSimpleConflict(t * testing.T) { }) fatalErr(t, err) - _, err = UnlockResource(ctx, r1, r1, nil) + err = UnlockLockable(ctx, r1, r1, nil) fatalErr(t, err) _, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) { - owner := states[0].(ResourceState).owner + owner := states[0].(LockableState).Owner() if owner != nil { return nil, fmt.Errorf("r1 is owned by %s instead of r1", owner.ID()) } @@ -204,22 +211,22 @@ func TestResourceLockSimpleConflict(t * testing.T) { } -func TestResourceLockTieredConflict(t * testing.T) { +func TestLockableLockTieredConflict(t * testing.T) { ctx := testContext(t) - r1, err := NewResource(ctx, "Test resource 1", []Resource{}) + r1, err := NewLockable(ctx, "Test lockable 1", []Lockable{}) fatalErr(t, err) - r2, err := NewResource(ctx, "Test resource 2", []Resource{r1}) + r2, err := NewLockable(ctx, "Test lockable 2", []Lockable{r1}) fatalErr(t, err) - r3, err := NewResource(ctx, "Test resource 3", []Resource{r1}) + r3, err := NewLockable(ctx, "Test lockable 3", []Lockable{r1}) fatalErr(t, err) - _, err = LockResource(ctx, r2, r2, nil) + err = LockLockable(ctx, r2, r2, nil) fatalErr(t, err) - _, err = LockResource(ctx, r3, r3, nil) + err = LockLockable(ctx, r3, r3, nil) if err == nil { t.Fatal("Locked r3 which depends on r1 while r2 which depends on r1 is already locked") } diff --git a/resource.go b/resource.go deleted file mode 100644 index e4f7585..0000000 --- a/resource.go +++ /dev/null @@ -1,355 +0,0 @@ -package graphvent - -import ( - "fmt" -) - -// Link a resource with a child -func LinkResource(ctx * GraphContext, resource Resource, child Resource) error { - if resource == nil || child == nil { - return fmt.Errorf("Will not connect nil to DAG") - } - _, err := UpdateStates(ctx, []GraphNode{resource, child}, func(states []NodeState) ([]NodeState, interface{}, error) { - resource_state := states[0].(ResourceState) - child_state := states[1].(ResourceState) - - if checkIfChild(ctx, resource_state, resource.ID(), child_state, child.ID()) == true { - return nil, nil, fmt.Errorf("RESOURCE_LINK_ERR: %s is a parent of %s so cannot link as child", child.ID(), resource.ID()) - } - - resource_state.children = append(resource_state.children, child) - child_state.parents = append(child_state.parents, resource) - return []NodeState{resource_state, child_state}, nil, nil - }) - return err -} - -// Link multiple children to a resource -func LinkResources(ctx * GraphContext, resource Resource, children []Resource) error { - if resource == nil || children == nil { - return fmt.Errorf("Invalid input") - } - - found := map[NodeID]bool{} - child_nodes := make([]GraphNode, len(children)) - for i, child := range(children) { - if child == nil { - return fmt.Errorf("Will not connect nil to DAG") - } - _, exists := found[child.ID()] - if exists == true { - return fmt.Errorf("Will not connect the same child twice") - } - found[child.ID()] = true - child_nodes[i] = child - } - - _, err := UpdateStates(ctx, append([]GraphNode{resource}, child_nodes...), func(states []NodeState) ([]NodeState, interface{}, error) { - resource_state := states[0].(ResourceState) - - new_states := make([]ResourceState, len(states)) - for i, state := range(states) { - new_states[i] = state.(ResourceState) - } - - for i, state := range(states[1:]) { - child_state := state.(ResourceState) - - if checkIfChild(ctx, resource_state, resource.ID(), child_state, children[i].ID()) == true { - return nil, nil, fmt.Errorf("RESOURCES_LINK_ERR: %s is a parent of %s so cannot link as child", children[i].ID() , resource.ID()) - } - - new_states[0].children = append(new_states[0].children, children[i]) - new_states[i+1].parents = append(new_states[i+1].parents, resource) - } - ret_states := make([]NodeState, len(states)) - for i, state := range(new_states) { - ret_states[i] = state - } - return ret_states, nil, nil - }) - - return err -} - -type ResourceState struct { - name string - owner GraphNode - children []Resource - parents []Resource -} - -func (state ResourceState) Serialize() []byte { - return []byte(state.name) -} - -// Locks cannot be passed between resources, so the answer to -// "who used to own this lock held by a resource" is always "nobody" -func (state ResourceState) OriginalLockHolder(id NodeID) GraphNode { - return nil -} - -// Nothing can take a lock from a resource -func (state ResourceState) AllowedToTakeLock(id NodeID) bool { - return false -} - -func (state ResourceState) RecordLockHolder(id NodeID, lock_holder GraphNode) NodeState { - if lock_holder != nil { - panic("Attempted to delegate a lock to a resource") - } - - return state -} - -func NewResourceState(name string) ResourceState { - return ResourceState{ - name: name, - owner: nil, - children: []Resource{}, - parents: []Resource{}, - } -} - -// Resource represents a Node which can be locked by another node, -// and needs to own all it's childrens locks before being locked. -// Resource connections form a directed acyclic graph -// Resources do not allow any other nodes to take locks from them -type Resource interface { - GraphNode - - // Called when locking the node to allow for custom lock behaviour - Lock(node GraphNode, state NodeState) (NodeState, error) - // Called when unlocking the node to allow for custom lock behaviour - Unlock(node GraphNode, state NodeState) (NodeState, error) -} - -// Resources propagate update up to multiple parents, and not downwards -// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) -func (resource * BaseResource) PropagateUpdate(ctx * GraphContext, signal GraphSignal) { - UseStates(ctx, []GraphNode{resource}, func(states []NodeState) (interface{}, error){ - resource_state := states[0].(ResourceState) - if signal.Direction() == Up { - // Child->Parent, resource updates parent resources - for _, parent := range resource_state.parents { - SendUpdate(ctx, parent, signal) - } - } else if signal.Direction() == Down { - // Parent->Child, resource updates lock holder - if resource_state.owner != nil { - SendUpdate(ctx, resource_state.owner, signal) - } - - for _, child := range(resource_state.children) { - SendUpdate(ctx, child, signal) - } - } else if signal.Direction() == Direct { - } else { - panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction())) - } - return nil, nil - }) -} - -func checkIfChild(ctx * GraphContext, r ResourceState, r_id NodeID, cur ResourceState, cur_id NodeID) bool { - if r_id == cur_id { - return true - } - - for _, c := range(cur.children) { - val, _ := UseStates(ctx, []GraphNode{c}, func(states []NodeState) (interface{}, error) { - child_state := states[0].(ResourceState) - return checkIfChild(ctx, cur, cur_id, child_state, c.ID()), nil - }) - - is_child := val.(bool) - if is_child { - return true - } - } - - return false -} - -func UnlockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) { - if node == nil || resource == nil{ - panic("Cannot unlock without a specified node and resource") - } - _, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) { - if resource.ID() == node.ID() { - if node_state != nil { - panic("node_state must be nil if unlocking resource from itself") - } - node_state = states[0] - } - resource_state := states[0].(ResourceState) - - if resource_state.owner == nil { - return nil, nil, fmt.Errorf("Resource already unlocked") - } - - if resource_state.owner.ID() != node.ID() { - return nil, nil, fmt.Errorf("Resource %s not locked by %s", resource.ID(), node.ID()) - } - - var lock_err error = nil - for _, child := range(resource_state.children) { - var err error = nil - node_state, err = UnlockResource(ctx, child, node, node_state) - if err != nil { - lock_err = err - break - } - } - - if lock_err != nil { - return nil, nil, fmt.Errorf("Resource %s failed to unlock: %e", resource.ID(), lock_err) - } - - resource_state.owner = node_state.OriginalLockHolder(resource.ID()) - unlock_state, err := resource.Unlock(node, resource_state) - resource_state = unlock_state.(ResourceState) - if err != nil { - return nil, nil, fmt.Errorf("Resource %s failed custom Unlock: %e", resource.ID(), err) - } - - if resource_state.owner == nil { - ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s unlocked %s", node.ID(), resource.ID()) - } else { - ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s passed lock of %s back to %s", node.ID(), resource.ID(), resource_state.owner.ID()) - } - - return []NodeState{resource_state}, nil, nil - }) - - if err != nil { - return nil, err - } - - return node_state, nil -} - -// TODO: State -func LockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) { - if node == nil || resource == nil { - panic("Cannot lock without a specified node and resource") - } - - _, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) { - if resource.ID() == node.ID() { - if node_state != nil { - panic("node_state must be nil if locking resource from itself") - } - node_state = states[0] - } - resource_state := states[0].(ResourceState) - if resource_state.owner != nil { - var lock_pass_allowed bool = false - - if resource_state.owner.ID() == resource.ID() { - lock_pass_allowed = resource_state.AllowedToTakeLock(node.ID()) - } else { - tmp, _ := UseStates(ctx, []GraphNode{resource_state.owner}, func(states []NodeState)(interface{}, error){ - return states[0].AllowedToTakeLock(node.ID()), nil - }) - lock_pass_allowed = tmp.(bool) - } - - - if lock_pass_allowed == false { - return nil, nil, fmt.Errorf("%s is not allowed to take a lock from %s", node.ID(), resource_state.owner.ID()) - } - } - - lock_state, err := resource.Lock(node, resource_state) - if err != nil { - return nil, nil, fmt.Errorf("Failed to lock resource: %e", err) - } - - resource_state = lock_state.(ResourceState) - - var lock_err error = nil - locked_resources := []Resource{} - for _, child := range(resource_state.children) { - node_state, err = LockResource(ctx, child, node, node_state) - if err != nil { - lock_err = err - break - } - locked_resources = append(locked_resources, child) - } - - if lock_err != nil { - for _, locked_resource := range(locked_resources) { - node_state, err = UnlockResource(ctx, locked_resource, node, node_state) - if err != nil { - panic(err) - } - } - return nil, nil, fmt.Errorf("Resource failed to lock: %e", lock_err) - } - - old_owner := resource_state.owner - resource_state.owner = node - node_state = node_state.RecordLockHolder(node.ID(), old_owner) - - if old_owner == nil { - ctx.Log.Logf("resource", "RESOURCE_LOCK: %s locked %s", node.ID(), resource.ID()) - } else { - ctx.Log.Logf("resource", "RESOURCE_LOCK: %s took lock of %s from %s", node.ID(), resource.ID(), old_owner.ID()) - } - - return []NodeState{resource_state}, nil, nil - }) - if err != nil { - return nil, err - } - - return node_state, nil -} - -// BaseResources represent simple resources in the DAG that can be used to create a hierarchy of locks that store names -type BaseResource struct { - BaseNode -} - -//BaseResources don't check anything special when locking/unlocking -func (resource * BaseResource) Lock(node GraphNode, state NodeState) (NodeState, error) { - return state, nil -} - -func (resource * BaseResource) Unlock(node GraphNode, state NodeState) (NodeState, error) { - return state, nil -} - -/*func FindResource(root Event, id string) Resource { - if root == nil || id == ""{ - panic("invalid input") - } - - for _, resource := range(root.Resources()) { - if resource.ID() == id { - return resource - } - } - for _, child := range(root.Children()) { - resource := FindResource(child, id) - if resource != nil { - return resource - } - } - return nil -}*/ - -func NewResource(ctx * GraphContext, name string, children []Resource) (* BaseResource, error) { - resource := &BaseResource{ - BaseNode: NewNode(ctx, RandID(), NewResourceState(name)), - } - - err := LinkResources(ctx, resource, children) - if err != nil { - return nil, err - } - - return resource, nil -} diff --git a/thread.go b/thread.go new file mode 100644 index 0000000..602d530 --- /dev/null +++ b/thread.go @@ -0,0 +1,595 @@ +package graphvent + +import ( + "fmt" + "time" + "errors" + "reflect" + "sync" + "encoding/json" +) + +// Update the threads listeners, and notify the parent to do the same +func (thread * BaseThread) PropagateUpdate(ctx * GraphContext, signal GraphSignal) { + UseStates(ctx, []GraphNode{thread}, func(states []NodeState) (interface{}, error) { + thread_state := states[0].(ThreadState) + if signal.Direction() == Up { + // Child->Parent, thread updates parent and connected resources + if thread_state.Parent() != nil { + SendUpdate(ctx, thread_state.Parent(), signal) + } + + for _, resource := range(thread_state.Lockables()) { + SendUpdate(ctx, resource, signal) + } + } else if signal.Direction() == Down { + // Parent->Child, thread updated children + for _, child := range(thread_state.Children()) { + SendUpdate(ctx, child, signal) + } + } else if signal.Direction() == Direct { + + } else { + panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction())) + } + + return nil, nil + }) + thread.signal <- signal +} + +/*func FindLockable(root Thread, id string) Lockable { + if root == nil || id == ""{ + panic("invalid input") + } + + for _, resource := range(root.Lockables()) { + if resource.ID() == id { + return resource + } + } + for _, child := range(root.Children()) { + resource := FindLockable(child, id) + if resource != nil { + return resource + } + } + return nil +}*/ + +type ThreadInfo interface { +} + +// An Thread is a lockable that has an additional parent->child relationship with other Threads +// This relationship allows the thread tree to be modified independent of the lockable state +type ThreadState interface { + LockHolderState + LockableState + + Parent() Thread + SetParent(parent Thread) + Children() []Thread + ChildInfo(child NodeID) ThreadInfo + AddChild(child Thread, info ThreadInfo) error + Lockables() []Lockable + AddLockable(resource Lockable) error +} + +type BaseThreadState struct { + BaseLockableState + parent Thread + children []Thread + child_info map[NodeID] ThreadInfo + resources map[NodeID]Lockable + delegation_map map[NodeID]GraphNode + info_type reflect.Type +} + +func (state * BaseThreadState) MarshalJSON() ([]byte, error) { + children := map[NodeID]interface{}{} + for _, child := range(state.children) { + children[child.ID()] = state.child_info[child.ID()] + } + + var parent_id *NodeID = nil + if state.parent != nil { + new_str := state.parent.ID() + parent_id = &new_str + } + + resources := map[NodeID]*NodeID{} + for _, resource := range(state.resources) { + original_owner := state.delegation_map[resource.ID()] + if original_owner != nil { + owner := original_owner.ID() + resources[resource.ID()] = &owner + } else { + resources[resource.ID()] = nil + } + } + + return json.Marshal(&struct{ + Name string `json:"name"` + Parent *NodeID `json:"parent"` + Children map[NodeID]interface{} `json:"children"` + Lockables map[NodeID]*NodeID `json:"resources"` + }{ + Name: state.Name(), + Parent: parent_id, + Children: children, + Lockables: resources, + }) +} + +func (state * BaseThreadState) Parent() Thread { + return state.parent +} + +func (state * BaseThreadState) SetParent(parent Thread) { + state.parent = parent +} + +func (state * BaseThreadState) Children() []Thread { + return state.children +} + +func (state * BaseThreadState) ChildInfo(child NodeID) ThreadInfo { + return state.child_info[child] +} + +func (state * BaseThreadState) AddChild(child Thread, info ThreadInfo) error { + if child == nil { + return fmt.Errorf("Will not connect nil to the thread tree") + } + + _, exists := state.child_info[child.ID()] + if exists == true { + return fmt.Errorf("Will not connect the same child twice") + } + + if info == nil && state.info_type != nil { + return fmt.Errorf("nil info passed when expecting info") + } else if info != nil { + if reflect.TypeOf(info) != state.info_type { + return fmt.Errorf("info type mismatch, expecting %+v", state.info_type) + } + } + + state.children = append(state.children, child) + state.child_info[child.ID()] = info + + return nil +} + +func checkIfChild(ctx * GraphContext, thread_state ThreadState, thread_id NodeID, cur_state ThreadState, cur_id NodeID) bool { + for _, child := range(cur_state.Children()) { + if child.ID() == thread_id { + return true + } + val, _ := UseStates(ctx, []GraphNode{child}, func(states []NodeState) (interface{}, error) { + child_state := states[0].(ThreadState) + return checkIfRequirement(ctx, cur_state, cur_id, child_state, child.ID()), nil + }) + + is_child := val.(bool) + if is_child { + return true + } + } + + return false +} + +func LinkThreads(ctx * GraphContext, thread Thread, child Thread, info ThreadInfo) error { + if ctx == nil || thread == nil || child == nil { + return fmt.Errorf("invalid input") + } + + if thread.ID() == child.ID() { + return fmt.Errorf("Will not link %s as a child of itself", thread.ID()) + } + + + _, err := UpdateStates(ctx, []GraphNode{thread, child}, func(states []NodeState) ([]NodeState, interface{}, error) { + thread_state := states[0].(ThreadState) + child_state := states[1].(ThreadState) + + if child_state.Parent() != nil { + return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID()) + } + + if checkIfChild(ctx, thread_state, thread.ID(), child_state, child.ID()) == true { + return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s is a child of %s so cannot add as parent", thread.ID(), child.ID()) + } + + if checkIfChild(ctx, child_state, child.ID(), thread_state, thread.ID()) == true { + return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s is already a parent of %s so will not add again", thread.ID(), child.ID()) + } + + err := thread_state.AddChild(child, info) + if err != nil { + return nil, nil, fmt.Errorf("EVENT_LINK_ERR: error adding %s as child to %s: %e", child.ID(), thread.ID(), err) + } + child_state.SetParent(thread) + + return states, nil, nil + }) + + if err != nil { + return err + } + + return nil +} + +// Threads allow locks to pass to their requirements, but they won't allow cycles +func (state * BaseThreadState) OriginalLockHolder(id NodeID) GraphNode { + node, exists := state.delegation_map[id] + if exists == false { + panic("Attempted to take a get the original lock holder of a resource we don't own") + } + delete(state.delegation_map, id) + return node +} + +func (state * BaseThreadState) AllowedToTakeLock(id NodeID) bool { + return false +} + +func (state * BaseThreadState) RecordLockHolder(id NodeID, lock_holder GraphNode) { + _, exists := state.delegation_map[id] + if exists == true { + panic("Attempted to lock a resource we're already holding(lock cycle)") + } + + state.delegation_map[id] = lock_holder +} + +// Thread is the interface that thread tree nodes must implement +type Thread interface { + GraphNode + + Action(action string) (func(* GraphContext)(string, error), bool) + Handler(signal_type string) (func(* GraphContext, GraphSignal) (string, error), bool) + + SetTimeout(end_time time.Time, action string) + ClearTimeout() + Timeout() <-chan time.Time + TimeoutAction() string +} + +func (thread * BaseThread) TimeoutAction() string { + return thread.timeout_action +} + +func (thread * BaseThread) Timeout() <-chan time.Time { + return thread.timeout +} + +func (thread * BaseThread) ClearTimeout() { + thread.timeout_action = "" + thread.timeout = nil +} + +func (thread * BaseThread) SetTimeout(end_time time.Time, action string) { + thread.timeout_action = action + thread.timeout = time.After(time.Until(end_time)) +} + +func (thread * BaseThread) Handler(signal_type string) (func(* GraphContext, GraphSignal)(string, error), bool) { + handler, exists := thread.Handlers[signal_type] + return handler, exists +} + +func FindChild(ctx * GraphContext, thread Thread, thread_state ThreadState, id NodeID) Thread { + if thread == nil { + panic("cannot recurse through nil") + } + if id == thread.ID() { + return thread + } + + + for _, child := range thread_state.Children() { + res, _ := UseStates(ctx, []GraphNode{child}, func(states []NodeState) (interface{}, error) { + child_state := states[0].(ThreadState) + result := FindChild(ctx, child, child_state, id) + return result, nil + }) + result := res.(Thread) + if result != nil { + return result + } + } + + return nil +} + +func RunThread(ctx * GraphContext, thread Thread) error { + ctx.Log.Logf("thread", "EVENT_RUN: %s", thread.ID()) + + _, err := UseStates(ctx, []GraphNode{thread}, func(states []NodeState) (interface{}, error) { + thread_state := states[0].(ThreadState) + if thread_state.Owner() == nil { + return nil, fmt.Errorf("EVENT_RUN_NOT_LOCKED: %s", thread_state.Name()) + } else if thread_state.Owner().ID() != thread.ID() { + return nil, fmt.Errorf("EVENT_RUN_RESOURCE_ALREADY_LOCKED: %s, %s", thread_state.Name(), thread_state.Owner().ID()) + } + return nil, nil + }) + + SendUpdate(ctx, thread, NewSignal(thread, "thread_start")) + + next_action := "start" + for next_action != "" { + action, exists := thread.Action(next_action) + if exists == false { + error_str := fmt.Sprintf("%s is not a valid action", next_action) + return errors.New(error_str) + } + + ctx.Log.Logf("thread", "EVENT_ACTION: %s - %s", thread.ID(), next_action) + next_action, err = action(ctx) + if err != nil { + return err + } + } + + SendUpdate(ctx, thread, NewSignal(thread, "thread_done")) + + ctx.Log.Logf("thread", "EVENT_RUN_DONE: %s", thread.ID()) + + return nil +} + +func ThreadAbort(thread Thread) func(*GraphContext, GraphSignal) (string, error) { + return func(ctx * GraphContext, signal GraphSignal) (string, error) { + return "", errors.New(fmt.Sprintf("%s aborted by signal", thread.ID())) + } +} + +func ThreadCancel(thread Thread) func(*GraphContext, GraphSignal) (string, error) { + return func(ctx * GraphContext, signal GraphSignal) (string, error) { + return "", nil + } +} + +// Thread is the most basic thread that can exist in the thread tree. +// On start it automatically transitions to completion. +// This node by itself doesn't implement any special behaviours for children, so they will be ignored. +// When started, this thread automatically transitions to completion +type BaseThread struct { + BaseNode + + resources_lock sync.Mutex + children_lock sync.Mutex + info_lock sync.Mutex + parent_lock sync.Mutex + + Actions map[string]func(* GraphContext) (string, error) + Handlers map[string]func(* GraphContext, GraphSignal) (string, error) + + timeout <-chan time.Time + timeout_action string +} + +func (thread * BaseThread) Lock(node GraphNode, state LockableState) error { + return nil +} + +func (thread * BaseThread) Unlock(node GraphNode, state LockableState) error { + return nil +} + +func (thread * BaseThread) Action(action string) (func(ctx * GraphContext) (string, error), bool) { + action_fn, exists := thread.Actions[action] + return action_fn, exists +} + +func ThreadWait(thread Thread) (func(*GraphContext) (string, error)) { + return func(ctx * GraphContext) (string, error) { + ctx.Log.Logf("thread", "EVENT_WAIT: %s TIMEOUT: %+v", thread.ID(), thread.Timeout()) + select { + case signal := <- thread.SignalChannel(): + ctx.Log.Logf("thread", "EVENT_SIGNAL: %s %+v", thread.ID(), signal) + signal_fn, exists := thread.Handler(signal.Type()) + if exists == true { + ctx.Log.Logf("thread", "EVENT_HANDLER: %s - %s", thread.ID(), signal.Type()) + return signal_fn(ctx, signal) + } + return "wait", nil + case <- thread.Timeout(): + ctx.Log.Logf("thread", "EVENT_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), thread.TimeoutAction()) + return thread.TimeoutAction(), nil + } + } +} + +func NewBaseThread(ctx * GraphContext, name string) (BaseThread, error) { + state := NewBaseThreadState(name) + thread := BaseThread{ + BaseNode: NewNode(ctx, RandID(), &state), + Actions: map[string]func(*GraphContext)(string, error){}, + Handlers: map[string]func(*GraphContext,GraphSignal)(string, error){}, + timeout: nil, + timeout_action: "", + } + return thread, nil +} + +func NewBaseThreadState(name string) BaseThreadState { + return BaseThreadState{ + BaseLockableState: NewLockableState(name), + delegation_map: map[NodeID]GraphNode{}, + children: []Thread{}, + child_info: map[NodeID]ThreadInfo{}, + resources: map[NodeID]Lockable{}, + parent: nil, + } +} + +func NewThread(ctx * GraphContext, name string, requirements []Lockable) (* BaseThread, error) { + thread, err := NewBaseThread(ctx, name) + if err != nil { + return nil, err + } + + thread_ptr := &thread + + for _, requirement := range(requirements) { + err := LinkLockables(ctx, thread_ptr, requirement) + if err != nil { + return nil, err + } + } + + thread_ptr.Actions["wait"] = ThreadWait(thread_ptr) + thread_ptr.Handlers["abort"] = ThreadAbort(thread_ptr) + thread_ptr.Handlers["cancel"] = ThreadCancel(thread_ptr) + + thread_ptr.Actions["start"] = func(ctx * GraphContext) (string, error) { + return "", nil + } + + return thread_ptr, nil +} + +/* +// ThreadQueue is a basic thread that can have children. +// On start, it attempts to start it's children from the highest 'priority' +type ThreadQueueInfo struct { + priority int + state string +} + +func (info * BaseThreadQueueInfo) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct{ + Priority int `json:"priority"` + State string `json:"state"` + }{ + Priority: info.priority, + State: info.state, + }) +} + +func NewThreadQueueInfo(priority int) * BaseThreadQueueInfo { + info := &ThreadQueueInfo{ + priority: priority, + state: "queued", + } + + return info +} + +type ThreadQueue struct { + Thread + listened_resources map[string]Lockable + queue_lock sync.Mutex +} + +func (queue * BaseThreadQueue) Unlock() error { + for _, resource := range(queue.listened_resources) { + resource.UnregisterChannel(queue.signal) + } + return nil +} + +func (queue * BaseThreadQueue) InfoType() reflect.Type { + return reflect.TypeOf((*ThreadQueueInfo)(nil)) +} + +func NewThreadQueue(name string, description string, resources []Lockable) (* BaseThreadQueue, error) { + queue := &ThreadQueue{ + Thread: NewThread(name, description), + listened_resources: map[string]Lockable{}, + } + + queue.state = NewBaseThreadState(name, description) + + AddLockables(queue, resources) + + queue.Actions["wait"] = ThreadWait(queue) + queue.Handlers["abort"] = ThreadAbort(queue) + queue.Handlers["cancel"] = ThreadCancel(queue) + + queue.Actions["start"] = func() (string, error) { + return "queue_thread", nil + } + + queue.Actions["queue_thread"] = func() (string, error) { + // Copy the threads to sort the list + queue.LockChildren() + copied_threads := make([]Thread, len(queue.Children())) + copy(copied_threads, queue.Children()) + less := func(i int, j int) bool { + info_i := queue.ChildInfo(copied_threads[i]).(*ThreadQueueInfo) + info_j := queue.ChildInfo(copied_threads[j]).(*ThreadQueueInfo) + return info_i.priority < info_j.priority + } + sort.SliceStable(copied_threads, less) + + needed_resources := map[string]Lockable{} + for _, thread := range(copied_threads) { + // make sure all the required resources are registered to update the thread + for _, resource := range(thread.Lockables()) { + needed_resources[resource.ID()] = resource + } + + info := queue.ChildInfo(thread).(*ThreadQueueInfo) + thread.LockInfo() + defer thread.UnlockInfo() + if info.state == "queued" { + err := LockLockable(thread) + // start in new goroutine + if err != nil { + } else { + info.state = "running" + Log.Logf("thread", "EVENT_START: %s", thread.Name()) + go func(thread Thread, info * BaseThreadQueueInfo, queue Thread) { + Log.Logf("thread", "EVENT_GOROUTINE: %s", thread.Name()) + err := RunThread(thread) + if err != nil { + Log.Logf("thread", "EVENT_ERROR: %s", err) + } + thread.LockInfo() + defer thread.UnlockInfo() + info.state = "done" + }(thread, info, queue) + } + } + } + + + for _, resource := range(needed_resources) { + _, exists := queue.listened_resources[resource.ID()] + if exists == false { + Log.Logf("thread", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name()) + queue.listened_resources[resource.ID()] = resource + resource.RegisterChannel(queue.signal) + } + } + + queue.UnlockChildren() + + return "wait", nil + } + + queue.Handlers["resource_connected"] = func(signal GraphSignal) (string, error) { + return "queue_thread", nil + } + + queue.Handlers["child_added"] = func(signal GraphSignal) (string, error) { + return "queue_thread", nil + } + + queue.Handlers["lock_changed"] = func(signal GraphSignal) (string, error) { + return "queue_thread", nil + } + + queue.Handlers["thread_done"] = func(signal GraphSignal) (string, error) { + return "queue_thread", nil + } + + return queue, nil +} +*/