graphvent/lockable.go

441 lines
15 KiB
Go

package graphvent
import (
"encoding/json"
)
// A Listener extension provides a channel that can receive signals on a different thread
2023-07-25 21:43:15 -06:00
type ListenerExt struct {
2023-07-26 11:56:10 -06:00
Buffer int
Chan chan Signal
2023-07-24 16:04:56 -06:00
}
// Create a new listener extension with a given buffer size
2023-07-26 11:56:10 -06:00
func NewListenerExt(buffer int) *ListenerExt {
return &ListenerExt{
Buffer: buffer,
Chan: make(chan Signal, buffer),
2023-07-24 16:04:56 -06:00
}
2023-06-23 22:19:43 -06:00
}
func (ext *ListenerExt) Field(name string) interface{} {
return ResolveFields(ext, name, map[string]func(*ListenerExt)interface{}{
"buffer": func(ext *ListenerExt) interface{} {
return ext.Buffer
},
"chan": func(ext *ListenerExt) interface{} {
return ext.Chan
},
})
}
// Simple load function, unmarshal the buffer int from json
2023-08-01 20:55:15 -06:00
func (ext *ListenerExt) Deserialize(ctx *Context, data []byte) error {
err := json.Unmarshal(data, &ext.Buffer)
ext.Chan = make(chan Signal, ext.Buffer)
2023-08-01 20:55:15 -06:00
return err
2023-07-26 11:56:10 -06:00
}
func (listener *ListenerExt) Type() ExtType {
2023-07-25 21:43:15 -06:00
return ListenerExtType
2023-06-23 22:19:43 -06:00
}
// Send the signal to the channel, logging an overflow if it occurs
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
ctx.Log.Logf("listener", "LISTENER_PROCESS: %s - %+v", node.ID, signal)
2023-07-25 21:43:15 -06:00
select {
case ext.Chan <- signal:
2023-07-25 21:43:15 -06:00
default:
ctx.Log.Logf("listener", "LISTENER_OVERFLOW: %s", node.ID)
2023-07-25 21:43:15 -06:00
}
return nil
2023-07-09 14:30:30 -06:00
}
// ReqState holds the multiple states of a requirement
2023-07-30 10:09:04 -06:00
type LinkState struct {
Link string `json:"link"`
Lock string `json:"lock"`
2023-07-30 10:09:04 -06:00
Initiator NodeID `json:"initiator"`
}
// A LockableExt allows a node to be linked to other nodes(via LinkSignal) and locked/unlocked(via LockSignal)
2023-07-30 10:09:04 -06:00
type LinkMap map[NodeID]LinkState
func (m LinkMap) MarshalJSON() ([]byte, error) {
tmp := map[string]LinkState{}
for id, state := range(m) {
tmp[id.String()] = state
}
return json.Marshal(tmp)
}
func (m LinkMap) UnmarshalJSON(data []byte) error {
tmp := map[string]LinkState{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
for id_str, state := range(tmp) {
id, err := ParseID(id_str)
if err != nil {
return err
}
m[id] = state
}
return nil
}
2023-07-30 10:09:04 -06:00
type LockableExt struct {
Owner *NodeID `json:"owner"`
PendingOwner *NodeID `json:"pending_owner"`
2023-07-30 10:09:04 -06:00
Requirements LinkMap `json:"requirements"`
Dependencies LinkMap `json:"dependencies"`
}
func (ext *LockableExt) Field(name string) interface{} {
return ResolveFields(ext, name, map[string]func(*LockableExt)interface{}{
"owner": func(ext *LockableExt) interface{} {
return ext.Owner
},
"pending_owner": func(ext *LockableExt) interface{} {
return ext.PendingOwner
},
"requirements": func(ext *LockableExt) interface{} {
return ext.Requirements
},
"dependencies": func(ext *LockableExt) interface{} {
return ext.Dependencies
},
})
}
func (ext *ListenerExt) Serialize() ([]byte, error) {
2023-08-01 20:55:15 -06:00
return json.Marshal(ext.Buffer)
}
2023-07-25 21:43:15 -06:00
func (ext *LockableExt) Type() ExtType {
return LockableExtType
}
2023-07-25 21:43:15 -06:00
func (ext *LockableExt) Serialize() ([]byte, error) {
2023-08-01 20:55:15 -06:00
return json.Marshal(ext)
}
func (ext *LockableExt) Deserialize(ctx *Context, data []byte) error {
return json.Unmarshal(data, ext)
2023-07-26 00:18:11 -06:00
}
func NewLockableExt() *LockableExt {
2023-07-26 11:56:10 -06:00
return &LockableExt{
Owner: nil,
PendingOwner: nil,
2023-07-30 10:09:04 -06:00
Requirements: map[NodeID]LinkState{},
Dependencies: map[NodeID]LinkState{},
2023-07-26 11:56:10 -06:00
}
}
// Send the signal to unlock a node from itself
func UnlockLockable(ctx *Context, node *Node) error {
msgs := Messages{}
2023-08-10 23:43:10 -06:00
msgs = msgs.Add(node.ID, node.Key, NewLockSignal("unlock"), node.ID)
return ctx.Send(msgs)
}
// Send the signal to lock a node from itself
func LockLockable(ctx *Context, node *Node) error {
msgs := Messages{}
2023-08-10 23:43:10 -06:00
msgs = msgs.Add(node.ID, node.Key, NewLockSignal("lock"), node.ID)
return ctx.Send(msgs)
}
// Setup a node to send the initial requirement link signal, then send the signal
func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error {
msgs := Messages{}
2023-08-10 23:43:10 -06:00
msgs = msgs.Add(dependency.ID, dependency.Key, NewLinkStartSignal("req", requirement), dependency.ID)
return ctx.Send(msgs)
}
2023-07-27 16:21:27 -06:00
// Handle a LockSignal and update the extensions owner/requirement states
func (ext *LockableExt) HandleLockSignal(log Logger, node *Node, source NodeID, signal *StringSignal) Messages {
2023-07-30 23:42:47 -06:00
state := signal.Str
log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal)
messages := Messages{}
switch state {
case "unlock":
if ext.Owner == nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_unlocked"), source)
} else if source != *ext.Owner {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_owner"), source)
} else if ext.PendingOwner == nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_unlocking"), source)
} else {
if len(ext.Requirements) == 0 {
ext.Owner = nil
ext.PendingOwner = nil
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("unlocked"), source)
} else {
ext.PendingOwner = nil
for id, state := range(ext.Requirements) {
if state.Link == "linked" {
if state.Lock != "locked" {
panic("NOT_LOCKED")
}
state.Lock = "unlocking"
ext.Requirements[id] = state
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("unlock"), id)
}
}
if source != node.ID {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("unlocking"), source)
}
}
}
case "unlocking":
state, exists := ext.Requirements[source]
if exists == false {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_requirement"), source)
} else if state.Link != "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_linked"), source)
} else if state.Lock != "unlocking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocking"), source)
}
case "unlocked":
if source == node.ID {
return nil
}
state, exists := ext.Requirements[source]
if exists == false {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_requirement"), source)
} else if state.Link != "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_linked"), source)
} else if state.Lock != "unlocking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_unlocking"), source)
} else {
state.Lock = "unlocked"
ext.Requirements[source] = state
if ext.PendingOwner == nil {
linked := 0
unlocked := 0
for _, s := range(ext.Requirements) {
if s.Link == "linked" {
linked += 1
}
if s.Lock == "unlocked" {
unlocked += 1
}
}
if linked == unlocked {
previous_owner := *ext.Owner
ext.Owner = nil
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("unlocked"), previous_owner)
}
}
}
case "locked":
if source == node.ID {
return nil
}
state, exists := ext.Requirements[source]
if exists == false {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_requirement"), source)
} else if state.Link != "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_linked"), source)
} else if state.Lock != "locking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locking"), source)
} else {
state.Lock = "locked"
ext.Requirements[source] = state
if ext.PendingOwner != nil {
linked := 0
locked := 0
for _, s := range(ext.Requirements) {
if s.Link == "linked" {
linked += 1
}
if s.Lock == "locked" {
locked += 1
}
}
if linked == locked {
ext.Owner = ext.PendingOwner
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("locked"), *ext.Owner)
}
}
}
case "locking":
state, exists := ext.Requirements[source]
if exists == false {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_requirement"), source)
} else if state.Link != "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_linked"), source)
} else if state.Lock != "locking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_locking"), source)
}
case "lock":
if ext.Owner != nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_locked"), source)
} else if ext.PendingOwner != nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_locking"), source)
} else {
owner := source
if len(ext.Requirements) == 0 {
ext.Owner = &owner
ext.PendingOwner = ext.Owner
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("locked"), source)
} else {
ext.PendingOwner = &owner
for id, state := range(ext.Requirements) {
if state.Link == "linked" {
log.Logf("lockable", "LOCK_REQ: %s sending 'lock' to %s", node.ID, id)
if state.Lock != "unlocked" {
panic("NOT_UNLOCKED")
}
state.Lock = "locking"
ext.Requirements[id] = state
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("lock"), id)
}
}
if source != node.ID {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLockSignal("locking"), source)
}
}
}
default:
log.Logf("lockable", "LOCK_ERR: unkown state %s", state)
}
log.Logf("lockable", "LOCK_MESSAGES: %+v", messages)
return messages
}
func (ext *LockableExt) HandleLinkStartSignal(log Logger, node *Node, source NodeID, signal *IDStringSignal) Messages {
2023-07-30 23:42:47 -06:00
link_type := signal.Str
target := signal.NodeID
log.Logf("lockable", "LINK_START_SIGNAL: %s->%s %s %s", source, node.ID, link_type, target)
messages := Messages{}
switch link_type {
case "req":
state, exists := ext.Requirements[target]
_, dep_exists := ext.Dependencies[target]
if ext.Owner != nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_locked"), source)
} else if ext.Owner != ext.PendingOwner {
if ext.PendingOwner == nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "unlocking"), source)
} else {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "locking"), source)
}
} else if exists == true {
if state.Link == "linking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_linking_req"), source)
} else if state.Link == "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_req"), source)
}
} else if dep_exists == true {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_dep"), source)
} else {
2023-07-30 10:09:04 -06:00
ext.Requirements[target] = LinkState{"linking", "unlocked", source}
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLinkSignal("linked_as_req"), target)
messages = messages.Add(node.ID, node.Key, NewLinkStartSignal("linking_req", target), source)
}
}
return messages
}
// Handle LinkSignal, updating the extensions requirements and dependencies as necessary
// TODO: Add unlink
func (ext *LockableExt) HandleLinkSignal(log Logger, node *Node, source NodeID, signal *StringSignal) Messages {
log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal)
2023-07-30 23:42:47 -06:00
state := signal.Str
messages := Messages{}
switch state {
case "dep_done":
2023-07-27 23:05:19 -06:00
state, exists := ext.Requirements[source]
if exists == false {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "not_linking"), source)
2023-07-27 23:05:19 -06:00
} else if state.Link == "linking" {
state.Link = "linked"
ext.Requirements[source] = state
log.Logf("lockable", "FINISHED_LINKING_REQ: %s->%s", node.ID, source)
}
2023-07-30 10:09:04 -06:00
case "linked_as_req":
2023-07-27 23:05:19 -06:00
state, exists := ext.Dependencies[source]
if exists == false {
ext.Dependencies[source] = LinkState{"linked", "unlocked", source}
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewLinkSignal("dep_done"), source)
2023-07-30 10:09:04 -06:00
} else if state.Link == "linking" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_linking"), source)
} else if state.Link == "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "already_linked"), source)
2023-07-27 23:05:19 -06:00
} else if ext.PendingOwner != ext.Owner {
if ext.Owner == nil {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "locking"), source)
2023-07-27 23:05:19 -06:00
} else {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, NewErrorSignal(signal.ID(), "unlocking"), source)
}
2023-07-27 18:16:37 -06:00
}
default:
log.Logf("lockable", "LINK_ERROR: unknown state %s", state)
}
return messages
2023-07-27 16:21:27 -06:00
}
// LockableExts process Up/Down signals by forwarding them to owner, dependency, and requirement nodes
// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) Messages {
messages := Messages{}
switch signal.Direction() {
case Up:
ctx.Log.Logf("lockable", "LOCKABLE_DEPENDENCIES: %+v", ext.Dependencies)
2023-07-27 12:20:49 -06:00
owner_sent := false
for dependency, state := range(ext.Dependencies) {
2023-07-30 10:09:04 -06:00
if state.Link == "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, signal, dependency)
if ext.Owner != nil {
if dependency == *ext.Owner {
owner_sent = true
}
}
}
2023-07-27 12:20:49 -06:00
}
2023-07-27 12:20:49 -06:00
if ext.Owner != nil && owner_sent == false {
if *ext.Owner != node.ID {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, signal, *ext.Owner)
2023-07-09 14:30:30 -06:00
}
2023-07-27 12:20:49 -06:00
}
case Down:
for requirement, state := range(ext.Requirements) {
if state.Link == "linked" {
2023-08-10 23:43:10 -06:00
messages = messages.Add(node.ID, node.Key, signal, requirement)
2023-07-27 12:20:49 -06:00
}
}
case Direct:
switch signal.Type() {
case LinkSignalType:
messages = ext.HandleLinkSignal(ctx.Log, node, source, signal.(*StringSignal))
case LockSignalType:
messages = ext.HandleLockSignal(ctx.Log, node, source, signal.(*StringSignal))
case LinkStartSignalType:
messages = ext.HandleLinkStartSignal(ctx.Log, node, source, signal.(*IDStringSignal))
2023-07-27 16:21:27 -06:00
default:
}
default:
2023-07-24 17:07:27 -06:00
}
return messages
2023-07-25 21:43:15 -06:00
}