Added test to create 10K lockables and link them to a single lockable

gql_cataclysm
noah metz 2023-07-28 12:46:06 -06:00
parent 08288f88af
commit 61de2669e2
5 changed files with 119 additions and 42 deletions

@ -3,6 +3,7 @@ package graphvent
import ( import (
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
"fmt" "fmt"
"sync"
"errors" "errors"
"runtime" "runtime"
"crypto/sha512" "crypto/sha512"
@ -77,6 +78,7 @@ type Context struct {
// Map between database type hashes and the registered info // Map between database type hashes and the registered info
Types map[uint64]NodeInfo Types map[uint64]NodeInfo
// Routing map to all the nodes local to this context // Routing map to all the nodes local to this context
NodesLock sync.RWMutex
Nodes map[NodeID]*Node Nodes map[NodeID]*Node
} }
@ -130,9 +132,23 @@ func (ctx *Context) RegisterExtension(ext_type ExtType, load_fn ExtensionLoadFun
return nil return nil
} }
func (ctx *Context) AddNode(id NodeID, node *Node) {
ctx.NodesLock.Lock()
ctx.Nodes[id] = node
ctx.NodesLock.Unlock()
}
func (ctx *Context) Node(id NodeID) (*Node, bool) {
ctx.NodesLock.RLock()
node, exists := ctx.Nodes[id]
ctx.NodesLock.RUnlock()
return node, exists
}
// Get a node from the context, or load from the database if not loaded // Get a node from the context, or load from the database if not loaded
func (ctx *Context) GetNode(id NodeID) (*Node, error) { func (ctx *Context) GetNode(id NodeID) (*Node, error) {
target, exists := ctx.Nodes[id] target, exists := ctx.Node(id)
if exists == false { if exists == false {
var err error var err error
target, err = LoadNode(ctx, id) target, err = LoadNode(ctx, id)

@ -2,7 +2,6 @@ package graphvent
import ( import (
"encoding/json" "encoding/json"
"fmt"
) )
// A Listener extension provides a channel that can receive signals on a different thread // A Listener extension provides a channel that can receive signals on a different thread
@ -156,24 +155,8 @@ func LockLockable(ctx *Context, node *Node) error {
} }
// Setup a node to send the initial requirement link signal, then send the signal // Setup a node to send the initial requirement link signal, then send the signal
func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error { func LinkRequirement(ctx *Context, dependency NodeID, requirement NodeID) error {
dep_ext, err := GetExt[*LockableExt](dependency) return ctx.Send(dependency, dependency, NewLinkStartSignal("req", requirement))
if err != nil {
return err
}
_, exists := dep_ext.Requirements[requirement]
if exists == true {
return fmt.Errorf("%s is already a requirement of %s", requirement, dependency.ID)
}
_, exists = dep_ext.Dependencies[requirement]
if exists == true {
return fmt.Errorf("%s is a dependency of %s, cannot link as requirement", requirement, dependency.ID)
}
dep_ext.Requirements[requirement] = ReqState{"linking", "unlocked"}
return ctx.Send(dependency.ID, requirement, NewLinkSignal("link_as_req"))
} }
// Handle a LockSignal and update the extensions owner/requirement states // Handle a LockSignal and update the extensions owner/requirement states
@ -332,6 +315,38 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node
} }
} }
func (ext *LockableExt) HandleLinkStartSignal(ctx *Context, source NodeID, node *Node, signal LinkStartSignal) {
ctx.Log.Logf("lockable", "LINK__START_SIGNAL: %s->%s %+v", source, node.ID, signal)
link_type := signal.LinkType
target := signal.ID
switch link_type {
case "req":
state, exists := ext.Requirements[target]
_, dep_exists := ext.Dependencies[target]
if ext.Owner != nil {
ctx.Send(node.ID, source, NewLinkStartSignal("locked", target))
} else if ext.Owner != ext.PendingOwner {
if ext.PendingOwner == nil {
ctx.Send(node.ID, source, NewLinkStartSignal("unlocking", target))
} else {
ctx.Send(node.ID, source, NewLinkStartSignal("locking", target))
}
} else if exists == true {
if state.Link == "linking" {
ctx.Send(node.ID, source, NewLinkStartSignal("already_linking_req", target))
} else if state.Link == "linked" {
ctx.Send(node.ID, source, NewLinkStartSignal("already_req", target))
}
} else if dep_exists == true {
ctx.Send(node.ID, source, NewLinkStartSignal("already_dep", target))
} else {
ext.Requirements[target] = ReqState{"linking", "unlocked"}
ctx.Send(node.ID, target, NewLinkSignal("link_as_req"))
ctx.Send(node.ID, source, NewLinkStartSignal("linking_req", target))
}
}
}
// Handle LinkSignal, updating the extensions requirements and dependencies as necessary // Handle LinkSignal, updating the extensions requirements and dependencies as necessary
// TODO: Add unlink // TODO: Add unlink
func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
@ -448,6 +463,8 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
ext.HandleLinkSignal(ctx, source, node, signal.(StateSignal)) ext.HandleLinkSignal(ctx, source, node, signal.(StateSignal))
case LockSignalType: case LockSignalType:
ext.HandleLockSignal(ctx, source, node, signal.(StateSignal)) ext.HandleLockSignal(ctx, source, node, signal.(StateSignal))
case LinkStartSignalType:
ext.HandleLinkStartSignal(ctx, source, node, signal.(LinkStartSignal))
default: default:
} }
default: default:

