Added message buffer size to node header and NewNode

gql_cataclysm
noah metz 2023-07-28 13:45:14 -06:00
parent 5678c79798
commit b92cebbe74
6 changed files with 29 additions and 21 deletions

@ -76,7 +76,7 @@ type Context struct {
// Map between database extension hashes and the registered info // Map between database extension hashes and the registered info
Extensions map[uint64]ExtensionInfo Extensions map[uint64]ExtensionInfo
// Map between database type hashes and the registered info // Map between database type hashes and the registered info
Types map[uint64]NodeInfo Types map[uint64]*NodeInfo
// Routing map to all the nodes local to this context // Routing map to all the nodes local to this context
NodesLock sync.RWMutex NodesLock sync.RWMutex
Nodes map[NodeID]*Node Nodes map[NodeID]*Node
@ -105,7 +105,7 @@ func (ctx *Context) RegisterNodeType(node_type NodeType, extensions []ExtType) e
ext_found[extension] = true ext_found[extension] = true
} }
ctx.Types[type_hash] = NodeInfo{ ctx.Types[type_hash] = &NodeInfo{
Type: node_type, Type: node_type,
Extensions: extensions, Extensions: extensions,
} }
@ -192,7 +192,7 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
DB: db, DB: db,
Log: log, Log: log,
Extensions: map[uint64]ExtensionInfo{}, Extensions: map[uint64]ExtensionInfo{},
Types: map[uint64]NodeInfo{}, Types: map[uint64]*NodeInfo{},
Nodes: map[NodeID]*Node{}, Nodes: map[NodeID]*Node{},
} }

