|
|
|
@ -11,7 +11,6 @@ import (
|
|
|
|
|
|
|
|
|
|
// IDs are how nodes are uniquely identified, and can be serialized for the database
|
|
|
|
|
type NodeID string
|
|
|
|
|
|
|
|
|
|
func (id NodeID) Serialize() []byte {
|
|
|
|
|
return []byte(id)
|
|
|
|
|
}
|
|
|
|
@ -26,7 +25,7 @@ func (node_type NodeType) Hash() uint64 {
|
|
|
|
|
return binary.BigEndian.Uint64(bytes[(len(bytes)-9):(len(bytes)-1)])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generate a random id
|
|
|
|
|
// Generate a random NodeID
|
|
|
|
|
func RandID() NodeID {
|
|
|
|
|
uuid_str := uuid.New().String()
|
|
|
|
|
return NodeID(uuid_str)
|
|
|
|
@ -38,11 +37,15 @@ type Node interface {
|
|
|
|
|
sync.Locker
|
|
|
|
|
RLock()
|
|
|
|
|
RUnlock()
|
|
|
|
|
// Serialize the Node for the database
|
|
|
|
|
Serialize() ([]byte, error)
|
|
|
|
|
ID() NodeID
|
|
|
|
|
Type() NodeType
|
|
|
|
|
// Send a GraphSignal to the node, requires that the node is locked for read so that it can propagate
|
|
|
|
|
Signal(ctx *Context, signal GraphSignal, nodes NodeMap) error
|
|
|
|
|
// Register a channel to receive updates sent to the node
|
|
|
|
|
RegisterChannel(id NodeID, listener chan GraphSignal)
|
|
|
|
|
// Unregister a channel from receiving updates sent to the node
|
|
|
|
|
UnregisterChannel(id NodeID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -76,6 +79,8 @@ func (node * GraphNode) Type() NodeType {
|
|
|
|
|
return NodeType("graph_node")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Propagate the signal to registered listeners, if a listener isn't ready to receive the update
|
|
|
|
|
// send it a notification that it was closed and then close it
|
|
|
|
|
func (node * GraphNode) Signal(ctx *Context, signal GraphSignal, nodes NodeMap) error {
|
|
|
|
|
ctx.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String())
|
|
|
|
|
node.listeners_lock.Lock()
|
|
|
|
@ -129,8 +134,11 @@ func NewGraphNode(id NodeID) GraphNode {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Magic first four bytes of serialized DB content, stored big endian
|
|
|
|
|
const NODE_DB_MAGIC = 0x2491df14
|
|
|
|
|
// Total length of the node database header, has magic to verify and type_hash to map to load function
|
|
|
|
|
const NODE_DB_HEADER_LEN = 12
|
|
|
|
|
// A DBHeader is parsed from the first NODE_DB_HEADER_LEN bytes of a serialized DB node
|
|
|
|
|
type DBHeader struct {
|
|
|
|
|
Magic uint32
|
|
|
|
|
TypeHash uint64
|
|
|
|
@ -154,7 +162,8 @@ func NewDBHeader(node_type NodeType) DBHeader {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getNodeBytes(ctx * Context, node Node) ([]byte, error) {
|
|
|
|
|
// Internal function to serialize a node and wrap it with the DB Header
|
|
|
|
|
func getNodeBytes(node Node) ([]byte, error) {
|
|
|
|
|
if node == nil {
|
|
|
|
|
return nil, fmt.Errorf("DB_SERIALIZE_ERROR: cannot serialize nil node")
|
|
|
|
|
}
|
|
|
|
@ -170,25 +179,6 @@ func getNodeBytes(ctx * Context, node Node) ([]byte, error) {
|
|
|
|
|
return db_data, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write a node to the database
|
|
|
|
|
func WriteNode(ctx * Context, node Node) error {
|
|
|
|
|
ctx.Log.Logf("db", "DB_WRITE: %+v", node)
|
|
|
|
|
|
|
|
|
|
node_bytes, err := getNodeBytes(ctx, node)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id_ser := node.ID().Serialize()
|
|
|
|
|
|
|
|
|
|
err = ctx.DB.Update(func(txn *badger.Txn) error {
|
|
|
|
|
err := txn.Set(id_ser, node_bytes)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write multiple nodes to the database in a single transaction
|
|
|
|
|
func WriteNodes(ctx * Context, nodes NodeMap) error {
|
|
|
|
|
ctx.Log.Logf("db", "DB_WRITES: %d", len(nodes))
|
|
|
|
@ -200,7 +190,7 @@ func WriteNodes(ctx * Context, nodes NodeMap) error {
|
|
|
|
|
serialized_ids := make([][]byte, len(nodes))
|
|
|
|
|
i := 0
|
|
|
|
|
for _, node := range(nodes) {
|
|
|
|
|
node_bytes, err := getNodeBytes(ctx, node)
|
|
|
|
|
node_bytes, err := getNodeBytes(node)
|
|
|
|
|
ctx.Log.Logf("db", "DB_WRITE: %+v", node)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
@ -227,7 +217,7 @@ func WriteNodes(ctx * Context, nodes NodeMap) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get the bytes associates with `id` in the database, or error
|
|
|
|
|
// Get the bytes associates with `id` from the database after unwrapping the header, or error
|
|
|
|
|
func readNodeBytes(ctx * Context, id NodeID) (uint64, []byte, error) {
|
|
|
|
|
var bytes []byte
|
|
|
|
|
err := ctx.DB.View(func(txn *badger.Txn) error {
|
|
|
|
@ -267,11 +257,15 @@ func readNodeBytes(ctx * Context, id NodeID) (uint64, []byte, error) {
|
|
|
|
|
return header.TypeHash, node_bytes, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Load a Node from the database by ID
|
|
|
|
|
func LoadNode(ctx * Context, id NodeID) (Node, error) {
|
|
|
|
|
nodes := NodeMap{}
|
|
|
|
|
return LoadNodeRecurse(ctx, id, nodes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Recursively load a node from the database.
|
|
|
|
|
// It's expected that node_type.Load adds the newly loaded node to nodes before calling LoadNodeRecurse again.
|
|
|
|
|
func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) {
|
|
|
|
|
node, exists := nodes[id]
|
|
|
|
|
if exists == false {
|
|
|
|
@ -299,6 +293,7 @@ func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) {
|
|
|
|
|
return node, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Internal function to check for a duplicate node in a slice by ID
|
|
|
|
|
func checkForDuplicate(nodes []Node) error {
|
|
|
|
|
found := map[NodeID]bool{}
|
|
|
|
|
for _, node := range(nodes) {
|
|
|
|
@ -315,6 +310,7 @@ func checkForDuplicate(nodes []Node) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Convert any slice of types that implement Node to a []Node
|
|
|
|
|
func NodeList[K Node](list []K) []Node {
|
|
|
|
|
nodes := make([]Node, len(list))
|
|
|
|
|
for i, node := range(list) {
|
|
|
|
@ -325,10 +321,14 @@ func NodeList[K Node](list []K) []Node {
|
|
|
|
|
|
|
|
|
|
type NodeMap map[NodeID]Node
|
|
|
|
|
type NodesFn func(nodes NodeMap) error
|
|
|
|
|
// Initiate a read context for nodes and call nodes_fn with init_nodes locked for read
|
|
|
|
|
func UseStates(ctx * Context, init_nodes []Node, nodes_fn NodesFn) error {
|
|
|
|
|
nodes := NodeMap{}
|
|
|
|
|
return UseMoreStates(ctx, init_nodes, nodes, nodes_fn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Add nodes to an existing read context and call nodes_fn with new_nodes locked for read
|
|
|
|
|
func UseMoreStates(ctx * Context, new_nodes []Node, nodes NodeMap, nodes_fn NodesFn) error {
|
|
|
|
|
err := checkForDuplicate(new_nodes)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -355,6 +355,7 @@ func UseMoreStates(ctx * Context, new_nodes []Node, nodes NodeMap, nodes_fn Node
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initiate a write context for nodes and call nodes_fn with nodes locked for read
|
|
|
|
|
func UpdateStates(ctx * Context, nodes []Node, nodes_fn NodesFn) error {
|
|
|
|
|
locked_nodes := NodeMap{}
|
|
|
|
|
err := UpdateMoreStates(ctx, nodes, locked_nodes, nodes_fn)
|
|
|
|
@ -367,6 +368,8 @@ func UpdateStates(ctx * Context, nodes []Node, nodes_fn NodesFn) error {
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add nodes to an existing write context and call nodes_fn with nodes locked for read
|
|
|
|
|
func UpdateMoreStates(ctx * Context, nodes []Node, locked_nodes NodeMap, nodes_fn NodesFn) error {
|
|
|
|
|
for _, node := range(nodes) {
|
|
|
|
|
_, locked := locked_nodes[node.ID()]
|
|
|
|
@ -379,6 +382,7 @@ func UpdateMoreStates(ctx * Context, nodes []Node, locked_nodes NodeMap, nodes_f
|
|
|
|
|
return nodes_fn(locked_nodes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create a new channel with a buffer the size of buffer, and register it to node with the id
|
|
|
|
|
func UpdateChannel(node Node, buffer int, id NodeID) chan GraphSignal {
|
|
|
|
|
if node == nil {
|
|
|
|
|
panic("Cannot get an update channel to nil")
|
|
|
|
|