Changed context to dynamically load nodes as they're signalled

gql_cataclysm
noah metz 2023-07-27 16:48:39 -06:00
parent 7ebb519cd0
commit 78c29d2f74
5 changed files with 71 additions and 30 deletions

@ -3,6 +3,7 @@ package graphvent
import ( import (
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
"fmt" "fmt"
"errors"
"runtime" "runtime"
) )
@ -86,12 +87,24 @@ func (ctx *Context) RegisterExtension(ext_type ExtType, load_fn ExtensionLoadFun
return nil return nil
} }
// Route a Signal to dest. Currently only local context routing is supported var NodeNotFoundError = errors.New("Node not found in DB")
func (ctx *Context) Send(source NodeID, dest NodeID, signal Signal) error {
target, exists := ctx.Nodes[dest] func (ctx *Context) GetNode(id NodeID) (*Node, error) {
target, exists := ctx.Nodes[id]
if exists == false { if exists == false {
return fmt.Errorf("%s does not exist, cannot signal it", dest) var err error
target, err = LoadNode(ctx, id)
if err != nil {
return nil, err
} }
}
return target, nil
}
// Route a Signal to dest. Currently only local context routing is supported
func (ctx *Context) Send(source NodeID, dest NodeID, signal Signal) error {
target, err := ctx.GetNode(dest)
if err == nil {
select { select {
case target.MsgChan <- Msg{source, signal}: case target.MsgChan <- Msg{source, signal}:
default: default:
@ -101,6 +114,11 @@ func (ctx *Context) Send(source NodeID, dest NodeID, signal Signal) error {
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", dest, stack_str) return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", dest, stack_str)
} }
return nil return nil
} else if errors.Is(err, NodeNotFoundError) {
// TODO: Handle finding nodes in other contexts
return err
}
return err
} }
// Create a new Context with the base library content added // Create a new Context with the base library content added

@ -34,7 +34,7 @@ func TestGQLDB(t * testing.T) {
err = ctx.Send(gql.ID, gql.ID, StopSignal) err = ctx.Send(gql.ID, gql.ID, StopSignal)
fatalErr(t, err) fatalErr(t, err)
(*GraphTester)(t).WaitForStatus(ctx, listener_ext.Chan, "stopped", 100*time.Millisecond, "Didn't receive stopped on listener") (*GraphTester)(t).WaitForStatus(ctx, listener_ext, "stopped", 100*time.Millisecond, "Didn't receive stopped on listener")
ser1, err := gql.Serialize() ser1, err := gql.Serialize()
ser2, err := u1.Serialize() ser2, err := u1.Serialize()
@ -49,7 +49,7 @@ func TestGQLDB(t * testing.T) {
fatalErr(t, err) fatalErr(t, err)
err = ctx.Send(gql_loaded.ID, gql_loaded.ID, StopSignal) err = ctx.Send(gql_loaded.ID, gql_loaded.ID, StopSignal)
fatalErr(t, err) fatalErr(t, err)
(*GraphTester)(t).WaitForStatus(ctx, listener_ext.Chan, "stopped", 100*time.Millisecond, "Didn't receive stopped on update_channel_2") (*GraphTester)(t).WaitForStatus(ctx, listener_ext, "stopped", 100*time.Millisecond, "Didn't receive stopped on update_channel_2")
} }

