diff --git a/context.go b/context.go index 3dad2c4..9b3eeee 100644 --- a/context.go +++ b/context.go @@ -76,7 +76,7 @@ type Context struct { // Map between database extension hashes and the registered info Extensions map[uint64]ExtensionInfo // Map between database type hashes and the registered info - Types map[uint64]NodeInfo + Types map[uint64]*NodeInfo // Routing map to all the nodes local to this context NodesLock sync.RWMutex Nodes map[NodeID]*Node @@ -105,7 +105,7 @@ func (ctx *Context) RegisterNodeType(node_type NodeType, extensions []ExtType) e ext_found[extension] = true } - ctx.Types[type_hash] = NodeInfo{ + ctx.Types[type_hash] = &NodeInfo{ Type: node_type, Extensions: extensions, } @@ -192,7 +192,7 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { DB: db, Log: log, Extensions: map[uint64]ExtensionInfo{}, - Types: map[uint64]NodeInfo{}, + Types: map[uint64]*NodeInfo{}, Nodes: map[NodeID]*Node{}, } diff --git a/gql_test.go b/gql_test.go index bb19016..6eec99e 100644 --- a/gql_test.go +++ b/gql_test.go @@ -15,7 +15,7 @@ func TestGQLDB(t * testing.T) { TestUserNodeType := NodeType("TEST_USER") err := ctx.RegisterNodeType(TestUserNodeType, []ExtType{}) fatalErr(t, err) - u1 := NewNode(ctx, RandID(), TestUserNodeType, nil) + u1 := NewNode(ctx, RandID(), TestUserNodeType, 10, nil) ctx.Log.Logf("test", "U1_ID: %s", u1.ID) @@ -24,7 +24,7 @@ func TestGQLDB(t * testing.T) { gql_ext := NewGQLExt(":0", ecdh.P256(), key, nil, nil) listener_ext := NewListenerExt(10) - gql := NewNode(ctx, RandID(), GQLNodeType, nil, + gql := NewNode(ctx, RandID(), GQLNodeType, 10, nil, gql_ext, listener_ext, NewACLExt(), diff --git a/graph_test.go b/graph_test.go index 6e58216..29d5d35 100644 --- a/graph_test.go +++ b/graph_test.go @@ -115,6 +115,7 @@ func NewSimpleListener(ctx *Context, buffer int) (*Node, *ListenerExt) { listener := NewNode(ctx, RandID(), SimpleListenerNodeType, + 10, nil, listener_extension, NewACLExt(policy), diff --git a/lockable_test.go b/lockable_test.go index 787448f..b683409 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -25,13 +25,13 @@ func TestLink(t *testing.T) { ctx := lockableTestContext(t, []string{}) l1_listener := NewListenerExt(10) - l1 := NewNode(ctx, RandID(), TestLockableType, nil, + l1 := NewNode(ctx, RandID(), TestLockableType, 10, nil, l1_listener, NewACLExt(link_policy), NewLockableExt(), ) l2_listener := NewListenerExt(10) - l2 := NewNode(ctx, RandID(), TestLockableType, nil, + l2 := NewNode(ctx, RandID(), TestLockableType, 10, nil, l2_listener, NewACLExt(link_policy), NewLockableExt(), @@ -55,7 +55,7 @@ func TestLink10K(t *testing.T) { ctx := lockableTestContext(t, []string{"test"}) NewLockable := func()(*Node) { - l := NewNode(ctx, RandID(), TestLockableType, nil, + l := NewNode(ctx, RandID(), TestLockableType, 10, nil, NewACLExt(lock_policy, link_policy), NewLockableExt(), ) @@ -64,7 +64,7 @@ func TestLink10K(t *testing.T) { NewListener := func()(*Node, *ListenerExt) { listener := NewListenerExt(100000) - l := NewNode(ctx, RandID(), TestLockableType, nil, + l := NewNode(ctx, RandID(), TestLockableType, 256, nil, listener, NewACLExt(lock_policy, link_policy), NewLockableExt(), @@ -94,7 +94,7 @@ func TestLock(t *testing.T) { NewLockable := func()(*Node, *ListenerExt) { listener := NewListenerExt(100) - l := NewNode(ctx, RandID(), TestLockableType, nil, + l := NewNode(ctx, RandID(), TestLockableType, 10, nil, listener, NewACLExt(lock_policy, link_policy), NewLockableExt(), diff --git a/node.go b/node.go index 5c2ea3c..319b255 100644 --- a/node.go +++ b/node.go @@ -13,12 +13,10 @@ import ( ) const ( - // Size of node message channels - NODE_MSG_CHAN_DEFAULT = 1024 // Magic first four bytes of serialized DB content, stored big endian NODE_DB_MAGIC = 0x2491df14 // Total length of the node database header, has magic to verify and type_hash to map to load function - NODE_DB_HEADER_LEN = 20 + NODE_DB_HEADER_LEN = 24 EXTENSION_DB_HEADER_LEN = 16 ) @@ -103,6 +101,8 @@ type Node struct { // Channel for this node to receive messages from the Context MsgChan chan Msg + // Size of MsgChan + BufferSize uint32 // Channel for this node to process delayed signals TimeoutChan <-chan time.Time @@ -272,6 +272,7 @@ func (node *Node) Serialize() ([]byte, error) { Header: NodeDBHeader{ Magic: NODE_DB_MAGIC, TypeHash: Hash(node.Type), + BufferSize: node.BufferSize, NumExtensions: uint32(len(extensions)), NumQueuedSignals: uint32(len(node.SignalQueue)), }, @@ -299,7 +300,7 @@ func (node *Node) Serialize() ([]byte, error) { } // Create a new node in memory and start it's event loop -func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []QueuedSignal, extensions ...Extension) *Node { +func NewNode(ctx *Context, id NodeID, node_type NodeType, buffer_size uint32, queued_signals []QueuedSignal, extensions ...Extension) *Node { _, exists := ctx.Node(id) if exists == true { panic("Attempted to create an existing node") @@ -336,7 +337,8 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue ID: id, Type: node_type, Extensions: ext_map, - MsgChan: make(chan Msg, NODE_MSG_CHAN_DEFAULT), + MsgChan: make(chan Msg, buffer_size), + BufferSize: buffer_size, TimeoutChan: timeout_chan, SignalQueue: queued_signals, NextSignal: next_signal, @@ -381,6 +383,7 @@ type NodeDBHeader struct { Magic uint32 NumExtensions uint32 NumQueuedSignals uint32 + BufferSize uint32 TypeHash uint64 } @@ -399,7 +402,8 @@ func NewNodeDB(data []byte) (NodeDB, error) { magic := binary.BigEndian.Uint32(data[0:4]) num_extensions := binary.BigEndian.Uint32(data[4:8]) num_queued_signals := binary.BigEndian.Uint32(data[8:12]) - node_type_hash := binary.BigEndian.Uint64(data[12:20]) + buffer_size := binary.BigEndian.Uint32(data[12:16]) + node_type_hash := binary.BigEndian.Uint64(data[16:24]) ptr += NODE_DB_HEADER_LEN @@ -438,6 +442,7 @@ func NewNodeDB(data []byte) (NodeDB, error) { Header: NodeDBHeader{ Magic: magic, TypeHash: node_type_hash, + BufferSize: buffer_size, NumExtensions: num_extensions, NumQueuedSignals: num_queued_signals, }, @@ -455,7 +460,8 @@ func (header NodeDBHeader) Serialize() []byte { binary.BigEndian.PutUint32(ret[0:4], header.Magic) binary.BigEndian.PutUint32(ret[4:8], header.NumExtensions) binary.BigEndian.PutUint32(ret[8:12], header.NumQueuedSignals) - binary.BigEndian.PutUint64(ret[12:20], header.TypeHash) + binary.BigEndian.PutUint32(ret[12:16], header.BufferSize) + binary.BigEndian.PutUint64(ret[16:24], header.TypeHash) return ret } @@ -545,7 +551,8 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) { ID: id, Type: node_type.Type, Extensions: map[ExtType]Extension{}, - MsgChan: make(chan Msg, NODE_MSG_CHAN_DEFAULT), + MsgChan: make(chan Msg, node_db.Header.BufferSize), + BufferSize: node_db.Header.BufferSize, TimeoutChan: timeout_chan, SignalQueue: node_db.QueuedSignals, NextSignal: next_signal, diff --git a/node_test.go b/node_test.go index f776fa2..3a5019f 100644 --- a/node_test.go +++ b/node_test.go @@ -11,7 +11,7 @@ func TestNodeDB(t *testing.T) { err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType}) fatalErr(t, err) - node := NewNode(ctx, RandID(), node_type, nil, NewGroupExt(nil)) + node := NewNode(ctx, RandID(), node_type, 10, nil, NewGroupExt(nil)) ctx.Nodes = NodeMap{} _, err = ctx.GetNode(node.ID) @@ -34,12 +34,12 @@ func TestNodeRead(t *testing.T) { n1_id: Actions{MakeAction(ReadResultSignalType, "+")}, }) n2_listener := NewListenerExt(10) - n2 := NewNode(ctx, n2_id, node_type, nil, NewACLExt(n2_policy), NewGroupExt(nil), n2_listener) + n2 := NewNode(ctx, n2_id, node_type, 10, nil, NewACLExt(n2_policy), NewGroupExt(nil), n2_listener) n1_policy := NewPerNodePolicy(map[NodeID]Actions{ n2_id: Actions{MakeAction(ReadSignalType, "+")}, }) - n1 := NewNode(ctx, n1_id, node_type, nil, NewACLExt(n1_policy), NewGroupExt(nil)) + n1 := NewNode(ctx, n1_id, node_type, 10, nil, NewACLExt(n1_policy), NewGroupExt(nil)) ctx.Send(n2.ID, n1.ID, NewReadSignal(map[ExtType][]string{ GroupExtType: []string{"members"},