Renamed resource to lockable and event to thread
parent
2de5276ecc
commit
30971f00bd
@ -0,0 +1,363 @@
|
||||
package graphvent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// Any struct that wants to hold a lock must implement this interface
|
||||
type LockHolderState interface {
|
||||
OriginalLockHolder(id NodeID) GraphNode
|
||||
AllowedToTakeLock(id NodeID) bool
|
||||
RecordLockHolder(id NodeID, lock_holder GraphNode)
|
||||
}
|
||||
|
||||
// Any node that wants to be connected to the lockable DAG must implement this interface
|
||||
type LockableState interface {
|
||||
LockHolderState
|
||||
Name() string
|
||||
Requirements() []Lockable
|
||||
AddRequirement(requirement Lockable)
|
||||
Dependencies() []Lockable
|
||||
AddDependency(dependency Lockable)
|
||||
Owner() GraphNode
|
||||
SetOwner(owner GraphNode)
|
||||
}
|
||||
|
||||
type BaseLockableState struct {
|
||||
name string
|
||||
owner GraphNode
|
||||
requirements []Lockable
|
||||
dependencies []Lockable
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) MarshalJSON() ([]byte, error) {
|
||||
requirement_ids := make([]NodeID, len(state.requirements))
|
||||
for i, requirement := range(state.requirements) {
|
||||
requirement_ids[i] = requirement.ID()
|
||||
}
|
||||
|
||||
dependency_ids := make([]NodeID, len(state.dependencies))
|
||||
for i, dependency := range(state.dependencies) {
|
||||
dependency_ids[i] = dependency.ID()
|
||||
}
|
||||
|
||||
var owner_id *NodeID = nil
|
||||
if state.owner != nil {
|
||||
new_str := state.owner.ID()
|
||||
owner_id = &new_str
|
||||
}
|
||||
|
||||
return json.Marshal(&struct{
|
||||
Name string `json:"name"`
|
||||
Owner *NodeID `json:"owner"`
|
||||
Dependencies []NodeID `json:"dependencies"`
|
||||
Requirements []NodeID `json:"requirements"`
|
||||
}{
|
||||
Name: state.name,
|
||||
Owner: owner_id,
|
||||
Dependencies: dependency_ids,
|
||||
Requirements: requirement_ids,
|
||||
})
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) Name() string {
|
||||
return state.name
|
||||
}
|
||||
|
||||
// Locks cannot be passed between base lockables, so the answer to
|
||||
// "who used to own this lock held by a base lockable" is always "nobody"
|
||||
func (state * BaseLockableState) OriginalLockHolder(id NodeID) GraphNode {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Nothing can take a lock from a base lockable either
|
||||
func (state * BaseLockableState) AllowedToTakeLock(id NodeID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) RecordLockHolder(id NodeID, lock_holder GraphNode) {
|
||||
if lock_holder != nil {
|
||||
panic("Attempted to delegate a lock to a lockable")
|
||||
}
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) Owner() GraphNode {
|
||||
return state.owner
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) SetOwner(owner GraphNode) {
|
||||
state.owner = owner
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) Requirements() []Lockable {
|
||||
return state.requirements
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) AddRequirement(requirement Lockable) {
|
||||
if requirement == nil {
|
||||
panic("Will not connect nil to the DAG")
|
||||
}
|
||||
state.requirements = append(state.requirements, requirement)
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) Dependencies() []Lockable {
|
||||
return state.dependencies
|
||||
}
|
||||
|
||||
func (state * BaseLockableState) AddDependency(dependency Lockable) {
|
||||
if dependency == nil {
|
||||
panic("Will not connect nil to the DAG")
|
||||
}
|
||||
|
||||
state.dependencies = append(state.dependencies, dependency)
|
||||
}
|
||||
|
||||
func NewLockableState(name string) BaseLockableState {
|
||||
return BaseLockableState{
|
||||
name: name,
|
||||
owner: nil,
|
||||
requirements: []Lockable{},
|
||||
dependencies: []Lockable{},
|
||||
}
|
||||
}
|
||||
|
||||
// Link a lockable with a requirement
|
||||
func LinkLockables(ctx * GraphContext, lockable Lockable, requirement Lockable) error {
|
||||
if lockable == nil || requirement == nil {
|
||||
return fmt.Errorf("Will not connect nil to DAG")
|
||||
}
|
||||
|
||||
if lockable.ID() == requirement.ID() {
|
||||
return fmt.Errorf("Will not link %s as requirement of itself", lockable.ID())
|
||||
}
|
||||
|
||||
_, err := UpdateStates(ctx, []GraphNode{lockable, requirement}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
lockable_state := states[0].(LockableState)
|
||||
requirement_state := states[1].(LockableState)
|
||||
|
||||
if checkIfRequirement(ctx, lockable_state, lockable.ID(), requirement_state, requirement.ID()) == true {
|
||||
return nil, nil, fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID(), lockable.ID())
|
||||
}
|
||||
|
||||
if checkIfRequirement(ctx, requirement_state, requirement.ID(), lockable_state, lockable.ID()) == true {
|
||||
return nil, nil, fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID(), requirement.ID())
|
||||
}
|
||||
|
||||
lockable_state.AddRequirement(requirement)
|
||||
requirement_state.AddDependency(lockable)
|
||||
return []NodeState{lockable_state, requirement_state}, nil, nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
type Lockable interface {
|
||||
GraphNode
|
||||
// Called when locking the node to allow for custom lock behaviour
|
||||
Lock(node GraphNode, state LockableState) error
|
||||
// Called when unlocking the node to allow for custom lock behaviour
|
||||
Unlock(node GraphNode, state LockableState) error
|
||||
}
|
||||
|
||||
// Lockables propagate update up to multiple dependencies, and not downwards
|
||||
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
|
||||
func (lockable * BaseLockable) PropagateUpdate(ctx * GraphContext, signal GraphSignal) {
|
||||
UseStates(ctx, []GraphNode{lockable}, func(states []NodeState) (interface{}, error){
|
||||
lockable_state := states[0].(LockableState)
|
||||
if signal.Direction() == Up {
|
||||
// Child->Parent, lockable updates dependency lockables
|
||||
for _, dependency := range lockable_state.Dependencies() {
|
||||
SendUpdate(ctx, dependency, signal)
|
||||
}
|
||||
} else if signal.Direction() == Down {
|
||||
// Parent->Child, lockable updates lock holder
|
||||
if lockable_state.Owner() != nil {
|
||||
SendUpdate(ctx, lockable_state.Owner(), signal)
|
||||
}
|
||||
|
||||
for _, requirement := range(lockable_state.Requirements()) {
|
||||
SendUpdate(ctx, requirement, signal)
|
||||
}
|
||||
} else if signal.Direction() == Direct {
|
||||
} else {
|
||||
panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction()))
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
|
||||
func checkIfRequirement(ctx * GraphContext, r LockableState, r_id NodeID, cur LockableState, cur_id NodeID) bool {
|
||||
for _, c := range(cur.Requirements()) {
|
||||
if c.ID() == r_id {
|
||||
return true
|
||||
}
|
||||
val, _ := UseStates(ctx, []GraphNode{c}, func(states []NodeState) (interface{}, error) {
|
||||
requirement_state := states[0].(LockableState)
|
||||
return checkIfRequirement(ctx, cur, cur_id, requirement_state, c.ID()), nil
|
||||
})
|
||||
|
||||
is_requirement := val.(bool)
|
||||
if is_requirement {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func UnlockLockable(ctx * GraphContext, lockable Lockable, node GraphNode, node_state LockHolderState) error {
|
||||
if node == nil || lockable == nil{
|
||||
panic("Cannot unlock without a specified node and lockable")
|
||||
}
|
||||
_, err := UpdateStates(ctx, []GraphNode{lockable}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
if lockable.ID() == node.ID() {
|
||||
if node_state != nil {
|
||||
panic("node_state must be nil if unlocking lockable from itself")
|
||||
}
|
||||
node_state = states[0].(LockHolderState)
|
||||
}
|
||||
lockable_state := states[0].(LockableState)
|
||||
|
||||
if lockable_state.Owner() == nil {
|
||||
return nil, nil, fmt.Errorf("Lockable already unlocked")
|
||||
}
|
||||
|
||||
if lockable_state.Owner().ID() != node.ID() {
|
||||
return nil, nil, fmt.Errorf("Lockable %s not locked by %s", lockable.ID(), node.ID())
|
||||
}
|
||||
|
||||
var lock_err error = nil
|
||||
for _, requirement := range(lockable_state.Requirements()) {
|
||||
var err error = nil
|
||||
err = UnlockLockable(ctx, requirement, node, node_state)
|
||||
if err != nil {
|
||||
lock_err = err
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lock_err != nil {
|
||||
return nil, nil, fmt.Errorf("Lockable %s failed to unlock: %e", lockable.ID(), lock_err)
|
||||
}
|
||||
|
||||
new_owner := node_state.OriginalLockHolder(lockable.ID())
|
||||
lockable_state.SetOwner(new_owner)
|
||||
err := lockable.Unlock(node, lockable_state)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Lockable %s failed custom Unlock: %e", lockable.ID(), err)
|
||||
}
|
||||
|
||||
if lockable_state.Owner() == nil {
|
||||
ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", node.ID(), lockable.ID())
|
||||
} else {
|
||||
ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", node.ID(), lockable.ID(), lockable_state.Owner().ID())
|
||||
}
|
||||
|
||||
return []NodeState{lockable_state}, nil, nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func LockLockable(ctx * GraphContext, lockable Lockable, node GraphNode, node_state LockHolderState) error {
|
||||
if node == nil || lockable == nil {
|
||||
panic("Cannot lock without a specified node and lockable")
|
||||
}
|
||||
|
||||
_, err := UpdateStates(ctx, []GraphNode{lockable}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
if lockable.ID() == node.ID() {
|
||||
if node_state != nil {
|
||||
panic("node_state must be nil if locking lockable from itself")
|
||||
}
|
||||
node_state = states[0].(LockHolderState)
|
||||
}
|
||||
lockable_state := states[0].(LockableState)
|
||||
if lockable_state.Owner() != nil {
|
||||
var lock_pass_allowed bool = false
|
||||
|
||||
if lockable_state.Owner().ID() == lockable.ID() {
|
||||
lock_pass_allowed = lockable_state.AllowedToTakeLock(node.ID())
|
||||
} else {
|
||||
tmp, _ := UseStates(ctx, []GraphNode{lockable_state.Owner()}, func(states []NodeState)(interface{}, error){
|
||||
return states[0].(LockHolderState).AllowedToTakeLock(node.ID()), nil
|
||||
})
|
||||
lock_pass_allowed = tmp.(bool)
|
||||
}
|
||||
|
||||
|
||||
if lock_pass_allowed == false {
|
||||
return nil, nil, fmt.Errorf("%s is not allowed to take a lock from %s", node.ID(), lockable_state.Owner().ID())
|
||||
}
|
||||
}
|
||||
|
||||
err := lockable.Lock(node, lockable_state)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Failed to lock lockable: %e", err)
|
||||
}
|
||||
|
||||
var lock_err error = nil
|
||||
locked_requirements := []Lockable{}
|
||||
for _, requirement := range(lockable_state.Requirements()) {
|
||||
err = LockLockable(ctx, requirement, node, node_state)
|
||||
if err != nil {
|
||||
lock_err = err
|
||||
break
|
||||
}
|
||||
locked_requirements = append(locked_requirements, requirement)
|
||||
}
|
||||
|
||||
if lock_err != nil {
|
||||
for _, locked_lockable := range(locked_requirements) {
|
||||
err = UnlockLockable(ctx, locked_lockable, node, node_state)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return nil, nil, fmt.Errorf("Lockable failed to lock: %e", lock_err)
|
||||
}
|
||||
|
||||
old_owner := lockable_state.Owner()
|
||||
lockable_state.SetOwner(node)
|
||||
node_state.RecordLockHolder(node.ID(), old_owner)
|
||||
|
||||
if old_owner == nil {
|
||||
ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", node.ID(), lockable.ID())
|
||||
} else {
|
||||
ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", node.ID(), lockable.ID(), old_owner.ID())
|
||||
}
|
||||
|
||||
return []NodeState{lockable_state}, nil, nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// BaseLockables represent simple lockables in the DAG that can be used to create a hierarchy of locks that store names
|
||||
type BaseLockable struct {
|
||||
BaseNode
|
||||
}
|
||||
|
||||
//BaseLockables don't check anything special when locking/unlocking
|
||||
func (lockable * BaseLockable) Lock(node GraphNode, state LockableState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lockable * BaseLockable) Unlock(node GraphNode, state LockableState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewLockable(ctx * GraphContext, name string, requirements []Lockable) (* BaseLockable, error) {
|
||||
state := NewLockableState(name)
|
||||
lockable := &BaseLockable{
|
||||
BaseNode: NewNode(ctx, RandID(), &state),
|
||||
}
|
||||
|
||||
for _, requirement := range(requirements) {
|
||||
err := LinkLockables(ctx, lockable, requirement)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return lockable, nil
|
||||
}
|
@ -1,355 +0,0 @@
|
||||
package graphvent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Link a resource with a child
|
||||
func LinkResource(ctx * GraphContext, resource Resource, child Resource) error {
|
||||
if resource == nil || child == nil {
|
||||
return fmt.Errorf("Will not connect nil to DAG")
|
||||
}
|
||||
_, err := UpdateStates(ctx, []GraphNode{resource, child}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
resource_state := states[0].(ResourceState)
|
||||
child_state := states[1].(ResourceState)
|
||||
|
||||
if checkIfChild(ctx, resource_state, resource.ID(), child_state, child.ID()) == true {
|
||||
return nil, nil, fmt.Errorf("RESOURCE_LINK_ERR: %s is a parent of %s so cannot link as child", child.ID(), resource.ID())
|
||||
}
|
||||
|
||||
resource_state.children = append(resource_state.children, child)
|
||||
child_state.parents = append(child_state.parents, resource)
|
||||
return []NodeState{resource_state, child_state}, nil, nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Link multiple children to a resource
|
||||
func LinkResources(ctx * GraphContext, resource Resource, children []Resource) error {
|
||||
if resource == nil || children == nil {
|
||||
return fmt.Errorf("Invalid input")
|
||||
}
|
||||
|
||||
found := map[NodeID]bool{}
|
||||
child_nodes := make([]GraphNode, len(children))
|
||||
for i, child := range(children) {
|
||||
if child == nil {
|
||||
return fmt.Errorf("Will not connect nil to DAG")
|
||||
}
|
||||
_, exists := found[child.ID()]
|
||||
if exists == true {
|
||||
return fmt.Errorf("Will not connect the same child twice")
|
||||
}
|
||||
found[child.ID()] = true
|
||||
child_nodes[i] = child
|
||||
}
|
||||
|
||||
_, err := UpdateStates(ctx, append([]GraphNode{resource}, child_nodes...), func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
resource_state := states[0].(ResourceState)
|
||||
|
||||
new_states := make([]ResourceState, len(states))
|
||||
for i, state := range(states) {
|
||||
new_states[i] = state.(ResourceState)
|
||||
}
|
||||
|
||||
for i, state := range(states[1:]) {
|
||||
child_state := state.(ResourceState)
|
||||
|
||||
if checkIfChild(ctx, resource_state, resource.ID(), child_state, children[i].ID()) == true {
|
||||
return nil, nil, fmt.Errorf("RESOURCES_LINK_ERR: %s is a parent of %s so cannot link as child", children[i].ID() , resource.ID())
|
||||
}
|
||||
|
||||
new_states[0].children = append(new_states[0].children, children[i])
|
||||
new_states[i+1].parents = append(new_states[i+1].parents, resource)
|
||||
}
|
||||
ret_states := make([]NodeState, len(states))
|
||||
for i, state := range(new_states) {
|
||||
ret_states[i] = state
|
||||
}
|
||||
return ret_states, nil, nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type ResourceState struct {
|
||||
name string
|
||||
owner GraphNode
|
||||
children []Resource
|
||||
parents []Resource
|
||||
}
|
||||
|
||||
func (state ResourceState) Serialize() []byte {
|
||||
return []byte(state.name)
|
||||
}
|
||||
|
||||
// Locks cannot be passed between resources, so the answer to
|
||||
// "who used to own this lock held by a resource" is always "nobody"
|
||||
func (state ResourceState) OriginalLockHolder(id NodeID) GraphNode {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Nothing can take a lock from a resource
|
||||
func (state ResourceState) AllowedToTakeLock(id NodeID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (state ResourceState) RecordLockHolder(id NodeID, lock_holder GraphNode) NodeState {
|
||||
if lock_holder != nil {
|
||||
panic("Attempted to delegate a lock to a resource")
|
||||
}
|
||||
|
||||
return state
|
||||
}
|
||||
|
||||
func NewResourceState(name string) ResourceState {
|
||||
return ResourceState{
|
||||
name: name,
|
||||
owner: nil,
|
||||
children: []Resource{},
|
||||
parents: []Resource{},
|
||||
}
|
||||
}
|
||||
|
||||
// Resource represents a Node which can be locked by another node,
|
||||
// and needs to own all it's childrens locks before being locked.
|
||||
// Resource connections form a directed acyclic graph
|
||||
// Resources do not allow any other nodes to take locks from them
|
||||
type Resource interface {
|
||||
GraphNode
|
||||
|
||||
// Called when locking the node to allow for custom lock behaviour
|
||||
Lock(node GraphNode, state NodeState) (NodeState, error)
|
||||
// Called when unlocking the node to allow for custom lock behaviour
|
||||
Unlock(node GraphNode, state NodeState) (NodeState, error)
|
||||
}
|
||||
|
||||
// Resources propagate update up to multiple parents, and not downwards
|
||||
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
|
||||
func (resource * BaseResource) PropagateUpdate(ctx * GraphContext, signal GraphSignal) {
|
||||
UseStates(ctx, []GraphNode{resource}, func(states []NodeState) (interface{}, error){
|
||||
resource_state := states[0].(ResourceState)
|
||||
if signal.Direction() == Up {
|
||||
// Child->Parent, resource updates parent resources
|
||||
for _, parent := range resource_state.parents {
|
||||
SendUpdate(ctx, parent, signal)
|
||||
}
|
||||
} else if signal.Direction() == Down {
|
||||
// Parent->Child, resource updates lock holder
|
||||
if resource_state.owner != nil {
|
||||
SendUpdate(ctx, resource_state.owner, signal)
|
||||
}
|
||||
|
||||
for _, child := range(resource_state.children) {
|
||||
SendUpdate(ctx, child, signal)
|
||||
}
|
||||
} else if signal.Direction() == Direct {
|
||||
} else {
|
||||
panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction()))
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
|
||||
func checkIfChild(ctx * GraphContext, r ResourceState, r_id NodeID, cur ResourceState, cur_id NodeID) bool {
|
||||
if r_id == cur_id {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, c := range(cur.children) {
|
||||
val, _ := UseStates(ctx, []GraphNode{c}, func(states []NodeState) (interface{}, error) {
|
||||
child_state := states[0].(ResourceState)
|
||||
return checkIfChild(ctx, cur, cur_id, child_state, c.ID()), nil
|
||||
})
|
||||
|
||||
is_child := val.(bool)
|
||||
if is_child {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func UnlockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) {
|
||||
if node == nil || resource == nil{
|
||||
panic("Cannot unlock without a specified node and resource")
|
||||
}
|
||||
_, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
if resource.ID() == node.ID() {
|
||||
if node_state != nil {
|
||||
panic("node_state must be nil if unlocking resource from itself")
|
||||
}
|
||||
node_state = states[0]
|
||||
}
|
||||
resource_state := states[0].(ResourceState)
|
||||
|
||||
if resource_state.owner == nil {
|
||||
return nil, nil, fmt.Errorf("Resource already unlocked")
|
||||
}
|
||||
|
||||
if resource_state.owner.ID() != node.ID() {
|
||||
return nil, nil, fmt.Errorf("Resource %s not locked by %s", resource.ID(), node.ID())
|
||||
}
|
||||
|
||||
var lock_err error = nil
|
||||
for _, child := range(resource_state.children) {
|
||||
var err error = nil
|
||||
node_state, err = UnlockResource(ctx, child, node, node_state)
|
||||
if err != nil {
|
||||
lock_err = err
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lock_err != nil {
|
||||
return nil, nil, fmt.Errorf("Resource %s failed to unlock: %e", resource.ID(), lock_err)
|
||||
}
|
||||
|
||||
resource_state.owner = node_state.OriginalLockHolder(resource.ID())
|
||||
unlock_state, err := resource.Unlock(node, resource_state)
|
||||
resource_state = unlock_state.(ResourceState)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Resource %s failed custom Unlock: %e", resource.ID(), err)
|
||||
}
|
||||
|
||||
if resource_state.owner == nil {
|
||||
ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s unlocked %s", node.ID(), resource.ID())
|
||||
} else {
|
||||
ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s passed lock of %s back to %s", node.ID(), resource.ID(), resource_state.owner.ID())
|
||||
}
|
||||
|
||||
return []NodeState{resource_state}, nil, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return node_state, nil
|
||||
}
|
||||
|
||||
// TODO: State
|
||||
func LockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) {
|
||||
if node == nil || resource == nil {
|
||||
panic("Cannot lock without a specified node and resource")
|
||||
}
|
||||
|
||||
_, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
if resource.ID() == node.ID() {
|
||||
if node_state != nil {
|
||||
panic("node_state must be nil if locking resource from itself")
|
||||
}
|
||||
node_state = states[0]
|
||||
}
|
||||
resource_state := states[0].(ResourceState)
|
||||
if resource_state.owner != nil {
|
||||
var lock_pass_allowed bool = false
|
||||
|
||||
if resource_state.owner.ID() == resource.ID() {
|
||||
lock_pass_allowed = resource_state.AllowedToTakeLock(node.ID())
|
||||
} else {
|
||||
tmp, _ := UseStates(ctx, []GraphNode{resource_state.owner}, func(states []NodeState)(interface{}, error){
|
||||
return states[0].AllowedToTakeLock(node.ID()), nil
|
||||
})
|
||||
lock_pass_allowed = tmp.(bool)
|
||||
}
|
||||
|
||||
|
||||
if lock_pass_allowed == false {
|
||||
return nil, nil, fmt.Errorf("%s is not allowed to take a lock from %s", node.ID(), resource_state.owner.ID())
|
||||
}
|
||||
}
|
||||
|
||||
lock_state, err := resource.Lock(node, resource_state)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Failed to lock resource: %e", err)
|
||||
}
|
||||
|
||||
resource_state = lock_state.(ResourceState)
|
||||
|
||||
var lock_err error = nil
|
||||
locked_resources := []Resource{}
|
||||
for _, child := range(resource_state.children) {
|
||||
node_state, err = LockResource(ctx, child, node, node_state)
|
||||
if err != nil {
|
||||
lock_err = err
|
||||
break
|
||||
}
|
||||
locked_resources = append(locked_resources, child)
|
||||
}
|
||||
|
||||
if lock_err != nil {
|
||||
for _, locked_resource := range(locked_resources) {
|
||||
node_state, err = UnlockResource(ctx, locked_resource, node, node_state)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return nil, nil, fmt.Errorf("Resource failed to lock: %e", lock_err)
|
||||
}
|
||||
|
||||
old_owner := resource_state.owner
|
||||
resource_state.owner = node
|
||||
node_state = node_state.RecordLockHolder(node.ID(), old_owner)
|
||||
|
||||
if old_owner == nil {
|
||||
ctx.Log.Logf("resource", "RESOURCE_LOCK: %s locked %s", node.ID(), resource.ID())
|
||||
} else {
|
||||
ctx.Log.Logf("resource", "RESOURCE_LOCK: %s took lock of %s from %s", node.ID(), resource.ID(), old_owner.ID())
|
||||
}
|
||||
|
||||
return []NodeState{resource_state}, nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return node_state, nil
|
||||
}
|
||||
|
||||
// BaseResources represent simple resources in the DAG that can be used to create a hierarchy of locks that store names
|
||||
type BaseResource struct {
|
||||
BaseNode
|
||||
}
|
||||
|
||||
//BaseResources don't check anything special when locking/unlocking
|
||||
func (resource * BaseResource) Lock(node GraphNode, state NodeState) (NodeState, error) {
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (resource * BaseResource) Unlock(node GraphNode, state NodeState) (NodeState, error) {
|
||||
return state, nil
|
||||
}
|
||||
|
||||
/*func FindResource(root Event, id string) Resource {
|
||||
if root == nil || id == ""{
|
||||
panic("invalid input")
|
||||
}
|
||||
|
||||
for _, resource := range(root.Resources()) {
|
||||
if resource.ID() == id {
|
||||
return resource
|
||||
}
|
||||
}
|
||||
for _, child := range(root.Children()) {
|
||||
resource := FindResource(child, id)
|
||||
if resource != nil {
|
||||
return resource
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}*/
|
||||
|
||||
func NewResource(ctx * GraphContext, name string, children []Resource) (* BaseResource, error) {
|
||||
resource := &BaseResource{
|
||||
BaseNode: NewNode(ctx, RandID(), NewResourceState(name)),
|
||||
}
|
||||
|
||||
err := LinkResources(ctx, resource, children)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resource, nil
|
||||
}
|
@ -0,0 +1,595 @@
|
||||
package graphvent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// Update the threads listeners, and notify the parent to do the same
|
||||
func (thread * BaseThread) PropagateUpdate(ctx * GraphContext, signal GraphSignal) {
|
||||
UseStates(ctx, []GraphNode{thread}, func(states []NodeState) (interface{}, error) {
|
||||
thread_state := states[0].(ThreadState)
|
||||
if signal.Direction() == Up {
|
||||
// Child->Parent, thread updates parent and connected resources
|
||||
if thread_state.Parent() != nil {
|
||||
SendUpdate(ctx, thread_state.Parent(), signal)
|
||||
}
|
||||
|
||||
for _, resource := range(thread_state.Lockables()) {
|
||||
SendUpdate(ctx, resource, signal)
|
||||
}
|
||||
} else if signal.Direction() == Down {
|
||||
// Parent->Child, thread updated children
|
||||
for _, child := range(thread_state.Children()) {
|
||||
SendUpdate(ctx, child, signal)
|
||||
}
|
||||
} else if signal.Direction() == Direct {
|
||||
|
||||
} else {
|
||||
panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction()))
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
})
|
||||
thread.signal <- signal
|
||||
}
|
||||
|
||||
/*func FindLockable(root Thread, id string) Lockable {
|
||||
if root == nil || id == ""{
|
||||
panic("invalid input")
|
||||
}
|
||||
|
||||
for _, resource := range(root.Lockables()) {
|
||||
if resource.ID() == id {
|
||||
return resource
|
||||
}
|
||||
}
|
||||
for _, child := range(root.Children()) {
|
||||
resource := FindLockable(child, id)
|
||||
if resource != nil {
|
||||
return resource
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}*/
|
||||
|
||||
type ThreadInfo interface {
|
||||
}
|
||||
|
||||
// An Thread is a lockable that has an additional parent->child relationship with other Threads
|
||||
// This relationship allows the thread tree to be modified independent of the lockable state
|
||||
type ThreadState interface {
|
||||
LockHolderState
|
||||
LockableState
|
||||
|
||||
Parent() Thread
|
||||
SetParent(parent Thread)
|
||||
Children() []Thread
|
||||
ChildInfo(child NodeID) ThreadInfo
|
||||
AddChild(child Thread, info ThreadInfo) error
|
||||
Lockables() []Lockable
|
||||
AddLockable(resource Lockable) error
|
||||
}
|
||||
|
||||
type BaseThreadState struct {
|
||||
BaseLockableState
|
||||
parent Thread
|
||||
children []Thread
|
||||
child_info map[NodeID] ThreadInfo
|
||||
resources map[NodeID]Lockable
|
||||
delegation_map map[NodeID]GraphNode
|
||||
info_type reflect.Type
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) MarshalJSON() ([]byte, error) {
|
||||
children := map[NodeID]interface{}{}
|
||||
for _, child := range(state.children) {
|
||||
children[child.ID()] = state.child_info[child.ID()]
|
||||
}
|
||||
|
||||
var parent_id *NodeID = nil
|
||||
if state.parent != nil {
|
||||
new_str := state.parent.ID()
|
||||
parent_id = &new_str
|
||||
}
|
||||
|
||||
resources := map[NodeID]*NodeID{}
|
||||
for _, resource := range(state.resources) {
|
||||
original_owner := state.delegation_map[resource.ID()]
|
||||
if original_owner != nil {
|
||||
owner := original_owner.ID()
|
||||
resources[resource.ID()] = &owner
|
||||
} else {
|
||||
resources[resource.ID()] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return json.Marshal(&struct{
|
||||
Name string `json:"name"`
|
||||
Parent *NodeID `json:"parent"`
|
||||
Children map[NodeID]interface{} `json:"children"`
|
||||
Lockables map[NodeID]*NodeID `json:"resources"`
|
||||
}{
|
||||
Name: state.Name(),
|
||||
Parent: parent_id,
|
||||
Children: children,
|
||||
Lockables: resources,
|
||||
})
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) Parent() Thread {
|
||||
return state.parent
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) SetParent(parent Thread) {
|
||||
state.parent = parent
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) Children() []Thread {
|
||||
return state.children
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) ChildInfo(child NodeID) ThreadInfo {
|
||||
return state.child_info[child]
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) AddChild(child Thread, info ThreadInfo) error {
|
||||
if child == nil {
|
||||
return fmt.Errorf("Will not connect nil to the thread tree")
|
||||
}
|
||||
|
||||
_, exists := state.child_info[child.ID()]
|
||||
if exists == true {
|
||||
return fmt.Errorf("Will not connect the same child twice")
|
||||
}
|
||||
|
||||
if info == nil && state.info_type != nil {
|
||||
return fmt.Errorf("nil info passed when expecting info")
|
||||
} else if info != nil {
|
||||
if reflect.TypeOf(info) != state.info_type {
|
||||
return fmt.Errorf("info type mismatch, expecting %+v", state.info_type)
|
||||
}
|
||||
}
|
||||
|
||||
state.children = append(state.children, child)
|
||||
state.child_info[child.ID()] = info
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkIfChild(ctx * GraphContext, thread_state ThreadState, thread_id NodeID, cur_state ThreadState, cur_id NodeID) bool {
|
||||
for _, child := range(cur_state.Children()) {
|
||||
if child.ID() == thread_id {
|
||||
return true
|
||||
}
|
||||
val, _ := UseStates(ctx, []GraphNode{child}, func(states []NodeState) (interface{}, error) {
|
||||
child_state := states[0].(ThreadState)
|
||||
return checkIfRequirement(ctx, cur_state, cur_id, child_state, child.ID()), nil
|
||||
})
|
||||
|
||||
is_child := val.(bool)
|
||||
if is_child {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func LinkThreads(ctx * GraphContext, thread Thread, child Thread, info ThreadInfo) error {
|
||||
if ctx == nil || thread == nil || child == nil {
|
||||
return fmt.Errorf("invalid input")
|
||||
}
|
||||
|
||||
if thread.ID() == child.ID() {
|
||||
return fmt.Errorf("Will not link %s as a child of itself", thread.ID())
|
||||
}
|
||||
|
||||
|
||||
_, err := UpdateStates(ctx, []GraphNode{thread, child}, func(states []NodeState) ([]NodeState, interface{}, error) {
|
||||
thread_state := states[0].(ThreadState)
|
||||
child_state := states[1].(ThreadState)
|
||||
|
||||
if child_state.Parent() != nil {
|
||||
return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID())
|
||||
}
|
||||
|
||||
if checkIfChild(ctx, thread_state, thread.ID(), child_state, child.ID()) == true {
|
||||
return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s is a child of %s so cannot add as parent", thread.ID(), child.ID())
|
||||
}
|
||||
|
||||
if checkIfChild(ctx, child_state, child.ID(), thread_state, thread.ID()) == true {
|
||||
return nil, nil, fmt.Errorf("EVENT_LINK_ERR: %s is already a parent of %s so will not add again", thread.ID(), child.ID())
|
||||
}
|
||||
|
||||
err := thread_state.AddChild(child, info)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("EVENT_LINK_ERR: error adding %s as child to %s: %e", child.ID(), thread.ID(), err)
|
||||
}
|
||||
child_state.SetParent(thread)
|
||||
|
||||
return states, nil, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Threads allow locks to pass to their requirements, but they won't allow cycles
|
||||
func (state * BaseThreadState) OriginalLockHolder(id NodeID) GraphNode {
|
||||
node, exists := state.delegation_map[id]
|
||||
if exists == false {
|
||||
panic("Attempted to take a get the original lock holder of a resource we don't own")
|
||||
}
|
||||
delete(state.delegation_map, id)
|
||||
return node
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) AllowedToTakeLock(id NodeID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (state * BaseThreadState) RecordLockHolder(id NodeID, lock_holder GraphNode) {
|
||||
_, exists := state.delegation_map[id]
|
||||
if exists == true {
|
||||
panic("Attempted to lock a resource we're already holding(lock cycle)")
|
||||
}
|
||||
|
||||
state.delegation_map[id] = lock_holder
|
||||
}
|
||||
|
||||
// Thread is the interface that thread tree nodes must implement
|
||||
type Thread interface {
|
||||
GraphNode
|
||||
|
||||
Action(action string) (func(* GraphContext)(string, error), bool)
|
||||
Handler(signal_type string) (func(* GraphContext, GraphSignal) (string, error), bool)
|
||||
|
||||
SetTimeout(end_time time.Time, action string)
|
||||
ClearTimeout()
|
||||
Timeout() <-chan time.Time
|
||||
TimeoutAction() string
|
||||
}
|
||||
|
||||
func (thread * BaseThread) TimeoutAction() string {
|
||||
return thread.timeout_action
|
||||
}
|
||||
|
||||
func (thread * BaseThread) Timeout() <-chan time.Time {
|
||||
return thread.timeout
|
||||
}
|
||||
|
||||
func (thread * BaseThread) ClearTimeout() {
|
||||
thread.timeout_action = ""
|
||||
thread.timeout = nil
|
||||
}
|
||||
|
||||
func (thread * BaseThread) SetTimeout(end_time time.Time, action string) {
|
||||
thread.timeout_action = action
|
||||
thread.timeout = time.After(time.Until(end_time))
|
||||
}
|
||||
|
||||
func (thread * BaseThread) Handler(signal_type string) (func(* GraphContext, GraphSignal)(string, error), bool) {
|
||||
handler, exists := thread.Handlers[signal_type]
|
||||
return handler, exists
|
||||
}
|
||||
|
||||
func FindChild(ctx * GraphContext, thread Thread, thread_state ThreadState, id NodeID) Thread {
|
||||
if thread == nil {
|
||||
panic("cannot recurse through nil")
|
||||
}
|
||||
if id == thread.ID() {
|
||||
return thread
|
||||
}
|
||||
|
||||
|
||||
for _, child := range thread_state.Children() {
|
||||
res, _ := UseStates(ctx, []GraphNode{child}, func(states []NodeState) (interface{}, error) {
|
||||
child_state := states[0].(ThreadState)
|
||||
result := FindChild(ctx, child, child_state, id)
|
||||
return result, nil
|
||||
})
|
||||
result := res.(Thread)
|
||||
if result != nil {
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunThread(ctx * GraphContext, thread Thread) error {
|
||||
ctx.Log.Logf("thread", "EVENT_RUN: %s", thread.ID())
|
||||
|
||||
_, err := UseStates(ctx, []GraphNode{thread}, func(states []NodeState) (interface{}, error) {
|
||||
thread_state := states[0].(ThreadState)
|
||||
if thread_state.Owner() == nil {
|
||||
return nil, fmt.Errorf("EVENT_RUN_NOT_LOCKED: %s", thread_state.Name())
|
||||
} else if thread_state.Owner().ID() != thread.ID() {
|
||||
return nil, fmt.Errorf("EVENT_RUN_RESOURCE_ALREADY_LOCKED: %s, %s", thread_state.Name(), thread_state.Owner().ID())
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
SendUpdate(ctx, thread, NewSignal(thread, "thread_start"))
|
||||
|
||||
next_action := "start"
|
||||
for next_action != "" {
|
||||
action, exists := thread.Action(next_action)
|
||||
if exists == false {
|
||||
error_str := fmt.Sprintf("%s is not a valid action", next_action)
|
||||
return errors.New(error_str)
|
||||
}
|
||||
|
||||
ctx.Log.Logf("thread", "EVENT_ACTION: %s - %s", thread.ID(), next_action)
|
||||
next_action, err = action(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
SendUpdate(ctx, thread, NewSignal(thread, "thread_done"))
|
||||
|
||||
ctx.Log.Logf("thread", "EVENT_RUN_DONE: %s", thread.ID())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ThreadAbort(thread Thread) func(*GraphContext, GraphSignal) (string, error) {
|
||||
return func(ctx * GraphContext, signal GraphSignal) (string, error) {
|
||||
return "", errors.New(fmt.Sprintf("%s aborted by signal", thread.ID()))
|
||||
}
|
||||
}
|
||||
|
||||
func ThreadCancel(thread Thread) func(*GraphContext, GraphSignal) (string, error) {
|
||||
return func(ctx * GraphContext, signal GraphSignal) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
}
|
||||
|
||||
// Thread is the most basic thread that can exist in the thread tree.
|
||||
// On start it automatically transitions to completion.
|
||||
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
|
||||
// When started, this thread automatically transitions to completion
|
||||
type BaseThread struct {
|
||||
BaseNode
|
||||
|
||||
resources_lock sync.Mutex
|
||||
children_lock sync.Mutex
|
||||
info_lock sync.Mutex
|
||||
parent_lock sync.Mutex
|
||||
|
||||
Actions map[string]func(* GraphContext) (string, error)
|
||||
Handlers map[string]func(* GraphContext, GraphSignal) (string, error)
|
||||
|
||||
timeout <-chan time.Time
|
||||
timeout_action string
|
||||
}
|
||||
|
||||
func (thread * BaseThread) Lock(node GraphNode, state LockableState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (thread * BaseThread) Unlock(node GraphNode, state LockableState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (thread * BaseThread) Action(action string) (func(ctx * GraphContext) (string, error), bool) {
|
||||
action_fn, exists := thread.Actions[action]
|
||||
return action_fn, exists
|
||||
}
|
||||
|
||||
func ThreadWait(thread Thread) (func(*GraphContext) (string, error)) {
|
||||
return func(ctx * GraphContext) (string, error) {
|
||||
ctx.Log.Logf("thread", "EVENT_WAIT: %s TIMEOUT: %+v", thread.ID(), thread.Timeout())
|
||||
select {
|
||||
case signal := <- thread.SignalChannel():
|
||||
ctx.Log.Logf("thread", "EVENT_SIGNAL: %s %+v", thread.ID(), signal)
|
||||
signal_fn, exists := thread.Handler(signal.Type())
|
||||
if exists == true {
|
||||
ctx.Log.Logf("thread", "EVENT_HANDLER: %s - %s", thread.ID(), signal.Type())
|
||||
return signal_fn(ctx, signal)
|
||||
}
|
||||
return "wait", nil
|
||||
case <- thread.Timeout():
|
||||
ctx.Log.Logf("thread", "EVENT_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), thread.TimeoutAction())
|
||||
return thread.TimeoutAction(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewBaseThread(ctx * GraphContext, name string) (BaseThread, error) {
|
||||
state := NewBaseThreadState(name)
|
||||
thread := BaseThread{
|
||||
BaseNode: NewNode(ctx, RandID(), &state),
|
||||
Actions: map[string]func(*GraphContext)(string, error){},
|
||||
Handlers: map[string]func(*GraphContext,GraphSignal)(string, error){},
|
||||
timeout: nil,
|
||||
timeout_action: "",
|
||||
}
|
||||
return thread, nil
|
||||
}
|
||||
|
||||
func NewBaseThreadState(name string) BaseThreadState {
|
||||
return BaseThreadState{
|
||||
BaseLockableState: NewLockableState(name),
|
||||
delegation_map: map[NodeID]GraphNode{},
|
||||
children: []Thread{},
|
||||
child_info: map[NodeID]ThreadInfo{},
|
||||
resources: map[NodeID]Lockable{},
|
||||
parent: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func NewThread(ctx * GraphContext, name string, requirements []Lockable) (* BaseThread, error) {
|
||||
thread, err := NewBaseThread(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
thread_ptr := &thread
|
||||
|
||||
for _, requirement := range(requirements) {
|
||||
err := LinkLockables(ctx, thread_ptr, requirement)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
thread_ptr.Actions["wait"] = ThreadWait(thread_ptr)
|
||||
thread_ptr.Handlers["abort"] = ThreadAbort(thread_ptr)
|
||||
thread_ptr.Handlers["cancel"] = ThreadCancel(thread_ptr)
|
||||
|
||||
thread_ptr.Actions["start"] = func(ctx * GraphContext) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return thread_ptr, nil
|
||||
}
|
||||
|
||||
/*
|
||||
// ThreadQueue is a basic thread that can have children.
|
||||
// On start, it attempts to start it's children from the highest 'priority'
|
||||
type ThreadQueueInfo struct {
|
||||
priority int
|
||||
state string
|
||||
}
|
||||
|
||||
func (info * BaseThreadQueueInfo) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(&struct{
|
||||
Priority int `json:"priority"`
|
||||
State string `json:"state"`
|
||||
}{
|
||||
Priority: info.priority,
|
||||
State: info.state,
|
||||
})
|
||||
}
|
||||
|
||||
func NewThreadQueueInfo(priority int) * BaseThreadQueueInfo {
|
||||
info := &ThreadQueueInfo{
|
||||
priority: priority,
|
||||
state: "queued",
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
type ThreadQueue struct {
|
||||
Thread
|
||||
listened_resources map[string]Lockable
|
||||
queue_lock sync.Mutex
|
||||
}
|
||||
|
||||
func (queue * BaseThreadQueue) Unlock() error {
|
||||
for _, resource := range(queue.listened_resources) {
|
||||
resource.UnregisterChannel(queue.signal)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (queue * BaseThreadQueue) InfoType() reflect.Type {
|
||||
return reflect.TypeOf((*ThreadQueueInfo)(nil))
|
||||
}
|
||||
|
||||
func NewThreadQueue(name string, description string, resources []Lockable) (* BaseThreadQueue, error) {
|
||||
queue := &ThreadQueue{
|
||||
Thread: NewThread(name, description),
|
||||
listened_resources: map[string]Lockable{},
|
||||
}
|
||||
|
||||
queue.state = NewBaseThreadState(name, description)
|
||||
|
||||
AddLockables(queue, resources)
|
||||
|
||||
queue.Actions["wait"] = ThreadWait(queue)
|
||||
queue.Handlers["abort"] = ThreadAbort(queue)
|
||||
queue.Handlers["cancel"] = ThreadCancel(queue)
|
||||
|
||||
queue.Actions["start"] = func() (string, error) {
|
||||
return "queue_thread", nil
|
||||
}
|
||||
|
||||
queue.Actions["queue_thread"] = func() (string, error) {
|
||||
// Copy the threads to sort the list
|
||||
queue.LockChildren()
|
||||
copied_threads := make([]Thread, len(queue.Children()))
|
||||
copy(copied_threads, queue.Children())
|
||||
less := func(i int, j int) bool {
|
||||
info_i := queue.ChildInfo(copied_threads[i]).(*ThreadQueueInfo)
|
||||
info_j := queue.ChildInfo(copied_threads[j]).(*ThreadQueueInfo)
|
||||
return info_i.priority < info_j.priority
|
||||
}
|
||||
sort.SliceStable(copied_threads, less)
|
||||
|
||||
needed_resources := map[string]Lockable{}
|
||||
for _, thread := range(copied_threads) {
|
||||
// make sure all the required resources are registered to update the thread
|
||||
for _, resource := range(thread.Lockables()) {
|
||||
needed_resources[resource.ID()] = resource
|
||||
}
|
||||
|
||||
info := queue.ChildInfo(thread).(*ThreadQueueInfo)
|
||||
thread.LockInfo()
|
||||
defer thread.UnlockInfo()
|
||||
if info.state == "queued" {
|
||||
err := LockLockable(thread)
|
||||
// start in new goroutine
|
||||
if err != nil {
|
||||
} else {
|
||||
info.state = "running"
|
||||
Log.Logf("thread", "EVENT_START: %s", thread.Name())
|
||||
go func(thread Thread, info * BaseThreadQueueInfo, queue Thread) {
|
||||
Log.Logf("thread", "EVENT_GOROUTINE: %s", thread.Name())
|
||||
err := RunThread(thread)
|
||||
if err != nil {
|
||||
Log.Logf("thread", "EVENT_ERROR: %s", err)
|
||||
}
|
||||
thread.LockInfo()
|
||||
defer thread.UnlockInfo()
|
||||
info.state = "done"
|
||||
}(thread, info, queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for _, resource := range(needed_resources) {
|
||||
_, exists := queue.listened_resources[resource.ID()]
|
||||
if exists == false {
|
||||
Log.Logf("thread", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
|
||||
queue.listened_resources[resource.ID()] = resource
|
||||
resource.RegisterChannel(queue.signal)
|
||||
}
|
||||
}
|
||||
|
||||
queue.UnlockChildren()
|
||||
|
||||
return "wait", nil
|
||||
}
|
||||
|
||||
queue.Handlers["resource_connected"] = func(signal GraphSignal) (string, error) {
|
||||
return "queue_thread", nil
|
||||
}
|
||||
|
||||
queue.Handlers["child_added"] = func(signal GraphSignal) (string, error) {
|
||||
return "queue_thread", nil
|
||||
}
|
||||
|
||||
queue.Handlers["lock_changed"] = func(signal GraphSignal) (string, error) {
|
||||
return "queue_thread", nil
|
||||
}
|
||||
|
||||
queue.Handlers["thread_done"] = func(signal GraphSignal) (string, error) {
|
||||
return "queue_thread", nil
|
||||
}
|
||||
|
||||
return queue, nil
|
||||
}
|
||||
*/
|
Loading…
Reference in New Issue