graphvent/lockable.go

592 lines
16 KiB
Go

package graphvent
import (
"fmt"
"encoding/json"
)
type ListenerExt struct {
Buffer int
Chan chan Signal
}
func NewListenerExt(buffer int) *ListenerExt {
return &ListenerExt{
Buffer: buffer,
Chan: make(chan Signal, buffer),
}
}
func LoadListenerExt(ctx *Context, data []byte) (Extension, error) {
var j int
err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
return NewListenerExt(j), nil
}
const ListenerExtType = ExtType("LISTENER")
func (listener *ListenerExt) Type() ExtType {
return ListenerExtType
}
func (ext *ListenerExt) Process(context *StateContext, node *Node, signal Signal) error {
context.Graph.Log.Logf("signal", "LISTENER_PROCESS: %s - %+v", node.ID, signal)
select {
case ext.Chan <- signal:
default:
return fmt.Errorf("LISTENER_OVERFLOW - %+v", signal)
}
return nil
}
func (ext *ListenerExt) Serialize() ([]byte, error) {
return json.MarshalIndent(ext.Buffer, "", " ")
}
type LockableExt struct {
Owner *Node
Requirements map[NodeID]*Node
Dependencies map[NodeID]*Node
LocksHeld map[NodeID]*Node
}
const LockableExtType = ExtType("LOCKABLE")
func (ext *LockableExt) Type() ExtType {
return LockableExtType
}
type LockableExtJSON struct {
Owner string `json:"owner"`
Requirements []string `json:"requirements"`
Dependencies []string `json:"dependencies"`
LocksHeld map[string]string `json:"locks_held"`
}
func (ext *LockableExt) Serialize() ([]byte, error) {
return json.MarshalIndent(&LockableExtJSON{
Owner: SaveNode(ext.Owner),
Requirements: SaveNodeList(ext.Requirements),
Dependencies: SaveNodeList(ext.Dependencies),
LocksHeld: SaveNodeMap(ext.LocksHeld),
}, "", " ")
}
func NewLockableExt(owner *Node, requirements NodeMap, dependencies NodeMap, locks_held NodeMap) *LockableExt {
if requirements == nil {
requirements = NodeMap{}
}
if dependencies == nil {
dependencies = NodeMap{}
}
if locks_held == nil {
locks_held = NodeMap{}
}
return &LockableExt{
Owner: owner,
Requirements: requirements,
Dependencies: dependencies,
LocksHeld: locks_held,
}
}
func LoadLockableExt(ctx *Context, data []byte) (Extension, error) {
var j LockableExtJSON
err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
ctx.Log.Logf("db", "DB_LOADING_LOCKABLE_EXT_JSON: %+v", j)
owner, err := RestoreNode(ctx, j.Owner)
if err != nil {
return nil, err
}
requirements, err := RestoreNodeList(ctx, j.Requirements)
if err != nil {
return nil, err
}
dependencies, err := RestoreNodeList(ctx, j.Dependencies)
if err != nil {
return nil, err
}
locks_held, err := RestoreNodeMap(ctx, j.LocksHeld)
if err != nil {
return nil, err
}
return NewLockableExt(owner, requirements, dependencies, locks_held), nil
}
func (ext *LockableExt) Process(context *StateContext, node *Node, signal Signal) error {
context.Graph.Log.Logf("signal", "LOCKABLE_PROCESS: %s", node.ID)
switch signal.Direction() {
case Up:
owner_sent := false
for _, dependency := range(ext.Dependencies) {
context.Graph.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", node.ID, dependency.ID)
err := dependency.Process(context, node.ID, signal)
if err != nil {
return err
}
if ext.Owner != nil {
if dependency.ID == ext.Owner.ID {
owner_sent = true
}
}
}
if ext.Owner != nil && owner_sent == false {
if ext.Owner.ID != node.ID {
context.Graph.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", node.ID, ext.Owner.ID)
err := ext.Owner.Process(context, node.ID, signal)
if err != nil {
return err
}
}
}
case Down:
for _, requirement := range(ext.Requirements) {
err := requirement.Process(context, node.ID, signal)
if err != nil {
return err
}
}
case Direct:
default:
return fmt.Errorf("invalid signal direction %d", signal.Direction())
}
return nil
}
func (ext *LockableExt) RecordUnlock(node *Node) *Node {
last_owner, exists := ext.LocksHeld[node.ID]
if exists == false {
panic("Attempted to take a get the original lock holder of a lockable we don't own")
}
delete(ext.LocksHeld, node.ID)
return last_owner
}
func (ext *LockableExt) RecordLock(node *Node, last_owner *Node) {
_, exists := ext.LocksHeld[node.ID]
if exists == true {
panic("Attempted to lock a lockable we're already holding(lock cycle)")
}
ext.LocksHeld[node.ID] = last_owner
}
// Removes requirement as a requirement from lockable
func UnlinkLockables(context *StateContext, princ *Node, lockable *Node, requirement *Node) error {
lockable_ext, err := GetExt[*LockableExt](lockable)
if err != nil {
return err
}
requirement_ext, err := GetExt[*LockableExt](requirement)
if err != nil {
return err
}
return UpdateStates(context, princ, ACLMap{
lockable.ID: ACLInfo{Node: lockable, Resources: []string{"requirements"}},
requirement.ID: ACLInfo{Node: requirement, Resources: []string{"dependencies"}},
}, func(context *StateContext) error {
var found *Node = nil
for _, req := range(lockable_ext.Requirements) {
if requirement.ID == req.ID {
found = req
break
}
}
if found == nil {
return fmt.Errorf("UNLINK_LOCKABLES_ERR: %s is not a requirement of %s", requirement.ID, lockable.ID)
}
delete(requirement_ext.Dependencies, lockable.ID)
delete(lockable_ext.Requirements, requirement.ID)
return nil
})
}
// Link requirements as requirements to lockable
func LinkLockables(context *StateContext, princ *Node, lockable *Node, requirements []*Node) error {
if lockable == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link Lockables to nil as requirements")
}
if len(requirements) == 0 {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link no lockables in call")
}
lockable_ext, err := GetExt[*LockableExt](lockable)
if err != nil {
return err
}
req_exts := map[NodeID]*LockableExt{}
for _, requirement := range(requirements) {
if requirement == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link nil to a Lockable as a requirement")
}
if lockable.ID == requirement.ID {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s to itself", lockable.ID)
}
_, exists := req_exts[requirement.ID]
if exists == true {
return fmt.Errorf("LOCKABLE_LINK_ERR: cannot link %s twice", requirement.ID)
}
ext, err := GetExt[*LockableExt](requirement)
if err != nil {
return err
}
req_exts[requirement.ID] = ext
}
return UpdateStates(context, princ, NewACLMap(
NewACLInfo(lockable, []string{"requirements"}),
ACLList(requirements, []string{"dependencies"}),
), func(context *StateContext) error {
// Check that all the requirements can be added
// If the lockable is already locked, need to lock this resource as well before we can add it
for _, requirement := range(requirements) {
requirement_ext := req_exts[requirement.ID]
for _, req := range(requirements) {
if req.ID == requirement.ID {
continue
}
is_req, err := checkIfRequirement(context, req.ID, requirement_ext)
if err != nil {
return err
} else if is_req {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot add the same dependency", req.ID, requirement.ID)
}
}
is_req, err := checkIfRequirement(context, lockable.ID, requirement_ext)
if err != nil {
return err
} else if is_req {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as requirement", requirement.ID, lockable.ID)
}
is_req, err = checkIfRequirement(context, requirement.ID, lockable_ext)
if err != nil {
return err
} else if is_req {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is a dependency of %s so cannot link as dependency again", lockable.ID, requirement.ID)
}
if lockable_ext.Owner == nil {
// If the new owner isn't locked, we can add the requirement
} else if requirement_ext.Owner == nil {
// if the new requirement isn't already locked but the owner is, the requirement needs to be locked first
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is locked, %s must be locked to add", lockable.ID, requirement.ID)
} else {
// If the new requirement is already locked and the owner is already locked, their owners need to match
if requirement_ext.Owner.ID != lockable_ext.Owner.ID {
return fmt.Errorf("LOCKABLE_LINK_ERR: %s is not locked by the same owner as %s, can't link as requirement", requirement.ID, lockable.ID)
}
}
}
// Update the states of the requirements
for _, requirement := range(requirements) {
requirement_ext := req_exts[requirement.ID]
requirement_ext.Dependencies[lockable.ID] = lockable
lockable_ext.Requirements[lockable.ID] = requirement
context.Graph.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID, lockable.ID)
}
// Return no error
return nil
})
}
func checkIfRequirement(context *StateContext, id NodeID, cur *LockableExt) (bool, error) {
for _, req := range(cur.Requirements) {
if req.ID == id {
return true, nil
}
req_ext, err := GetExt[*LockableExt](req)
if err != nil {
return false, err
}
var is_req bool
err = UpdateStates(context, req, NewACLInfo(req, []string{"requirements"}), func(context *StateContext) error {
is_req, err = checkIfRequirement(context, id, req_ext)
return err
})
if err != nil {
return false, err
}
if is_req == true {
return true, nil
}
}
return false, nil
}
// Lock nodes in the to_lock slice with new_owner, does not modify any states if returning an error
// Assumes that new_owner will be written to after returning, even though it doesn't get locked during the call
func LockLockables(context *StateContext, to_lock NodeMap, new_owner *Node) error {
if to_lock == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: no map provided")
}
req_exts := map[NodeID]*LockableExt{}
for _, l := range(to_lock) {
var err error
if l == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: Can not lock nil")
}
req_exts[l.ID], err = GetExt[*LockableExt](l)
if err != nil {
return err
}
}
if new_owner == nil {
return fmt.Errorf("LOCKABLE_LOCK_ERR: nil cannot hold locks")
}
new_owner_ext, err := GetExt[*LockableExt](new_owner)
if err != nil {
return err
}
// Called with no requirements to lock, success
if len(to_lock) == 0 {
return nil
}
return UpdateStates(context, new_owner, NewACLMap(
ACLListM(to_lock, []string{"lock"}),
NewACLInfo(new_owner, nil),
), func(context *StateContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_lock) {
req_ext := req_exts[req.ID]
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID, new_owner.ID)
// If req is alreay locked, check that we can pass the lock
if req_ext.Owner != nil {
owner := req_ext.Owner
if owner.ID == new_owner.ID {
continue
} else {
err := UpdateStates(context, new_owner, NewACLInfo(owner, []string{"take_lock"}), func(context *StateContext)(error){
return LockLockables(context, req_ext.Requirements, req)
})
if err != nil {
return err
}
}
} else {
err := LockLockables(context, req_ext.Requirements, req)
if err != nil {
return err
}
}
}
// At this point state modification will be started, so no errors can be returned
for _, req := range(to_lock) {
req_ext := req_exts[req.ID]
old_owner := req_ext.Owner
// If the lockable was previously unowned, update the state
if old_owner == nil {
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID, req.ID)
req_ext.Owner = new_owner
new_owner_ext.RecordLock(req, old_owner)
// Otherwise if the new owner already owns it, no need to update state
} else if old_owner.ID == new_owner.ID {
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID, req.ID)
// Otherwise update the state
} else {
req_ext.Owner = new_owner
new_owner_ext.RecordLock(req, old_owner)
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID, req.ID, old_owner.ID)
}
}
return nil
})
}
func UnlockLockables(context *StateContext, to_unlock NodeMap, old_owner *Node) error {
if to_unlock == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided")
}
req_exts := map[NodeID]*LockableExt{}
for _, l := range(to_unlock) {
if l == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: Can not unlock nil")
}
var err error
req_exts[l.ID], err = GetExt[*LockableExt](l)
if err != nil {
return err
}
}
if old_owner == nil {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: nil cannot hold locks")
}
old_owner_ext, err := GetExt[*LockableExt](old_owner)
if err != nil {
return err
}
// Called with no requirements to unlock, success
if len(to_unlock) == 0 {
return nil
}
return UpdateStates(context, old_owner, NewACLMap(
ACLListM(to_unlock, []string{"lock"}),
NewACLInfo(old_owner, nil),
), func(context *StateContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_unlock) {
req_ext := req_exts[req.ID]
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID, old_owner.ID)
// Check if the owner is correct
if req_ext.Owner != nil {
if req_ext.Owner.ID != old_owner.ID {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked by %s", req.ID, old_owner.ID)
}
} else {
return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked", req.ID)
}
err := UnlockLockables(context, req_ext.Requirements, req)
if err != nil {
return err
}
}
// At this point state modification will be started, so no errors can be returned
for _, req := range(to_unlock) {
req_ext := req_exts[req.ID]
new_owner := old_owner_ext.RecordUnlock(req)
req_ext.Owner = new_owner
if new_owner == nil {
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID, req.ID)
} else {
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID, req.ID, new_owner.ID)
}
}
return nil
})
}
func SaveNode(node *Node) string {
str := ""
if node != nil {
str = node.ID.String()
}
return str
}
func RestoreNode(ctx *Context, id_str string) (*Node, error) {
if id_str == "" {
return nil, nil
}
id, err := ParseID(id_str)
if err != nil {
return nil, err
}
return LoadNode(ctx, id)
}
func SaveNodeMap(nodes NodeMap) map[string]string {
m := map[string]string{}
for id, node := range(nodes) {
m[id.String()] = SaveNode(node)
}
return m
}
func RestoreNodeMap(ctx *Context, ids map[string]string) (NodeMap, error) {
nodes := NodeMap{}
for id_str_1, id_str_2 := range(ids) {
id_1, err := ParseID(id_str_1)
if err != nil {
return nil, err
}
node_1, err := LoadNode(ctx, id_1)
if err != nil {
return nil, err
}
var node_2 *Node = nil
if id_str_2 != "" {
id_2, err := ParseID(id_str_2)
if err != nil {
return nil, err
}
node_2, err = LoadNode(ctx, id_2)
if err != nil {
return nil, err
}
}
nodes[node_1.ID] = node_2
}
return nodes, nil
}
func SaveNodeList(nodes NodeMap) []string {
ids := make([]string, len(nodes))
i := 0
for id, _ := range(nodes) {
ids[i] = id.String()
i += 1
}
return ids
}
func RestoreNodeList(ctx *Context, ids []string) (NodeMap, error) {
nodes := NodeMap{}
for _, id_str := range(ids) {
node, err := RestoreNode(ctx, id_str)
if err != nil {
return nil, err
}
nodes[node.ID] = node
}
return nodes, nil
}