@ -15,7 +15,7 @@ func TestGQLDB(t * testing.T) {
TestUserNodeType := NodeType("TEST_USER") TestUserNodeType := NodeType("TEST_USER")
err := ctx.RegisterNodeType(TestUserNodeType, []ExtType{}) err := ctx.RegisterNodeType(TestUserNodeType, []ExtType{})
fatalErr(t, err) fatalErr(t, err)
u1 := NewNode(ctx, RandID(), TestUserNodeType, nil) u1 := NewNode(ctx, RandID(), TestUserNodeType, 10, nil)
ctx.Log.Logf("test", "U1_ID: %s", u1.ID) ctx.Log.Logf("test", "U1_ID: %s", u1.ID)
@ -24,7 +24,7 @@ func TestGQLDB(t * testing.T) {
gql_ext := NewGQLExt(":0", ecdh.P256(), key, nil, nil) gql_ext := NewGQLExt(":0", ecdh.P256(), key, nil, nil)
listener_ext := NewListenerExt(10) listener_ext := NewListenerExt(10)
gql := NewNode(ctx, RandID(), GQLNodeType, nil, gql := NewNode(ctx, RandID(), GQLNodeType, 10, nil,
gql_ext, gql_ext,
listener_ext, listener_ext,
NewACLExt(), NewACLExt(),

@ -115,6 +115,7 @@ func NewSimpleListener(ctx *Context, buffer int) (*Node, *ListenerExt) {
listener := NewNode(ctx, listener := NewNode(ctx,
RandID(), RandID(),
SimpleListenerNodeType, SimpleListenerNodeType,
10,
nil, nil,
listener_extension, listener_extension,
NewACLExt(policy), NewACLExt(policy),

@ -25,13 +25,13 @@ func TestLink(t *testing.T) {
ctx := lockableTestContext(t, []string{}) ctx := lockableTestContext(t, []string{})
l1_listener := NewListenerExt(10) l1_listener := NewListenerExt(10)
l1 := NewNode(ctx, RandID(), TestLockableType, nil, l1 := NewNode(ctx, RandID(), TestLockableType, 10, nil,
l1_listener, l1_listener,
NewACLExt(link_policy), NewACLExt(link_policy),
NewLockableExt(), NewLockableExt(),
) )
l2_listener := NewListenerExt(10) l2_listener := NewListenerExt(10)
l2 := NewNode(ctx, RandID(), TestLockableType, nil, l2 := NewNode(ctx, RandID(), TestLockableType, 10, nil,
l2_listener, l2_listener,
NewACLExt(link_policy), NewACLExt(link_policy),
NewLockableExt(), NewLockableExt(),
@ -55,7 +55,7 @@ func TestLink10K(t *testing.T) {
ctx := lockableTestContext(t, []string{"test"}) ctx := lockableTestContext(t, []string{"test"})
NewLockable := func()(*Node) { NewLockable := func()(*Node) {
l := NewNode(ctx, RandID(), TestLockableType, nil, l := NewNode(ctx, RandID(), TestLockableType, 10, nil,
NewACLExt(lock_policy, link_policy), NewACLExt(lock_policy, link_policy),
NewLockableExt(), NewLockableExt(),
) )
@ -64,7 +64,7 @@ func TestLink10K(t *testing.T) {
NewListener := func()(*Node, *ListenerExt) { NewListener := func()(*Node, *ListenerExt) {
listener := NewListenerExt(100000) listener := NewListenerExt(100000)
l := NewNode(ctx, RandID(), TestLockableType, nil, l := NewNode(ctx, RandID(), TestLockableType, 256, nil,
listener, listener,
NewACLExt(lock_policy, link_policy), NewACLExt(lock_policy, link_policy),
NewLockableExt(), NewLockableExt(),
@ -94,7 +94,7 @@ func TestLock(t *testing.T) {
NewLockable := func()(*Node, *ListenerExt) { NewLockable := func()(*Node, *ListenerExt) {
listener := NewListenerExt(100) listener := NewListenerExt(100)
l := NewNode(ctx, RandID(), TestLockableType, nil, l := NewNode(ctx, RandID(), TestLockableType, 10, nil,
listener, listener,
NewACLExt(lock_policy, link_policy), NewACLExt(lock_policy, link_policy),
NewLockableExt(), NewLockableExt(),

@ -13,12 +13,10 @@ import (
) )
const ( const (
// Size of node message channels
NODE_MSG_CHAN_DEFAULT = 1024
// Magic first four bytes of serialized DB content, stored big endian // Magic first four bytes of serialized DB content, stored big endian
NODE_DB_MAGIC = 0x2491df14 NODE_DB_MAGIC = 0x2491df14
// Total length of the node database header, has magic to verify and type_hash to map to load function // Total length of the node database header, has magic to verify and type_hash to map to load function
NODE_DB_HEADER_LEN = 20 NODE_DB_HEADER_LEN = 24
EXTENSION_DB_HEADER_LEN = 16 EXTENSION_DB_HEADER_LEN = 16
) )
@ -103,6 +101,8 @@ type Node struct {
// Channel for this node to receive messages from the Context // Channel for this node to receive messages from the Context
MsgChan chan Msg MsgChan chan Msg
// Size of MsgChan
BufferSize uint32
// Channel for this node to process delayed signals // Channel for this node to process delayed signals
TimeoutChan <-chan time.Time TimeoutChan <-chan time.Time
@ -272,6 +272,7 @@ func (node *Node) Serialize() ([]byte, error) {
Header: NodeDBHeader{ Header: NodeDBHeader{
Magic: NODE_DB_MAGIC, Magic: NODE_DB_MAGIC,
TypeHash: Hash(node.Type), TypeHash: Hash(node.Type),
BufferSize: node.BufferSize,
NumExtensions: uint32(len(extensions)), NumExtensions: uint32(len(extensions)),
NumQueuedSignals: uint32(len(node.SignalQueue)), NumQueuedSignals: uint32(len(node.SignalQueue)),
}, },
@ -299,7 +300,7 @@ func (node *Node) Serialize() ([]byte, error) {
} }
// Create a new node in memory and start it's event loop // Create a new node in memory and start it's event loop
func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []QueuedSignal, extensions ...Extension) *Node { func NewNode(ctx *Context, id NodeID, node_type NodeType, buffer_size uint32, queued_signals []QueuedSignal, extensions ...Extension) *Node {
_, exists := ctx.Node(id) _, exists := ctx.Node(id)
if exists == true { if exists == true {
panic("Attempted to create an existing node") panic("Attempted to create an existing node")
@ -336,7 +337,8 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue
ID: id, ID: id,
Type: node_type, Type: node_type,
Extensions: ext_map, Extensions: ext_map,
MsgChan: make(chan Msg, NODE_MSG_CHAN_DEFAULT), MsgChan: make(chan Msg, buffer_size),
BufferSize: buffer_size,
TimeoutChan: timeout_chan, TimeoutChan: timeout_chan,
SignalQueue: queued_signals, SignalQueue: queued_signals,
NextSignal: next_signal, NextSignal: next_signal,
@ -381,6 +383,7 @@ type NodeDBHeader struct {
Magic uint32 Magic uint32
NumExtensions uint32 NumExtensions uint32
NumQueuedSignals uint32 NumQueuedSignals uint32
BufferSize uint32
TypeHash uint64 TypeHash uint64
} }
@ -399,7 +402,8 @@ func NewNodeDB(data []byte) (NodeDB, error) {
magic := binary.BigEndian.Uint32(data[0:4]) magic := binary.BigEndian.Uint32(data[0:4])
num_extensions := binary.BigEndian.Uint32(data[4:8]) num_extensions := binary.BigEndian.Uint32(data[4:8])
num_queued_signals := binary.BigEndian.Uint32(data[8:12]) num_queued_signals := binary.BigEndian.Uint32(data[8:12])
node_type_hash := binary.BigEndian.Uint64(data[12:20]) buffer_size := binary.BigEndian.Uint32(data[12:16])
node_type_hash := binary.BigEndian.Uint64(data[16:24])
ptr += NODE_DB_HEADER_LEN ptr += NODE_DB_HEADER_LEN
@ -438,6 +442,7 @@ func NewNodeDB(data []byte) (NodeDB, error) {
Header: NodeDBHeader{ Header: NodeDBHeader{
Magic: magic, Magic: magic,
TypeHash: node_type_hash, TypeHash: node_type_hash,
BufferSize: buffer_size,
NumExtensions: num_extensions, NumExtensions: num_extensions,
NumQueuedSignals: num_queued_signals, NumQueuedSignals: num_queued_signals,
}, },
@ -455,7 +460,8 @@ func (header NodeDBHeader) Serialize() []byte {
binary.BigEndian.PutUint32(ret[0:4], header.Magic) binary.BigEndian.PutUint32(ret[0:4], header.Magic)
binary.BigEndian.PutUint32(ret[4:8], header.NumExtensions) binary.BigEndian.PutUint32(ret[4:8], header.NumExtensions)
binary.BigEndian.PutUint32(ret[8:12], header.NumQueuedSignals) binary.BigEndian.PutUint32(ret[8:12], header.NumQueuedSignals)
binary.BigEndian.PutUint64(ret[12:20], header.TypeHash) binary.BigEndian.PutUint32(ret[12:16], header.BufferSize)
binary.BigEndian.PutUint64(ret[16:24], header.TypeHash)
return ret return ret
} }
@ -545,7 +551,8 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
ID: id, ID: id,
Type: node_type.Type, Type: node_type.Type,
Extensions: map[ExtType]Extension{}, Extensions: map[ExtType]Extension{},
MsgChan: make(chan Msg, NODE_MSG_CHAN_DEFAULT), MsgChan: make(chan Msg, node_db.Header.BufferSize),
BufferSize: node_db.Header.BufferSize,
TimeoutChan: timeout_chan, TimeoutChan: timeout_chan,
SignalQueue: node_db.QueuedSignals, SignalQueue: node_db.QueuedSignals,
NextSignal: next_signal, NextSignal: next_signal,

@ -11,7 +11,7 @@ func TestNodeDB(t *testing.T) {
err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType}) err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType})
fatalErr(t, err) fatalErr(t, err)
node := NewNode(ctx, RandID(), node_type, nil, NewGroupExt(nil)) node := NewNode(ctx, RandID(), node_type, 10, nil, NewGroupExt(nil))
ctx.Nodes = NodeMap{} ctx.Nodes = NodeMap{}
_, err = ctx.GetNode(node.ID) _, err = ctx.GetNode(node.ID)
@ -34,12 +34,12 @@ func TestNodeRead(t *testing.T) {
n1_id: Actions{MakeAction(ReadResultSignalType, "+")}, n1_id: Actions{MakeAction(ReadResultSignalType, "+")},
}) })
n2_listener := NewListenerExt(10) n2_listener := NewListenerExt(10)
n2 := NewNode(ctx, n2_id, node_type, nil, NewACLExt(n2_policy), NewGroupExt(nil), n2_listener) n2 := NewNode(ctx, n2_id, node_type, 10, nil, NewACLExt(n2_policy), NewGroupExt(nil), n2_listener)
n1_policy := NewPerNodePolicy(map[NodeID]Actions{ n1_policy := NewPerNodePolicy(map[NodeID]Actions{
n2_id: Actions{MakeAction(ReadSignalType, "+")}, n2_id: Actions{MakeAction(ReadSignalType, "+")},
}) })
n1 := NewNode(ctx, n1_id, node_type, nil, NewACLExt(n1_policy), NewGroupExt(nil)) n1 := NewNode(ctx, n1_id, node_type, 10, nil, NewACLExt(n1_policy), NewGroupExt(nil))
ctx.Send(n2.ID, n1.ID, NewReadSignal(map[ExtType][]string{ ctx.Send(n2.ID, n1.ID, NewReadSignal(map[ExtType][]string{
GroupExtType: []string{"members"}, GroupExtType: []string{"members"},