@ -13,11 +13,11 @@ import (
type GraphTester testing.T type GraphTester testing.T
const listner_timeout = 50 * time.Millisecond const listner_timeout = 50 * time.Millisecond
func (t * GraphTester) WaitForStatus(ctx * Context, listener chan Signal, status string, timeout time.Duration, str string) Signal { func (t * GraphTester) WaitForStatus(ctx * Context, listener *ListenerExt, status string, timeout time.Duration, str string) Signal {
timeout_channel := time.After(timeout) timeout_channel := time.After(timeout)
for true { for true {
select { select {
case signal := <- listener: case signal := <- listener.Chan:
if signal == nil { if signal == nil {
ctx.Log.Logf("test", "SIGNAL_CHANNEL_CLOSED: %s", listener) ctx.Log.Logf("test", "SIGNAL_CHANNEL_CLOSED: %s", listener)
t.Fatal(str) t.Fatal(str)
@ -25,6 +25,8 @@ func (t * GraphTester) WaitForStatus(ctx * Context, listener chan Signal, status
if signal.Type() == "status" { if signal.Type() == "status" {
sig, ok := signal.(StatusSignal) sig, ok := signal.(StatusSignal)
if ok == true { if ok == true {
ctx.Log.Logf("test", "Status received: %s", sig.Status) ctx.Log.Logf("test", "Status received: %s", sig.Status)
if sig.Status == status { if sig.Status == status {
return signal return signal
@ -42,10 +44,10 @@ func (t * GraphTester) WaitForStatus(ctx * Context, listener chan Signal, status
return nil return nil
} }
func (t * GraphTester) CheckForNone(listener chan Signal, str string) { func (t * GraphTester) CheckForNone(listener *ListenerExt, str string) {
timeout := time.After(listner_timeout) timeout := time.After(listner_timeout)
select { select {
case sig := <- listener: case sig := <- listener.Chan:
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatal(fmt.Sprintf("%s : %+v", str, sig)) t.Fatal(fmt.Sprintf("%s : %+v", str, sig))
case <-timeout: case <-timeout:

@ -4,10 +4,35 @@ import (
"testing" "testing"
) )
func TestLockableLink(t *testing.T) { const TestLockableType = NodeType("TEST_LOCKABLE")
func lockableTestContext(t *testing.T) *Context {
ctx := logTestContext(t, []string{"lockable", "signal"}) ctx := logTestContext(t, []string{"lockable", "signal"})
LockableType := NodeType("TEST_LOCKABLE")
err := ctx.RegisterNodeType(LockableType, []ExtType{LockableExtType}) err := ctx.RegisterNodeType(TestLockableType, []ExtType{ACLExtType, LockableExtType, ListenerExtType})
fatalErr(t, err) fatalErr(t, err)
return ctx
}
var link_policy = NewAllNodesPolicy([]string{"link", "status"})
func Test(t *testing.T) {
ctx := lockableTestContext(t)
l1_listener := NewListenerExt(10)
l1 := NewNode(ctx, RandID(), TestLockableType, nil,
l1_listener,
NewACLExt(&link_policy),
NewLockableExt(nil, nil, nil, nil),
)
l2_listener := NewListenerExt(10)
l2 := NewNode(ctx, RandID(), TestLockableType, nil,
l2_listener,
NewACLExt(&link_policy),
NewLockableExt(nil, nil, nil, nil),
)
ctx.Send(l1.ID, l2.ID, NewLinkSignal("start", l1.ID))
} }

@ -2,6 +2,7 @@ package graphvent
import ( import (
"time" "time"
"errors"
"reflect" "reflect"
"github.com/google/uuid" "github.com/google/uuid"
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
@ -458,14 +459,7 @@ func WriteNode(ctx *Context, node *Node) error {
} }
func LoadNode(ctx * Context, id NodeID) (*Node, error) { func LoadNode(ctx * Context, id NodeID) (*Node, error) {
ctx.Log.Logf("db", "LOOKING_FOR_NODE: %s", id)
node, exists := ctx.Nodes[id]
if exists == true {
ctx.Log.Logf("db", "NODE_ALREADY_LOADED: %s", id)
return node,nil
}
ctx.Log.Logf("db", "LOADING_NODE: %s", id) ctx.Log.Logf("db", "LOADING_NODE: %s", id)
var bytes []byte var bytes []byte
err := ctx.DB.View(func(txn *badger.Txn) error { err := ctx.DB.View(func(txn *badger.Txn) error {
item, err := txn.Get(id.Serialize()) item, err := txn.Get(id.Serialize())
@ -478,7 +472,9 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
return nil return nil
}) })
}) })
if err != nil { if errors.Is(err, badger.ErrKeyNotFound) {
return nil, NodeNotFoundError
}else if err != nil {
return nil, err return nil, err
} }
@ -494,7 +490,7 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
} }
next_signal, timeout_chan := SoonestSignal(node_db.QueuedSignals) next_signal, timeout_chan := SoonestSignal(node_db.QueuedSignals)
node = &Node{ node := &Node{
ID: id, ID: id,
Type: node_type.Type, Type: node_type.Type,
Extensions: map[ExtType]Extension{}, Extensions: map[ExtType]Extension{},