From a1ce4238cc2367c65643701bc4f9cf911a320f23 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Thu, 27 Jul 2023 19:53:43 -0600 Subject: [PATCH] Implemented locking over signals, TODO: implement unlock --- lockable.go | 113 +++++++++++++++++++++++++++++------------------ lockable_test.go | 64 ++++++++++++++++++++------- 2 files changed, 119 insertions(+), 58 deletions(-) diff --git a/lockable.go b/lockable.go index ab7b267..7d30c13 100644 --- a/lockable.go +++ b/lockable.go @@ -70,33 +70,22 @@ func (ext *LockableExt) Serialize() ([]byte, error) { func NewLockableExt() *LockableExt { return &LockableExt{ Owner: nil, + PendingOwner: nil, Requirements: map[NodeID]string{}, Dependencies: map[NodeID]string{}, - LocksHeld: map[NodeID]*NodeID{}, LockStates: map[NodeID]string{}, } } type LockableExt struct { Owner *NodeID `json:"owner"` + PendingOwner *NodeID `json:"pending_owner"` Requirements map[NodeID]string `json:"requirements"` Dependencies map[NodeID]string `json:"dependencies"` LockStates map[NodeID]string `json:"lock_states"` - LocksHeld map[NodeID]*NodeID `json:"locks_held"` } func LockLockable(ctx *Context, node *Node) error { - ext, err := GetExt[*LockableExt](node) - if err != nil { - return err - } - - _, exists := ext.LockStates[node.ID] - if exists == true { - return fmt.Errorf("%s is already being locked, cannot lock again", node.ID) - } - - ext.LockStates[node.ID] = "start" return ctx.Send(node.ID, node.ID, NewLockSignal("lock")) } @@ -124,11 +113,64 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal) state := signal.State switch state { + case "locked": + if source == node.ID { + return + } + + _, exists := ext.LockStates[source] + if exists == true { + ext.LockStates[source] = "locked" + locked_reqs := 0 + for _, state := range(ext.LockStates) { + if state == "locked" { + locked_reqs += 1 + } + } + if len(ext.Requirements) == locked_reqs { + ext.Owner = ext.PendingOwner + ext.PendingOwner = nil + ctx.Send(node.ID, *ext.Owner, NewLockSignal("locked")) + } + } else { + ctx.Send(node.ID, source, NewLockSignal("reset")) + } + case "pending": + state, exists := ext.LockStates[source] + if exists == true && state != "pending" { + delete(ext.LockStates, source) + ctx.Send(node.ID, source, NewLockSignal("reset")) + } else if exists == false { + ctx.Send(node.ID, source, NewLockSignal("reset")) + } + case "lock": + if ext.Owner != nil { + ctx.Send(node.ID, source, NewLockSignal("already_locked")) + } else if ext.PendingOwner == nil { + if len(ext.Requirements) == 0 { + owner := source + ext.Owner = &owner + ctx.Send(node.ID, source, NewLockSignal("locked")) + } else { + pending_owner := source + ext.PendingOwner = &pending_owner + for id, state := range(ext.Requirements) { + if state == "linked" { + ext.LockStates[id] = "pending" + ctx.Send(node.ID, id, NewLockSignal("lock")) + } + } + if source != node.ID { + ctx.Send(node.ID, source, NewLockSignal("pending")) + } + } + } default: ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", state) } } +// TODO: don't allow changes to requirements or dependencies while being locked or locked func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal) state := signal.State @@ -195,15 +237,17 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal switch signal.Direction() { case Up: owner_sent := false - for dependency, _ := range(ext.Dependencies) { - err := ctx.Send(node.ID, dependency, signal) - if err != nil { - ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, dependency, err) - } + for dependency, state := range(ext.Dependencies) { + if state == "linked" { + err := ctx.Send(node.ID, dependency, signal) + if err != nil { + ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, dependency, err) + } - if ext.Owner != nil { - if dependency == *ext.Owner { - owner_sent = true + if ext.Owner != nil { + if dependency == *ext.Owner { + owner_sent = true + } } } } @@ -217,10 +261,12 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal } } case Down: - for requirement, _ := range(ext.Requirements) { - err := ctx.Send(node.ID, requirement, signal) - if err != nil { - ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, requirement, err) + for requirement, state := range(ext.Requirements) { + if state == "linked" { + err := ctx.Send(node.ID, requirement, signal) + if err != nil { + ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, requirement, err) + } } } case Direct: @@ -235,23 +281,6 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal } } -func (ext *LockableExt) RecordUnlock(node NodeID) *NodeID { - last_owner, exists := ext.LocksHeld[node] - if exists == false { - panic("Attempted to take a get the original lock holder of a lockable we don't own") - } - delete(ext.LocksHeld, node) - return last_owner -} - -func (ext *LockableExt) RecordLock(node NodeID, last_owner *NodeID) { - _, exists := ext.LocksHeld[node] - if exists == true { - panic("Attempted to lock a lockable we're already holding(lock cycle)") - } - ext.LocksHeld[node] = last_owner -} - func SaveNode(node *Node) string { str := "" if node != nil { diff --git a/lockable_test.go b/lockable_test.go index 616f2ec..26c1d04 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -50,27 +50,59 @@ func TestLink(t *testing.T) { } func TestLock(t *testing.T) { - ctx := lockableTestContext(t, []string{"test", "lockable"}) - - l1_listener := NewListenerExt(10) - l1 := NewNode(ctx, RandID(), TestLockableType, nil, - l1_listener, - NewACLExt(&link_policy), - NewLockableExt(), - ) - l2_listener := NewListenerExt(10) - l2 := NewNode(ctx, RandID(), TestLockableType, nil, - l2_listener, - NewACLExt(&link_policy), - NewLockableExt(), - ) + ctx := lockableTestContext(t, []string{"lockable"}) + + NewLockable := func()(*Node, *ListenerExt) { + listener := NewListenerExt(10) + l := NewNode(ctx, RandID(), TestLockableType, nil, + listener, + NewACLExt(&lock_policy), + NewLockableExt(), + ) + return l, listener + } + + l0, l0_listener := NewLockable() + l1, l1_listener := NewLockable() + l2, _ := NewLockable() + l3, _ := NewLockable() + l4, _ := NewLockable() + l5, _ := NewLockable() + + + var err error + err = LinkRequirement(ctx, l1, l2.ID) + fatalErr(t, err) + err = LinkRequirement(ctx, l1, l3.ID) + fatalErr(t, err) + err = LinkRequirement(ctx, l1, l4.ID) + fatalErr(t, err) + err = LinkRequirement(ctx, l1, l5.ID) + fatalErr(t, err) - err := LinkRequirement(ctx, l1, l2.ID) + err = LinkRequirement(ctx, l0, l2.ID) + fatalErr(t, err) + err = LinkRequirement(ctx, l0, l3.ID) fatalErr(t, err) + err = LinkRequirement(ctx, l0, l4.ID) + fatalErr(t, err) + err = LinkRequirement(ctx, l0, l5.ID) + fatalErr(t, err) + + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") - (*GraphTester)(t).WaitForState(ctx, l2_listener, LinkSignalType, "req_linked", time.Millisecond*10, "No req_linked") + + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") err = LockLockable(ctx, l1) fatalErr(t, err) (*GraphTester)(t).WaitForState(ctx, l1_listener, LockSignalType, "locked", time.Millisecond*10, "No locked") + err = LockLockable(ctx, l0) + fatalErr(t, err) + (*GraphTester)(t).WaitForState(ctx, l1_listener, LockSignalType, "locked", time.Millisecond*10, "No locked") }