diff --git a/context.go b/context.go index da53138..ba0a4c1 100644 --- a/context.go +++ b/context.go @@ -7,15 +7,10 @@ import ( "fmt" ) -// For persistance, each node needs the following functions(* is a placeholder for the node/state type): -// Load*State - StateLoadFunc that returns the NodeState interface to attach to the node -// Load* - NodeLoadFunc that returns the GraphNode restored from it's loaded state - -// For convenience, the following functions are a good idea to define for composability: -// Restore*State - takes in the nodes serialized data to allow for easier nesting of inherited Load*State functions -// Save*State - serialize the node into it's json counterpart to be included as part of a larger json - +// NodeLoadFunc is the footprint of the function used to create a new node in memory from persisted bytes type NodeLoadFunc func(*Context, NodeID, []byte, NodeMap)(Node, error) + +// A NodeDef is a description of a node that can be added to a Context type NodeDef struct { Load NodeLoadFunc Type NodeType @@ -23,6 +18,7 @@ type NodeDef struct { Reflect reflect.Type } +// Create a new Node def, extracting the Type and Reflect from example func NewNodeDef(example Node, load_func NodeLoadFunc, gql_type *graphql.Object) NodeDef { return NodeDef{ Type: example.Type(), @@ -32,13 +28,19 @@ func NewNodeDef(example Node, load_func NodeLoadFunc, gql_type *graphql.Object) } } +// A Context is all the data needed to run a graphvent type Context struct { + // DB is the database connection used to load and write nodes DB * badger.DB + // Log is an interface used to record events happening Log Logger + // A mapping between type hashes and their corresponding node definitions Types map[uint64]NodeDef + // GQL substructure GQL GQLContext } +// Recreate the GQL schema after making changes func (ctx * Context) RebuildSchema() error { schemaConfig := graphql.SchemaConfig{ Types: ctx.GQL.TypeList, @@ -56,10 +58,12 @@ func (ctx * Context) RebuildSchema() error { return nil } +// Add a non-node type to the gql context func (ctx * Context) AddGQLType(gql_type graphql.Type) { ctx.GQL.TypeList = append(ctx.GQL.TypeList, gql_type) } +// Add a node to a context, returns an error if the def is invalid or already exists in the context func (ctx * Context) RegisterNodeType(def NodeDef) error { if def.Load == nil { return fmt.Errorf("Cannot register a node without a load function: %s", def.Type) @@ -95,19 +99,23 @@ func (ctx * Context) RegisterNodeType(def NodeDef) error { return nil } -type TypeList []graphql.Type +// Map of go types to graphql types type ObjTypeMap map[reflect.Type]*graphql.Object -type FieldMap map[string]*graphql.Field +// GQL Specific Context information type GQLContext struct { + // Generated GQL schema Schema graphql.Schema + // Interface types to compare against NodeType reflect.Type LockableType reflect.Type ThreadType reflect.Type - TypeList TypeList + // List of GQL types + TypeList []graphql.Type + // Interface type maps to map go types of specific interfaces to gql types ValidNodes ObjTypeMap ValidLockables ObjTypeMap ValidThreads ObjTypeMap @@ -117,6 +125,7 @@ type GQLContext struct { Subscription *graphql.Object } +// Create a new GQL context without any content func NewGQLContext() GQLContext { query := graphql.NewObject(graphql.ObjectConfig{ Name: "Query", @@ -135,7 +144,7 @@ func NewGQLContext() GQLContext { ctx := GQLContext{ Schema: graphql.Schema{}, - TypeList: TypeList{}, + TypeList: []graphql.Type{}, ValidNodes: ObjTypeMap{}, NodeType: reflect.TypeOf((*Node)(nil)).Elem(), ValidThreads: ObjTypeMap{}, @@ -150,6 +159,7 @@ func NewGQLContext() GQLContext { return ctx } +// Create a new Context with all the library content added func NewContext(db * badger.DB, log Logger) * Context { ctx := &Context{ GQL: NewGQLContext(), diff --git a/gql_test.go b/gql_test.go index 8560195..1dd7323 100644 --- a/gql_test.go +++ b/gql_test.go @@ -54,7 +54,7 @@ func TestGQLDBLoad(t * testing.T) { gql := &gql_r info := NewGQLThreadInfo(true, "start", "restore") - err := UpdateStates(ctx, []Node{gql, t1}, func(nodes NodeMap) error { + err := UpdateStates(ctx, []Node{gql, t1, l1}, func(nodes NodeMap) error { err := LinkLockables(ctx, gql, []Lockable{l1}, nodes) if err != nil { return err diff --git a/lockable.go b/lockable.go index 5a79dd3..069ab9a 100644 --- a/lockable.go +++ b/lockable.go @@ -213,7 +213,7 @@ func (lockable * SimpleLockable) CanUnlock(new_owner Lockable) error { return nil } -// lockable must already be locked for read +// Lockable.Signal sends the update to the owner, requirements, and dependencies before updating listeners func (lockable * SimpleLockable) Signal(ctx *Context, signal GraphSignal, nodes NodeMap) error { err := lockable.GraphNode.Signal(ctx, signal, nodes) if err != nil { @@ -261,7 +261,8 @@ func (lockable * SimpleLockable) Signal(ctx *Context, signal GraphSignal, nodes return nil } -// Requires lockable and requirement's states to be locked for write +// Removes requirement as a requirement from lockable +// Requires lockable and requirement be locked for write func UnlinkLockables(ctx * Context, lockable Lockable, requirement Lockable) error { var found Node = nil for _, req := range(lockable.Requirements()) { @@ -281,6 +282,7 @@ func UnlinkLockables(ctx * Context, lockable Lockable, requirement Lockable) err return nil } +// Link requirements as requirements to lockable // Requires lockable and requirements to be locked for write, nodes passed because requirement check recursively locks func LinkLockables(ctx * Context, lockable Lockable, requirements []Lockable, nodes NodeMap) error { if lockable == nil { @@ -369,6 +371,8 @@ func checkIfRequirement(ctx * Context, r Lockable, cur Lockable, nodes NodeMap) return false } +// Lock nodes in the to_lock slice with new_owner, does not modify any states if returning an error +// Requires that all the nodes in to_lock and new_owner are locked for write func LockLockables(ctx * Context, to_lock []Lockable, new_owner Lockable, nodes NodeMap) error { if to_lock == nil { return fmt.Errorf("LOCKABLE_LOCK_ERR: no list provided") @@ -390,66 +394,65 @@ func LockLockables(ctx * Context, to_lock []Lockable, new_owner Lockable, nodes return nil } - err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error { - // First loop is to check that the states can be locked, and locks all requirements - for _, req := range(to_lock) { - ctx.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID()) + // First loop is to check that the states can be locked, and locks all requirements + for _, req := range(to_lock) { + ctx.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID()) - // Check custom lock conditions - err := req.CanLock(new_owner) - if err != nil { - return err - } + // Check custom lock conditions + err := req.CanLock(new_owner) + if err != nil { + return err + } - // If req is alreay locked, check that we can pass the lock - if req.Owner() != nil { - owner := req.Owner() - if owner.ID() == new_owner.ID() { - return fmt.Errorf("LOCKABLE_LOCK_ERR: %s already owns %s, cannot lock again", new_owner.ID(), req.ID()) - } else if owner.ID() == req.ID() { - if req.AllowedToTakeLock(new_owner, req) == false { + // If req is alreay locked, check that we can pass the lock + if req.Owner() != nil { + owner := req.Owner() + if owner.ID() == new_owner.ID() { + return fmt.Errorf("LOCKABLE_LOCK_ERR: %s already owns %s, cannot lock again", new_owner.ID(), req.ID()) + } else if owner.ID() == req.ID() { + if req.AllowedToTakeLock(new_owner, req) == false { + return fmt.Errorf("LOCKABLE_LOCK_ERR: %s is not allowed to take %s's lock from %s", new_owner.ID(), req.ID(), owner.ID()) + } + err := LockLockables(ctx, req.Requirements(), req, nodes) + if err != nil { + return err + } + } else { + err := UpdateMoreStates(ctx, []Node{owner}, nodes, func(nodes NodeMap)(error){ + if owner.AllowedToTakeLock(new_owner, req) == false { return fmt.Errorf("LOCKABLE_LOCK_ERR: %s is not allowed to take %s's lock from %s", new_owner.ID(), req.ID(), owner.ID()) } err := LockLockables(ctx, req.Requirements(), req, nodes) - if err != nil { - return err - } - } else { - err := UpdateMoreStates(ctx, []Node{owner}, nodes, func(nodes NodeMap)(error){ - if owner.AllowedToTakeLock(new_owner, req) == false { - return fmt.Errorf("LOCKABLE_LOCK_ERR: %s is not allowed to take %s's lock from %s", new_owner.ID(), req.ID(), owner.ID()) - } - err := LockLockables(ctx, req.Requirements(), req, nodes) - return err - }) - if err != nil { - return err - } - } - } else { - err := LockLockables(ctx, req.Requirements(), req, nodes) + return err + }) if err != nil { return err } } + } else { + err := LockLockables(ctx, req.Requirements(), req, nodes) + if err != nil { + return err + } } + } - // At this point state modification will be started, so no errors can be returned - for _, req := range(to_lock) { - old_owner := req.Owner() - req.SetOwner(new_owner) - new_owner.RecordLock(req, old_owner) - if old_owner == nil { - ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID()) - } else { - ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID()) - } + // At this point state modification will be started, so no errors can be returned + for _, req := range(to_lock) { + old_owner := req.Owner() + req.SetOwner(new_owner) + new_owner.RecordLock(req, old_owner) + if old_owner == nil { + ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID()) + } else { + ctx.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID()) } - return nil - }) - return err + } + return nil } +// Unlock nodes in the to_unlock slice with old_owner, does not modify any states if returning an error +// Requires that all the nodes in to_unlock and old_owner are locked for write func UnlockLockables(ctx * Context, to_unlock []Lockable, old_owner Lockable, nodes NodeMap) error { if to_unlock == nil { return fmt.Errorf("LOCKABLE_UNLOCK_ERR: no list provided") @@ -473,48 +476,45 @@ func UnlockLockables(ctx * Context, to_unlock []Lockable, old_owner Lockable, no node_list[i] = l } - err := UpdateMoreStates(ctx, node_list, nodes, func(nodes NodeMap) error { - // First loop is to check that the states can be locked, and locks all requirements - for _, req := range(to_unlock) { - ctx.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID()) + // First loop is to check that the states can be locked, and locks all requirements + for _, req := range(to_unlock) { + ctx.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID()) - // Check if the owner is correct - if req.Owner() != nil { - if req.Owner().ID() != old_owner.ID() { - return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked by %s", req.ID(), old_owner.ID()) - } - } else { - return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked", req.ID()) + // Check if the owner is correct + if req.Owner() != nil { + if req.Owner().ID() != old_owner.ID() { + return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked by %s", req.ID(), old_owner.ID()) } + } else { + return fmt.Errorf("LOCKABLE_UNLOCK_ERR: %s is not locked", req.ID()) + } - // Check custom unlock conditions - err := req.CanUnlock(old_owner) - if err != nil { - return err - } + // Check custom unlock conditions + err := req.CanUnlock(old_owner) + if err != nil { + return err + } - err = UnlockLockables(ctx, req.Requirements(), req, nodes) - if err != nil { - return err - } + err = UnlockLockables(ctx, req.Requirements(), req, nodes) + if err != nil { + return err } + } - // At this point state modification will be started, so no errors can be returned - for _, req := range(to_unlock) { - new_owner := old_owner.RecordUnlock(req) - req.SetOwner(new_owner) - if new_owner == nil { - ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID()) - } else { - ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID()) - } + // At this point state modification will be started, so no errors can be returned + for _, req := range(to_unlock) { + new_owner := old_owner.RecordUnlock(req) + req.SetOwner(new_owner) + if new_owner == nil { + ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID()) + } else { + ctx.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID()) } - return nil - }) - return err + } + return nil } - +// Load function for SimpleLockable func LoadSimpleLockable(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) { var j SimpleLockableJSON err := json.Unmarshal(data, &j) @@ -533,7 +533,6 @@ func LoadSimpleLockable(ctx *Context, id NodeID, data []byte, nodes NodeMap) (No return &lockable, nil } - func NewSimpleLockable(id NodeID, name string) SimpleLockable { return SimpleLockable{ GraphNode: NewGraphNode(id), @@ -545,6 +544,7 @@ func NewSimpleLockable(id NodeID, name string) SimpleLockable { } } +// Helper function to load links when loading a struct that embeds SimpleLockable func RestoreSimpleLockable(ctx * Context, lockable Lockable, j SimpleLockableJSON, nodes NodeMap) error { if j.Owner != nil { o, err := LoadNodeRecurse(ctx, *j.Owner, nodes) diff --git a/node.go b/node.go index 3539ced..023acb8 100644 --- a/node.go +++ b/node.go @@ -11,7 +11,6 @@ import ( // IDs are how nodes are uniquely identified, and can be serialized for the database type NodeID string - func (id NodeID) Serialize() []byte { return []byte(id) } @@ -26,7 +25,7 @@ func (node_type NodeType) Hash() uint64 { return binary.BigEndian.Uint64(bytes[(len(bytes)-9):(len(bytes)-1)]) } -// Generate a random id +// Generate a random NodeID func RandID() NodeID { uuid_str := uuid.New().String() return NodeID(uuid_str) @@ -38,11 +37,15 @@ type Node interface { sync.Locker RLock() RUnlock() + // Serialize the Node for the database Serialize() ([]byte, error) ID() NodeID Type() NodeType + // Send a GraphSignal to the node, requires that the node is locked for read so that it can propagate Signal(ctx *Context, signal GraphSignal, nodes NodeMap) error + // Register a channel to receive updates sent to the node RegisterChannel(id NodeID, listener chan GraphSignal) + // Unregister a channel from receiving updates sent to the node UnregisterChannel(id NodeID) } @@ -76,6 +79,8 @@ func (node * GraphNode) Type() NodeType { return NodeType("graph_node") } +// Propagate the signal to registered listeners, if a listener isn't ready to receive the update +// send it a notification that it was closed and then close it func (node * GraphNode) Signal(ctx *Context, signal GraphSignal, nodes NodeMap) error { ctx.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String()) node.listeners_lock.Lock() @@ -129,8 +134,11 @@ func NewGraphNode(id NodeID) GraphNode { } } +// Magic first four bytes of serialized DB content, stored big endian const NODE_DB_MAGIC = 0x2491df14 +// Total length of the node database header, has magic to verify and type_hash to map to load function const NODE_DB_HEADER_LEN = 12 +// A DBHeader is parsed from the first NODE_DB_HEADER_LEN bytes of a serialized DB node type DBHeader struct { Magic uint32 TypeHash uint64 @@ -154,7 +162,8 @@ func NewDBHeader(node_type NodeType) DBHeader { } } -func getNodeBytes(ctx * Context, node Node) ([]byte, error) { +// Internal function to serialize a node and wrap it with the DB Header +func getNodeBytes(node Node) ([]byte, error) { if node == nil { return nil, fmt.Errorf("DB_SERIALIZE_ERROR: cannot serialize nil node") } @@ -170,25 +179,6 @@ func getNodeBytes(ctx * Context, node Node) ([]byte, error) { return db_data, nil } -// Write a node to the database -func WriteNode(ctx * Context, node Node) error { - ctx.Log.Logf("db", "DB_WRITE: %+v", node) - - node_bytes, err := getNodeBytes(ctx, node) - if err != nil { - return err - } - - id_ser := node.ID().Serialize() - - err = ctx.DB.Update(func(txn *badger.Txn) error { - err := txn.Set(id_ser, node_bytes) - return err - }) - - return err -} - // Write multiple nodes to the database in a single transaction func WriteNodes(ctx * Context, nodes NodeMap) error { ctx.Log.Logf("db", "DB_WRITES: %d", len(nodes)) @@ -200,7 +190,7 @@ func WriteNodes(ctx * Context, nodes NodeMap) error { serialized_ids := make([][]byte, len(nodes)) i := 0 for _, node := range(nodes) { - node_bytes, err := getNodeBytes(ctx, node) + node_bytes, err := getNodeBytes(node) ctx.Log.Logf("db", "DB_WRITE: %+v", node) if err != nil { return err @@ -227,7 +217,7 @@ func WriteNodes(ctx * Context, nodes NodeMap) error { return err } -// Get the bytes associates with `id` in the database, or error +// Get the bytes associates with `id` from the database after unwrapping the header, or error func readNodeBytes(ctx * Context, id NodeID) (uint64, []byte, error) { var bytes []byte err := ctx.DB.View(func(txn *badger.Txn) error { @@ -267,11 +257,15 @@ func readNodeBytes(ctx * Context, id NodeID) (uint64, []byte, error) { return header.TypeHash, node_bytes, nil } +// Load a Node from the database by ID func LoadNode(ctx * Context, id NodeID) (Node, error) { nodes := NodeMap{} return LoadNodeRecurse(ctx, id, nodes) } + +// Recursively load a node from the database. +// It's expected that node_type.Load adds the newly loaded node to nodes before calling LoadNodeRecurse again. func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) { node, exists := nodes[id] if exists == false { @@ -299,6 +293,7 @@ func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) { return node, nil } +// Internal function to check for a duplicate node in a slice by ID func checkForDuplicate(nodes []Node) error { found := map[NodeID]bool{} for _, node := range(nodes) { @@ -315,6 +310,7 @@ func checkForDuplicate(nodes []Node) error { return nil } +// Convert any slice of types that implement Node to a []Node func NodeList[K Node](list []K) []Node { nodes := make([]Node, len(list)) for i, node := range(list) { @@ -325,10 +321,14 @@ func NodeList[K Node](list []K) []Node { type NodeMap map[NodeID]Node type NodesFn func(nodes NodeMap) error +// Initiate a read context for nodes and call nodes_fn with init_nodes locked for read func UseStates(ctx * Context, init_nodes []Node, nodes_fn NodesFn) error { nodes := NodeMap{} return UseMoreStates(ctx, init_nodes, nodes, nodes_fn) } + + +// Add nodes to an existing read context and call nodes_fn with new_nodes locked for read func UseMoreStates(ctx * Context, new_nodes []Node, nodes NodeMap, nodes_fn NodesFn) error { err := checkForDuplicate(new_nodes) if err != nil { @@ -355,6 +355,7 @@ func UseMoreStates(ctx * Context, new_nodes []Node, nodes NodeMap, nodes_fn Node return err } +// Initiate a write context for nodes and call nodes_fn with nodes locked for read func UpdateStates(ctx * Context, nodes []Node, nodes_fn NodesFn) error { locked_nodes := NodeMap{} err := UpdateMoreStates(ctx, nodes, locked_nodes, nodes_fn) @@ -367,6 +368,8 @@ func UpdateStates(ctx * Context, nodes []Node, nodes_fn NodesFn) error { } return err } + +// Add nodes to an existing write context and call nodes_fn with nodes locked for read func UpdateMoreStates(ctx * Context, nodes []Node, locked_nodes NodeMap, nodes_fn NodesFn) error { for _, node := range(nodes) { _, locked := locked_nodes[node.ID()] @@ -379,6 +382,7 @@ func UpdateMoreStates(ctx * Context, nodes []Node, locked_nodes NodeMap, nodes_f return nodes_fn(locked_nodes) } +// Create a new channel with a buffer the size of buffer, and register it to node with the id func UpdateChannel(node Node, buffer int, id NodeID) chan GraphSignal { if node == nil { panic("Cannot get an update channel to nil") diff --git a/thread.go b/thread.go index 056518e..ec84a29 100644 --- a/thread.go +++ b/thread.go @@ -9,7 +9,7 @@ import ( "encoding/json" ) -// Update the threads listeners, and notify the parent to do the same +// SimpleThread.Signal updates the parent and children, and sends the signal to an internal channel func (thread * SimpleThread) Signal(ctx * Context, signal GraphSignal, nodes NodeMap) error { err := thread.SimpleLockable.Signal(ctx, signal, nodes) if err != nil {