Fixed compile errors

graph-rework-2
noah metz 2023-07-22 21:24:54 -06:00
parent e347a3f232
commit 575912d56f
7 changed files with 161 additions and 198 deletions

@ -204,12 +204,12 @@ func AuthHandler(ctx *Context, server *GQLThread) func(http.ResponseWriter, *htt
ctx.Log.Logf("gql", "AUTHORIZING NEW USER %s - %s", key_id, shared)
new_user := NewUser(fmt.Sprintf("GQL_USER %s", key_id.String()), time.Now(), remote_id, shared, []string{"gql"})
err := UpdateStates(ctx, server, NewLockMap(LockList{
LockInfo{
err := UpdateStates(ctx, server, NewLockMap(LockMap{
server.ID(): LockInfo{
Node: server,
Resources: []string{"users"},
},
LockInfo{
new_user.ID(): LockInfo{
Node: &new_user,
Resources: []string{""},
},

@ -32,12 +32,12 @@ var GQLMutationAbort = NewField(func()*graphql.Field {
err = UseStates(ctx.Context, ctx.User, NewLockMap(
NewLockInfo(ctx.Server, []string{"children"}),
), func(context *ReadContext) (error){
node = FindChild(ctx.Context, ctx.User, ctx.Server, id, locked)
node = FindChild(context, ctx.User, ctx.Server, id)
if node == nil {
return fmt.Errorf("Failed to find ID: %s as child of server thread", id)
}
return UseMoreStates(ctx.Context, locked, ctx.User, NewLockInfo(node, []string{"signal"}), func(locked NodeLockMap) error {
return node.Signal(ctx.Context, AbortSignal, locked)
return UseMoreStates(context, ctx.User, NewLockMap(NewLockInfo(node, []string{"signal"})), func(context *ReadContext) error {
return node.Signal(context, AbortSignal)
})
})
if err != nil {
@ -91,7 +91,7 @@ var GQLMutationStartChild = NewField(func()*graphql.Field{
err = UseStates(ctx.Context, ctx.User, NewLockMap(
NewLockInfo(ctx.Server, []string{"children"}),
), func(context *ReadContext) error {
node := FindChild(ctx.Context, ctx.User, ctx.Server, parent_id, locked)
node := FindChild(context, ctx.User, ctx.Server, parent_id)
if node == nil {
return fmt.Errorf("Failed to find ID: %s as child of server thread", parent_id)
}
@ -101,9 +101,9 @@ var GQLMutationStartChild = NewField(func()*graphql.Field{
return err
}
return UseMoreStates(ctx.Context, locked, ctx.User, NewLockInfo(node, []string{"start_child", "signal"}), func(locked NodeLockMap) error {
return UseMoreStates(context, ctx.User, NewLockMap(NewLockInfo(node, []string{"start_child", "signal"})), func(context *ReadContext) error {
signal = NewStartChildSignal(child_id, action)
return node.Signal(ctx.Context, signal, locked)
return node.Signal(context, signal)
})
})
if err != nil {

@ -53,7 +53,7 @@ func GQLNodeID(p graphql.ResolveParams) (interface{}, error) {
return nil, fmt.Errorf("Failed to cast source to Node")
}
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"id"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"id"})), func(context *ReadContext) error {
return nil
})
if err != nil {
@ -76,7 +76,7 @@ func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) {
listen := ""
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"listen"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"listen"})), func(context *ReadContext) error {
listen = node.Listen
return nil
})
@ -100,7 +100,7 @@ func GQLThreadParent(p graphql.ResolveParams) (interface{}, error) {
}
var parent Thread = nil
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"parent"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"parent"})), func(context *ReadContext) error {
parent = node.Parent()
return nil
})
@ -124,7 +124,7 @@ func GQLThreadState(p graphql.ResolveParams) (interface{}, error) {
}
var state string
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"state"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"state"})), func(context *ReadContext) error {
state = node.State()
return nil
})
@ -148,7 +148,7 @@ func GQLThreadChildren(p graphql.ResolveParams) (interface{}, error) {
}
var children []Thread = nil
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"children"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"children"})), func(context *ReadContext) error {
children = node.Children()
return nil
})
@ -172,7 +172,7 @@ func GQLLockableName(p graphql.ResolveParams) (interface{}, error) {
}
name := ""
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"name"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"name"})), func(context *ReadContext) error {
name = node.Name()
return nil
})
@ -196,7 +196,7 @@ func GQLLockableRequirements(p graphql.ResolveParams) (interface{}, error) {
}
var requirements []Lockable = nil
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"requirements"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"requirements"})), func(context *ReadContext) error {
requirements = node.Requirements()
return nil
})
@ -220,7 +220,7 @@ func GQLLockableDependencies(p graphql.ResolveParams) (interface{}, error) {
}
var dependencies []Lockable = nil
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"dependencies"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"dependencies"})), func(context *ReadContext) error {
dependencies = node.Dependencies()
return nil
})
@ -244,7 +244,7 @@ func GQLLockableOwner(p graphql.ResolveParams) (interface{}, error) {
}
var owner Node = nil
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"owner"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"owner"})), func(context *ReadContext) error {
owner = node.Owner()
return nil
})
@ -268,7 +268,7 @@ func GQLThreadUsers(p graphql.ResolveParams) (interface{}, error) {
}
var users []*User
err = UseStates(ctx.Context, ctx.User, NewLockRequest(node, []string{"users"}), func(locked NodeLockMap) error {
err = UseStates(ctx.Context, ctx.User, NewLockMap(NewLockInfo(node, []string{"users"})), func(context *ReadContext) error {
users = make([]*User, len(node.Users))
i := 0
for _, user := range(node.Users) {

@ -31,24 +31,25 @@ func TestGQLThread(t * testing.T) {
t2_r := NewSimpleThread(RandID(), "Test thread 2", "init", nil, BaseThreadActions, BaseThreadHandlers)
t2 := &t2_r
err = UpdateStates(ctx, gql_t, RequestList([]Node{t1, t2}, []string{"parent"}), func(locked NodeLockMap) error {
return UpdateMoreStates(ctx, locked, gql_t, NewLockRequest(gql_t, []string{"children"}), func(locked NodeLockMap) error {
err = UpdateStates(ctx, gql_t, NewLockMap(
LockList([]Node{t1, t2}, []string{"parent"}),
NewLockInfo(gql_t, []string{"children"}),
), func(context *WriteContext) error {
i1 := NewParentThreadInfo(true, "start", "restore")
err := LinkThreads(ctx, gql_t, t1, &i1, locked)
err := LinkThreads(context, gql_t, gql_t, t1, &i1)
if err != nil {
return err
}
i2 := NewParentThreadInfo(false, "start", "restore")
return LinkThreads(ctx, gql_t, t2, &i2, locked)
})
return LinkThreads(context, gql_t, gql_t, t2, &i2)
})
fatalErr(t, err)
go func(thread Thread){
time.Sleep(10*time.Millisecond)
err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error {
return thread.Signal(ctx, CancelSignal, locked)
err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error {
return thread.Signal(context, CancelSignal)
})
fatalErr(t, err)
}(gql_t)
@ -83,8 +84,9 @@ func TestGQLDBLoad(t * testing.T) {
gql := &gql_r
info := NewParentThreadInfo(true, "start", "restore")
err = UpdateStates(ctx, gql, NewLockRequest(gql, []string{"policies", "users", "requirements", "children"}), func(locked NodeLockMap) error {
return UpdateMoreStates(ctx, locked, gql, RequestList([]Node{u1, p1}, []string{}), func(locked NodeLockMap) error {
err = UpdateStates(ctx, gql, NewLockMap(
NewLockInfo(gql, []string{"policies", "users"}),
), func(context *WriteContext) error {
err := gql.AddPolicy(p1)
if err != nil {
return err
@ -92,25 +94,20 @@ func TestGQLDBLoad(t * testing.T) {
gql.Users[KeyID(&u1_key.PublicKey)] = u1
return UpdateMoreStates(ctx, locked, gql, NewLockRequest(t1, []string{"parent"}), func(locked NodeLockMap) error {
err := LinkThreads(ctx, gql, t1, &info, locked)
err = LinkThreads(context, gql, gql, t1, &info)
if err != nil {
return err
}
return UpdateMoreStates(ctx, locked, gql, NewLockRequest(l1, []string{"dependencies"}), func(locked NodeLockMap) error {
return LinkLockables(ctx, gql, gql, []Lockable{l1}, locked)
})
})
})
return LinkLockables(context, gql, gql, []Lockable{l1})
})
fatalErr(t, err)
err = UseStates(ctx, gql, NewLockRequest(gql, []string{"signal"}), func(locked NodeLockMap) error {
err := gql.Signal(ctx, NewStatusSignal("child_linked", t1.ID()), locked)
err = UseStates(ctx, gql, NewLockInfo(gql, []string{"signal"}), func(context *ReadContext) error {
err := gql.Signal(context, NewStatusSignal("child_linked", t1.ID()))
if err != nil {
return nil
}
return gql.Signal(ctx, CancelSignal, locked)
return gql.Signal(context, CancelSignal)
})
fatalErr(t, err)
@ -125,7 +122,7 @@ func TestGQLDBLoad(t * testing.T) {
(*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_aborted", 100*time.Millisecond, "Didn't receive thread_abort from t1 on t1")
err = UseStates(ctx, gql, RequestList([]Node{gql, u1}, nil), func(locked NodeLockMap) error {
err = UseStates(ctx, gql, LockList([]Node{gql, u1}, nil), func(context *ReadContext) error {
ser1, err := gql.Serialize()
ser2, err := u1.Serialize()
ctx.Log.Logf("test", "\n%s\n\n", ser1)
@ -138,19 +135,19 @@ func TestGQLDBLoad(t * testing.T) {
var t1_loaded *SimpleThread = nil
var update_channel_2 chan GraphSignal
err = UseStates(ctx, gql, NewLockRequest(gql_loaded, []string{"users", "children"}), func(locked NodeLockMap) error {
err = UseStates(ctx, gql, NewLockInfo(gql_loaded, []string{"users", "children"}), func(context *ReadContext) error {
ser, err := gql_loaded.Serialize()
ctx.Log.Logf("test", "\n%s\n\n", ser)
u_loaded := gql_loaded.(*GQLThread).Users[u1.ID()]
child := gql_loaded.(Thread).Children()[0].(*SimpleThread)
t1_loaded = child
update_channel_2 = UpdateChannel(t1_loaded, 10, NodeID{})
err = UseMoreStates(ctx, locked, gql, NewLockRequest(u_loaded, nil), func(locked NodeLockMap) error {
err = UseMoreStates(context, gql, NewLockInfo(u_loaded, nil), func(context *ReadContext) error {
ser, err := u_loaded.Serialize()
ctx.Log.Logf("test", "\n%s\n\n", ser)
return err
})
gql_loaded.Signal(ctx, AbortSignal, locked)
gql_loaded.Signal(context, AbortSignal)
return err
})
@ -176,14 +173,14 @@ func TestGQLAuth(t * testing.T) {
gql_t := &gql_t_r
// p1 not written to DB, TODO: update write to follow links maybe
err = UpdateStates(ctx, gql_t, NewLockRequest(gql_t, []string{"policies"}), func(locked NodeLockMap) error {
err = UpdateStates(ctx, gql_t, NewLockInfo(gql_t, []string{"policies"}), func(context *WriteContext) error {
return gql_t.AddPolicy(p1)
})
done := make(chan error, 1)
var update_channel chan GraphSignal
err = UseStates(ctx, gql_t, NewLockRequest(gql_t, nil), func(locked NodeLockMap) error {
err = UseStates(ctx, gql_t, NewLockInfo(gql_t, nil), func(context *ReadContext) error {
update_channel = UpdateChannel(gql_t, 10, NodeID{})
return nil
})
@ -197,8 +194,8 @@ func TestGQLAuth(t * testing.T) {
case <-done:
ctx.Log.Logf("test", "DONE")
}
err := UseStates(ctx, gql_t, NewLockRequest(gql_t, []string{"signal}"}), func(locked NodeLockMap) error {
return thread.Signal(ctx, CancelSignal, locked)
err := UseStates(ctx, gql_t, NewLockInfo(gql_t, []string{"signal}"}), func(context *ReadContext) error {
return thread.Signal(context, CancelSignal)
})
fatalErr(t, err)
}(done, gql_t)

@ -217,21 +217,21 @@ func (lockable * SimpleLockable) CanUnlock(new_owner Lockable) error {
// Assumed that lockable is already locked for signal
func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal) error {
err := lockable.GraphNode.Signal(ctx, signal, locked)
err := lockable.GraphNode.Signal(context, signal)
if err != nil {
return err
}
switch signal.Direction() {
case Up:
err = UseMoreStates(ctx, locked, lockable, NewLockMap(
err = UseMoreStates(context, lockable, NewLockMap(
NewLockInfo(lockable, []string{"dependencies", "owner"}),
RequestList(lockable.requirements, []string{"signal"}),
LockList(lockable.requirements, []string{"signal"}),
), func(context *ReadContext) error {
owner_sent := false
for _, dependency := range(lockable.dependencies) {
ctx.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", lockable.ID(), dependency.ID())
dependency.Signal(ctx, signal, locked)
context.Graph.Log.Logf("signal", "SENDING_TO_DEPENDENCY: %s -> %s", lockable.ID(), dependency.ID())
dependency.Signal(context, signal)
if lockable.owner != nil {
if dependency.ID() == lockable.owner.ID() {
owner_sent = true
@ -240,9 +240,9 @@ func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal
}
if lockable.owner != nil && owner_sent == false {
if lockable.owner.ID() != lockable.ID() {
ctx.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", lockable.ID(), lockable.owner.ID())
return UseMoreStates(context, lockable, NewLockRequest(lockable.owner, []string{"signal"}), func(context *ReadContext) error {
return lockable.owner.Signal(ctx, signal, locked)
context.Graph.Log.Logf("signal", "SENDING_TO_OWNER: %s -> %s", lockable.ID(), lockable.owner.ID())
return UseMoreStates(context, lockable, NewLockMap(NewLockInfo(lockable.owner, []string{"signal"})), func(context *ReadContext) error {
return lockable.owner.Signal(context, signal)
})
}
}
@ -250,11 +250,11 @@ func (lockable * SimpleLockable) Signal(context *ReadContext, signal GraphSignal
})
case Down:
err = UseMoreStates(context, lockable, NewLockMap(
NewLockInfo(lockable, []string{"requirements"},
RequestList(lockable.requirements, []string{"signal"})),
NewLockInfo(lockable, []string{"requirements"}),
LockList(lockable.requirements, []string{"signal"}),
), func(context *ReadContext) error {
for _, requirement := range(lockable.requirements) {
err := requirement.Signal(ctx, signal, locked)
err := requirement.Signal(context, signal)
if err != nil {
return err
}
@ -318,9 +318,9 @@ func LinkLockables(context *WriteContext, princ Node, lockable Lockable, require
found[requirement.ID()] = true
}
return UpdateMoreStates(context, princ, NewInfoMap(
return UpdateMoreStates(context, princ, NewLockMap(
NewLockInfo(lockable, []string{"requirements"}),
RequestList(requirements, []string{"dependencies"}),
LockList(requirements, []string{"dependencies"}),
), func(context *WriteContext) error {
// Check that all the requirements can be added
// If the lockable is already locked, need to lock this resource as well before we can add it
@ -356,7 +356,7 @@ func LinkLockables(context *WriteContext, princ Node, lockable Lockable, require
for _, requirement := range(requirements) {
requirement.AddDependency(lockable)
lockable.AddRequirement(requirement)
ctx.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_LINK: linked %s to %s as a requirement", requirement.ID(), lockable.ID())
}
// Return no error
@ -371,7 +371,7 @@ func checkIfRequirement(context *WriteContext, r Lockable, cur Lockable) bool {
return true
}
is_requirement := false
UpdateMoreStates(context, cur, NewLockMap(NewLockRequest(c, []string{"requirements"})), func(context *WriteContext) error {
UpdateMoreStates(context, cur, NewLockMap(NewLockInfo(c, []string{"requirements"})), func(context *WriteContext) error {
is_requirement = checkIfRequirement(context, cur, c)
return nil
})
@ -406,10 +406,13 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable
return nil
}
return UpdateMoreStates(ctx, locked, new_owner, RequestList(to_lock, []string{"lock"}), func(locked NodeLockMap) error {
return UpdateMoreStates(context, new_owner, NewLockMap(
LockList(to_lock, []string{"lock"}),
NewLockInfo(new_owner, []string{}),
), func(context *WriteContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_lock) {
context.Context.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCKING: %s from %s", req.ID(), new_owner.ID())
// Check custom lock conditions
err := req.CanLock(new_owner)
@ -423,15 +426,15 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable
if owner.ID() == new_owner.ID() {
continue
} else {
err := UpdateMoreStates(ctx, locked, new_owner, NewLockRequest(owner, []string{"take_lock"}), func(locked NodeLockMap)(error){
return LockLockables(ctx, req.Requirements(), req, locked)
err := UpdateMoreStates(context, new_owner, NewLockMap(NewLockInfo(owner, []string{"take_lock"})), func(context *WriteContext)(error){
return LockLockables(context, req.Requirements(), req)
})
if err != nil {
return err
}
}
} else {
err := LockLockables(ctx, req.Requirements(), req, locked)
err := LockLockables(context, req.Requirements(), req)
if err != nil {
return err
}
@ -443,17 +446,17 @@ func LockLockables(context *WriteContext, to_lock []Lockable, new_owner Lockable
old_owner := req.Owner()
// If the lockable was previously unowned, update the state
if old_owner == nil {
context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s locked %s", new_owner.ID(), req.ID())
req.SetOwner(new_owner)
new_owner.RecordLock(req, old_owner)
// Otherwise if the new owner already owns it, no need to update state
} else if old_owner.ID() == new_owner.ID() {
context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID(), req.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s already owns %s", new_owner.ID(), req.ID())
// Otherwise update the state
} else {
req.SetOwner(new_owner)
new_owner.RecordLock(req, old_owner)
context.Context.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_LOCK: %s took lock of %s from %s", new_owner.ID(), req.ID(), old_owner.ID())
}
}
return nil
@ -481,10 +484,13 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock
return nil
}
return UpdateMoreStates(ctx, locked, old_owner, RequestList(to_unlock, []string{"lock"}), func(locked NodeLockMap) error {
return UpdateMoreStates(context, old_owner, NewLockMap(
LockList(to_unlock, []string{"lock"}),
NewLockInfo(old_owner, []string{}),
), func(context *WriteContext) error {
// First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_unlock) {
context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCKING: %s from %s", req.ID(), old_owner.ID())
// Check if the owner is correct
if req.Owner() != nil {
@ -501,7 +507,7 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock
return err
}
err = UnlockLockables(ctx, req.Requirements(), req, locked)
err = UnlockLockables(context, req.Requirements(), req)
if err != nil {
return err
}
@ -512,9 +518,9 @@ func UnlockLockables(context *WriteContext, to_unlock []Lockable, old_owner Lock
new_owner := old_owner.RecordUnlock(req)
req.SetOwner(new_owner)
if new_owner == nil {
context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s unlocked %s", old_owner.ID(), req.ID())
} else {
context.Context.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID())
context.Graph.Log.Logf("lockable", "LOCKABLE_UNLOCK: %s passed lock of %s back to %s", old_owner.ID(), req.ID(), new_owner.ID())
}
}

@ -54,24 +54,6 @@ func RandID() NodeID {
return NodeID(uuid.New())
}
// A Node represents a data that can be locked and held by other Nodes
type Node interface {
ID() NodeID
Type() NodeType
Serialize() ([]byte, error)
Allows(resouce, action string, principal Node) error
AddPolicy(Policy) error
RemovePolicy(Policy) error
Signal(context *ReadContext, signal GraphSignal) error
Requirements() []Node
Dependencies() []Node
Owner() Node
}
// A Node represents data that can be read by multiple goroutines and written to by one, with a unique ID attached, and a method to process updates(including propagating them to connected nodes)
// RegisterChannel and UnregisterChannel are used to connect arbitrary listeners to the node
type Node interface {
@ -212,17 +194,17 @@ func (node * GraphNode) Type() NodeType {
// Propagate the signal to registered listeners, if a listener isn't ready to receive the update
// send it a notification that it was closed and then close it
func (node * GraphNode) Signal(context *ReadContext, signal GraphSignal) error {
context.Context.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String())
context.Graph.Log.Logf("signal", "SIGNAL: %s - %s", node.ID(), signal.String())
node.listeners_lock.Lock()
defer node.listeners_lock.Unlock()
closed := []NodeID{}
for id, listener := range node.listeners {
context.Context.Log.Logf("signal", "UPDATE_LISTENER %s: %p", node.ID(), listener)
context.Graph.Log.Logf("signal", "UPDATE_LISTENER %s: %p", node.ID(), listener)
select {
case listener <- signal:
default:
context.Context.Log.Logf("signal", "CLOSED_LISTENER %s: %p", node.ID(), listener)
context.Graph.Log.Logf("signal", "CLOSED_LISTENER %s: %p", node.ID(), listener)
go func(node Node, listener chan GraphSignal) {
listener <- NewDirectSignal("listener_closed")
close(listener)
@ -312,10 +294,13 @@ func getNodeBytes(node Node) ([]byte, error) {
// Write multiple nodes to the database in a single transaction
func WriteNodes(context *WriteContext) error {
if locked == nil {
if context == nil {
return fmt.Errorf("Cannot write nil to DB")
}
if context.Locked == nil {
return fmt.Errorf("Cannot write nil map to DB")
}
context.Context.Log.Logf("db", "DB_WRITES: %d", len(context.Locked))
context.Graph.Log.Logf("db", "DB_WRITES: %d", len(context.Locked))
serialized_bytes := make([][]byte, len(context.Locked))
serialized_ids := make([][]byte, len(context.Locked))
@ -323,7 +308,7 @@ func WriteNodes(context *WriteContext) error {
for _, lock := range(context.Locked) {
node := lock.Node
node_bytes, err := getNodeBytes(node)
context.Context.Log.Logf("db", "DB_WRITE: %+v", node)
context.Graph.Log.Logf("db", "DB_WRITE: %+v", node)
if err != nil {
return err
}
@ -336,7 +321,7 @@ func WriteNodes(context *WriteContext) error {
i++
}
err := context.Context.DB.Update(func(txn *badger.Txn) error {
err := context.Graph.DB.Update(func(txn *badger.Txn) error {
for i, id := range(serialized_ids) {
err := txn.Set(id, serialized_bytes[i])
if err != nil {
@ -425,36 +410,34 @@ func LoadNodeRecurse(ctx * Context, id NodeID, nodes NodeMap) (Node, error) {
return node, nil
}
func NewLockInfo(node Node, resources []string) LockInfo {
return LockInfo{
func NewLockInfo(node Node, resources []string) LockMap {
return LockMap{
node.ID(): LockInfo{
Node: node,
Resources: resources,
},
}
}
type LockInfoList interface {
List() []LockInfo
}
func NewLockMap(requests ...LockInfoList) LockMap {
func NewLockMap(requests ...LockMap) LockMap {
reqs := LockMap{}
for _, req := range(requests) {
for _, info := range(req) {
reqs[req.Node.ID()] = info
for id, info := range(req) {
reqs[id] = info
}
}
return reqs
}
func RequestList[K Node](list []K, resources []string) LockList {
requests := make(LockList{}, len(list))
for i, node := range(list) {
requests[i] = LockInfo{
func LockList[K Node](list []K, resources []string) LockMap {
reqs := LockMap{}
for _, node := range(list) {
reqs[node.ID()] = LockInfo{
Node: node,
Resources: resources,
}
}
return requests
return reqs
}
@ -464,26 +447,8 @@ type LockInfo struct {
Node Node
Resources []string
}
func (info LockInfo) List() []LockInfo {
return []LockInfo{info}
}
type LockMap map[NodeID]LockInfo
func (m LockMap) List() []LockInfo {
infos := make([]LockInfo, len(m))
i := 0
for _, info := range(m) {
infos[i] = info
i += 1
}
return infos
}
type LockList []LockInfo
func (li LockList) List() []LockInfo {
return li
}
type ReadContext struct {
Graph *Context
@ -516,7 +481,7 @@ func del[K comparable](list []K, val K) []K {
// Start a read context for node under ctx for the resources specified in init_nodes, then run nodes_fn
func UseStates(ctx *Context, node Node, nodes LockMap, read_fn ReadFn) error {
context := &ReadContext{
Context: ctx,
Graph: ctx,
Locked: LockMap{},
}
return UseMoreStates(context, node, nodes, read_fn)
@ -579,7 +544,7 @@ func UseMoreStates(context *ReadContext, node Node, new_nodes LockMap, read_fn R
new_perms = del(new_perms, resource)
}
cur_perms.Resources = new_perms
context.Locked[request.Node.ID()].Resources = new_perms
context.Locked[request.Node.ID()] = cur_perms
}
for _, node := range(locked_nodes) {
@ -593,10 +558,10 @@ func UseMoreStates(context *ReadContext, node Node, new_nodes LockMap, read_fn R
// Initiate a write context for nodes and call nodes_fn with nodes locked for read
func UpdateStates(ctx *Context, node Node, nodes LockMap, write_fn WriteFn) error {
context := &WriteContext{
Context: ctx,
Graph: ctx,
Locked: LockMap{},
}
err := UpdateMoreStates(context, node, nodes, nodes_fn)
err := UpdateMoreStates(context, node, nodes, write_fn)
if err == nil {
err = WriteNodes(context)
}
@ -609,13 +574,13 @@ func UpdateStates(ctx *Context, node Node, nodes LockMap, write_fn WriteFn) erro
}
// Add nodes to an existing write context and call nodes_fn with nodes locked for read
func UpdateMoreStates(ctx *Context, locked LockMap, node Node, new_nodes LockMap, write_fn WriteFn) error {
func UpdateMoreStates(context *WriteContext, node Node, new_nodes LockMap, write_fn WriteFn) error {
new_permissions := LockMap{}
for _, request := range(new_nodes) {
id := request.Node.ID()
new_permissions[id] = LockInfo{Node: request.Node, Resources: []string{}}
for _, resource := range(request.Resources) {
current_permissions, exists := locked[id]
current_permissions, exists := context.Locked[id]
if exists == true {
already_granted := false
for _, r := range(current_permissions.Resources) {
@ -641,18 +606,18 @@ func UpdateMoreStates(ctx *Context, locked LockMap, node Node, new_nodes LockMap
req_perms, exists := new_permissions[id]
if exists == true {
cur_perms, already_locked := locked[id]
cur_perms, already_locked := context.Locked[id]
if already_locked == false {
request.Node.Lock()
locked[id] = req_perms
context.Locked[id] = req_perms
} else {
cur_perms.Resources = append(cur_perms.Resources, req_perms.Resources...)
locked[id] = cur_perms
context.Locked[id] = cur_perms
}
}
}
return write_fn(locked)
return write_fn(context)
}
// Create a new channel with a buffer the size of buffer, and register it to node with the id

@ -7,7 +7,6 @@ import (
"errors"
"reflect"
"encoding/json"
"github.com/google/uuid"
)
// Assumed that thread is already locked for signal
@ -19,11 +18,9 @@ func (thread *SimpleThread) Signal(context *ReadContext, signal GraphSignal) err
switch signal.Direction() {
case Up:
err = UseMoreStates(ctx, locked, thread, NewLockMap(
NewLockRequest(thread, []string{"parent"}),
), func(context *ReadContext) error {
err = UseMoreStates(context, thread, NewLockInfo(thread, []string{"parent"}), func(context *ReadContext) error {
if thread.parent != nil {
return UseMoreStates(ctx, locked, thread, NewLockRequest(thread.parent, []string{"signal"}), func(context *ReadContext) error {
return UseMoreStates(context, thread, NewLockInfo(thread.parent, []string{"signal"}), func(context *ReadContext) error {
return thread.parent.Signal(context, signal)
})
} else {
@ -31,9 +28,9 @@ func (thread *SimpleThread) Signal(context *ReadContext, signal GraphSignal) err
}
})
case Down:
err = UseMoreStates(ctx, locked, thread, NewLockMap(
NewLockRequest(thread, []string{"children"}),
RequestList(thread.childre, []string{"signal"}),
err = UseMoreStates(context, thread, NewLockMap(
NewLockInfo(thread, []string{"children"}),
LockList(thread.children, []string{"signal"}),
), func(context *ReadContext) error {
for _, child := range(thread.children) {
err := child.Signal(context, signal)
@ -178,9 +175,9 @@ func checkIfChild(context *WriteContext, target Thread, cur Thread) bool {
return true
}
is_child := false
UpdateMoreStates(ctx, locked, cur, NewLockMap(
NewLockRequest(child, []string{"children"}),
), func(locked NodeLockMap) error {
UpdateMoreStates(context, cur, NewLockMap(
NewLockInfo(child, []string{"children"}),
), func(context *WriteContext) error {
is_child = checkIfChild(context, target, child)
return nil
})
@ -193,7 +190,7 @@ func checkIfChild(context *WriteContext, target Thread, cur Thread) bool {
}
func LinkThreads(context *WriteContext, princ Node, thread Thread, child Thread, info ThreadInfo) error {
if ctx == nil || thread == nil || child == nil {
if context == nil || thread == nil || child == nil {
return fmt.Errorf("invalid input")
}
@ -201,10 +198,7 @@ func LinkThreads(context *WriteContext, princ Node, thread Thread, child Thread,
return fmt.Errorf("Will not link %s as a child of itself", thread.ID())
}
return UpdateMoreStates(context, princ, NewNodeMap(
NewLockInfo(child, []string{"parent", "children"}),
NewLockInfo(thread, []string{"parent", "children"}),
), func(context *WriteContext) {
return UpdateMoreStates(context, princ, LockList([]Node{child, thread}, []string{"parent", "children"}), func(context *WriteContext) error {
if child.Parent() != nil {
return fmt.Errorf("EVENT_LINK_ERR: %s already has a parent, cannot link as child", child.ID())
}
@ -465,8 +459,8 @@ func FindChild(context *ReadContext, princ Node, thread Thread, id NodeID) Threa
for _, child := range thread.Children() {
var result Thread = nil
UseMoreStates(context, princ, NewLockRequest(child, []string{"children"}), func(locked NodeLockMap) error {
result = FindChild(ctx, princ, child, id, locked)
UseMoreStates(context, princ, NewLockInfo(child, []string{"children"}), func(context *ReadContext) error {
result = FindChild(context, princ, child, id)
return nil
})
if result != nil {
@ -521,12 +515,12 @@ func ThreadLoop(ctx * Context, thread Thread, first_action string) error {
return err
}
err = UpdateStates(ctx, thread, NewLockRequest(thread, []string{"state"}), func(locked NodeLockMap) error {
err = UpdateStates(ctx, thread, NewLockInfo(thread, []string{"state"}), func(context *WriteContext) error {
err := thread.SetState("finished")
if err != nil {
return err
}
return UnlockLockables(ctx, []Lockable{thread}, thread, locked)
return UnlockLockables(context, []Lockable{thread}, thread)
})
if err != nil {
@ -585,12 +579,12 @@ func (thread * SimpleThread) AllowedToTakeLock(new_owner Lockable, lockable Lock
}
func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) error {
return UpdateStates(ctx, thread, NewLockRequest(thread, []string{"children"}), func(locked NodeLockMap) error {
return UpdateStates(ctx, thread, NewLockInfo(thread, []string{"children"}), func(context *WriteContext) error {
child := thread.Child(signal.ID)
if child == nil {
return fmt.Errorf("%s is not a child of %s", signal.ID, thread.ID())
}
return UpdateMoreStates(ctx, locked, thread, NewLockRequest(child, []string{"start"}), func(locked NodeLockMap) error {
return UpdateMoreStates(context, thread, NewLockInfo(child, []string{"start"}), func(context *WriteContext) error {
info := thread.ChildInfo(signal.ID).(*ParentThreadInfo)
info.Start = true
@ -602,8 +596,10 @@ func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) erro
}
func ThreadRestore(ctx * Context, thread Thread) {
UpdateStates(ctx, thread, NewLockRequest(thread, []string{"children"}), func(locked NodeLockMap)(error) {
return UpdateMoreStates(ctx, locked, thread, RequestList(thread.Children(), []string{"start"}), func(locked NodeLockMap) error {
UpdateStates(ctx, thread, NewLockMap(
NewLockInfo(thread, []string{"children"}),
LockList(thread.Children(), []string{"start"}),
), func(context *WriteContext)(error) {
for _, child := range(thread.Children()) {
should_run := (thread.ChildInfo(child.ID())).(ParentInfo).Parent()
ctx.Log.Logf("thread", "THREAD_RESTORE: %s -> %s: %+v", thread.ID(), child.ID(), should_run)
@ -614,17 +610,16 @@ func ThreadRestore(ctx * Context, thread Thread) {
}
return nil
})
})
}
func ThreadStart(ctx * Context, thread Thread) error {
return UpdateStates(ctx, thread, NewLockRequest(thread, []string{"start", "lock"}), func(locked NodeLockMap) error {
return UpdateStates(ctx, thread, NewLockInfo(thread, []string{"start", "lock"}), func(context *WriteContext) error {
owner_id := NodeID{}
if thread.Owner() != nil {
owner_id = thread.Owner().ID()
}
if owner_id != thread.ID() {
err := LockLockables(ctx, []Lockable{thread}, thread, locked)
err := LockLockables(context, []Lockable{thread}, thread)
if err != nil {
return err
}
@ -680,8 +675,8 @@ var ThreadAbortedError = errors.New("Thread aborted by signal")
// Default thread abort is to return a ThreadAbortedError
func ThreadAbort(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error {
return thread.Signal(ctx, NewStatusSignal("aborted", thread.ID()), locked)
err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error {
return thread.Signal(context, NewStatusSignal("aborted", thread.ID()))
})
if err != nil {
return "", err
@ -691,8 +686,8 @@ func ThreadAbort(ctx * Context, thread Thread, signal GraphSignal) (string, erro
// Default thread cancel is to finish the thread
func ThreadCancel(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
err := UseStates(ctx, thread, NewLockRequest(thread, []string{"signal"}), func(locked NodeLockMap) error {
return thread.Signal(ctx, NewSignal("cancelled"), locked)
err := UseStates(ctx, thread, NewLockInfo(thread, []string{"signal"}), func(context *ReadContext) error {
return thread.Signal(context, NewSignal("cancelled"))
})
return "", err
}