Added first pass of hierarchical locking

graph-rework
noah metz 2023-06-20 22:36:18 -06:00
parent 687355f355
commit adcd079156
4 changed files with 88 additions and 16 deletions

@ -553,6 +553,14 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve
return queue, nil
}
func (event * BaseEvent) Allowed() []GraphNode {
ret := make([]GraphNode, len(event.children))
for i, v := range(event.children) {
ret[i] = v
}
return ret
}
func (event * BaseEvent) Parent() Event {
return event.parent
}

@ -136,6 +136,9 @@ type GraphNode interface {
Name() string
Description() string
ID() string
Allowed() []GraphNode
Delegator(id string) GraphNode
TakeLock(resource Resource)
UpdateListeners(update GraphSignal)
PropagateUpdate(update GraphSignal)
RegisterChannel(listener chan GraphSignal)
@ -151,6 +154,7 @@ func NewBaseNode(name string, description string, id string) BaseNode {
id: id,
signal: make(chan GraphSignal, 512),
listeners: map[chan GraphSignal]chan GraphSignal{},
delegation_map: map[string]GraphNode{},
}
Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name())
return node
@ -165,6 +169,30 @@ type BaseNode struct {
signal chan GraphSignal
listeners_lock sync.Mutex
listeners map[chan GraphSignal]chan GraphSignal
delegation_map map[string]GraphNode
}
func (node * BaseNode) TakeLock(resource Resource) {
_, exists := node.delegation_map[resource.ID()]
if exists == true {
panic("Trying to take a lock we already have")
}
node.delegation_map[resource.ID()] = resource.Owner()
}
func (node * BaseNode) Allowed() []GraphNode {
return []GraphNode{}
}
func (node * BaseNode) Delegator(id string) GraphNode {
last_owner, exists := node.delegation_map[id]
if exists == false {
panic("Trying to delegate a lock we don't own")
}
delete(node.delegation_map, id)
return last_owner
}
func (node * BaseNode) SignalChannel() chan GraphSignal {

@ -319,14 +319,6 @@ func TestAbortEventQueue(t * testing.T) {
LockResource(r1, root_event)
e1, _ := NewEvent("event_1", "", []Resource{r1})
e1_info := NewEventQueueInfo(1)
// Add an event so that the queue doesn't auto complete
err := LinkEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal(err)
}
// Now that the event is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
@ -335,7 +327,7 @@ func TestAbortEventQueue(t * testing.T) {
SendUpdate(root_event, abort_signal)
}()
err = LockResources(root_event)
err := LockResources(root_event)
if err != nil {
t.Fatal(err)
}
@ -349,6 +341,35 @@ func TestAbortEventQueue(t * testing.T) {
}
}
func TestDelegateLock(t * testing.T) {
Log.Init([]string{})
test_resource, _ := NewResource("test_resource", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{test_resource})
test_event, _ := NewEvent("test_event", "", []Resource{test_resource})
err := LinkEvent(root_event, test_event, NewEventQueueInfo(1))
if err != nil {
t.Fatal(err)
}
err = LockResources(root_event)
if err != nil {
t.Fatal(err)
}
test_listener := test_event.UpdateChannel()
go func() {
(*GraphTester)(t).WaitForValue(test_listener, "event_done", test_event, 250 * time.Millisecond, "No event_done for test_event")
abort_signal := NewDownSignal(nil, "cancel")
SendUpdate(root_event, abort_signal)
}()
err = RunEvent(root_event)
if err != nil {
t.Fatal(err)
}
}
func TestStartWithoutLocking(t * testing.T) {
test_resource, _ := NewResource("test_resource", "", []Resource{})
root_event, _ := NewEvent("root_event", "", []Resource{test_resource})
@ -360,6 +381,7 @@ func TestStartWithoutLocking(t * testing.T) {
}
func TestStartEventQueue(t * testing.T) {
Log.Init([]string{"event"})
root_event, _ := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
rel := root_event.UpdateChannel();
@ -432,7 +454,7 @@ func TestStartEventQueue(t * testing.T) {
}
if e1_r.Owner() != nil {
t.Fatal("e1 was not completed")
t.Fatal(fmt.Sprintf("e1 was not completed: %s", e1_r.Owner()))
}
if e2_r.Owner() != nil {

@ -104,7 +104,7 @@ func checkIfChild(r Resource, cur Resource) bool {
return false
}
func UnlockResource(resource Resource, event Event) error {
func UnlockResource(resource Resource, node GraphNode) error {
var err error = nil
resource.LockState()
defer resource.UnlockState()
@ -112,13 +112,13 @@ func UnlockResource(resource Resource, event Event) error {
return errors.New("Resource already unlocked")
}
if resource.Owner().ID() != event.ID() {
if resource.Owner().ID() != node.ID() {
return errors.New("Resource not locked by parent, unlock failed")
}
var lock_err error = nil
for _, child := range resource.Children() {
err := UnlockResource(child, event)
err := UnlockResource(child, node)
if err != nil {
lock_err = err
break
@ -129,9 +129,9 @@ func UnlockResource(resource Resource, event Event) error {
return fmt.Errorf("Resource failed to unlock: %s", lock_err)
}
resource.SetOwner(nil)
resource.SetOwner(node.Delegator(resource.ID()))
err = resource.unlock(event)
err = resource.unlock(node)
if err != nil {
return errors.New("Failed to unlock resource")
}
@ -139,11 +139,24 @@ func UnlockResource(resource Resource, event Event) error {
return nil
}
func isAllowedToTakeLock(node GraphNode, current_owner GraphNode) bool {
for _, allowed := range(current_owner.Allowed()) {
if allowed.ID() == node.ID() {
return true
}
}
return false
}
func LockResource(resource Resource, node GraphNode) error {
resource.LockState()
defer resource.UnlockState()
if resource.Owner() != nil {
return fmt.Errorf("Resource already locked: %s", resource.Name())
// Check if node is allowed to take a lock from resource.Owner()
if isAllowedToTakeLock(node, resource.Owner()) == false {
return fmt.Errorf("%s is not allowed to take a lock from %s, allowed: %+v", node.Name(), resource.Owner().Name(), resource.Owner().Allowed())
}
}
err := resource.lock(node)
@ -167,6 +180,7 @@ func LockResource(resource Resource, node GraphNode) error {
}
Log.Logf("resource", "Locked %s", resource.Name())
node.TakeLock(resource)
resource.SetOwner(node)
return nil