graphvent/lockable.go

578 lines
18 KiB
Go

package graphvent
import (
"fmt"
"reflect"
"encoding/json"
)
type Listener struct {
Lockable
Chan chan GraphSignal
}
func (node *Listener) Type() NodeType {
return NodeType("listener")
}
func (node *Listener) Process(context *StateContext, signal GraphSignal) error {
context.Graph.Log.Logf("signal", "LISTENER_PROCESS: %s", node.ID())
select {
case node.Chan <- signal:
default:
return fmt.Errorf("LISTENER_OVERFLOW: %s - %s", node.ID(), signal)
}
return node.Lockable.Process(context, signal)
}
const LISTENER_CHANNEL_BUFFER = 1024
func NewListener(id NodeID, name string) Listener {
return Listener{
Lockable: NewLockable(id, name),
Chan: make(chan GraphSignal, LISTENER_CHANNEL_BUFFER),
}
}
func LoadListener(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j LockableJSON
err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
listener := NewListener(id, j.Name)
nodes[id] = &listener
err = RestoreLockable(ctx, &listener.Lockable, j, nodes)
if err != nil {
return nil, err
}
return &listener, nil
}
type LockableNode interface {
Node
LockableHandle() *Lockable
}
// Lockable is a simple Lockable implementation that can be embedded into more complex structures
type Lockable struct {
SimpleNode
Name string
Owner LockableNode
Requirements map[NodeID]LockableNode
Dependencies map[NodeID]LockableNode
LocksHeld map[NodeID]LockableNode
}
func (lockable *Lockable) LockableHandle() *Lockable {
return lockable
}
func (lockable *Lockable) Type() NodeType {
return NodeType("lockable")
}
type LockableJSON struct {
SimpleNodeJSON
Name string `json:"name"`
Owner string `json:"owner"`
Dependencies []string `json:"dependencies"`
Requirements []string `json:"requirements"`
LocksHeld map[string]string `json:"locks_held"`
}
func (lockable *Lockable) Serialize() ([]byte, error) {
lockable_json := NewLockableJSON(lockable)
return json.MarshalIndent(&lockable_json, "", " ")
}
func NewLockableJSON(lockable *Lockable) LockableJSON {
requirement_ids := make([]string, len(lockable.Requirements))
req_n := 0
for id, _ := range(lockable.Requirements) {
requirement_ids[req_n] = id.String()
req_n++
}
dependency_ids := make([]string, len(lockable.Dependencies))
dep_n := 0
for id, _ := range(lockable.Dependencies) {
dependency_ids[dep_n] = id.String()
dep_n++
}
owner_id := ""
if lockable.Owner != nil {
owner_id = lockable.Owner.ID().String()
}
locks_held := map[string]string{}
for lockable_id, node := range(lockable.LocksHeld) {
if node == nil {
locks_held[lockable_id.String()] = ""
} else {
locks_held[lockable_id.String()] = node.ID().String()
}
}
node_json := NewSimpleNodeJSON(&lockable.SimpleNode)
return LockableJSON{
SimpleNodeJSON: node_json,
Name: lockable.Name,
Owner: owner_id,
Dependencies: dependency_ids,
Requirements: requirement_ids,
LocksHeld: locks_held,
}
}
func (lockable *Lockable) RecordUnlock(l LockableNode) LockableNode {
lockable_id := l.ID()
last_owner, exists := lockable.LocksHeld[lockable_id]
if exists == false {
panic("Attempted to take a get the original lock holder of a lockable we don't own")
}
delete(lockable.LocksHeld, lockable_id)
return last_owner
}
func (lockable *Lockable) RecordLock(l LockableNode, last_owner LockableNode) {
lockable_id := l.ID()
_, exists := lockable.LocksHeld[lockable_id]
if exists == true {
panic("Attempted to lock a lockable we're already holding(lock cycle)")
}
lockable.LocksHeld[lockable_id] = last_owner
}
// Assumed that lockable is already locked for signal
func (lockable *Lockable) Process(context *StateContext, signal GraphSignal) error {
context.Graph.Log.Logf("signal", "LOCKABLE_PROCESS: %s", lockable.ID())
var err error
switch signal.Direction() {
case Up:
err = UseStates(context, lockable,
NewLockInfo(lockable, []string{"dependencies", "owner"}), func(context *StateContext) error {
owner_sent := false
for _, dependency := range(lockable.Dependencies) {
context.Graph.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", lockable.ID(), dependency.ID())
Signal(context, dependency, lockable, signal)
if lockable.Owner != nil {
if dependency.ID() == lockable.Owner.ID() {
owner_sent = true
}
}
}
if lockable.Owner != nil && owner_sent == false {
if lockable.Owner.ID() != lockable.ID() {
context.Graph.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", lockable.ID(), lockable.Owner.ID())
return Signal(context, lockable.Owner, lockable, signal)
}
}
return nil
})
case Down:
err = UseStates(context, lockable, NewLockInfo(lockable, []string{"requirements"}), func(context *StateContext) error {
for _, requirement := range(lockable.Requirements) {
err := Signal(context, requirement, lockable, signal)
if err != nil {
return err
}
}
return nil
})
case Direct:
err = nil
default:
return fmt.Errorf("invalid signal direction %d", signal.Direction())
}
if err != nil {
return err
}
return lockable.SimpleNode.Process(context, signal)
}
// Removes requirement as a requirement from lockable
// Continues the write context with princ, getting requirents for lockable and dependencies for requirement
// Assumes that an active write context exists with princ locked so that princ's state can be used in checks
func UnlinkLockables(context *StateContext, princ Node, lockable LockableNode, requirement LockableNode) error {
return UpdateStates(context, princ, LockMap{
lockable.ID(): LockInfo{Node: lockable, Resources: []string{"requirements"}},
requirement.ID(): LockInfo{Node: requirement, Resources: []string{"dependencies"}},
}, func(context *StateContext) error {
var found Node = nil
for _, req := range(lockable.LockableHandle().Requirements) {
if requirement.ID() == req.ID() {
found = req
break
}
}
if found == nil {
return fmt.Errorf("UNLINK_LOCKABLES_ERR: %s is not a requirement of %s", requirement.ID(), lockable.ID())
}
delete(requirement.LockableHandle().Dependencies, lockable.ID())
delete(lockable.LockableHandle().Requirements, requirement.ID())
return nil
})
}
// Link requirements as requirements to lockable
// Continues the wrtie context with princ, getting requirements for lockable and dependencies for requirements
func LinkLockables(context *StateContext, princ Node, lockable_node LockableNode, requirements []LockableNode) error {
if lockable_node == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link Lockables to nil as requirements")
}
lockable := lockable_node.LockableHandle()
if len(requirements) == 0 {
return nil
}
found := map[NodeID]bool{}
for _, requirement := range(requirements) {
if requirement == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link nil to a Lockable as a requirement")
}
if lockable.ID() == requirement.ID() {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s to itself", lockable.ID())
}
_, exists := found[requirement.ID()]
if exists == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s twice", requirement.ID())
}
found[requirement.ID()] = true
}
return UpdateStates(context, princ, NewLockMap(
NewLockInfo(lockable_node, []string{"requirements"}),
LockList(requirements, []string{"dependencies"}),
), func(context *StateContext) error {
// Check that all the requirements can be added
// If the lockable is already locked, need to lock this resource as well before we can add it
for _, requirement_node := range(requirements) {
requirement := requirement_node.LockableHandle()
for _, req_node := range(requirements) {
req := req_node.LockableHandle()
if req.ID() == requirement.ID() {
continue
}
if checkIfRequirement(context, req, requirement) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependenyc of %s so cannot add the same dependency", req.ID(), requirement.ID())
}
}
if checkIfRequirement(context, lockable, requirement) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID(), lockable.ID())
}
if checkIfRequirement(context, requirement, lockable) == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID(), requirement.ID())
}
if lockable.Owner == nil {
// If the new owner isn't locked, we can add the requirement
} else if requirement.Owner == nil {
// if the new requirement isn't already locked but the owner is, the requirement needs to be locked first
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is locked, %s must be locked to add", lockable.ID(), requirement.ID())
} else {
// If the new requirement is already locked and the owner is already locked, their owners need to match
if requirement.Owner.ID() != lockable.Owner.ID() {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is not locked by the same owner as %s, can't link as requirement", requirement.ID(), lockable.ID())
}
}
}
// Update the states of the requirements
for _, requirement_node := range(requirements) {
requirement := requirement_node.LockableHandle()
requirement.Dependencies[lockable.ID()] = lockable_node
lockable.Requirements[lockable.ID()] = requirement_node
context.Graph.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID())
}
// Return no error
return nil
})
}
// Must be called withing update context
func checkIfRequirement(context *StateContext, r LockableNode, cur LockableNode) bool {
for _, c := range(cur.LockableHandle().Requirements) {
if c.ID() == r.ID() {
return true
}
is_requirement := false
UpdateStates(context, cur, NewLockMap(NewLockInfo(c, []string{"requirements"})), func(context *StateContext) error {
is_requirement = checkIfRequirement(context, cur, c)
return nil
})
if is_requirement {
return true
}
}
return false
}
// Lock nodes in the to_lock slice with new_owner, does not modify any states if returning an error
// Assumes that new_owner will be written to after returning, even though it doesn't get locked during the call
func LockLockables(context *StateContext, to_lock map[NodeID]LockableNode, new_owner_node LockableNode) error {
if to_lock == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: no list provided")
}
for _, l := range(to_lock) {
if l == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: Can not lock nil")
}
}
if new_owner_node == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: nil cannot hold locks")
}
new_owner := new_owner_node.LockableHandle()
// Called with no requirements to lock, success
if len(to_lock) == 0 {
return nil
}
return UpdateStates(context, new_owner, NewLockMap(
LockListM(to_lock, []string{"lock"}),
NewLockInfo(new_owner, nil),
), func(context *StateContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req_node := range(to_lock) {
req := req_node.LockableHandle()
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID())
// If req is alreay locked, check that we can pass the lock
if req.Owner != nil {
owner := req.Owner
if owner.ID() == new_owner.ID() {
continue
} else {
err := UpdateStates(context, new_owner, NewLockInfo(owner, []string{"take_lock"}), func(context *StateContext)(error){
return LockLockables(context, req.Requirements, req)
})
if err != nil {
return err
}
}
} else {
err := LockLockables(context, req.Requirements, req)
if err != nil {
return err
}
}
}
// At this point state modification will be started, so no errors can be returned
for _, req_node := range(to_lock) {
req := req_node.LockableHandle()
old_owner := req.Owner
// If the lockable was previously unowned, update the state
if old_owner == nil {
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID())
req.Owner = new_owner_node
new_owner.RecordLock(req, old_owner)
// Otherwise if the new owner already owns it, no need to update state
} else if old_owner.ID() == new_owner.ID() {
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID(), req.ID())
// Otherwise update the state
} else {
req.Owner = new_owner
new_owner.RecordLock(req, old_owner)
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID())
}
}
return nil
})
}
func UnlockLockables(context *StateContext, to_unlock map[NodeID]LockableNode, old_owner_node LockableNode) error {
if to_unlock == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided")
}
for _, l := range(to_unlock) {
if l == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: Can not unlock nil")
}
}
if old_owner_node == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: nil cannot hold locks")
}
old_owner := old_owner_node.LockableHandle()
// Called with no requirements to unlock, success
if len(to_unlock) == 0 {
return nil
}
return UpdateStates(context, old_owner, NewLockMap(
LockListM(to_unlock, []string{"lock"}),
NewLockInfo(old_owner, nil),
), func(context *StateContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req_node := range(to_unlock) {
req := req_node.LockableHandle()
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID())
// Check if the owner is correct
if req.Owner != nil {
if req.Owner.ID() != old_owner.ID() {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked by %s", req.ID(), old_owner.ID())
}
} else {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked", req.ID())
}
err := UnlockLockables(context, req.Requirements, req)
if err != nil {
return err
}
}
// At this point state modification will be started, so no errors can be returned
for _, req_node := range(to_unlock) {
req := req_node.LockableHandle()
new_owner := old_owner.RecordUnlock(req)
req.Owner = new_owner
if new_owner == nil {
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID())
} else {
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID())
}
}
return nil
})
}
// Load function for Lockable
func LoadLockable(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j LockableJSON
err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
lockable := NewLockable(id, j.Name)
nodes[id] = &lockable
err = RestoreLockable(ctx, &lockable, j, nodes)
if err != nil {
return nil, err
}
return &lockable, nil
}
func NewLockable(id NodeID, name string) Lockable {
return Lockable{
SimpleNode: NewSimpleNode(id),
Name: name,
Owner: nil,
Requirements: map[NodeID]LockableNode{},
Dependencies: map[NodeID]LockableNode{},
LocksHeld: map[NodeID]LockableNode{},
}
}
// Helper function to load links when loading a struct that embeds Lockable
func RestoreLockable(ctx * Context, lockable *Lockable, j LockableJSON, nodes NodeMap) error {
if j.Owner != "" {
owner_id, err := ParseID(j.Owner)
if err != nil {
return err
}
owner_node, err := LoadNodeRecurse(ctx, owner_id, nodes)
if err != nil {
return err
}
owner, ok := owner_node.(LockableNode)
if ok == false {
return fmt.Errorf("%s is not a Lockable", j.Owner)
}
lockable.Owner = owner
}
for _, dep_str := range(j.Dependencies) {
dep_id, err := ParseID(dep_str)
if err != nil {
return err
}
dep_node, err := LoadNodeRecurse(ctx, dep_id, nodes)
if err != nil {
return err
}
dep, ok := dep_node.(LockableNode)
if ok == false {
return fmt.Errorf("%+v is not a Lockable as expected", dep_node)
}
ctx.Log.Logf("db", "LOCKABLE_LOAD_DEPENDENCY: %s - %s - %+v", lockable.ID(), dep_id, reflect.TypeOf(dep))
lockable.Dependencies[dep_id] = dep
}
for _, req_str := range(j.Requirements) {
req_id, err := ParseID(req_str)
if err != nil {
return err
}
req_node, err := LoadNodeRecurse(ctx, req_id, nodes)
if err != nil {
return err
}
req, ok := req_node.(LockableNode)
if ok == false {
return fmt.Errorf("%+v is not a Lockable as expected", req_node)
}
lockable.Requirements[req_id] = req
}
for l_id_str, h_str := range(j.LocksHeld) {
l_id, err := ParseID(l_id_str)
l, err := LoadNodeRecurse(ctx, l_id, nodes)
if err != nil {
return err
}
l_l, ok := l.(LockableNode)
if ok == false {
return fmt.Errorf("%s is not a Lockable", l.ID())
}
var h_l LockableNode
if h_str != "" {
h_id, err := ParseID(h_str)
if err != nil {
return err
}
h_node, err := LoadNodeRecurse(ctx, h_id, nodes)
if err != nil {
return err
}
h, ok := h_node.(LockableNode)
if ok == false {
return err
}
h_l = h
}
lockable.RecordLock(l_l, h_l)
}
return RestoreSimpleNode(ctx, &lockable.SimpleNode, j.SimpleNodeJSON, nodes)
}