diff --git a/context.go b/context.go index 4af24c3..0e3579e 100644 --- a/context.go +++ b/context.go @@ -903,7 +903,7 @@ func (ctx *Context) Send(node *Node, messages []SendMsg) error { if err == nil { select { case target.MsgChan <- RecvMsg{node.ID, msg.Signal}: - ctx.Log.Logf("signal", "Sent %s to %s", msg.Signal, msg.Dest) + ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Dest) default: buf := make([]byte, 4096) n := runtime.Stack(buf, false) @@ -1064,11 +1064,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = RegisterScalar[WaitReason](ctx, identity, coerce[WaitReason], astString[WaitReason], nil) - if err != nil { - return nil, err - } - err = RegisterScalar[Change](ctx, identity, coerce[Change], astString[Change], nil) if err != nil { return nil, err @@ -1095,11 +1090,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = RegisterObject[WaitInfo](ctx) - if err != nil { - return nil, err - } - err = RegisterExtension[LockableExt](ctx, nil) if err != nil { return nil, err diff --git a/lockable.go b/lockable.go index 0906a34..8b3e63a 100644 --- a/lockable.go +++ b/lockable.go @@ -1,8 +1,7 @@ package graphvent import ( - "github.com/google/uuid" - "time" + "github.com/google/uuid" ) type ReqState byte @@ -22,41 +21,51 @@ var ReqStateStrings = map[ReqState]string { AbortingLock: "AbortingLock", } +func (state ReqState) String() string { + str, mapped := ReqStateStrings[state] + if mapped == false { + return "UNKNOWN_REQSTATE" + } else { + return str + } +} + type LockableExt struct{ State ReqState `gv:"lockable_state"` ReqID *uuid.UUID `gv:"req_id"` - Owner *NodeID `gv:"owner" node:"Base"` - PendingOwner *NodeID `gv:"pending_owner" node:"Base"` + Owner *NodeID `gv:"owner"` + PendingOwner *NodeID `gv:"pending_owner"` PendingID uuid.UUID `gv:"pending_id"` Requirements map[NodeID]ReqState `gv:"requirements" node:"Lockable:"` - WaitInfos WaitMap `gv:"wait_infos" node:":Base"` + Waiting WaitMap `gv:"waiting_locks" node:":Lockable"` } func NewLockableExt(requirements []NodeID) *LockableExt { var reqs map[NodeID]ReqState = nil - if requirements != nil { + if len(requirements) != 0 { reqs = map[NodeID]ReqState{} - for _, id := range(requirements) { - reqs[id] = Unlocked + for _, req := range(requirements) { + reqs[req] = Unlocked } } + return &LockableExt{ State: Unlocked, Owner: nil, PendingOwner: nil, Requirements: reqs, - WaitInfos: WaitMap{}, + Waiting: WaitMap{}, } } func UnlockLockable(ctx *Context, node *Node) (uuid.UUID, error) { - signal := NewLockSignal("unlock") + signal := NewUnlockSignal() messages := []SendMsg{{node.ID, signal}} return signal.ID(), ctx.Send(node, messages) } func LockLockable(ctx *Context, node *Node) (uuid.UUID, error) { - signal := NewLockSignal("lock") + signal := NewLockSignal() messages := []SendMsg{{node.ID, signal}} return signal.ID(), ctx.Send(node, messages) } @@ -66,83 +75,17 @@ func (ext *LockableExt) Load(ctx *Context, node *Node) error { } func (ext *LockableExt) Unload(ctx *Context, node *Node) { + return } -func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) { +// Handle link signal by adding/removing the requested NodeID +// returns an error if the node is not unlocked +func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) { var messages []SendMsg = nil var changes Changes = nil - info, info_found := node.ProcessResponse(ext.WaitInfos, signal) - if info_found { - state, found := ext.Requirements[info.Destination] - if found == true { - changes = append(changes, "wait_infos") - ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s while in %s", signal, info, ReqStateStrings[state], ReqStateStrings[ext.State]) - switch ext.State { - case AbortingLock: - ext.Requirements[info.Destination] = Unlocked - all_unlocked := true - for _, state := range(ext.Requirements) { - if state != Unlocked { - all_unlocked = false - break - } - } - if all_unlocked == true { - changes = append(changes, "state") - ext.State = Unlocked - } - case Locking: - changes = append(changes, "state") - ext.Requirements[info.Destination] = Unlocked - unlocked := 0 - for _, state := range(ext.Requirements) { - if state == Unlocked { - unlocked += 1 - } - } - - if unlocked == len(ext.Requirements) { - ctx.Log.Logf("lockable", "%s unlocked from error %s from %s", node.ID, signal.Error, source) - ext.State = Unlocked - } else { - ext.State = AbortingLock - for id, state := range(ext.Requirements) { - if state == Locked { - ext.Requirements[id] = Unlocking - lock_signal := NewLockSignal("unlock") - ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond) - messages = append(messages, SendMsg{id, lock_signal}) - ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID) - } - } - } - - case Unlocking: - ext.Requirements[info.Destination] = Locked - all_returned := true - for _, state := range(ext.Requirements) { - if state == Unlocking { - all_returned = false - break - } - } - if all_returned == true { - ext.State = Locked - } - } - } else { - ctx.Log.Logf("lockable", "Got mapped error %s, but %s isn't a requirement", signal, info.Destination) - } - } - - return messages, changes -} - -func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID, signal *LinkSignal) ([]SendMsg, Changes) { - var messages []SendMsg = nil - var changes = Changes{} - if ext.State == Unlocked { + switch ext.State { + case Unlocked: switch signal.Action { case "add": _, exists := ext.Requirements[signal.NodeID] @@ -159,7 +102,7 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID case "remove": _, exists := ext.Requirements[signal.NodeID] if exists == false { - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "can't link: not_requirement")}) + messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_requirement")}) } else { delete(ext.Requirements, signal.NodeID) changes = append(changes, "requirements") @@ -168,237 +111,255 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, node *Node, source NodeID default: messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "unknown_action")}) } - } else { + default: messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")}) } + return messages, changes } -func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) { +// Handle an UnlockSignal by either transitioning to Unlocked state, +// sending unlock signals to requirements, or returning an error signal +func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source NodeID, signal *UnlockSignal) ([]SendMsg, Changes) { var messages []SendMsg = nil - var changes = Changes{} - if source == node.ID { - return messages, changes - } + var changes Changes = nil - info, info_found := node.ProcessResponse(ext.WaitInfos, signal) - if info_found == true { - state, found := ext.Requirements[info.Destination] - if found == false { - ctx.Log.Logf("lockable", "Got success signal for requirement that is no longer in the map(%s), ignoring...", info.Destination) + switch ext.State { + case Locked: + if source != *ext.Owner { + messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")}) } else { - ctx.Log.Logf("lockable", "got mapped response %+v for %+v in state %s", signal, info, ReqStateStrings[state]) - switch state { - case Locking: - switch ext.State { - case Locking: - ext.Requirements[info.Destination] = Locked - locked := 0 - for _, s := range(ext.Requirements) { - if s == Locked { - locked += 1 - } - } - if locked == len(ext.Requirements) { - ctx.Log.Logf("lockable", "WHOLE LOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) - ext.State = Locked - ext.Owner = ext.PendingOwner - changes = append(changes, "state", "owner", "requirements") - messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(ext.PendingID)}) - } else { - changes = append(changes, "requirements") - ctx.Log.Logf("lockable", "PARTIAL LOCK: %s - %d/%d", node.ID, locked, len(ext.Requirements)) - } - case AbortingLock: - ext.Requirements[info.Destination] = Unlocking + if len(ext.Requirements) == 0 { + changes = append(changes, "state", "owner", "pending_owner") - lock_signal := NewLockSignal("unlock") - ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", info.Destination, lock_signal, 100*time.Millisecond) - messages = append(messages, SendMsg{info.Destination, lock_signal}) + ext.Owner = nil - ctx.Log.Logf("lockable", "sending abort_lock to %s for %s", info.Destination, node.ID) - } - case AbortingLock: - ctx.Log.Logf("lockable", "Got success signal in AbortingLock %s", node.ID) - fallthrough - case Unlocking: - ext.Requirements[source] = Unlocked + ext.PendingOwner = nil - unlocked := 0 - for _, s := range(ext.Requirements) { - if s == Unlocked { - unlocked += 1 - } - } + ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID) + ext.State = Unlocked - if unlocked == len(ext.Requirements) { - old_state := ext.State - ext.State = Unlocked - ctx.Log.Logf("lockable", "WHOLE UNLOCK: %s - %s - %+v", node.ID, ext.PendingID, ext.PendingOwner) - if old_state == Unlocking { - previous_owner := *ext.Owner - ext.Owner = ext.PendingOwner - ext.ReqID = nil - changes = append(changes, "state", "owner", "req_id") - messages = append(messages, SendMsg{previous_owner, NewSuccessSignal(ext.PendingID)}) - } else if old_state == AbortingLock { - changes = append(changes, "state", "pending_owner") - messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked")}) - ext.PendingOwner = ext.Owner - } - } else { - changes = append(changes, "state") - ctx.Log.Logf("lockable", "PARTIAL UNLOCK: %s - %d/%d", node.ID, unlocked, len(ext.Requirements)) + messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) + } else { + changes = append(changes, "state", "waiting", "requirements", "pending_owner") + + ext.PendingOwner = nil + + ext.ReqID = new(uuid.UUID) + *ext.ReqID = signal.Id + + ctx.Log.Logf("lockable", "%s transition to Unlocking", node.ID) + ext.State = Unlocking + for id := range(ext.Requirements) { + unlock_signal := NewUnlockSignal() + + ext.Waiting[unlock_signal.Id] = id + ext.Requirements[id] = Unlocking + + messages = append(messages, SendMsg{id, unlock_signal}) } } } + default: + messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_locked")}) } return messages, changes } -// Handle a LockSignal and update the extensions owner/requirement states +// Handle a LockSignal by either transitioning to a locked state, +// sending lock signals to requirements, or returning an error signal func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID, signal *LockSignal) ([]SendMsg, Changes) { var messages []SendMsg = nil - var changes = Changes{} + var changes Changes = nil - switch signal.State { - case "lock": - switch ext.State { - case Unlocked: - if len(ext.Requirements) == 0 { - ext.State = Locked - new_owner := source - ext.PendingOwner = &new_owner - ext.Owner = &new_owner - changes = append(changes, "state", "pending_owner", "owner") - messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) - } else { - ext.State = Locking - id := signal.ID() - ext.ReqID = &id - new_owner := source - ext.PendingOwner = &new_owner - ext.PendingID = signal.ID() - changes = append(changes, "state", "req_id", "pending_owner", "pending_id") - for id, state := range(ext.Requirements) { - if state != Unlocked { - ctx.Log.Logf("lockable", "REQ_NOT_UNLOCKED_WHEN_LOCKING") - } + switch ext.State { + case Unlocked: + if len(ext.Requirements) == 0 { + changes = append(changes, "state", "owner", "pending_owner") - lock_signal := NewLockSignal("lock") - ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("lock", id, lock_signal, 500*time.Millisecond) - ext.Requirements[id] = Locking + ext.Owner = new(NodeID) + *ext.Owner = source - messages = append(messages, SendMsg{id, lock_signal}) - } - } - default: - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_unlocked")}) - ctx.Log.Logf("lockable", "Tried to lock %s while %s", node.ID, ext.State) - } - case "unlock": - if ext.State == Locked { - if len(ext.Requirements) == 0 { - ext.State = Unlocked - new_owner := source - ext.PendingOwner = nil - ext.Owner = nil - changes = append(changes, "state", "pending_owner", "owner") - messages = append(messages, SendMsg{new_owner, NewSuccessSignal(signal.ID())}) - } else if source == *ext.Owner { - ext.State = Unlocking - id := signal.ID() - ext.ReqID = &id - ext.PendingOwner = nil - ext.PendingID = signal.ID() - changes = append(changes, "state", "pending_owner", "pending_id", "req_id") - for id, state := range(ext.Requirements) { - if state != Locked { - ctx.Log.Logf("lockable", "REQ_NOT_LOCKED_WHEN_UNLOCKING") - } + ext.PendingOwner = new(NodeID) + *ext.PendingOwner = source - lock_signal := NewLockSignal("unlock") - ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond) - ext.Requirements[id] = Unlocking + ctx.Log.Logf("lockable", "%s transition to Locked", node.ID) + ext.State = Locked + messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) + } else { + changes = append(changes, "state", "requirements", "waiting", "pending_owner") - messages = append(messages, SendMsg{id, lock_signal}) - } + ext.PendingOwner = new(NodeID) + *ext.PendingOwner = source + + ext.ReqID = new(uuid.UUID) + *ext.ReqID = signal.Id + + ctx.Log.Logf("lockable", "%s transition to Locking", node.ID) + ext.State = Locking + for id := range(ext.Requirements) { + lock_signal := NewLockSignal() + + ext.Waiting[lock_signal.Id] = id + ext.Requirements[id] = Locking + + messages = append(messages, SendMsg{id, lock_signal}) } - } else { - messages = append(messages, SendMsg{source, NewErrorSignal(signal.ID(), "not_locked")}) } default: - ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", signal.State) + messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_unlocked")}) } + return messages, changes } -func (ext *LockableExt) HandleTimeoutSignal(ctx *Context, node *Node, source NodeID, signal *TimeoutSignal) ([]SendMsg, Changes) { +// Handle an error signal by aborting the lock, or retrying the unlock +func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeID, signal *ErrorSignal) ([]SendMsg, Changes) { var messages []SendMsg = nil - var changes = Changes{} - - wait_info, found := node.ProcessResponse(ext.WaitInfos, signal) - if found == true { - changes = append(changes, "wait_infos") - state, found := ext.Requirements[wait_info.Destination] - if found == true { - ctx.Log.Logf("lockable", "%s timed out %s while %s was %s", wait_info.Destination, ReqStateStrings[state], node.ID, ReqStateStrings[state]) - switch ext.State { - case AbortingLock: - ext.Requirements[wait_info.Destination] = Unlocked - all_unlocked := true - for _, state := range(ext.Requirements) { - if state != Unlocked { - all_unlocked = false - break - } - } - if all_unlocked == true { - changes = append(changes, "state") - ext.State = Unlocked + var changes Changes = nil + + id, waiting := ext.Waiting[signal.ReqID] + if waiting == true { + delete(ext.Waiting, signal.ReqID) + changes = append(changes, "waiting") + + switch ext.State { + case Locking: + changes = append(changes, "state", "pending_owner", "requirements", "req_id") + messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, signal.Error)}) + + ext.ReqID = nil + ext.PendingOwner = nil + ext.Requirements[id] = Unlocked + + unlocked := 0 + for req_id, req_state := range(ext.Requirements) { + // Unlock locked requirements, and count unlocked requirements + switch req_state { + case Locked: + unlock_signal := NewUnlockSignal() + + ext.Waiting[unlock_signal.Id] = req_id + ext.Requirements[req_id] = Unlocking + + messages = append(messages, SendMsg{req_id, unlock_signal}) + case Unlocked: + unlocked += 1 } + } + + if unlocked == len(ext.Requirements) { + changes = append(changes, "owner", "state") + ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID) + ext.State = Unlocked + ext.Owner = nil + } else { + ctx.Log.Logf("lockable", "%s transition to Unlocking", node.ID) + ext.State = Unlocking + } + + case Unlocking: + req_state := ext.Requirements[id] + // Mark failed lock as Unlocked, or retry unlock + switch req_state { case Locking: - ext.State = AbortingLock - ext.Requirements[wait_info.Destination] = Unlocked - for id, state := range(ext.Requirements) { - if state == Locked { - ext.Requirements[id] = Unlocking - lock_signal := NewLockSignal("unlock") - ext.WaitInfos[lock_signal.Id] = node.QueueTimeout("unlock", id, lock_signal, 100*time.Millisecond) - messages = append(messages, SendMsg{id, lock_signal}) - ctx.Log.Logf("lockable", "sent abort unlock to %s from %s", id, node.ID) + ext.Requirements[id] = Unlocked + + // Check if all requirements unlocked now + unlocked := 0 + for _, req_state := range(ext.Requirements) { + if req_state == Unlocked { + unlocked += 1 } } - case Unlocking: - ext.Requirements[wait_info.Destination] = Locked - all_returned := true - for _, state := range(ext.Requirements) { - if state == Unlocking { - all_returned = false - break - } + + if unlocked == len(ext.Requirements) { + changes = append(changes, "owner", "state") + ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID) + ext.State = Unlocked + ext.Owner = nil } - if all_returned == true { - ext.State = Locked + case Unlocking: + // Handle error for unlocking requirement while unlocking by retrying unlock + unlock_signal := NewUnlockSignal() + ext.Waiting[unlock_signal.Id] = id + messages = append(messages, SendMsg{id, unlock_signal}) + } + } + } + + return messages, changes +} + +// Handle a success signal by checking if all requirements have been locked/unlocked +func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source NodeID, signal *SuccessSignal) ([]SendMsg, Changes) { + var messages []SendMsg = nil + var changes Changes = nil + + id, waiting := ext.Waiting[signal.ReqID] + if waiting == true { + delete(ext.Waiting, signal.ReqID) + changes = append(changes, "waiting") + + req_state := ext.Requirements[id] + switch req_state { + case Locking: + ext.Requirements[id] = Locked + case Unlocking: + ext.Requirements[id] = Unlocked + } + + locked := 0 + unlocked := 0 + for _, req_state := range(ext.Requirements) { + switch req_state { + case Locked: + locked += 1 + case Unlocked: + unlocked += 1 + } + } + + switch ext.State { + case Locking: + if locked == len(ext.Requirements) { + changes = append(changes, "state", "owner", "req_id") + ctx.Log.Logf("lockable", "%s transition to Locked", node.ID) + ext.State = Locked + + ext.Owner = new(NodeID) + *ext.Owner = *ext.PendingOwner + + messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) + ext.ReqID = nil + } + case Unlocking: + if unlocked == len(ext.Requirements) { + changes = append(changes, "state", "owner", "req_id") + ctx.Log.Logf("lockable", "%s transition to Unlocked", node.ID) + ext.State = Unlocked + + if ext.Owner != nil { + messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) + + ext.ReqID = nil + ext.Owner = nil } } - } else { - ctx.Log.Logf("lockable", "%s timed out", wait_info.Destination) } } return messages, changes } -// LockableExts process status signals by forwarding them to it's owner -// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]SendMsg, Changes) { var messages []SendMsg = nil - var changes = Changes{} + var changes Changes = nil switch sig := signal.(type) { case *StatusSignal: + // Forward StatusSignals up to the owner(unless that would be a cycle) if ext.Owner != nil { if *ext.Owner != node.ID { messages = append(messages, SendMsg{*ext.Owner, signal}) @@ -408,13 +369,12 @@ func (ext *LockableExt) Process(ctx *Context, node *Node, source NodeID, signal messages, changes = ext.HandleLinkSignal(ctx, node, source, sig) case *LockSignal: messages, changes = ext.HandleLockSignal(ctx, node, source, sig) + case *UnlockSignal: + messages, changes = ext.HandleUnlockSignal(ctx, node, source, sig) case *ErrorSignal: messages, changes = ext.HandleErrorSignal(ctx, node, source, sig) case *SuccessSignal: messages, changes = ext.HandleSuccessSignal(ctx, node, source, sig) - case *TimeoutSignal: - messages, changes = ext.HandleTimeoutSignal(ctx, node, source, sig) - default: } return messages, changes diff --git a/lockable_test.go b/lockable_test.go index 986a9f3..ae4d789 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -79,7 +79,7 @@ func Test1000Lock(t *testing.T) { } func TestLock(t *testing.T) { - ctx := logTestContext(t, []string{"test", "lockable"}) + ctx := logTestContext(t, []string{"test", "lockable", "signal"}) NewLockable := func(reqs []NodeID)(*Node, *ListenerExt) { listener := NewListenerExt(1000) @@ -103,24 +103,26 @@ func TestLock(t *testing.T) { ctx.Log.Logf("test", "l5: %s", l5.ID) id_1, err := LockLockable(ctx, l0) - ctx.Log.Logf("test", "ID_1: %s", id_1) fatalErr(t, err) - _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) + response, _, err := WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) fatalErr(t, err) + ctx.Log.Logf("test", "l0 lock: %+v", response) id_2, err := LockLockable(ctx, l1) fatalErr(t, err) - _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*100, id_2) + response, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2) fatalErr(t, err) + ctx.Log.Logf("test", "l1 lock: %+v", response) id_3, err := UnlockLockable(ctx, l0) fatalErr(t, err) - _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) + response, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) fatalErr(t, err) + ctx.Log.Logf("test", "l0 unlock: %+v", response) id_4, err := LockLockable(ctx, l1) fatalErr(t, err) - - _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4) + response, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4) fatalErr(t, err) + ctx.Log.Logf("test", "l1 lock: %+v", response) } diff --git a/node.go b/node.go index 547b894..2b5d881 100644 --- a/node.go +++ b/node.go @@ -66,6 +66,8 @@ func (q QueuedSignal) String() string { return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time) } +type WaitMap map[uuid.UUID]NodeID + // Default message channel size for nodes // Nodes represent a group of extensions that can be collectively addressed type Node struct { @@ -101,54 +103,6 @@ func (node *Node) PostDeserialize(ctx *Context) error { return nil } -type WaitReason string -type WaitInfo struct { - Destination NodeID `gv:"destination" node:"Base"` - Timeout uuid.UUID `gv:"timeout"` - Reason WaitReason `gv:"reason"` -} - -type WaitMap map[uuid.UUID]WaitInfo - -// Removes a signal from the wait_map and dequeue the associated timeout signal -// Returns the data, and whether or not the ID was found in the wait_map -func (node *Node) ProcessResponse(wait_map WaitMap, response ResponseSignal) (WaitInfo, bool) { - wait_info, is_processed := wait_map[response.ResponseID()] - if is_processed == true { - delete(wait_map, response.ResponseID()) - if response.ID() != wait_info.Timeout { - node.DequeueSignal(wait_info.Timeout) - } - return wait_info, true - } - return WaitInfo{}, false -} - -func (node *Node) NewTimeout(reason WaitReason, dest NodeID, timeout time.Duration) (WaitInfo, uuid.UUID) { - id := uuid.New() - - timeout_signal := NewTimeoutSignal(id) - node.QueueSignal(time.Now().Add(timeout), timeout_signal) - - return WaitInfo{ - Destination: dest, - Timeout: timeout_signal.Id, - Reason: reason, - }, id -} - -// Creates a timeout signal for signal, queues it for the node at the timeout, and returns the WaitInfo -func (node *Node) QueueTimeout(reason WaitReason, dest NodeID, signal Signal, timeout time.Duration) WaitInfo { - timeout_signal := NewTimeoutSignal(signal.ID()) - node.QueueSignal(time.Now().Add(timeout), timeout_signal) - - return WaitInfo{ - Destination: dest, - Timeout: timeout_signal.Id, - Reason: reason, - } -} - func (node *Node) QueueSignal(time time.Time, signal Signal) { node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) diff --git a/signal.go b/signal.go index 9f8aafb..86d3cf7 100644 --- a/signal.go +++ b/signal.go @@ -21,13 +21,6 @@ func (signal TimeoutSignal) String() string { return fmt.Sprintf("TimeoutSignal(%s)", &signal.ResponseHeader) } -type SignalDirection int -const ( - Up SignalDirection = iota - Down - Direct -) - type SignalHeader struct { Id uuid.UUID `gv:"id"` } @@ -205,19 +198,31 @@ func NewLinkSignal(action string, id NodeID) Signal { type LockSignal struct { SignalHeader - State string } func (signal LockSignal) String() string { - return fmt.Sprintf("LockSignal(%s, %s)", signal.SignalHeader, signal.State) + return fmt.Sprintf("LockSignal(%s)", signal.SignalHeader) } -func NewLockSignal(state string) *LockSignal { +func NewLockSignal() *LockSignal { return &LockSignal{ NewSignalHeader(), - state, } } +type UnlockSignal struct { + SignalHeader +} +func (signal UnlockSignal) String() string { + return fmt.Sprintf("UnlockSignal(%s)", signal.SignalHeader) +} + +func NewUnlockSignal() *UnlockSignal { + return &UnlockSignal{ + NewSignalHeader(), + } +} + + type ReadSignal struct { SignalHeader Extensions map[ExtType][]string `json:"extensions"`