@ -3,6 +3,7 @@ package graphvent
import ( import (
"testing" "testing"
"time" "time"
"fmt"
) )
const TestLockableType = NodeType("TEST_LOCKABLE") const TestLockableType = NodeType("TEST_LOCKABLE")
@ -37,7 +38,7 @@ func TestLink(t *testing.T) {
) )
// Link l2 as a requirement of l1 // Link l2 as a requirement of l1
err := LinkRequirement(ctx, l1, l2.ID) err := LinkRequirement(ctx, l1.ID, l2.ID)
fatalErr(t, err) fatalErr(t, err)
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req")
@ -50,11 +51,40 @@ func TestLink(t *testing.T) {
(*GraphTester)(t).WaitForStatus(ctx, l2_listener, "TEST", time.Millisecond*10, "No TEST on l2") (*GraphTester)(t).WaitForStatus(ctx, l2_listener, "TEST", time.Millisecond*10, "No TEST on l2")
} }
func TestLink10K(t *testing.T) {
ctx := lockableTestContext(t, []string{"test"})
NewLockable := func()(*Node, *ListenerExt) {
listener := NewListenerExt(100000)
l := NewNode(ctx, RandID(), TestLockableType, nil,
listener,
NewACLExt(lock_policy, link_policy),
NewLockableExt(),
)
return l, listener
}
l0, l0_listener := NewLockable()
lockables := make([]*Node, 10000)
for i, _ := range(lockables) {
lockables[i], _ = NewLockable()
LinkRequirement(ctx, l0.ID, lockables[i].ID)
}
ctx.Log.Logf("test", "CREATED_10K %d")
for i, _ := range(lockables) {
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*1000, fmt.Sprintf("No linked_as_req for %d", i))
}
ctx.Log.Logf("test", "LINKED_10K: %d")
}
func TestLock(t *testing.T) { func TestLock(t *testing.T) {
ctx := lockableTestContext(t, []string{}) ctx := lockableTestContext(t, []string{"lockable"})
NewLockable := func()(*Node, *ListenerExt) { NewLockable := func()(*Node, *ListenerExt) {
listener := NewListenerExt(10) listener := NewListenerExt(100)
l := NewNode(ctx, RandID(), TestLockableType, nil, l := NewNode(ctx, RandID(), TestLockableType, nil,
listener, listener,
NewACLExt(lock_policy, link_policy), NewACLExt(lock_policy, link_policy),
@ -72,33 +102,33 @@ func TestLock(t *testing.T) {
var err error var err error
err = LinkRequirement(ctx, l1, l2.ID) err = LinkRequirement(ctx, l1.ID, l2.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l1, l3.ID) err = LinkRequirement(ctx, l1.ID, l3.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l1, l4.ID) err = LinkRequirement(ctx, l1.ID, l4.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l1, l5.ID) err = LinkRequirement(ctx, l1.ID, l5.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l0, l2.ID) err = LinkRequirement(ctx, l0.ID, l2.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l0, l3.ID) err = LinkRequirement(ctx, l0.ID, l3.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l0, l4.ID) err = LinkRequirement(ctx, l0.ID, l4.ID)
fatalErr(t, err) fatalErr(t, err)
err = LinkRequirement(ctx, l0, l5.ID) err = LinkRequirement(ctx, l0.ID, l5.ID)
fatalErr(t, err) fatalErr(t, err)
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
(*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req")
err = LockLockable(ctx, l1) err = LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)

@ -289,7 +289,7 @@ func (node *Node) Serialize() ([]byte, error) {
// Create a new node in memory and start it's event loop // Create a new node in memory and start it's event loop
func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []QueuedSignal, extensions ...Extension) *Node { func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []QueuedSignal, extensions ...Extension) *Node {
_, exists := ctx.Nodes[id] _, exists := ctx.Node(id)
if exists == true { if exists == true {
panic("Attempted to create an existing node") panic("Attempted to create an existing node")
} }
@ -330,7 +330,7 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue
SignalQueue: queued_signals, SignalQueue: queued_signals,
NextSignal: next_signal, NextSignal: next_signal,
} }
ctx.Nodes[id] = node ctx.AddNode(id, node)
err := WriteNode(ctx, node) err := WriteNode(ctx, node)
if err != nil { if err != nil {
panic(err) panic(err)
@ -544,7 +544,7 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
SignalQueue: node_db.QueuedSignals, SignalQueue: node_db.QueuedSignals,
NextSignal: next_signal, NextSignal: next_signal,
} }
ctx.Nodes[id] = node ctx.AddNode(id, node)
found_extensions := []ExtType{} found_extensions := []ExtType{}
// Parse each of the extensions from the db // Parse each of the extensions from the db

@ -11,6 +11,7 @@ const (
LockSignalType = SignalType("LOCK") LockSignalType = SignalType("LOCK")
ReadSignalType = SignalType("READ") ReadSignalType = SignalType("READ")
ReadResultSignalType = SignalType("READ_RESULT") ReadResultSignalType = SignalType("READ_RESULT")
LinkStartSignalType = SignalType("LINK_START")
) )
type SignalDirection int type SignalDirection int
@ -135,6 +136,18 @@ func NewLinkSignal(state string) StateSignal {
} }
} }
type LinkStartSignal struct {
IDSignal
LinkType string `json:"link_type"`
}
func NewLinkStartSignal(link_type string, target NodeID) LinkStartSignal {
return LinkStartSignal{
IDSignal: NewIDSignal(LinkStartSignalType, Direct, target),
LinkType: link_type,
}
}
func NewLockSignal(state string) StateSignal { func NewLockSignal(state string) StateSignal {
return StateSignal{ return StateSignal{
BaseSignal: NewDirectSignal(LockSignalType), BaseSignal: NewDirectSignal(LockSignalType),
@ -169,3 +182,4 @@ func NewReadResultSignal(exts map[ExtType]map[string]interface{}) ReadResultSign
Extensions: exts, Extensions: exts,
} }
} }