diff --git a/gql.go b/gql.go index a38cf15..38ae8d4 100644 --- a/gql.go +++ b/gql.go @@ -204,12 +204,12 @@ func AuthHandler(ctx *Context, server *GQLThread) func(http.ResponseWriter, *htt ctx.Log.Logf("gql", "AUTHORIZING NEW USER %s - %s", key_id, shared) new_user := NewUser(fmt.Sprintf("GQL_USER %s", key_id.String()), time.Now(), remote_id, shared, []string{"gql"}) - err := UpdateStates(ctx, server, NewLockMap(LockList{ - LockInfo{ + err := UpdateStates(ctx, server, NewLockMap(LockMap{ + server.ID(): LockInfo{ Node: server, Resources: []string{"users"}, }, - LockInfo{ + new_user.ID(): LockInfo{ Node: &new_user, Resources: []string{""}, }, diff --git a/gql_mutation.go b/gql_mutation.go index b85f261..d5d2354 100644 --- a/gql_mutation.go +++ b/gql_mutation.go @@ -32,12 +32,12 @@ var GQLMutationAbort = NewField(func()*graphql.Field { err = UseStates(ctx.Context, ctx.User, NewLockMap( NewLockInfo(ctx.Server, []string{"children"}), ), func(context *ReadContext) (error){ - node = FindChild(ctx.Context, ctx.User, ctx.Server, id, locked) + node = FindChild(context, ctx.User, ctx.Server, id) if node == nil { return fmt.Errorf("Failed to find ID: %s as child of server thread", id) } - return UseMoreStates(ctx.Context, locked, ctx.User, NewLockInfo(node, []string{"signal"}), func(locked NodeLockMap) error { - return node.Signal(ctx.Context, AbortSignal, locked) + return UseMoreStates(context, ctx.User, NewLockMap(NewLockInfo(node, []string{"signal"})), func(context *ReadContext) error { + return node.Signal(context, AbortSignal) }) }) if err != nil { @@ -91,7 +91,7 @@ var GQLMutationStartChild = NewField(func()*graphql.Field{ err = UseStates(ctx.Context, ctx.User, NewLockMap( NewLockInfo(ctx.Server, []string{"children"}), ), func(context *ReadContext) error { - node := FindChild(ctx.Context, ctx.User, ctx.Server, parent_id, locked) + node := FindChild(context, ctx.User, ctx.Server, parent_id) if node == nil { return fmt.Errorf("Failed to find ID: %s as child of server thread", parent_id) } @@ -101,9 +101,9 @@ var GQLMutationStartChild = NewField(func()*graphql.Field{ return err } - return UseMoreStates(ctx.Context, locked, ctx.User, NewLockInfo(node, []string{"start_child", "signal"}), func(locked NodeLockMap) error { + return UseMoreStates(context, ctx.User, NewLockMap(NewLockInfo(node, []string{"start_child", "signal"})), func(context *ReadContext) error { signal = NewStartChildSignal(child_id, action) - return node.Signal(ctx.Context, signal, locked) + return node.Signal(context, signal) }) }) if err != nil { diff --git a/gql_resolvers.go b/gql_resolvers.go index a8d897f..dcc0f62 100644 --- a/gql_resolvers.go +++ b/gql_resolvers.go @@ -53,7 +53,7 @@ func GQLNodeID(p graphql.ResolveParams) (interface{}, error) { return nil, fmt.Errorf("Failed to cast source to Node") } - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"id"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"id"})), func(context *ReadContext) error { return nil }) if err != nil { @@ -76,7 +76,7 @@ func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) { listen := "" - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"listen"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"listen"})), func(context *ReadContext) error { listen = node.Listen return nil }) @@ -100,7 +100,7 @@ func GQLThreadParent(p graphql.ResolveParams) (interface{}, error) { } var parent Thread = nil - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"parent"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"parent"})), func(context *ReadContext) error { parent = node.Parent() return nil }) @@ -124,7 +124,7 @@ func GQLThreadState(p graphql.ResolveParams) (interface{}, error) { } var state string - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"state"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"state"})), func(context *ReadContext) error { state = node.State() return nil }) @@ -148,7 +148,7 @@ func GQLThreadChildren(p graphql.ResolveParams) (interface{}, error) { } var children []Thread = nil - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"children"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"children"})), func(context *ReadContext) error { children = node.Children() return nil }) @@ -172,7 +172,7 @@ func GQLLockableName(p graphql.ResolveParams) (interface{}, error) { } name := "" - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"name"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"name"})), func(context *ReadContext) error { name = node.Name() return nil }) @@ -196,7 +196,7 @@ func GQLLockableRequirements(p graphql.ResolveParams) (interface{}, error) { } var requirements []Lockable = nil - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"requirements"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"requirements"})), func(context *ReadContext) error { requirements = node.Requirements() return nil }) @@ -220,7 +220,7 @@ func GQLLockableDependencies(p graphql.ResolveParams) (interface{}, error) { } var dependencies []Lockable = nil - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"dependencies"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"dependencies"})), func(context *ReadContext) error { dependencies = node.Dependencies() return nil }) @@ -244,7 +244,7 @@ func GQLLockableOwner(p graphql.ResolveParams) (interface{}, error) { } var owner Node = nil - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"owner"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"owner"})), func(context *ReadContext) error { owner = node.Owner() return nil }) @@ -268,7 +268,7 @@ func GQLThreadUsers(p graphql.ResolveParams) (interface{}, error) { } var users []*User - err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"users"}), func(locked NodeLockMap) error { + err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"users"})), func(context *ReadContext) error { users = make([]*User, len(node.Users)) i := 0 for _, user := range(node.Users) { diff --git a/gql_test.go b/gql_test.go index 385b07b..c6d4d23 100644 --- a/gql_test.go +++ b/gql_test.go @@ -31,24 +31,25 @@ func TestGQLThread(t * testing.T) { t2_r := NewSimpleThread(RandID(), "Test thread 2", "init", nil, BaseThreadActions, BaseThreadHandlers) t2 := &t2_r - err = UpdateStates(ctx, gql_t, RequestList([]Node{t1, t2}, []string{"parent"}), func(locked NodeLockMap) error { - return UpdateMoreStates(ctx, locked, gql_t, NewLockRequest(gql_t, []string{"children"}), func(locked NodeLockMap) error { - i1 := NewParentThreadInfo(true, "start", "restore") - err := LinkThreads(ctx, gql_t, t1, &i1, locked) - if err != nil { - return err - } - - i2 := NewParentThreadInfo(false, "start", "restore") - return LinkThreads(ctx, gql_t, t2, &i2, locked) - }) + err = UpdateStates(ctx, gql_t, NewLockMap( + LockList([]Node{t1, t2}, []string{"parent"}), + NewLockInfo(gql_t, []string{"children"}), + ), func(context *WriteContext) error { + i1 := NewParentThreadInfo(true, "start", "restore") + err := LinkThreads(context, gql_t, gql_t, t1, &i1) + if err != nil { + return err + } + + i2 := NewParentThreadInfo(false, "start", "restore") + return LinkThreads(context, gql_t, gql_t, t2, &i2) }) fatalErr(t, err) go func(thread Thread){ time.Sleep(10*time.Millisecond) - err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error { - return thread.Signal(ctx, CancelSignal, locked) + err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error { + return thread.Signal(context, CancelSignal) }) fatalErr(t, err) }(gql_t) @@ -83,34 +84,30 @@ func TestGQLDBLoad(t * testing.T) { gql := &gql_r info := NewParentThreadInfo(true, "start", "restore") - err = UpdateStates(ctx, gql, NewLockRequest(gql, []string{"policies", "users", "requirements", "children"}), func(locked NodeLockMap) error { - return UpdateMoreStates(ctx, locked, gql, RequestList([]Node{u1, p1}, []string{}), func(locked NodeLockMap) error { - err := gql.AddPolicy(p1) - if err != nil { - return err - } - - gql.Users[KeyID(&u1_key.PublicKey)] = u1 - - return UpdateMoreStates(ctx, locked, gql, NewLockRequest(t1, []string{"parent"}), func(locked NodeLockMap) error { - err := LinkThreads(ctx, gql, t1, &info, locked) - if err != nil { - return err - } - return UpdateMoreStates(ctx, locked, gql, NewLockRequest(l1, []string{"dependencies"}), func(locked NodeLockMap) error { - return LinkLockables(ctx, gql, gql, []Lockable{l1}, locked) - }) - }) - }) + err = UpdateStates(ctx, gql, NewLockMap( + NewLockInfo(gql, []string{"policies", "users"}), + ), func(context *WriteContext) error { + err := gql.AddPolicy(p1) + if err != nil { + return err + } + + gql.Users[KeyID(&u1_key.PublicKey)] = u1 + + err = LinkThreads(context, gql, gql, t1, &info) + if err != nil { + return err + } + return LinkLockables(context, gql, gql, []Lockable{l1}) }) fatalErr(t, err) - err = UseStates(ctx, gql, NewLockRequest(gql, []string{"signal"}), func(locked NodeLockMap) error { - err := gql.Signal(ctx, NewStatusSignal("child_linked", t1.ID()), locked) + err = UseStates(ctx, gql, NewLockInfo(gql, []string{"signal"}), func(context *ReadContext) error { + err := gql.Signal(context, NewStatusSignal("child_linked", t1.ID())) if err != nil { return nil } - return gql.Signal(ctx, CancelSignal, locked) + return gql.Signal(context, CancelSignal) }) fatalErr(t, err) @@ -125,7 +122,7 @@ func TestGQLDBLoad(t * testing.T) { (*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_aborted", 100*time.Millisecond, "Didn't receive thread_abort from t1 on t1") - err = UseStates(ctx, gql, RequestList([]Node{gql, u1}, nil), func(locked NodeLockMap) error { + err = UseStates(ctx, gql, LockList([]Node{gql, u1}, nil), func(context *ReadContext) error { ser1, err := gql.Serialize() ser2, err := u1.Serialize() ctx.Log.Logf("test", "\n%s\n\n", ser1) @@ -138,19 +135,19 @@ func TestGQLDBLoad(t * testing.T) { var t1_loaded *SimpleThread = nil var update_channel_2 chan GraphSignal - err = UseStates(ctx, gql, NewLockRequest(gql_loaded, []string{"users", "children"}), func(locked NodeLockMap) error { + err = UseStates(ctx, gql, NewLockInfo(gql_loaded, []string{"users", "children"}), func(context *ReadContext) error { ser, err := gql_loaded.Serialize() ctx.Log.Logf("test", "\n%s\n\n", ser) u_loaded := gql_loaded.(*GQLThread).Users[u1.ID()] child := gql_loaded.(Thread).Children()[0].(*SimpleThread) t1_loaded = child update_channel_2 = UpdateChannel(t1_loaded, 10, NodeID{}) - err = UseMoreStates(ctx, locked, gql, NewLockRequest(u_loaded, nil), func(locked NodeLockMap) error { + err = UseMoreStates(context, gql, NewLockInfo(u_loaded, nil), func(context *ReadContext) error { ser, err := u_loaded.Serialize() ctx.Log.Logf("test", "\n%s\n\n", ser) return err }) - gql_loaded.Signal(ctx, AbortSignal, locked) + gql_loaded.Signal(context, AbortSignal) return err }) @@ -176,14 +173,14 @@ func TestGQLAuth(t * testing.T) { gql_t := &gql_t_r // p1 not written to DB, TODO: update write to follow links maybe - err = UpdateStates(ctx, gql_t, NewLockRequest(gql_t, []string{"policies"}), func(locked NodeLockMap) error { + err = UpdateStates(ctx, gql_t, NewLockInfo(gql_t, []string{"policies"}), func(context *WriteContext) error { return gql_t.AddPolicy(p1) }) done := make(chan error, 1) var update_channel chan GraphSignal - err = UseStates(ctx, gql_t, NewLockRequest(gql_t, nil), func(locked NodeLockMap) error { + err = UseStates(ctx, gql_t, NewLockInfo(gql_t, nil), func(context *ReadContext) error { update_channel = UpdateChannel(gql_t, 10, NodeID{}) return nil }) @@ -197,8 +194,8 @@ func TestGQLAuth(t * testing.T) { case <-done: ctx.Log.Logf("test", "DONE") } - err := UseStates(ctx, gql_t, NewLockRequest(gql_t, []string{"signal}"}), func(locked NodeLockMap) error { - return thread.Signal(ctx, CancelSignal, locked) + err := UseStates(ctx, gql_t, NewLockInfo(gql_t, []string{"signal}"}), func(context *ReadContext) error { + return thread.Signal(context, CancelSignal) }) fatalErr(t, err) }(done, gql_t) diff --git a/lockable.go b/lockable.go index 9bdee4c..2df2cc5 100644 --- a/lockable.go +++ b/lockable.go @@ -217,21 +217,21 @@ func (lockable * SimpleLockable) CanUnlock(new_owner Lockable) error { // Assumed that lockable is already locked for signal func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal) error { - err := lockable.GraphNode.Signal(ctx, signal, locked) + err := lockable.GraphNode.Signal(context, signal) if err != nil { return err } switch signal.Direction() { case Up: - err = UseMoreStates(ctx, locked, lockable, NewLockMap( + err = UseMoreStates(context, lockable, NewLockMap( NewLockInfo(lockable, []string{"dependencies", "owner"}), - RequestList(lockable.requirements, []string{"signal"}), + LockList(lockable.requirements, []string{"signal"}), ), func(context *ReadContext) error { owner_sent := false for _, dependency := range(lockable.dependencies) { - ctx.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", lockable.ID(), dependency.ID()) - dependency.Signal(ctx, signal, locked) + context.Graph.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", lockable.ID(), dependency.ID()) + dependency.Signal(context, signal) if lockable.owner != nil { if dependency.ID() == lockable.owner.ID() { owner_sent = true @@ -240,9 +240,9 @@ func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal } if lockable.owner != nil && owner_sent == false { if lockable.owner.ID() != lockable.ID() { - ctx.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", lockable.ID(), lockable.owner.ID()) - return UseMoreStates(context, lockable, NewLockRequest(lockable.owner, []string{"signal"}), func(context *ReadContext) error { - return lockable.owner.Signal(ctx, signal, locked) + context.Graph.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", lockable.ID(), lockable.owner.ID()) + return UseMoreStates(context, lockable, NewLockMap(NewLockInfo(lockable.owner, []string{"signal"})), func(context *ReadContext) error { + return lockable.owner.Signal(context, signal) }) } } @@ -250,11 +250,11 @@ func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal }) case Down: err = UseMoreStates(context, lockable, NewLockMap( - NewLockInfo(lockable, []string{"requirements"}, - RequestList(lockable.requirements, []string{"signal"})), + NewLockInfo(lockable, []string{"requirements"}), + LockList(lockable.requirements, []string{"signal"}), ), func(context *ReadContext) error { for _, requirement := range(lockable.requirements) { - err := requirement.Signal(ctx, signal, locked) + err := requirement.Signal(context, signal) if err != nil { return err } @@ -318,9 +318,9 @@ func LinkLockables(context *WriteContext, princ Node, lockable Lockable, require found[requirement.ID()] = true } - return UpdateMoreStates(context, princ, NewInfoMap( + return UpdateMoreStates(context, princ, NewLockMap( NewLockInfo(lockable, []string{"requirements"}), - RequestList(requirements, []string{"dependencies"}), + LockList(requirements, []string{"dependencies"}), ), func(context *WriteContext) error { // Check that all the requirements can be added // If the lockable is already locked, need to lock this resource as well before we can add it @@ -356,7 +356,7 @@ func LinkLockables(context *WriteContext, princ Node, lockable Lockable, require for _, requirement := range(requirements) { requirement.AddDependency(lockable) lockable.AddRequirement(requirement) - ctx.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID()) } // Return no error @@ -371,7 +371,7 @@ func checkIfRequirement(context *WriteContext, r Lockable, cur Lockable) bool { return true } is_requirement := false - UpdateMoreStates(context, cur, NewLockMap(NewLockRequest(c, []string{"requirements"})), func(context *WriteContext) error { + UpdateMoreStates(context, cur, NewLockMap(NewLockInfo(c, []string{"requirements"})), func(context *WriteContext) error { is_requirement = checkIfRequirement(context, cur, c) return nil }) @@ -406,10 +406,13 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable return nil } - return UpdateMoreStates(ctx, locked, new_owner, RequestList(to_lock, []string{"lock"}), func(locked NodeLockMap) error { + return UpdateMoreStates(context, new_owner, NewLockMap( + LockList(to_lock, []string{"lock"}), + NewLockInfo(new_owner, []string{}), + ), func(context *WriteContext) error { // First loop is to check that the states can be locked, and locks all requirements for _, req := range(to_lock) { - context.Context.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID()) // Check custom lock conditions err := req.CanLock(new_owner) @@ -423,15 +426,15 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable if owner.ID() == new_owner.ID() { continue } else { - err := UpdateMoreStates(ctx, locked, new_owner, NewLockRequest(owner, []string{"take_lock"}), func(locked NodeLockMap)(error){ - return LockLockables(ctx, req.Requirements(), req, locked) + err := UpdateMoreStates(context, new_owner, NewLockMap(NewLockInfo(owner, []string{"take_lock"})), func(context *WriteContext)(error){ + return LockLockables(context, req.Requirements(), req) }) if err != nil { return err } } } else { - err := LockLockables(ctx, req.Requirements(), req, locked) + err := LockLockables(context, req.Requirements(), req) if err != nil { return err } @@ -443,17 +446,17 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable old_owner := req.Owner() // If the lockable was previously unowned, update the state if old_owner == nil { - context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID()) req.SetOwner(new_owner) new_owner.RecordLock(req, old_owner) // Otherwise if the new owner already owns it, no need to update state } else if old_owner.ID() == new_owner.ID() { - context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID(), req.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID(), req.ID()) // Otherwise update the state } else { req.SetOwner(new_owner) new_owner.RecordLock(req, old_owner) - context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID()) } } return nil @@ -481,10 +484,13 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock return nil } - return UpdateMoreStates(ctx, locked, old_owner, RequestList(to_unlock, []string{"lock"}), func(locked NodeLockMap) error { + return UpdateMoreStates(context, old_owner, NewLockMap( + LockList(to_unlock, []string{"lock"}), + NewLockInfo(old_owner, []string{}), + ), func(context *WriteContext) error { // First loop is to check that the states can be locked, and locks all requirements for _, req := range(to_unlock) { - context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID()) // Check if the owner is correct if req.Owner() != nil { @@ -501,7 +507,7 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock return err } - err = UnlockLockables(ctx, req.Requirements(), req, locked) + err = UnlockLockables(context, req.Requirements(), req) if err != nil { return err } @@ -512,9 +518,9 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock new_owner := old_owner.RecordUnlock(req) req.SetOwner(new_owner) if new_owner == nil { - context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID()) } else { - context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID()) + context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID()) } } diff --git a/node.go b/node.go index 8d93487..8625c22 100644 --- a/node.go +++ b/node.go @@ -54,24 +54,6 @@ func RandID() NodeID { return NodeID(uuid.New()) } -// A Node represents a data that can be locked and held by other Nodes -type Node interface { - ID() NodeID - Type() NodeType - Serialize() ([]byte, error) - - Allows(resouce, action string, principal Node) error - AddPolicy(Policy) error - RemovePolicy(Policy) error - - Signal(context *ReadContext, signal GraphSignal) error - - - Requirements() []Node - Dependencies() []Node - Owner() Node -} - // A Node represents data that can be read by multiple goroutines and written to by one, with a unique ID attached, and a method to process updates(including propagating them to connected nodes) // RegisterChannel and UnregisterChannel are used to connect arbitrary listeners to the node type Node interface { @@ -212,17 +194,17 @@ func (node * GraphNode) Type() NodeType { // Propagate the signal to registered listeners, if a listener isn't ready to receive the update // send it a notification that it was closed and then close it func (node * GraphNode) Signal(context *ReadContext, signal GraphSignal) error { - context.Context.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String()) + context.Graph.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String()) node.listeners_lock.Lock() defer node.listeners_lock.Unlock() closed := []NodeID{} for id, listener := range node.listeners { - context.Context.Log.Logf("signal", "UPDATE_LISTENER %s: %p", node.ID(), listener) + context.Graph.Log.Logf("signal", "UPDATE_LISTENER %s: %p", node.ID(), listener) select { case listener <- signal: default: - context.Context.Log.Logf("signal", "CLOSED_LISTENER %s: %p", node.ID(), listener) + context.Graph.Log.Logf("signal", "CLOSED_LISTENER %s: %p", node.ID(), listener) go func(node Node, listener chan GraphSignal) { listener <- NewDirectSignal("listener_closed") close(listener) @@ -312,10 +294,13 @@ func getNodeBytes(node Node) ([]byte, error) { // Write multiple nodes to the database in a single transaction func WriteNodes(context *WriteContext) error { - if locked == nil { + if context == nil { + return fmt.Errorf("Cannot write nil to DB") + } + if context.Locked == nil { return fmt.Errorf("Cannot write nil map to DB") } - context.Context.Log.Logf("db", "DB_WRITES: %d", len(context.Locked)) + context.Graph.Log.Logf("db", "DB_WRITES: %d", len(context.Locked)) serialized_bytes := make([][]byte, len(context.Locked)) serialized_ids := make([][]byte, len(context.Locked)) @@ -323,7 +308,7 @@ func WriteNodes(context *WriteContext) error { for _, lock := range(context.Locked) { node := lock.Node node_bytes, err := getNodeBytes(node) - context.Context.Log.Logf("db", "DB_WRITE: %+v", node) + context.Graph.Log.Logf("db", "DB_WRITE: %+v", node) if err != nil { return err } @@ -336,7 +321,7 @@ func WriteNodes(context *WriteContext) error { i++ } - err := context.Context.DB.Update(func(txn *badger.Txn) error { + err := context.Graph.DB.Update(func(txn *badger.Txn) error { for i, id := range(serialized_ids) { err := txn.Set(id, serialized_bytes[i]) if err != nil { @@ -425,36 +410,34 @@ func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) { return node, nil } -func NewLockInfo(node Node, resources []string) LockInfo { - return LockInfo{ - Node: node, - Resources: resources, +func NewLockInfo(node Node, resources []string) LockMap { + return LockMap{ + node.ID(): LockInfo{ + Node: node, + Resources: resources, + }, } } -type LockInfoList interface { - List() []LockInfo -} - -func NewLockMap(requests ...LockInfoList) LockMap { +func NewLockMap(requests ...LockMap) LockMap { reqs := LockMap{} for _, req := range(requests) { - for _, info := range(req) { - reqs[req.Node.ID()] = info + for id, info := range(req) { + reqs[id] = info } } return reqs } -func RequestList[K Node](list []K, resources []string) LockList { - requests := make(LockList{}, len(list)) - for i, node := range(list) { - requests[i] = LockInfo{ +func LockList[K Node](list []K, resources []string) LockMap { + reqs := LockMap{} + for _, node := range(list) { + reqs[node.ID()] = LockInfo{ Node: node, Resources: resources, } } - return requests + return reqs } @@ -464,26 +447,8 @@ type LockInfo struct { Node Node Resources []string } -func (info LockInfo) List() []LockInfo { - return []LockInfo{info} -} type LockMap map[NodeID]LockInfo -func (m LockMap) List() []LockInfo { - infos := make([]LockInfo, len(m)) - i := 0 - for _, info := range(m) { - infos[i] = info - i += 1 - } - - return infos -} - -type LockList []LockInfo -func (li LockList) List() []LockInfo { - return li -} type ReadContext struct { Graph *Context @@ -516,7 +481,7 @@ func del[K comparable](list []K, val K) []K { // Start a read context for node under ctx for the resources specified in init_nodes, then run nodes_fn func UseStates(ctx *Context, node Node, nodes LockMap, read_fn ReadFn) error { context := &ReadContext{ - Context: ctx, + Graph: ctx, Locked: LockMap{}, } return UseMoreStates(context, node, nodes, read_fn) @@ -579,7 +544,7 @@ func UseMoreStates(context *ReadContext, node Node, new_nodes LockMap, read_fn R new_perms = del(new_perms, resource) } cur_perms.Resources = new_perms - context.Locked[request.Node.ID()].Resources = new_perms + context.Locked[request.Node.ID()] = cur_perms } for _, node := range(locked_nodes) { @@ -593,10 +558,10 @@ func UseMoreStates(context *ReadContext, node Node, new_nodes LockMap, read_fn R // Initiate a write context for nodes and call nodes_fn with nodes locked for read func UpdateStates(ctx *Context, node Node, nodes LockMap, write_fn WriteFn) error { context := &WriteContext{ - Context: ctx, + Graph: ctx, Locked: LockMap{}, } - err := UpdateMoreStates(context, node, nodes, nodes_fn) + err := UpdateMoreStates(context, node, nodes, write_fn) if err == nil { err = WriteNodes(context) } @@ -609,13 +574,13 @@ func UpdateStates(ctx *Context, node Node, nodes LockMap, write_fn WriteFn) erro } // Add nodes to an existing write context and call nodes_fn with nodes locked for read -func UpdateMoreStates(ctx *Context, locked LockMap, node Node, new_nodes LockMap, write_fn WriteFn) error { +func UpdateMoreStates(context *WriteContext, node Node, new_nodes LockMap, write_fn WriteFn) error { new_permissions := LockMap{} for _, request := range(new_nodes) { id := request.Node.ID() new_permissions[id] = LockInfo{Node: request.Node, Resources: []string{}} for _, resource := range(request.Resources) { - current_permissions, exists := locked[id] + current_permissions, exists := context.Locked[id] if exists == true { already_granted := false for _, r := range(current_permissions.Resources) { @@ -641,18 +606,18 @@ func UpdateMoreStates(ctx *Context, locked LockMap, node Node, new_nodes LockMap req_perms, exists := new_permissions[id] if exists == true { - cur_perms, already_locked := locked[id] + cur_perms, already_locked := context.Locked[id] if already_locked == false { request.Node.Lock() - locked[id] = req_perms + context.Locked[id] = req_perms } else { cur_perms.Resources = append(cur_perms.Resources, req_perms.Resources...) - locked[id] = cur_perms + context.Locked[id] = cur_perms } } } - return write_fn(locked) + return write_fn(context) } // Create a new channel with a buffer the size of buffer, and register it to node with the id diff --git a/thread.go b/thread.go index 0de353e..4959dd2 100644 --- a/thread.go +++ b/thread.go @@ -7,7 +7,6 @@ import ( "errors" "reflect" "encoding/json" - "github.com/google/uuid" ) // Assumed that thread is already locked for signal @@ -19,11 +18,9 @@ func (thread *SimpleThread) Signal(context *ReadContext, signal GraphSignal) err switch signal.Direction() { case Up: - err = UseMoreStates(ctx, locked, thread, NewLockMap( - NewLockRequest(thread, []string{"parent"}), - ), func(context *ReadContext) error { + err = UseMoreStates(context, thread, NewLockInfo(thread, []string{"parent"}), func(context *ReadContext) error { if thread.parent != nil { - return UseMoreStates(ctx, locked, thread, NewLockRequest(thread.parent, []string{"signal"}), func(context *ReadContext) error { + return UseMoreStates(context, thread, NewLockInfo(thread.parent, []string{"signal"}), func(context *ReadContext) error { return thread.parent.Signal(context, signal) }) } else { @@ -31,9 +28,9 @@ func (thread *SimpleThread) Signal(context *ReadContext, signal GraphSignal) err } }) case Down: - err = UseMoreStates(ctx, locked, thread, NewLockMap( - NewLockRequest(thread, []string{"children"}), - RequestList(thread.childre, []string{"signal"}), + err = UseMoreStates(context, thread, NewLockMap( + NewLockInfo(thread, []string{"children"}), + LockList(thread.children, []string{"signal"}), ), func(context *ReadContext) error { for _, child := range(thread.children) { err := child.Signal(context, signal) @@ -178,9 +175,9 @@ func checkIfChild(context *WriteContext, target Thread, cur Thread) bool { return true } is_child := false - UpdateMoreStates(ctx, locked, cur, NewLockMap( - NewLockRequest(child, []string{"children"}), - ), func(locked NodeLockMap) error { + UpdateMoreStates(context, cur, NewLockMap( + NewLockInfo(child, []string{"children"}), + ), func(context *WriteContext) error { is_child = checkIfChild(context, target, child) return nil }) @@ -193,7 +190,7 @@ func checkIfChild(context *WriteContext, target Thread, cur Thread) bool { } func LinkThreads(context *WriteContext, princ Node, thread Thread, child Thread, info ThreadInfo) error { - if ctx == nil || thread == nil || child == nil { + if context == nil || thread == nil || child == nil { return fmt.Errorf("invalid input") } @@ -201,10 +198,7 @@ func LinkThreads(context *WriteContext, princ Node, thread Thread, child Thread, return fmt.Errorf("Will not link %s as a child of itself", thread.ID()) } - return UpdateMoreStates(context, princ, NewNodeMap( - NewLockInfo(child, []string{"parent", "children"}), - NewLockInfo(thread, []string{"parent", "children"}), - ), func(context *WriteContext) { + return UpdateMoreStates(context, princ, LockList([]Node{child, thread}, []string{"parent", "children"}), func(context *WriteContext) error { if child.Parent() != nil { return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID()) } @@ -465,8 +459,8 @@ func FindChild(context *ReadContext, princ Node, thread Thread, id NodeID) Threa for _, child := range thread.Children() { var result Thread = nil - UseMoreStates(context, princ, NewLockRequest(child, []string{"children"}), func(locked NodeLockMap) error { - result = FindChild(ctx, princ, child, id, locked) + UseMoreStates(context, princ, NewLockInfo(child, []string{"children"}), func(context *ReadContext) error { + result = FindChild(context, princ, child, id) return nil }) if result != nil { @@ -521,12 +515,12 @@ func ThreadLoop(ctx * Context, thread Thread, first_action string) error { return err } - err = UpdateStates(ctx, thread, NewLockRequest(thread, []string{"state"}), func(locked NodeLockMap) error { + err = UpdateStates(ctx, thread, NewLockInfo(thread, []string{"state"}), func(context *WriteContext) error { err := thread.SetState("finished") if err != nil { return err } - return UnlockLockables(ctx, []Lockable{thread}, thread, locked) + return UnlockLockables(context, []Lockable{thread}, thread) }) if err != nil { @@ -585,12 +579,12 @@ func (thread * SimpleThread) AllowedToTakeLock(new_owner Lockable, lockable Lock } func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) error { - return UpdateStates(ctx, thread, NewLockRequest(thread, []string{"children"}), func(locked NodeLockMap) error { + return UpdateStates(ctx, thread, NewLockInfo(thread, []string{"children"}), func(context *WriteContext) error { child := thread.Child(signal.ID) if child == nil { return fmt.Errorf("%s is not a child of %s", signal.ID, thread.ID()) } - return UpdateMoreStates(ctx, locked, thread, NewLockRequest(child, []string{"start"}), func(locked NodeLockMap) error { + return UpdateMoreStates(context, thread, NewLockInfo(child, []string{"start"}), func(context *WriteContext) error { info := thread.ChildInfo(signal.ID).(*ParentThreadInfo) info.Start = true @@ -602,29 +596,30 @@ func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) erro } func ThreadRestore(ctx * Context, thread Thread) { - UpdateStates(ctx, thread, NewLockRequest(thread, []string{"children"}), func(locked NodeLockMap)(error) { - return UpdateMoreStates(ctx, locked, thread, RequestList(thread.Children(), []string{"start"}), func(locked NodeLockMap) error { - for _, child := range(thread.Children()) { - should_run := (thread.ChildInfo(child.ID())).(ParentInfo).Parent() - ctx.Log.Logf("thread", "THREAD_RESTORE: %s -> %s: %+v", thread.ID(), child.ID(), should_run) - if should_run.Start == true && child.State() != "finished" { - ctx.Log.Logf("thread", "THREAD_RESTORED: %s -> %s", thread.ID(), child.ID()) - ChildGo(ctx, thread, child, should_run.RestoreAction) - } + UpdateStates(ctx, thread, NewLockMap( + NewLockInfo(thread, []string{"children"}), + LockList(thread.Children(), []string{"start"}), + ), func(context *WriteContext)(error) { + for _, child := range(thread.Children()) { + should_run := (thread.ChildInfo(child.ID())).(ParentInfo).Parent() + ctx.Log.Logf("thread", "THREAD_RESTORE: %s -> %s: %+v", thread.ID(), child.ID(), should_run) + if should_run.Start == true && child.State() != "finished" { + ctx.Log.Logf("thread", "THREAD_RESTORED: %s -> %s", thread.ID(), child.ID()) + ChildGo(ctx, thread, child, should_run.RestoreAction) } - return nil - }) + } + return nil }) } func ThreadStart(ctx * Context, thread Thread) error { - return UpdateStates(ctx, thread, NewLockRequest(thread, []string{"start", "lock"}), func(locked NodeLockMap) error { + return UpdateStates(ctx, thread, NewLockInfo(thread, []string{"start", "lock"}), func(context *WriteContext) error { owner_id := NodeID{} if thread.Owner() != nil { owner_id = thread.Owner().ID() } if owner_id != thread.ID() { - err := LockLockables(ctx, []Lockable{thread}, thread, locked) + err := LockLockables(context, []Lockable{thread}, thread) if err != nil { return err } @@ -680,8 +675,8 @@ var ThreadAbortedError = errors.New("Thread aborted by signal") // Default thread abort is to return a ThreadAbortedError func ThreadAbort(ctx * Context, thread Thread, signal GraphSignal) (string, error) { - err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error { - return thread.Signal(ctx, NewStatusSignal("aborted", thread.ID()), locked) + err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error { + return thread.Signal(context, NewStatusSignal("aborted", thread.ID())) }) if err != nil { return "", err @@ -691,8 +686,8 @@ func ThreadAbort(ctx * Context, thread Thread, signal GraphSignal) (string, erro // Default thread cancel is to finish the thread func ThreadCancel(ctx * Context, thread Thread, signal GraphSignal) (string, error) { - err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error { - return thread.Signal(ctx, NewSignal("cancelled"), locked) + err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error { + return thread.Signal(context, NewSignal("cancelled")) }) return "", err }