graphvent/lockable.go

423 lines
14 KiB
Go

package graphvent
import (
2023-08-15 18:23:06 -06:00
"github.com/google/uuid"
2023-11-03 22:51:54 -06:00
"time"
)
2023-08-15 18:23:06 -06:00
type ReqState byte
2023-08-11 16:00:36 -06:00
const (
Unlocked = ReqState(0)
Unlocking = ReqState(1)
Locked = ReqState(2)
Locking = ReqState(3)
2023-08-15 18:23:06 -06:00
AbortingLock = ReqState(4)
2023-08-11 16:00:36 -06:00
)
2023-11-03 22:51:54 -06:00
var ReqStateStrings = map[ReqState]string {
Unlocked: "Unlocked",
Unlocking: "Unlocking",
Locked: "Locked",
Locking: "Locking",
AbortingLock: "AbortingLock",
}
2023-08-11 16:00:36 -06:00
type LockableExt struct{
2024-03-17 14:25:34 -06:00
State ReqState `gv:"lockable_state"`
ReqID *uuid.UUID `gv:"req_id"`
Owner *NodeID `gv:"owner" node:"Base"`
PendingOwner *NodeID `gv:"pending_owner" node:"Base"`
PendingID uuid.UUID `gv:"pending_id"`
Requirements map[NodeID]ReqState `gv:"requirements" node:"Lockable:"`
WaitInfos WaitMap `gv:"wait_infos" node:":Base"`
}
func NewLockableExt(requirements []NodeID) *LockableExt {
2023-11-03 22:51:54 -06:00
var reqs map[NodeID]ReqState = nil
2023-08-11 16:00:36 -06:00
if requirements != nil {
2023-11-03 22:51:54 -06:00
reqs = map[NodeID]ReqState{}
2023-08-11 16:00:36 -06:00
for _, id := range(requirements) {
2023-11-03 22:51:54 -06:00
reqs[id] = Unlocked
2023-08-11 16:00:36 -06:00
}
}
2023-07-26 11:56:10 -06:00
return &LockableExt{
2023-08-15 18:23:06 -06:00
State: Unlocked,
Owner: nil,
PendingOwner: nil,
Requirements: reqs,
2023-11-03 22:51:54 -06:00
WaitInfos: WaitMap{},
2023-07-26 11:56:10 -06:00
}
}
func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
2023-08-15 18:23:06 -06:00
signal := NewLockSignal("unlock")
2024-03-04 17:30:42 -07:00
messages := []SendMsg{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages)
}
func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
2023-08-15 18:23:06 -06:00
signal := NewLockSignal("lock")
2024-03-04 17:30:42 -07:00
messages := []SendMsg{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages)
}
func (ext *LockableExt) Load(ctx *Context, node *Node) error {
return nil
}
func (ext *LockableExt) Unload(ctx *Context, node *Node) {
}
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
var changes Changes = nil
2023-11-03 22:51:54 -06:00
info, info_found := node.ProcessResponse(ext.WaitInfos, signal)
if info_found {
state, found := ext.Requirements[info.Destination]
2023-11-03 22:51:54 -06:00
if found == true {
2024-03-21 14:13:54 -06:00
changes = append(changes, "wait_infos")
2023-11-11 13:53:41 -07:00
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State])
switch ext.State {
case AbortingLock:
ext.Requirements[info.Destination] = Unlocked
all_unlocked := true
for _, state := range(ext.Requirements) {
if state != Unlocked {
all_unlocked = false
break
}
}
if all_unlocked == true {
2024-03-21 14:13:54 -06:00
changes = append(changes, "state")
2023-11-11 13:53:41 -07:00
ext.State = Unlocked
}
2023-11-03 22:51:54 -06:00
case Locking:
2024-03-21 14:13:54 -06:00
changes = append(changes, "state")
ext.Requirements[info.Destination] = Unlocked
unlocked := 0
for _, state := range(ext.Requirements) {
if state == Unlocked {
unlocked += 1
}
}
if unlocked == len(ext.Requirements) {
ctx.Log.Logf("lockable", "%s unlocked from error %s from %s", node.ID, signal.Error, source)
ext.State = Unlocked
} else {
ext.State = AbortingLock
for id, state := range(ext.Requirements) {
if state == Locked {
ext.Requirements[id] = Unlocking
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond)
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{id, lock_signal})
ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID)
}
2023-11-03 22:51:54 -06:00
}
}
2023-11-03 22:51:54 -06:00
case Unlocking:
2023-11-13 13:23:58 -07:00
ext.Requirements[info.Destination] = Locked
all_returned := true
for _, state := range(ext.Requirements) {
if state == Unlocking {
all_returned = false
break
}
}
if all_returned == true {
ext.State = Locked
}
2023-08-11 16:00:36 -06:00
}
2023-11-03 22:51:54 -06:00
} else {
ctx.Log.Logf("lockable", "Got mapped error %s, but %s isn't a requirement", signal, info.Destination)
}
2023-08-15 18:23:06 -06:00
}
2023-10-07 23:00:07 -06:00
return messages, changes
2023-08-15 18:23:06 -06:00
}
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
2023-11-11 14:52:08 -07:00
var changes = Changes{}
if ext.State == Unlocked {
switch signal.Action {
case "add":
_, exists := ext.Requirements[signal.NodeID]
if exists == true {
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "already_requirement")})
} else {
if ext.Requirements == nil {
2023-11-03 22:51:54 -06:00
ext.Requirements = map[NodeID]ReqState{}
}
2023-11-03 22:51:54 -06:00
ext.Requirements[signal.NodeID] = Unlocked
2024-03-21 14:13:54 -06:00
changes = append(changes, "requirements")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
}
case "remove":
_, exists := ext.Requirements[signal.NodeID]
if exists == false {
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")})
} else {
delete(ext.Requirements, signal.NodeID)
2024-03-21 14:13:54 -06:00
changes = append(changes, "requirements")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.ID())})
}
default:
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")})
}
} else {
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")})
}
2023-10-07 23:00:07 -06:00
return messages, changes
}
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
2023-11-11 14:52:08 -07:00
var changes = Changes{}
if source == node.ID {
2023-10-07 23:00:07 -06:00
return messages, changes
}
2023-11-03 22:51:54 -06:00
info, info_found := node.ProcessResponse(ext.WaitInfos, signal)
if info_found == true {
state, found := ext.Requirements[info.Destination]
2023-11-03 22:51:54 -06:00
if found == false {
ctx.Log.Logf("lockable", "Got success signal for requirement that is no longer in the map(%s), ignoring...", info.Destination)
2023-11-03 22:51:54 -06:00
} else {
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s", signal, info, ReqStateStrings[state])
switch state {
case Locking:
switch ext.State {
case Locking:
ext.Requirements[info.Destination] = Locked
2023-11-03 22:51:54 -06:00
locked := 0
for _, s := range(ext.Requirements) {
if s == Locked {
locked += 1
}
}
2023-11-03 22:51:54 -06:00
if locked == len(ext.Requirements) {
ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
ext.State = Locked
ext.Owner = ext.PendingOwner
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "owner", "requirements")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)})
2023-11-03 22:51:54 -06:00
} else {
2024-03-21 14:13:54 -06:00
changes = append(changes, "requirements")
2023-11-03 22:51:54 -06:00
ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements))
}
case AbortingLock:
ext.Requirements[info.Destination] = Unlocking
2023-11-03 22:51:54 -06:00
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", info.Destination, lock_signal, 100*time.Millisecond)
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{info.Destination, lock_signal})
2023-11-03 22:51:54 -06:00
ctx.Log.Logf("lockable", "sending abort_lock to %s for %s", info.Destination, node.ID)
}
2023-11-03 22:51:54 -06:00
case AbortingLock:
ctx.Log.Logf("lockable", "Got success signal in AbortingLock %s", node.ID)
fallthrough
case Unlocking:
ext.Requirements[source] = Unlocked
unlocked := 0
for _, s := range(ext.Requirements) {
if s == Unlocked {
unlocked += 1
}
2023-08-15 18:23:06 -06:00
}
2023-11-03 22:51:54 -06:00
if unlocked == len(ext.Requirements) {
old_state := ext.State
ext.State = Unlocked
ctx.Log.Logf("lockable", "WHOLE UNLOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
if old_state == Unlocking {
previous_owner := *ext.Owner
ext.Owner = ext.PendingOwner
ext.ReqID = nil
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "owner", "req_id")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)})
2023-11-03 22:51:54 -06:00
} else if old_state == AbortingLock {
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "pending_owner")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")})
2023-11-03 22:51:54 -06:00
ext.PendingOwner = ext.Owner
}
} else {
2024-03-21 14:13:54 -06:00
changes = append(changes, "state")
2023-11-03 22:51:54 -06:00
ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements))
2023-08-15 18:23:06 -06:00
}
}
}
}
2023-10-07 23:00:07 -06:00
return messages, changes
}
// Handle a LockSignal and update the extensions owner/requirement states
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
2023-11-11 14:52:08 -07:00
var changes = Changes{}
2023-10-07 23:00:07 -06:00
switch signal.State {
case "lock":
2023-11-03 22:51:54 -06:00
switch ext.State {
case Unlocked:
if len(ext.Requirements) == 0 {
2023-08-15 18:23:06 -06:00
ext.State = Locked
new_owner := source
ext.PendingOwner = &new_owner
ext.Owner = &new_owner
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "pending_owner", "owner")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())})
} else {
2023-08-15 18:23:06 -06:00
ext.State = Locking
id := signal.ID()
ext.ReqID = &id
2023-08-15 18:23:06 -06:00
new_owner := source
ext.PendingOwner = &new_owner
ext.PendingID = signal.ID()
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "req_id", "pending_owner", "pending_id")
2023-11-03 22:51:54 -06:00
for id, state := range(ext.Requirements) {
if state != Unlocked {
ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING")
}
2023-11-03 22:51:54 -06:00
2023-08-15 18:23:06 -06:00
lock_signal := NewLockSignal("lock")
2023-11-11 13:53:41 -07:00
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("lock", id, lock_signal, 500*time.Millisecond)
2023-11-03 22:51:54 -06:00
ext.Requirements[id] = Locking
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{id, lock_signal})
}
2023-08-15 18:23:06 -06:00
}
2023-11-03 22:51:54 -06:00
default:
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")})
2023-11-11 13:53:41 -07:00
ctx.Log.Logf("lockable", "Tried to lock %s while %s", node.ID, ext.State)
2023-08-15 18:23:06 -06:00
}
case "unlock":
if ext.State == Locked {
if len(ext.Requirements) == 0 {
ext.State = Unlocked
new_owner := source
ext.PendingOwner = nil
ext.Owner = nil
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "pending_owner", "owner")
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())})
2023-08-15 18:23:06 -06:00
} else if source == *ext.Owner {
ext.State = Unlocking
id := signal.ID()
ext.ReqID = &id
2023-08-15 18:23:06 -06:00
ext.PendingOwner = nil
ext.PendingID = signal.ID()
2024-03-21 14:13:54 -06:00
changes = append(changes, "state", "pending_owner", "pending_id", "req_id")
2023-11-03 22:51:54 -06:00
for id, state := range(ext.Requirements) {
if state != Locked {
ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING")
2023-08-15 18:23:06 -06:00
}
2023-11-03 22:51:54 -06:00
2023-08-15 18:23:06 -06:00
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond)
2023-11-03 22:51:54 -06:00
ext.Requirements[id] = Unlocking
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{id, lock_signal})
}
}
2023-08-15 18:23:06 -06:00
} else {
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_locked")})
}
default:
ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", signal.State)
}
2023-10-07 23:00:07 -06:00
return messages, changes
}
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source NodeID, signal *TimeoutSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
2023-11-11 14:52:08 -07:00
var changes = Changes{}
2023-11-03 22:51:54 -06:00
wait_info, found := node.ProcessResponse(ext.WaitInfos, signal)
if found == true {
2024-03-21 14:13:54 -06:00
changes = append(changes, "wait_infos")
state, found := ext.Requirements[wait_info.Destination]
2023-11-03 22:51:54 -06:00
if found == true {
ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state])
switch ext.State {
case AbortingLock:
ext.Requirements[wait_info.Destination] = Unlocked
all_unlocked := true
for _, state := range(ext.Requirements) {
if state != Unlocked {
all_unlocked = false
break
}
}
if all_unlocked == true {
2024-03-21 14:13:54 -06:00
changes = append(changes, "state")
ext.State = Unlocked
}
2023-11-03 22:54:28 -06:00
case Locking:
ext.State = AbortingLock
ext.Requirements[wait_info.Destination] = Unlocked
2023-11-03 22:54:28 -06:00
for id, state := range(ext.Requirements) {
if state == Locked {
ext.Requirements[id] = Unlocking
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond)
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{id, lock_signal})
2023-11-03 22:54:28 -06:00
ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID)
}
}
case Unlocking:
2023-11-13 13:23:58 -07:00
ext.Requirements[wait_info.Destination] = Locked
all_returned := true
for _, state := range(ext.Requirements) {
if state == Unlocking {
all_returned = false
break
}
}
if all_returned == true {
ext.State = Locked
}
2023-11-03 22:54:28 -06:00
}
2023-11-03 22:51:54 -06:00
} else {
ctx.Log.Logf("lockable", "%s timed out", wait_info.Destination)
2023-11-03 22:51:54 -06:00
}
}
return messages, changes
}
2024-03-04 17:30:42 -07:00
// LockableExts process status signals by forwarding them to it's owner
// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state
2024-03-04 17:30:42 -07:00
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
2023-11-11 14:52:08 -07:00
var changes = Changes{}
2023-10-07 23:00:07 -06:00
2024-03-04 17:30:42 -07:00
switch sig := signal.(type) {
case *StatusSignal:
if ext.Owner != nil {
if *ext.Owner != node.ID {
2024-03-04 17:30:42 -07:00
messages = append(messages, SendMsg{*ext.Owner, signal})
2023-07-09 14:30:30 -06:00
}
2023-07-27 12:20:49 -06:00
}
2024-03-04 17:30:42 -07:00
case *LinkSignal:
messages, changes = ext.HandleLinkSignal(ctx, node, source, sig)
case *LockSignal:
messages, changes = ext.HandleLockSignal(ctx, node, source, sig)
case *ErrorSignal:
messages, changes = ext.HandleErrorSignal(ctx, node, source, sig)
case *SuccessSignal:
messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig)
case *TimeoutSignal:
messages, changes = ext.HandleTimeoutSignal(ctx, node, source, sig)
default:
2023-07-24 17:07:27 -06:00
}
2023-11-11 13:53:41 -07:00
2024-03-04 17:30:42 -07:00
return messages, changes
}