From adcd079156810ea221a79efff6d7e2c1b48cbc06 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Tue, 20 Jun 2023 22:36:18 -0600 Subject: [PATCH] Added first pass of hierarchical locking --- event.go | 8 ++++++++ graph.go | 28 ++++++++++++++++++++++++++++ manager_test.go | 42 ++++++++++++++++++++++++++++++++---------- resource.go | 26 ++++++++++++++++++++------ 4 files changed, 88 insertions(+), 16 deletions(-) diff --git a/event.go b/event.go index bed4ce8..095077b 100644 --- a/event.go +++ b/event.go @@ -553,6 +553,14 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve return queue, nil } +func (event * BaseEvent) Allowed() []GraphNode { + ret := make([]GraphNode, len(event.children)) + for i, v := range(event.children) { + ret[i] = v + } + return ret +} + func (event * BaseEvent) Parent() Event { return event.parent } diff --git a/graph.go b/graph.go index 6c387bf..5bef206 100644 --- a/graph.go +++ b/graph.go @@ -136,6 +136,9 @@ type GraphNode interface { Name() string Description() string ID() string + Allowed() []GraphNode + Delegator(id string) GraphNode + TakeLock(resource Resource) UpdateListeners(update GraphSignal) PropagateUpdate(update GraphSignal) RegisterChannel(listener chan GraphSignal) @@ -151,6 +154,7 @@ func NewBaseNode(name string, description string, id string) BaseNode { id: id, signal: make(chan GraphSignal, 512), listeners: map[chan GraphSignal]chan GraphSignal{}, + delegation_map: map[string]GraphNode{}, } Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) return node @@ -165,6 +169,30 @@ type BaseNode struct { signal chan GraphSignal listeners_lock sync.Mutex listeners map[chan GraphSignal]chan GraphSignal + delegation_map map[string]GraphNode +} + +func (node * BaseNode) TakeLock(resource Resource) { + _, exists := node.delegation_map[resource.ID()] + if exists == true { + panic("Trying to take a lock we already have") + } + + node.delegation_map[resource.ID()] = resource.Owner() +} + +func (node * BaseNode) Allowed() []GraphNode { + return []GraphNode{} +} + +func (node * BaseNode) Delegator(id string) GraphNode { + last_owner, exists := node.delegation_map[id] + if exists == false { + panic("Trying to delegate a lock we don't own") + } + + delete(node.delegation_map, id) + return last_owner } func (node * BaseNode) SignalChannel() chan GraphSignal { diff --git a/manager_test.go b/manager_test.go index 393b9a8..3387691 100644 --- a/manager_test.go +++ b/manager_test.go @@ -319,14 +319,6 @@ func TestAbortEventQueue(t * testing.T) { LockResource(r1, root_event) - e1, _ := NewEvent("event_1", "", []Resource{r1}) - e1_info := NewEventQueueInfo(1) - // Add an event so that the queue doesn't auto complete - err := LinkEvent(root_event, e1, e1_info) - if err != nil { - t.Fatal(err) - } - // Now that the event is constructed with a queue and 3 basic events // start the queue and check that all the events are executed go func() { @@ -335,7 +327,7 @@ func TestAbortEventQueue(t * testing.T) { SendUpdate(root_event, abort_signal) }() - err = LockResources(root_event) + err := LockResources(root_event) if err != nil { t.Fatal(err) } @@ -349,6 +341,35 @@ func TestAbortEventQueue(t * testing.T) { } } +func TestDelegateLock(t * testing.T) { + Log.Init([]string{}) + test_resource, _ := NewResource("test_resource", "", []Resource{}) + root_event, _ := NewEventQueue("root_event", "", []Resource{test_resource}) + test_event, _ := NewEvent("test_event", "", []Resource{test_resource}) + err := LinkEvent(root_event, test_event, NewEventQueueInfo(1)) + if err != nil { + t.Fatal(err) + } + + err = LockResources(root_event) + if err != nil { + t.Fatal(err) + } + + test_listener := test_event.UpdateChannel() + + go func() { + (*GraphTester)(t).WaitForValue(test_listener, "event_done", test_event, 250 * time.Millisecond, "No event_done for test_event") + abort_signal := NewDownSignal(nil, "cancel") + SendUpdate(root_event, abort_signal) + }() + + err = RunEvent(root_event) + if err != nil { + t.Fatal(err) + } +} + func TestStartWithoutLocking(t * testing.T) { test_resource, _ := NewResource("test_resource", "", []Resource{}) root_event, _ := NewEvent("root_event", "", []Resource{test_resource}) @@ -360,6 +381,7 @@ func TestStartWithoutLocking(t * testing.T) { } func TestStartEventQueue(t * testing.T) { + Log.Init([]string{"event"}) root_event, _ := NewEventQueue("root_event", "", []Resource{}) r := root_event.DoneResource() rel := root_event.UpdateChannel(); @@ -432,7 +454,7 @@ func TestStartEventQueue(t * testing.T) { } if e1_r.Owner() != nil { - t.Fatal("e1 was not completed") + t.Fatal(fmt.Sprintf("e1 was not completed: %s", e1_r.Owner())) } if e2_r.Owner() != nil { diff --git a/resource.go b/resource.go index 1bbd3c9..85263c4 100644 --- a/resource.go +++ b/resource.go @@ -104,7 +104,7 @@ func checkIfChild(r Resource, cur Resource) bool { return false } -func UnlockResource(resource Resource, event Event) error { +func UnlockResource(resource Resource, node GraphNode) error { var err error = nil resource.LockState() defer resource.UnlockState() @@ -112,13 +112,13 @@ func UnlockResource(resource Resource, event Event) error { return errors.New("Resource already unlocked") } - if resource.Owner().ID() != event.ID() { + if resource.Owner().ID() != node.ID() { return errors.New("Resource not locked by parent, unlock failed") } var lock_err error = nil for _, child := range resource.Children() { - err := UnlockResource(child, event) + err := UnlockResource(child, node) if err != nil { lock_err = err break @@ -129,9 +129,9 @@ func UnlockResource(resource Resource, event Event) error { return fmt.Errorf("Resource failed to unlock: %s", lock_err) } - resource.SetOwner(nil) + resource.SetOwner(node.Delegator(resource.ID())) - err = resource.unlock(event) + err = resource.unlock(node) if err != nil { return errors.New("Failed to unlock resource") } @@ -139,11 +139,24 @@ func UnlockResource(resource Resource, event Event) error { return nil } +func isAllowedToTakeLock(node GraphNode, current_owner GraphNode) bool { + for _, allowed := range(current_owner.Allowed()) { + if allowed.ID() == node.ID() { + return true + } + } + return false +} + func LockResource(resource Resource, node GraphNode) error { resource.LockState() defer resource.UnlockState() + if resource.Owner() != nil { - return fmt.Errorf("Resource already locked: %s", resource.Name()) + // Check if node is allowed to take a lock from resource.Owner() + if isAllowedToTakeLock(node, resource.Owner()) == false { + return fmt.Errorf("%s is not allowed to take a lock from %s, allowed: %+v", node.Name(), resource.Owner().Name(), resource.Owner().Allowed()) + } } err := resource.lock(node) @@ -167,6 +180,7 @@ func LockResource(resource Resource, node GraphNode) error { } Log.Logf("resource", "Locked %s", resource.Name()) + node.TakeLock(resource) resource.SetOwner(node) return nil