Rewrote lockable.go

master
noah metz 2024-03-23 02:21:27 -06:00
parent d7b07df798
commit 2db4655670
5 changed files with 253 additions and 342 deletions

@ -903,7 +903,7 @@ func (ctx *Context) Send(node *Node, messages []SendMsg) error {
if err == nil { if err == nil {
select { select {
case target.MsgChan <- RecvMsg{node.ID, msg.Signal}: case target.MsgChan <- RecvMsg{node.ID, msg.Signal}:
ctx.Log.Logf("signal", "Sent %s to %s", msg.Signal, msg.Dest) ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Dest)
default: default:
buf := make([]byte, 4096) buf := make([]byte, 4096)
n := runtime.Stack(buf, false) n := runtime.Stack(buf, false)
@ -1064,11 +1064,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterScalar[WaitReason](ctx, identity, coerce[WaitReason], astString[WaitReason], nil)
if err != nil {
return nil, err
}
err = RegisterScalar[Change](ctx, identity, coerce[Change], astString[Change], nil) err = RegisterScalar[Change](ctx, identity, coerce[Change], astString[Change], nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1095,11 +1090,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = RegisterObject[WaitInfo](ctx)
if err != nil {
return nil, err
}
err = RegisterExtension[LockableExt](ctx, nil) err = RegisterExtension[LockableExt](ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err

@ -2,7 +2,6 @@ package graphvent
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
"time"
) )
type ReqState byte type ReqState byte
@ -22,41 +21,51 @@ var ReqStateStrings = map[ReqState]string {
AbortingLock: "AbortingLock", AbortingLock: "AbortingLock",
} }
func (state ReqState) String() string {
str, mapped := ReqStateStrings[state]
if mapped == false {
return "UNKNOWN_REQSTATE"
} else {
return str
}
}
type LockableExt struct{ type LockableExt struct{
State ReqState `gv:"lockable_state"` State ReqState `gv:"lockable_state"`
ReqID *uuid.UUID `gv:"req_id"` ReqID *uuid.UUID `gv:"req_id"`
Owner *NodeID `gv:"owner" node:"Base"` Owner *NodeID `gv:"owner"`
PendingOwner *NodeID `gv:"pending_owner" node:"Base"` PendingOwner *NodeID `gv:"pending_owner"`
PendingID uuid.UUID `gv:"pending_id"` PendingID uuid.UUID `gv:"pending_id"`
Requirements map[NodeID]ReqState `gv:"requirements" node:"Lockable:"` Requirements map[NodeID]ReqState `gv:"requirements" node:"Lockable:"`
WaitInfos WaitMap `gv:"wait_infos" node:":Base"` Waiting WaitMap `gv:"waiting_locks" node:":Lockable"`
} }
func NewLockableExt(requirements []NodeID) *LockableExt { func NewLockableExt(requirements []NodeID) *LockableExt {
var reqs map[NodeID]ReqState = nil var reqs map[NodeID]ReqState = nil
if requirements != nil { if len(requirements) != 0 {
reqs = map[NodeID]ReqState{} reqs = map[NodeID]ReqState{}
for _, id := range(requirements) { for _, req := range(requirements) {
reqs[id] = Unlocked reqs[req] = Unlocked
} }
} }
return &LockableExt{ return &LockableExt{
State: Unlocked, State: Unlocked,
Owner: nil, Owner: nil,
PendingOwner: nil, PendingOwner: nil,
Requirements: reqs, Requirements: reqs,
WaitInfos: WaitMap{}, Waiting: WaitMap{},
} }
} }
func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) { func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
signal := NewLockSignal("unlock") signal := NewUnlockSignal()
messages := []SendMsg{{node.ID, signal}} messages := []SendMsg{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages) return signal.ID(), ctx.Send(node, messages)
} }
func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) { func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) {
signal := NewLockSignal("lock") signal := NewLockSignal()
messages := []SendMsg{{node.ID, signal}} messages := []SendMsg{{node.ID, signal}}
return signal.ID(), ctx.Send(node, messages) return signal.ID(), ctx.Send(node, messages)
} }
@ -66,83 +75,17 @@ func (ext *LockableExt) Load(ctx *Context, node *Node) error {
} }
func (ext *LockableExt) Unload(ctx *Context, node *Node) { func (ext *LockableExt) Unload(ctx *Context, node *Node) {
return
} }
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) { // Handle link signal by adding/removing the requested NodeID
// returns an error if the node is not unlocked
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil var messages []SendMsg = nil
var changes Changes = nil var changes Changes = nil
info, info_found := node.ProcessResponse(ext.WaitInfos, signal)
if info_found {
state, found := ext.Requirements[info.Destination]
if found == true {
changes = append(changes, "wait_infos")
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State])
switch ext.State { switch ext.State {
case AbortingLock: case Unlocked:
ext.Requirements[info.Destination] = Unlocked
all_unlocked := true
for _, state := range(ext.Requirements) {
if state != Unlocked {
all_unlocked = false
break
}
}
if all_unlocked == true {
changes = append(changes, "state")
ext.State = Unlocked
}
case Locking:
changes = append(changes, "state")
ext.Requirements[info.Destination] = Unlocked
unlocked := 0
for _, state := range(ext.Requirements) {
if state == Unlocked {
unlocked += 1
}
}
if unlocked == len(ext.Requirements) {
ctx.Log.Logf("lockable", "%s unlocked from error %s from %s", node.ID, signal.Error, source)
ext.State = Unlocked
} else {
ext.State = AbortingLock
for id, state := range(ext.Requirements) {
if state == Locked {
ext.Requirements[id] = Unlocking
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond)
messages = append(messages, SendMsg{id, lock_signal})
ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID)
}
}
}
case Unlocking:
ext.Requirements[info.Destination] = Locked
all_returned := true
for _, state := range(ext.Requirements) {
if state == Unlocking {
all_returned = false
break
}
}
if all_returned == true {
ext.State = Locked
}
}
} else {
ctx.Log.Logf("lockable", "Got mapped error %s, but %s isn't a requirement", signal, info.Destination)
}
}
return messages, changes
}
func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
var changes = Changes{}
if ext.State == Unlocked {
switch signal.Action { switch signal.Action {
case "add": case "add":
_, exists := ext.Requirements[signal.NodeID] _, exists := ext.Requirements[signal.NodeID]
@ -159,7 +102,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
case "remove": case "remove":
_, exists := ext.Requirements[signal.NodeID] _, exists := ext.Requirements[signal.NodeID]
if exists == false { if exists == false {
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_requirement")})
} else { } else {
delete(ext.Requirements, signal.NodeID) delete(ext.Requirements, signal.NodeID)
changes = append(changes, "requirements") changes = append(changes, "requirements")
@ -168,237 +111,255 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID
default: default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")})
} }
} else { default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")})
} }
return messages, changes return messages, changes
} }
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) { // Handle an UnlockSignal by either transitioning to Unlocked state,
// sending unlock signals to requirements, or returning an error signal
func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil var messages []SendMsg = nil
var changes = Changes{} var changes Changes = nil
if source == node.ID {
return messages, changes
}
info, info_found := node.ProcessResponse(ext.WaitInfos, signal)
if info_found == true {
state, found := ext.Requirements[info.Destination]
if found == false {
ctx.Log.Logf("lockable", "Got success signal for requirement that is no longer in the map(%s), ignoring...", info.Destination)
} else {
ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s", signal, info, ReqStateStrings[state])
switch state {
case Locking:
switch ext.State { switch ext.State {
case Locking: case Locked:
ext.Requirements[info.Destination] = Locked if source != *ext.Owner {
locked := 0 messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")})
for _, s := range(ext.Requirements) {
if s == Locked {
locked += 1
}
}
if locked == len(ext.Requirements) {
ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
ext.State = Locked
ext.Owner = ext.PendingOwner
changes = append(changes, "state", "owner", "requirements")
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)})
} else { } else {
changes = append(changes, "requirements") if len(ext.Requirements) == 0 {
ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) changes = append(changes, "state", "owner", "pending_owner")
}
case AbortingLock:
ext.Requirements[info.Destination] = Unlocking
lock_signal := NewLockSignal("unlock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", info.Destination, lock_signal, 100*time.Millisecond)
messages = append(messages, SendMsg{info.Destination, lock_signal})
ctx.Log.Logf("lockable", "sending abort_lock to %s for %s", info.Destination, node.ID) ext.Owner = nil
}
case AbortingLock:
ctx.Log.Logf("lockable", "Got success signal in AbortingLock %s", node.ID)
fallthrough
case Unlocking:
ext.Requirements[source] = Unlocked
unlocked := 0 ext.PendingOwner = nil
for _, s := range(ext.Requirements) {
if s == Unlocked {
unlocked += 1
}
}
if unlocked == len(ext.Requirements) { ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID)
old_state := ext.State
ext.State = Unlocked ext.State = Unlocked
ctx.Log.Logf("lockable", "WHOLE UNLOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner)
if old_state == Unlocking { messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
previous_owner := *ext.Owner
ext.Owner = ext.PendingOwner
ext.ReqID = nil
changes = append(changes, "state", "owner", "req_id")
messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)})
} else if old_state == AbortingLock {
changes = append(changes, "state", "pending_owner")
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")})
ext.PendingOwner = ext.Owner
}
} else { } else {
changes = append(changes, "state") changes = append(changes, "state", "waiting", "requirements", "pending_owner")
ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements))
ext.PendingOwner = nil
ext.ReqID = new(uuid.UUID)
*ext.ReqID = signal.Id
ctx.Log.Logf("lockable", "%s transition to Unlocking", node.ID)
ext.State = Unlocking
for id := range(ext.Requirements) {
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = id
ext.Requirements[id] = Unlocking
messages = append(messages, SendMsg{id, unlock_signal})
} }
} }
} }
default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_locked")})
} }
return messages, changes return messages, changes
} }
// Handle a LockSignal and update the extensions owner/requirement states // Handle a LockSignal by either transitioning to a locked state,
// sending lock signals to requirements, or returning an error signal
func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) { func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil var messages []SendMsg = nil
var changes = Changes{} var changes Changes = nil
switch signal.State {
case "lock":
switch ext.State { switch ext.State {
case Unlocked: case Unlocked:
if len(ext.Requirements) == 0 { if len(ext.Requirements) == 0 {
changes = append(changes, "state", "owner", "pending_owner")
ext.Owner = new(NodeID)
*ext.Owner = source
ext.PendingOwner = new(NodeID)
*ext.PendingOwner = source
ctx.Log.Logf("lockable", "%s transition to Locked", node.ID)
ext.State = Locked ext.State = Locked
new_owner := source messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
ext.PendingOwner = &new_owner
ext.Owner = &new_owner
changes = append(changes, "state", "pending_owner", "owner")
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())})
} else { } else {
changes = append(changes, "state", "requirements", "waiting", "pending_owner")
ext.PendingOwner = new(NodeID)
*ext.PendingOwner = source
ext.ReqID = new(uuid.UUID)
*ext.ReqID = signal.Id
ctx.Log.Logf("lockable", "%s transition to Locking", node.ID)
ext.State = Locking ext.State = Locking
id := signal.ID() for id := range(ext.Requirements) {
ext.ReqID = &id lock_signal := NewLockSignal()
new_owner := source
ext.PendingOwner = &new_owner ext.Waiting[lock_signal.Id] = id
ext.PendingID = signal.ID()
changes = append(changes, "state", "req_id", "pending_owner", "pending_id")
for id, state := range(ext.Requirements) {
if state != Unlocked {
ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING")
}
lock_signal := NewLockSignal("lock")
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("lock", id, lock_signal, 500*time.Millisecond)
ext.Requirements[id] = Locking ext.Requirements[id] = Locking
messages = append(messages, SendMsg{id, lock_signal}) messages = append(messages, SendMsg{id, lock_signal})
} }
} }
default: default:
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_unlocked")})
ctx.Log.Logf("lockable", "Tried to lock %s while %s", node.ID, ext.State)
} }
case "unlock":
if ext.State == Locked { return messages, changes
if len(ext.Requirements) == 0 { }
ext.State = Unlocked
new_owner := source // Handle an error signal by aborting the lock, or retrying the unlock
func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil
var changes Changes = nil
id, waiting := ext.Waiting[signal.ReqID]
if waiting == true {
delete(ext.Waiting, signal.ReqID)
changes = append(changes, "waiting")
switch ext.State {
case Locking:
changes = append(changes, "state", "pending_owner", "requirements", "req_id")
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, signal.Error)})
ext.ReqID = nil
ext.PendingOwner = nil ext.PendingOwner = nil
ext.Requirements[id] = Unlocked
unlocked := 0
for req_id, req_state := range(ext.Requirements) {
// Unlock locked requirements, and count unlocked requirements
switch req_state {
case Locked:
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = req_id
ext.Requirements[req_id] = Unlocking
messages = append(messages, SendMsg{req_id, unlock_signal})
case Unlocked:
unlocked += 1
}
}
if unlocked == len(ext.Requirements) {
changes = append(changes, "owner", "state")
ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID)
ext.State = Unlocked
ext.Owner = nil ext.Owner = nil
changes = append(changes, "state", "pending_owner", "owner") } else {
messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) ctx.Log.Logf("lockable", "%s transition to Unlocking", node.ID)
} else if source == *ext.Owner {
ext.State = Unlocking ext.State = Unlocking
id := signal.ID()
ext.ReqID = &id
ext.PendingOwner = nil
ext.PendingID = signal.ID()
changes = append(changes, "state", "pending_owner", "pending_id", "req_id")
for id, state := range(ext.Requirements) {
if state != Locked {
ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING")
} }
lock_signal := NewLockSignal("unlock") case Unlocking:
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond) req_state := ext.Requirements[id]
ext.Requirements[id] = Unlocking // Mark failed lock as Unlocked, or retry unlock
switch req_state {
case Locking:
ext.Requirements[id] = Unlocked
messages = append(messages, SendMsg{id, lock_signal}) // Check if all requirements unlocked now
unlocked := 0
for _, req_state := range(ext.Requirements) {
if req_state == Unlocked {
unlocked += 1
} }
} }
} else {
messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_locked")}) if unlocked == len(ext.Requirements) {
changes = append(changes, "owner", "state")
ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID)
ext.State = Unlocked
ext.Owner = nil
}
case Unlocking:
// Handle error for unlocking requirement while unlocking by retrying unlock
unlock_signal := NewUnlockSignal()
ext.Waiting[unlock_signal.Id] = id
messages = append(messages, SendMsg{id, unlock_signal})
} }
default:
ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", signal.State)
} }
}
return messages, changes return messages, changes
} }
func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source NodeID, signal *TimeoutSignal) ([]SendMsg, Changes) { // Handle a success signal by checking if all requirements have been locked/unlocked
func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) {
var messages []SendMsg = nil var messages []SendMsg = nil
var changes = Changes{} var changes Changes = nil
wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) id, waiting := ext.Waiting[signal.ReqID]
if found == true { if waiting == true {
changes = append(changes, "wait_infos") delete(ext.Waiting, signal.ReqID)
state, found := ext.Requirements[wait_info.Destination] changes = append(changes, "waiting")
if found == true {
ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) req_state := ext.Requirements[id]
switch ext.State { switch req_state {
case AbortingLock: case Locking:
ext.Requirements[wait_info.Destination] = Unlocked ext.Requirements[id] = Locked
all_unlocked := true case Unlocking:
for _, state := range(ext.Requirements) { ext.Requirements[id] = Unlocked
if state != Unlocked {
all_unlocked = false
break
} }
locked := 0
unlocked := 0
for _, req_state := range(ext.Requirements) {
switch req_state {
case Locked:
locked += 1
case Unlocked:
unlocked += 1
} }
if all_unlocked == true {
changes = append(changes, "state")
ext.State = Unlocked
} }
switch ext.State {
case Locking: case Locking:
ext.State = AbortingLock if locked == len(ext.Requirements) {
ext.Requirements[wait_info.Destination] = Unlocked changes = append(changes, "state", "owner", "req_id")
for id, state := range(ext.Requirements) { ctx.Log.Logf("lockable", "%s transition to Locked", node.ID)
if state == Locked { ext.State = Locked
ext.Requirements[id] = Unlocking
lock_signal := NewLockSignal("unlock") ext.Owner = new(NodeID)
ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond) *ext.Owner = *ext.PendingOwner
messages = append(messages, SendMsg{id, lock_signal})
ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID) messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
} ext.ReqID = nil
} }
case Unlocking: case Unlocking:
ext.Requirements[wait_info.Destination] = Locked if unlocked == len(ext.Requirements) {
all_returned := true changes = append(changes, "state", "owner", "req_id")
for _, state := range(ext.Requirements) { ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID)
if state == Unlocking { ext.State = Unlocked
all_returned = false
break if ext.Owner != nil {
} messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
}
if all_returned == true { ext.ReqID = nil
ext.State = Locked ext.Owner = nil
} }
} }
} else {
ctx.Log.Logf("lockable", "%s timed out", wait_info.Destination)
} }
} }
return messages, changes return messages, changes
} }
// LockableExts process status signals by forwarding them to it's owner
// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state
func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) { func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) {
var messages []SendMsg = nil var messages []SendMsg = nil
var changes = Changes{} var changes Changes = nil
switch sig := signal.(type) { switch sig := signal.(type) {
case *StatusSignal: case *StatusSignal:
// Forward StatusSignals up to the owner(unless that would be a cycle)
if ext.Owner != nil { if ext.Owner != nil {
if *ext.Owner != node.ID { if *ext.Owner != node.ID {
messages = append(messages, SendMsg{*ext.Owner, signal}) messages = append(messages, SendMsg{*ext.Owner, signal})
@ -408,13 +369,12 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal
messages, changes = ext.HandleLinkSignal(ctx, node, source, sig) messages, changes = ext.HandleLinkSignal(ctx, node, source, sig)
case *LockSignal: case *LockSignal:
messages, changes = ext.HandleLockSignal(ctx, node, source, sig) messages, changes = ext.HandleLockSignal(ctx, node, source, sig)
case *UnlockSignal:
messages, changes = ext.HandleUnlockSignal(ctx, node, source, sig)
case *ErrorSignal: case *ErrorSignal:
messages, changes = ext.HandleErrorSignal(ctx, node, source, sig) messages, changes = ext.HandleErrorSignal(ctx, node, source, sig)
case *SuccessSignal: case *SuccessSignal:
messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig) messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig)
case *TimeoutSignal:
messages, changes = ext.HandleTimeoutSignal(ctx, node, source, sig)
default:
} }
return messages, changes return messages, changes

@ -79,7 +79,7 @@ func Test1000Lock(t *testing.T) {
} }
func TestLock(t *testing.T) { func TestLock(t *testing.T) {
ctx := logTestContext(t, []string{"test", "lockable"}) ctx := logTestContext(t, []string{"test", "lockable", "signal"})
NewLockable := func(reqs []NodeID)(*Node, *ListenerExt) { NewLockable := func(reqs []NodeID)(*Node, *ListenerExt) {
listener := NewListenerExt(1000) listener := NewListenerExt(1000)
@ -103,24 +103,26 @@ func TestLock(t *testing.T) {
ctx.Log.Logf("test", "l5: %s", l5.ID) ctx.Log.Logf("test", "l5: %s", l5.ID)
id_1, err := LockLockable(ctx, l0) id_1, err := LockLockable(ctx, l0)
ctx.Log.Logf("test", "ID_1: %s", id_1)
fatalErr(t, err) fatalErr(t, err)
_, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) response, _, err := WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "l0 lock: %+v", response)
id_2, err := LockLockable(ctx, l1) id_2, err := LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)
_, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*100, id_2) response, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "l1 lock: %+v", response)
id_3, err := UnlockLockable(ctx, l0) id_3, err := UnlockLockable(ctx, l0)
fatalErr(t, err) fatalErr(t, err)
_, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) response, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "l0 unlock: %+v", response)
id_4, err := LockLockable(ctx, l1) id_4, err := LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)
response, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4)
_, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "l1 lock: %+v", response)
} }

