diff --git a/context.go b/context.go index 4f89efb..3dad2c4 100644 --- a/context.go +++ b/context.go @@ -3,6 +3,7 @@ package graphvent import ( badger "github.com/dgraph-io/badger/v3" "fmt" + "sync" "errors" "runtime" "crypto/sha512" @@ -77,6 +78,7 @@ type Context struct { // Map between database type hashes and the registered info Types map[uint64]NodeInfo // Routing map to all the nodes local to this context + NodesLock sync.RWMutex Nodes map[NodeID]*Node } @@ -130,9 +132,23 @@ func (ctx *Context) RegisterExtension(ext_type ExtType, load_fn ExtensionLoadFun return nil } +func (ctx *Context) AddNode(id NodeID, node *Node) { + ctx.NodesLock.Lock() + ctx.Nodes[id] = node + ctx.NodesLock.Unlock() +} + +func (ctx *Context) Node(id NodeID) (*Node, bool) { + ctx.NodesLock.RLock() + node, exists := ctx.Nodes[id] + ctx.NodesLock.RUnlock() + return node, exists +} + // Get a node from the context, or load from the database if not loaded func (ctx *Context) GetNode(id NodeID) (*Node, error) { - target, exists := ctx.Nodes[id] + target, exists := ctx.Node(id) + if exists == false { var err error target, err = LoadNode(ctx, id) diff --git a/lockable.go b/lockable.go index bc50feb..5368b71 100644 --- a/lockable.go +++ b/lockable.go @@ -2,7 +2,6 @@ package graphvent import ( "encoding/json" - "fmt" ) // A Listener extension provides a channel that can receive signals on a different thread @@ -156,24 +155,8 @@ func LockLockable(ctx *Context, node *Node) error { } // Setup a node to send the initial requirement link signal, then send the signal -func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error { - dep_ext, err := GetExt[*LockableExt](dependency) - if err != nil { - return err - } - - _, exists := dep_ext.Requirements[requirement] - if exists == true { - return fmt.Errorf("%s is already a requirement of %s", requirement, dependency.ID) - } - - _, exists = dep_ext.Dependencies[requirement] - if exists == true { - return fmt.Errorf("%s is a dependency of %s, cannot link as requirement", requirement, dependency.ID) - } - - dep_ext.Requirements[requirement] = ReqState{"linking", "unlocked"} - return ctx.Send(dependency.ID, requirement, NewLinkSignal("link_as_req")) +func LinkRequirement(ctx *Context, dependency NodeID, requirement NodeID) error { + return ctx.Send(dependency, dependency, NewLinkStartSignal("req", requirement)) } // Handle a LockSignal and update the extensions owner/requirement states @@ -332,6 +315,38 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node } } +func (ext *LockableExt) HandleLinkStartSignal(ctx *Context, source NodeID, node *Node, signal LinkStartSignal) { + ctx.Log.Logf("lockable", "LINK__START_SIGNAL: %s->%s %+v", source, node.ID, signal) + link_type := signal.LinkType + target := signal.ID + switch link_type { + case "req": + state, exists := ext.Requirements[target] + _, dep_exists := ext.Dependencies[target] + if ext.Owner != nil { + ctx.Send(node.ID, source, NewLinkStartSignal("locked", target)) + } else if ext.Owner != ext.PendingOwner { + if ext.PendingOwner == nil { + ctx.Send(node.ID, source, NewLinkStartSignal("unlocking", target)) + } else { + ctx.Send(node.ID, source, NewLinkStartSignal("locking", target)) + } + } else if exists == true { + if state.Link == "linking" { + ctx.Send(node.ID, source, NewLinkStartSignal("already_linking_req", target)) + } else if state.Link == "linked" { + ctx.Send(node.ID, source, NewLinkStartSignal("already_req", target)) + } + } else if dep_exists == true { + ctx.Send(node.ID, source, NewLinkStartSignal("already_dep", target)) + } else { + ext.Requirements[target] = ReqState{"linking", "unlocked"} + ctx.Send(node.ID, target, NewLinkSignal("link_as_req")) + ctx.Send(node.ID, source, NewLinkStartSignal("linking_req", target)) + } + } +} + // Handle LinkSignal, updating the extensions requirements and dependencies as necessary // TODO: Add unlink func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { @@ -448,6 +463,8 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal ext.HandleLinkSignal(ctx, source, node, signal.(StateSignal)) case LockSignalType: ext.HandleLockSignal(ctx, source, node, signal.(StateSignal)) + case LinkStartSignalType: + ext.HandleLinkStartSignal(ctx, source, node, signal.(LinkStartSignal)) default: } default: diff --git a/lockable_test.go b/lockable_test.go index c9a159f..1a2548c 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -3,6 +3,7 @@ package graphvent import ( "testing" "time" + "fmt" ) const TestLockableType = NodeType("TEST_LOCKABLE") @@ -37,7 +38,7 @@ func TestLink(t *testing.T) { ) // Link l2 as a requirement of l1 - err := LinkRequirement(ctx, l1, l2.ID) + err := LinkRequirement(ctx, l1.ID, l2.ID) fatalErr(t, err) (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") @@ -50,11 +51,40 @@ func TestLink(t *testing.T) { (*GraphTester)(t).WaitForStatus(ctx, l2_listener, "TEST", time.Millisecond*10, "No TEST on l2") } +func TestLink10K(t *testing.T) { + ctx := lockableTestContext(t, []string{"test"}) + + NewLockable := func()(*Node, *ListenerExt) { + listener := NewListenerExt(100000) + l := NewNode(ctx, RandID(), TestLockableType, nil, + listener, + NewACLExt(lock_policy, link_policy), + NewLockableExt(), + ) + return l, listener + } + l0, l0_listener := NewLockable() + lockables := make([]*Node, 10000) + for i, _ := range(lockables) { + lockables[i], _ = NewLockable() + LinkRequirement(ctx, l0.ID, lockables[i].ID) + } + + ctx.Log.Logf("test", "CREATED_10K %d") + + + for i, _ := range(lockables) { + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*1000, fmt.Sprintf("No linked_as_req for %d", i)) + } + + ctx.Log.Logf("test", "LINKED_10K: %d") +} + func TestLock(t *testing.T) { - ctx := lockableTestContext(t, []string{}) + ctx := lockableTestContext(t, []string{"lockable"}) NewLockable := func()(*Node, *ListenerExt) { - listener := NewListenerExt(10) + listener := NewListenerExt(100) l := NewNode(ctx, RandID(), TestLockableType, nil, listener, NewACLExt(lock_policy, link_policy), @@ -72,33 +102,33 @@ func TestLock(t *testing.T) { var err error - err = LinkRequirement(ctx, l1, l2.ID) + err = LinkRequirement(ctx, l1.ID, l2.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l1, l3.ID) + err = LinkRequirement(ctx, l1.ID, l3.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l1, l4.ID) + err = LinkRequirement(ctx, l1.ID, l4.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l1, l5.ID) + err = LinkRequirement(ctx, l1.ID, l5.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l0, l2.ID) + err = LinkRequirement(ctx, l0.ID, l2.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l0, l3.ID) + err = LinkRequirement(ctx, l0.ID, l3.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l0, l4.ID) + err = LinkRequirement(ctx, l0.ID, l4.ID) fatalErr(t, err) - err = LinkRequirement(ctx, l0, l5.ID) + err = LinkRequirement(ctx, l0.ID, l5.ID) fatalErr(t, err) - (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l1_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") - (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*10, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") + (*GraphTester)(t).WaitForState(ctx, l0_listener, LinkSignalType, "linked_as_req", time.Millisecond*100, "No linked_as_req") err = LockLockable(ctx, l1) fatalErr(t, err) diff --git a/node.go b/node.go index cdc9128..17360cb 100644 --- a/node.go +++ b/node.go @@ -289,7 +289,7 @@ func (node *Node) Serialize() ([]byte, error) { // Create a new node in memory and start it's event loop func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []QueuedSignal, extensions ...Extension) *Node { - _, exists := ctx.Nodes[id] + _, exists := ctx.Node(id) if exists == true { panic("Attempted to create an existing node") } @@ -330,7 +330,7 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue SignalQueue: queued_signals, NextSignal: next_signal, } - ctx.Nodes[id] = node + ctx.AddNode(id, node) err := WriteNode(ctx, node) if err != nil { panic(err) @@ -544,7 +544,7 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) { SignalQueue: node_db.QueuedSignals, NextSignal: next_signal, } - ctx.Nodes[id] = node + ctx.AddNode(id, node) found_extensions := []ExtType{} // Parse each of the extensions from the db diff --git a/signal.go b/signal.go index 971d1e0..3353651 100644 --- a/signal.go +++ b/signal.go @@ -11,6 +11,7 @@ const ( LockSignalType = SignalType("LOCK") ReadSignalType = SignalType("READ") ReadResultSignalType = SignalType("READ_RESULT") + LinkStartSignalType = SignalType("LINK_START") ) type SignalDirection int @@ -135,6 +136,18 @@ func NewLinkSignal(state string) StateSignal { } } +type LinkStartSignal struct { + IDSignal + LinkType string `json:"link_type"` +} + +func NewLinkStartSignal(link_type string, target NodeID) LinkStartSignal { + return LinkStartSignal{ + IDSignal: NewIDSignal(LinkStartSignalType, Direct, target), + LinkType: link_type, + } +} + func NewLockSignal(state string) StateSignal { return StateSignal{ BaseSignal: NewDirectSignal(LockSignalType), @@ -169,3 +182,4 @@ func NewReadResultSignal(exts map[ExtType]map[string]interface{}) ReadResultSign Extensions: exts, } } +