Implemented locking over signals, TODO: implement unlock

gql_cataclysm
noah metz 2023-07-27 19:53:43 -06:00
parent a44b00bc97
commit a1ce4238cc
2 changed files with 119 additions and 58 deletions

@ -70,33 +70,22 @@ func (ext *LockableExt) Serialize() ([]byte, error) {
func NewLockableExt() *LockableExt { func NewLockableExt() *LockableExt {
return &LockableExt{ return &LockableExt{
Owner: nil, Owner: nil,
PendingOwner: nil,
Requirements: map[NodeID]string{}, Requirements: map[NodeID]string{},
Dependencies: map[NodeID]string{}, Dependencies: map[NodeID]string{},
LocksHeld: map[NodeID]*NodeID{},
LockStates: map[NodeID]string{}, LockStates: map[NodeID]string{},
} }
} }
type LockableExt struct { type LockableExt struct {
Owner *NodeID `json:"owner"` Owner *NodeID `json:"owner"`
PendingOwner *NodeID `json:"pending_owner"`
Requirements map[NodeID]string `json:"requirements"` Requirements map[NodeID]string `json:"requirements"`
Dependencies map[NodeID]string `json:"dependencies"` Dependencies map[NodeID]string `json:"dependencies"`
LockStates map[NodeID]string `json:"lock_states"` LockStates map[NodeID]string `json:"lock_states"`
LocksHeld map[NodeID]*NodeID `json:"locks_held"`
} }
func LockLockable(ctx *Context, node *Node) error { func LockLockable(ctx *Context, node *Node) error {
ext, err := GetExt[*LockableExt](node)
if err != nil {
return err
}
_, exists := ext.LockStates[node.ID]
if exists == true {
return fmt.Errorf("%s is already being locked, cannot lock again", node.ID)
}
ext.LockStates[node.ID] = "start"
return ctx.Send(node.ID, node.ID, NewLockSignal("lock")) return ctx.Send(node.ID, node.ID, NewLockSignal("lock"))
} }
@ -124,11 +113,64 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node
ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal) ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State state := signal.State
switch state { switch state {
case "locked":
if source == node.ID {
return
}
_, exists := ext.LockStates[source]
if exists == true {
ext.LockStates[source] = "locked"
locked_reqs := 0
for _, state := range(ext.LockStates) {
if state == "locked" {
locked_reqs += 1
}
}
if len(ext.Requirements) == locked_reqs {
ext.Owner = ext.PendingOwner
ext.PendingOwner = nil
ctx.Send(node.ID, *ext.Owner, NewLockSignal("locked"))
}
} else {
ctx.Send(node.ID, source, NewLockSignal("reset"))
}
case "pending":
state, exists := ext.LockStates[source]
if exists == true && state != "pending" {
delete(ext.LockStates, source)
ctx.Send(node.ID, source, NewLockSignal("reset"))
} else if exists == false {
ctx.Send(node.ID, source, NewLockSignal("reset"))
}
case "lock":
if ext.Owner != nil {
ctx.Send(node.ID, source, NewLockSignal("already_locked"))
} else if ext.PendingOwner == nil {
if len(ext.Requirements) == 0 {
owner := source
ext.Owner = &owner
ctx.Send(node.ID, source, NewLockSignal("locked"))
} else {
pending_owner := source
ext.PendingOwner = &pending_owner
for id, state := range(ext.Requirements) {
if state == "linked" {
ext.LockStates[id] = "pending"
ctx.Send(node.ID, id, NewLockSignal("lock"))
}
}
if source != node.ID {
ctx.Send(node.ID, source, NewLockSignal("pending"))
}
}
}
default: default:
ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", state) ctx.Log.Logf("lockable", "LOCK_ERR: unkown state %s", state)
} }
} }
// TODO: don't allow changes to requirements or dependencies while being locked or locked
func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal) ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State state := signal.State
@ -195,15 +237,17 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
switch signal.Direction() { switch signal.Direction() {
case Up: case Up:
owner_sent := false owner_sent := false
for dependency, _ := range(ext.Dependencies) { for dependency, state := range(ext.Dependencies) {
err := ctx.Send(node.ID, dependency, signal) if state == "linked" {
if err != nil { err := ctx.Send(node.ID, dependency, signal)
ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, dependency, err) if err != nil {
} ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, dependency, err)
}
if ext.Owner != nil { if ext.Owner != nil {
if dependency == *ext.Owner { if dependency == *ext.Owner {
owner_sent = true owner_sent = true
}
} }
} }
} }
@ -217,10 +261,12 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
} }
} }
case Down: case Down:
for requirement, _ := range(ext.Requirements) { for requirement, state := range(ext.Requirements) {
err := ctx.Send(node.ID, requirement, signal) if state == "linked" {
if err != nil { err := ctx.Send(node.ID, requirement, signal)
ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, requirement, err) if err != nil {
ctx.Log.Logf("signal", "LOCKABLE_SIGNAL_ERR: %s->%s - %e", node.ID, requirement, err)
}
} }
} }
case Direct: case Direct:
@ -235,23 +281,6 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
} }
} }
func (ext *LockableExt) RecordUnlock(node NodeID) *NodeID {
last_owner, exists := ext.LocksHeld[node]
if exists == false {
panic("Attempted to take a get the original lock holder of a lockable we don't own")
}
delete(ext.LocksHeld, node)
return last_owner
}
func (ext *LockableExt) RecordLock(node NodeID, last_owner *NodeID) {
_, exists := ext.LocksHeld[node]
if exists == true {
panic("Attempted to lock a lockable we're already holding(lock cycle)")
}
ext.LocksHeld[node] = last_owner
}
func SaveNode(node *Node) string { func SaveNode(node *Node) string {
str := "" str := ""
if node != nil { if node != nil {

@ -50,27 +50,59 @@ func TestLink(t *testing.T) {
} }
func TestLock(t *testing.T) { func TestLock(t *testing.T) {
ctx := lockableTestContext(t, []string{"test", "lockable"}) ctx := lockableTestContext(t, []string{"lockable"})
l1_listener := NewListenerExt(10) NewLockable := func()(*Node, *ListenerExt) {
l1 := NewNode(ctx, RandID(), TestLockableType, nil, listener := NewListenerExt(10)
l1_listener, l := NewNode(ctx, RandID(), TestLockableType, nil,
NewACLExt(&link_policy), listener,
NewLockableExt(), NewACLExt(&lock_policy),
) NewLockableExt(),
l2_listener := NewListenerExt(10) )
l2 := NewNode(ctx, RandID(), TestLockableType, nil, return l, listener
l2_listener, }
NewACLExt(&link_policy),
NewLockableExt(), l0, l0_listener := NewLockable()
) l1, l1_listener := NewLockable()
l2, _ := NewLockable()
l3, _ := NewLockable()
l4, _ := NewLockable()
l5, _ := NewLockable()
var err error
err = LinkRequirement(ctx, l1, l2.ID)
fatalErr(t, err)
err = LinkRequirement(ctx, l1, l3.ID)
fatalErr(t, err)
err = LinkRequirement(ctx, l1, l4.ID)
fatalErr(t, err)
err = LinkRequirement(ctx, l1, l5.ID)
fatalErr(t, err)
err := LinkRequirement(ctx, l1, l2.ID) err = LinkRequirement(ctx, l0, l2.ID)
fatalErr(t, err)
err = LinkRequirement(ctx, l0, l3.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l0, l4.ID)
fatalErr(t, err)
err = LinkRequirement(ctx, l0, l5.ID)
fatalErr(t, err)
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l2_listener, LinkSignalType, "req_linked", time.Millisecond*10, "No req_linked")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "dep_linked", time.Millisecond*10, "No dep_link")
err = LockLockable(ctx, l1) err = LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)
(*GraphTester)(t).WaitForState(ctx, l1_listener, LockSignalType, "locked", time.Millisecond*10, "No locked") (*GraphTester)(t).WaitForState(ctx, l1_listener, LockSignalType, "locked", time.Millisecond*10, "No locked")
err = LockLockable(ctx, l0)
fatalErr(t, err)
(*GraphTester)(t).WaitForState(ctx, l1_listener, LockSignalType, "locked", time.Millisecond*10, "No locked")
} }