@ -66,6 +66,8 @@ func (q QueuedSignal) String() string {
return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time) return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time)
} }
type WaitMap map[uuid.UUID]NodeID
// Default message channel size for nodes // Default message channel size for nodes
// Nodes represent a group of extensions that can be collectively addressed // Nodes represent a group of extensions that can be collectively addressed
type Node struct { type Node struct {
@ -101,54 +103,6 @@ func (node *Node) PostDeserialize(ctx *Context) error {
return nil return nil
} }
type WaitReason string
type WaitInfo struct {
Destination NodeID `gv:"destination" node:"Base"`
Timeout uuid.UUID `gv:"timeout"`
Reason WaitReason `gv:"reason"`
}
type WaitMap map[uuid.UUID]WaitInfo
// Removes a signal from the wait_map and dequeue the associated timeout signal
// Returns the data, and whether or not the ID was found in the wait_map
func (node *Node) ProcessResponse(wait_map WaitMap, response ResponseSignal) (WaitInfo, bool) {
wait_info, is_processed := wait_map[response.ResponseID()]
if is_processed == true {
delete(wait_map, response.ResponseID())
if response.ID() != wait_info.Timeout {
node.DequeueSignal(wait_info.Timeout)
}
return wait_info, true
}
return WaitInfo{}, false
}
func (node *Node) NewTimeout(reason WaitReason, dest NodeID, timeout time.Duration) (WaitInfo, uuid.UUID) {
id := uuid.New()
timeout_signal := NewTimeoutSignal(id)
node.QueueSignal(time.Now().Add(timeout), timeout_signal)
return WaitInfo{
Destination: dest,
Timeout: timeout_signal.Id,
Reason: reason,
}, id
}
// Creates a timeout signal for signal, queues it for the node at the timeout, and returns the WaitInfo
func (node *Node) QueueTimeout(reason WaitReason, dest NodeID, signal Signal, timeout time.Duration) WaitInfo {
timeout_signal := NewTimeoutSignal(signal.ID())
node.QueueSignal(time.Now().Add(timeout), timeout_signal)
return WaitInfo{
Destination: dest,
Timeout: timeout_signal.Id,
Reason: reason,
}
}
func (node *Node) QueueSignal(time time.Time, signal Signal) { func (node *Node) QueueSignal(time time.Time, signal Signal) {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)

@ -21,13 +21,6 @@ func (signal TimeoutSignal) String() string {
return fmt.Sprintf("TimeoutSignal(%s)", &signal.ResponseHeader) return fmt.Sprintf("TimeoutSignal(%s)", &signal.ResponseHeader)
} }
type SignalDirection int
const (
Up SignalDirection = iota
Down
Direct
)
type SignalHeader struct { type SignalHeader struct {
Id uuid.UUID `gv:"id"` Id uuid.UUID `gv:"id"`
} }
@ -205,19 +198,31 @@ func NewLinkSignal(action string, id NodeID) Signal {
type LockSignal struct { type LockSignal struct {
SignalHeader SignalHeader
State string
} }
func (signal LockSignal) String() string { func (signal LockSignal) String() string {
return fmt.Sprintf("LockSignal(%s, %s)", signal.SignalHeader, signal.State) return fmt.Sprintf("LockSignal(%s)", signal.SignalHeader)
} }
func NewLockSignal(state string) *LockSignal { func NewLockSignal() *LockSignal {
return &LockSignal{ return &LockSignal{
NewSignalHeader(), NewSignalHeader(),
state,
} }
} }
type UnlockSignal struct {
SignalHeader
}
func (signal UnlockSignal) String() string {
return fmt.Sprintf("UnlockSignal(%s)", signal.SignalHeader)
}
func NewUnlockSignal() *UnlockSignal {
return &UnlockSignal{
NewSignalHeader(),
}
}
type ReadSignal struct { type ReadSignal struct {
SignalHeader SignalHeader
Extensions map[ExtType][]string `json:"extensions"` Extensions map[ExtType][]string `json:"extensions"`