598 lines
16 KiB
Go
598 lines
16 KiB
Go
package graphvent
|
|
|
|
import (
|
|
"fmt"
|
|
"encoding/json"
|
|
)
|
|
|
|
type ListenerExt struct {
|
|
Buffer int
|
|
Chan chan Signal
|
|
}
|
|
|
|
func NewListenerExt(buffer int) *ListenerExt {
|
|
return &ListenerExt{
|
|
Buffer: buffer,
|
|
Chan: make(chan Signal, buffer),
|
|
}
|
|
}
|
|
|
|
func LoadListenerExt(ctx *Context, data []byte) (Extension, error) {
|
|
var j int
|
|
err := json.Unmarshal(data, &j)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewListenerExt(j), nil
|
|
}
|
|
|
|
const ListenerExtType = ExtType("LISTENER")
|
|
func (listener *ListenerExt) Type() ExtType {
|
|
return ListenerExtType
|
|
}
|
|
|
|
func (ext *ListenerExt) Process(context *StateContext, node *Node, signal Signal) error {
|
|
context.Graph.Log.Logf("signal", "LISTENER_PROCESS: %s - %+v", node.ID, signal)
|
|
select {
|
|
case ext.Chan <- signal:
|
|
default:
|
|
return fmt.Errorf("LISTENER_OVERFLOW - %+v", signal)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ext *ListenerExt) Serialize() ([]byte, error) {
|
|
return json.MarshalIndent(ext.Buffer, "", " ")
|
|
}
|
|
|
|
type LockableExt struct {
|
|
Owner *Node
|
|
Requirements map[NodeID]*Node
|
|
Dependencies map[NodeID]*Node
|
|
LocksHeld map[NodeID]*Node
|
|
}
|
|
|
|
const LockableExtType = ExtType("LOCKABLE")
|
|
func (ext *LockableExt) Type() ExtType {
|
|
return LockableExtType
|
|
}
|
|
|
|
type LockableExtJSON struct {
|
|
Owner string `json:"owner"`
|
|
Requirements []string `json:"requirements"`
|
|
Dependencies []string `json:"dependencies"`
|
|
LocksHeld map[string]string `json:"locks_held"`
|
|
}
|
|
|
|
func (ext *LockableExt) Serialize() ([]byte, error) {
|
|
return json.MarshalIndent(&LockableExtJSON{
|
|
Owner: SaveNode(ext.Owner),
|
|
Requirements: SaveNodeList(ext.Requirements),
|
|
Dependencies: SaveNodeList(ext.Dependencies),
|
|
LocksHeld: SaveNodeMap(ext.LocksHeld),
|
|
}, "", " ")
|
|
}
|
|
|
|
func NewLockableExt(owner *Node, requirements NodeMap, dependencies NodeMap, locks_held NodeMap) *LockableExt {
|
|
if requirements == nil {
|
|
requirements = NodeMap{}
|
|
}
|
|
|
|
if dependencies == nil {
|
|
dependencies = NodeMap{}
|
|
}
|
|
|
|
if locks_held == nil {
|
|
locks_held = NodeMap{}
|
|
}
|
|
|
|
return &LockableExt{
|
|
Owner: owner,
|
|
Requirements: requirements,
|
|
Dependencies: dependencies,
|
|
LocksHeld: locks_held,
|
|
}
|
|
}
|
|
|
|
func LoadLockableExt(ctx *Context, data []byte) (Extension, error) {
|
|
var j LockableExtJSON
|
|
err := json.Unmarshal(data, &j)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx.Log.Logf("db", "DB_LOADING_LOCKABLE_EXT_JSON: %+v", j)
|
|
|
|
owner, err := RestoreNode(ctx, j.Owner)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requirements, err := RestoreNodeList(ctx, j.Requirements)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dependencies, err := RestoreNodeList(ctx, j.Dependencies)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
locks_held, err := RestoreNodeMap(ctx, j.LocksHeld)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
|
|
return NewLockableExt(owner, requirements, dependencies, locks_held), nil
|
|
}
|
|
|
|
func (ext *LockableExt) Process(context *StateContext, node *Node, signal Signal) error {
|
|
context.Graph.Log.Logf("signal", "LOCKABLE_PROCESS: %s", node.ID)
|
|
|
|
var err error
|
|
switch signal.Direction() {
|
|
case Up:
|
|
err = UseStates(context, node,
|
|
NewACLInfo(node, []string{"dependencies", "owner"}), func(context *StateContext) error {
|
|
owner_sent := false
|
|
for _, dependency := range(ext.Dependencies) {
|
|
context.Graph.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", node.ID, dependency.ID)
|
|
dependency.Process(context, node.ID, signal)
|
|
if ext.Owner != nil {
|
|
if dependency.ID == ext.Owner.ID {
|
|
owner_sent = true
|
|
}
|
|
}
|
|
}
|
|
if ext.Owner != nil && owner_sent == false {
|
|
if ext.Owner.ID != node.ID {
|
|
context.Graph.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", node.ID, ext.Owner.ID)
|
|
return ext.Owner.Process(context, node.ID, signal)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
case Down:
|
|
err = UseStates(context, node, NewACLInfo(node, []string{"requirements"}), func(context *StateContext) error {
|
|
for _, requirement := range(ext.Requirements) {
|
|
err := requirement.Process(context, node.ID, signal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
case Direct:
|
|
err = nil
|
|
default:
|
|
err = fmt.Errorf("invalid signal direction %d", signal.Direction())
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ext *LockableExt) RecordUnlock(node *Node) *Node {
|
|
last_owner, exists := ext.LocksHeld[node.ID]
|
|
if exists == false {
|
|
panic("Attempted to take a get the original lock holder of a lockable we don't own")
|
|
}
|
|
delete(ext.LocksHeld, node.ID)
|
|
return last_owner
|
|
}
|
|
|
|
func (ext *LockableExt) RecordLock(node *Node, last_owner *Node) {
|
|
_, exists := ext.LocksHeld[node.ID]
|
|
if exists == true {
|
|
panic("Attempted to lock a lockable we're already holding(lock cycle)")
|
|
}
|
|
ext.LocksHeld[node.ID] = last_owner
|
|
}
|
|
|
|
// Removes requirement as a requirement from lockable
|
|
func UnlinkLockables(context *StateContext, princ *Node, lockable *Node, requirement *Node) error {
|
|
lockable_ext, err := GetExt[*LockableExt](lockable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
requirement_ext, err := GetExt[*LockableExt](requirement)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return UpdateStates(context, princ, ACLMap{
|
|
lockable.ID: ACLInfo{Node: lockable, Resources: []string{"requirements"}},
|
|
requirement.ID: ACLInfo{Node: requirement, Resources: []string{"dependencies"}},
|
|
}, func(context *StateContext) error {
|
|
var found *Node = nil
|
|
for _, req := range(lockable_ext.Requirements) {
|
|
if requirement.ID == req.ID {
|
|
found = req
|
|
break
|
|
}
|
|
}
|
|
|
|
if found == nil {
|
|
return fmt.Errorf("UNLINK_LOCKABLES_ERR: %s is not a requirement of %s", requirement.ID, lockable.ID)
|
|
}
|
|
|
|
delete(requirement_ext.Dependencies, lockable.ID)
|
|
delete(lockable_ext.Requirements, requirement.ID)
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Link requirements as requirements to lockable
|
|
func LinkLockables(context *StateContext, princ *Node, lockable *Node, requirements []*Node) error {
|
|
if lockable == nil {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link Lockables to nil as requirements")
|
|
}
|
|
|
|
if len(requirements) == 0 {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link no lockables in call")
|
|
}
|
|
|
|
lockable_ext, err := GetExt[*LockableExt](lockable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req_exts := map[NodeID]*LockableExt{}
|
|
for _, requirement := range(requirements) {
|
|
if requirement == nil {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link nil to a Lockable as a requirement")
|
|
}
|
|
|
|
if lockable.ID == requirement.ID {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s to itself", lockable.ID)
|
|
}
|
|
|
|
_, exists := req_exts[requirement.ID]
|
|
if exists == true {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s twice", requirement.ID)
|
|
}
|
|
ext, err := GetExt[*LockableExt](requirement)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req_exts[requirement.ID] = ext
|
|
}
|
|
|
|
return UpdateStates(context, princ, NewACLMap(
|
|
NewACLInfo(lockable, []string{"requirements"}),
|
|
ACLList(requirements, []string{"dependencies"}),
|
|
), func(context *StateContext) error {
|
|
// Check that all the requirements can be added
|
|
// If the lockable is already locked, need to lock this resource as well before we can add it
|
|
for _, requirement := range(requirements) {
|
|
requirement_ext := req_exts[requirement.ID]
|
|
for _, req := range(requirements) {
|
|
if req.ID == requirement.ID {
|
|
continue
|
|
}
|
|
|
|
is_req, err := checkIfRequirement(context, req.ID, requirement_ext)
|
|
if err != nil {
|
|
return err
|
|
} else if is_req {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot add the same dependency", req.ID, requirement.ID)
|
|
|
|
}
|
|
}
|
|
|
|
is_req, err := checkIfRequirement(context, lockable.ID, requirement_ext)
|
|
if err != nil {
|
|
return err
|
|
} else if is_req {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID, lockable.ID)
|
|
}
|
|
|
|
is_req, err = checkIfRequirement(context, requirement.ID, lockable_ext)
|
|
if err != nil {
|
|
return err
|
|
} else if is_req {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID, requirement.ID)
|
|
}
|
|
|
|
if lockable_ext.Owner == nil {
|
|
// If the new owner isn't locked, we can add the requirement
|
|
} else if requirement_ext.Owner == nil {
|
|
// if the new requirement isn't already locked but the owner is, the requirement needs to be locked first
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is locked, %s must be locked to add", lockable.ID, requirement.ID)
|
|
} else {
|
|
// If the new requirement is already locked and the owner is already locked, their owners need to match
|
|
if requirement_ext.Owner.ID != lockable_ext.Owner.ID {
|
|
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is not locked by the same owner as %s, can't link as requirement", requirement.ID, lockable.ID)
|
|
}
|
|
}
|
|
}
|
|
// Update the states of the requirements
|
|
for _, requirement := range(requirements) {
|
|
requirement_ext := req_exts[requirement.ID]
|
|
requirement_ext.Dependencies[lockable.ID] = lockable
|
|
lockable_ext.Requirements[lockable.ID] = requirement
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID, lockable.ID)
|
|
}
|
|
|
|
// Return no error
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func checkIfRequirement(context *StateContext, id NodeID, cur *LockableExt) (bool, error) {
|
|
for _, req := range(cur.Requirements) {
|
|
if req.ID == id {
|
|
return true, nil
|
|
}
|
|
|
|
req_ext, err := GetExt[*LockableExt](req)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var is_req bool
|
|
err = UpdateStates(context, req, NewACLInfo(req, []string{"requirements"}), func(context *StateContext) error {
|
|
is_req, err = checkIfRequirement(context, id, req_ext)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if is_req == true {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// Lock nodes in the to_lock slice with new_owner, does not modify any states if returning an error
|
|
// Assumes that new_owner will be written to after returning, even though it doesn't get locked during the call
|
|
func LockLockables(context *StateContext, to_lock NodeMap, new_owner *Node) error {
|
|
if to_lock == nil {
|
|
return fmt.Errorf("LOCKABLE_LOCK_ERR: no map provided")
|
|
}
|
|
|
|
req_exts := map[NodeID]*LockableExt{}
|
|
for _, l := range(to_lock) {
|
|
var err error
|
|
if l == nil {
|
|
return fmt.Errorf("LOCKABLE_LOCK_ERR: Can not lock nil")
|
|
}
|
|
|
|
req_exts[l.ID], err = GetExt[*LockableExt](l)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if new_owner == nil {
|
|
return fmt.Errorf("LOCKABLE_LOCK_ERR: nil cannot hold locks")
|
|
}
|
|
|
|
new_owner_ext, err := GetExt[*LockableExt](new_owner)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Called with no requirements to lock, success
|
|
if len(to_lock) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return UpdateStates(context, new_owner, NewACLMap(
|
|
ACLListM(to_lock, []string{"lock"}),
|
|
NewACLInfo(new_owner, nil),
|
|
), func(context *StateContext) error {
|
|
// First loop is to check that the states can be locked, and locks all requirements
|
|
for _, req := range(to_lock) {
|
|
req_ext := req_exts[req.ID]
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID, new_owner.ID)
|
|
|
|
// If req is alreay locked, check that we can pass the lock
|
|
if req_ext.Owner != nil {
|
|
owner := req_ext.Owner
|
|
if owner.ID == new_owner.ID {
|
|
continue
|
|
} else {
|
|
err := UpdateStates(context, new_owner, NewACLInfo(owner, []string{"take_lock"}), func(context *StateContext)(error){
|
|
return LockLockables(context, req_ext.Requirements, req)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
err := LockLockables(context, req_ext.Requirements, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// At this point state modification will be started, so no errors can be returned
|
|
for _, req := range(to_lock) {
|
|
req_ext := req_exts[req.ID]
|
|
old_owner := req_ext.Owner
|
|
// If the lockable was previously unowned, update the state
|
|
if old_owner == nil {
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID, req.ID)
|
|
req_ext.Owner = new_owner
|
|
new_owner_ext.RecordLock(req, old_owner)
|
|
// Otherwise if the new owner already owns it, no need to update state
|
|
} else if old_owner.ID == new_owner.ID {
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID, req.ID)
|
|
// Otherwise update the state
|
|
} else {
|
|
req_ext.Owner = new_owner
|
|
new_owner_ext.RecordLock(req, old_owner)
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID, req.ID, old_owner.ID)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
}
|
|
|
|
func UnlockLockables(context *StateContext, to_unlock NodeMap, old_owner *Node) error {
|
|
if to_unlock == nil {
|
|
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided")
|
|
}
|
|
|
|
req_exts := map[NodeID]*LockableExt{}
|
|
for _, l := range(to_unlock) {
|
|
if l == nil {
|
|
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: Can not unlock nil")
|
|
}
|
|
|
|
var err error
|
|
req_exts[l.ID], err = GetExt[*LockableExt](l)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if old_owner == nil {
|
|
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: nil cannot hold locks")
|
|
}
|
|
|
|
old_owner_ext, err := GetExt[*LockableExt](old_owner)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
|
|
// Called with no requirements to unlock, success
|
|
if len(to_unlock) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return UpdateStates(context, old_owner, NewACLMap(
|
|
ACLListM(to_unlock, []string{"lock"}),
|
|
NewACLInfo(old_owner, nil),
|
|
), func(context *StateContext) error {
|
|
// First loop is to check that the states can be locked, and locks all requirements
|
|
for _, req := range(to_unlock) {
|
|
req_ext := req_exts[req.ID]
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID, old_owner.ID)
|
|
|
|
// Check if the owner is correct
|
|
if req_ext.Owner != nil {
|
|
if req_ext.Owner.ID != old_owner.ID {
|
|
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked by %s", req.ID, old_owner.ID)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked", req.ID)
|
|
}
|
|
|
|
err := UnlockLockables(context, req_ext.Requirements, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// At this point state modification will be started, so no errors can be returned
|
|
for _, req := range(to_unlock) {
|
|
req_ext := req_exts[req.ID]
|
|
new_owner := old_owner_ext.RecordUnlock(req)
|
|
req_ext.Owner = new_owner
|
|
if new_owner == nil {
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID, req.ID)
|
|
} else {
|
|
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID, req.ID, new_owner.ID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func SaveNode(node *Node) string {
|
|
str := ""
|
|
if node != nil {
|
|
str = node.ID.String()
|
|
}
|
|
return str
|
|
}
|
|
|
|
func RestoreNode(ctx *Context, id_str string) (*Node, error) {
|
|
if id_str == "" {
|
|
return nil, nil
|
|
}
|
|
id, err := ParseID(id_str)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return LoadNode(ctx, id)
|
|
}
|
|
|
|
func SaveNodeMap(nodes NodeMap) map[string]string {
|
|
m := map[string]string{}
|
|
for id, node := range(nodes) {
|
|
m[id.String()] = SaveNode(node)
|
|
}
|
|
return m
|
|
}
|
|
|
|
func RestoreNodeMap(ctx *Context, ids map[string]string) (NodeMap, error) {
|
|
nodes := NodeMap{}
|
|
for id_str_1, id_str_2 := range(ids) {
|
|
id_1, err := ParseID(id_str_1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
node_1, err := LoadNode(ctx, id_1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
|
|
var node_2 *Node = nil
|
|
if id_str_2 != "" {
|
|
id_2, err := ParseID(id_str_2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
node_2, err = LoadNode(ctx, id_2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
nodes[node_1.ID] = node_2
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
func SaveNodeList(nodes NodeMap) []string {
|
|
ids := make([]string, len(nodes))
|
|
i := 0
|
|
for id, _ := range(nodes) {
|
|
ids[i] = id.String()
|
|
i += 1
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func RestoreNodeList(ctx *Context, ids []string) (NodeMap, error) {
|
|
nodes := NodeMap{}
|
|
|
|
for _, id_str := range(ids) {
|
|
node, err := RestoreNode(ctx, id_str)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nodes[node.ID] = node
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|