Moved gql thread actions/handlers to module functions

graph-rework-2
noah metz 2023-07-23 23:01:45 -06:00
parent 32ac1f618d
commit fc6c198ae3
4 changed files with 77 additions and 74 deletions

@ -807,9 +807,9 @@ func NewGQLThread(id NodeID, name string, state_name string, listen string, ecdh
var gql_actions ThreadActions = ThreadActions{ var gql_actions ThreadActions = ThreadActions{
"wait": ThreadWait, "wait": ThreadWait,
"restore": func(ctx * Context, thread Thread) (string, error) { "restore": func(ctx * Context, thread Thread) (string, error) {
// Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
err := ThreadRestore(ctx, thread) // Restore all the threads that have "Start" as true and arent in the "finished" state
err := ThreadRestore(ctx, thread, false)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -821,22 +821,11 @@ var gql_actions ThreadActions = ThreadActions{
if err != nil { if err != nil {
return "", err return "", err
} }
// Start all the threads that have "Start" as true and arent in the "finished" state
// Start all the threads that have "Start" as true err = ThreadRestore(ctx, thread, true)
context := NewWriteContext(ctx) if err != nil {
err = UpdateStates(context, thread, NewLockInfo(thread, []string{"children"}), func(context *StateContext) error { return "", err
return UpdateStates(context, thread, LockList(thread.Children(), []string{"start"}), func(context *StateContext) error {
for _, child := range(thread.Children()) {
info := thread.ChildInfo(child.ID()).(ParentInfo).Parent()
if info.Start == true {
ctx.Log.Logf("thread", "THREAD_START_CHILD: %s -> %s", thread.ID(), child.ID())
ChildGo(ctx, thread, child, info.StartAction)
}
} }
return nil
})
})
return "start_server", nil return "start_server", nil
}, },
"start_server": func(ctx * Context, thread Thread) (string, error) { "start_server": func(ctx * Context, thread Thread) (string, error) {
@ -922,52 +911,8 @@ var gql_actions ThreadActions = ThreadActions{
} }
var gql_handlers ThreadHandlers = ThreadHandlers{ var gql_handlers ThreadHandlers = ThreadHandlers{
"child_linked": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { "child_linked": ThreadParentChildLinked,
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal) "start_child": ThreadParentStartChild,
context := NewWriteContext(ctx)
err := UpdateStates(context, thread, NewLockMap(
NewLockInfo(thread, []string{"children"}),
), func(context *StateContext) error {
sig, ok := signal.(IDSignal)
if ok == false {
ctx.Log.Logf("gql", "GQL_THREAD_NODE_LINKED_BAD_CAST")
return nil
}
should_run, exists := thread.ChildInfo(sig.ID).(*ParentThreadInfo)
if exists == false {
ctx.Log.Logf("gql", "GQL_THREAD_NODE_LINKED: %s is not a child of %s", sig.ID)
return nil
}
if should_run.Start == true {
ChildGo(ctx, thread, thread.Child(sig.ID), should_run.StartAction)
}
return nil
})
if err != nil {
} else {
}
return "wait", nil
},
"start_child": func(ctx *Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_START_CHILD")
sig, ok := signal.(StartChildSignal)
if ok == false {
ctx.Log.Logf("gql", "GQL_START_CHILD_BAD_SIGNAL: %+v", signal)
return "wait", nil
}
err := ThreadStartChild(ctx, thread, sig)
if err != nil {
ctx.Log.Logf("gql", "GQL_START_CHILD_ERR: %s", err)
} else {
ctx.Log.Logf("gql", "GQL_START_CHILD: %s", sig.ID.String())
}
return "wait", nil
},
"abort": ThreadAbort, "abort": ThreadAbort,
"stop": ThreadStop, "stop": ThreadStop,
} }

@ -5,6 +5,7 @@ import (
"time" "time"
"net" "net"
"net/http" "net/http"
"errors"
"io" "io"
"fmt" "fmt"
"encoding/json" "encoding/json"
@ -94,13 +95,15 @@ func TestGQLDBLoad(t * testing.T) {
err = gql.Signal(context, gql, NewStatusSignal("child_linked", t1.ID())) err = gql.Signal(context, gql, NewStatusSignal("child_linked", t1.ID()))
fatalErr(t, err) fatalErr(t, err)
context = NewReadContext(ctx) context = NewReadContext(ctx)
err = gql.Signal(context, gql, StopSignal) err = gql.Signal(context, gql, AbortSignal)
fatalErr(t, err) fatalErr(t, err)
err = ThreadLoop(ctx, gql, "start") err = ThreadLoop(ctx, gql, "start")
if errors.Is(err, ThreadAbortedError) == false {
fatalErr(t, err) fatalErr(t, err)
}
(*GraphTester)(t).WaitForStatus(ctx, update_channel, "stopped", 100*time.Millisecond, "Didn't receive stopped on update_channel") (*GraphTester)(t).WaitForStatus(ctx, update_channel, "aborted", 100*time.Millisecond, "Didn't receive aborted on update_channel")
context = NewReadContext(ctx) context = NewReadContext(ctx)
err = UseStates(context, gql, LockList([]Node{gql, u1}, nil), func(context *StateContext) error { err = UseStates(context, gql, LockList([]Node{gql, u1}, nil), func(context *StateContext) error {

@ -291,7 +291,6 @@ func UnlinkLockables(context *StateContext, princ Node, lockable Lockable, requi
// Link requirements as requirements to lockable // Link requirements as requirements to lockable
// Continues the wrtie context with princ, getting requirements for lockable and dependencies for requirements // Continues the wrtie context with princ, getting requirements for lockable and dependencies for requirements
// Assumes that an active write context exists with princ locked so that princ's state can be used in checks
func LinkLockables(context *StateContext, princ Node, lockable Lockable, requirements []Lockable) error { func LinkLockables(context *StateContext, princ Node, lockable Lockable, requirements []Lockable) error {
if lockable == nil { if lockable == nil {
return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link Lockables to nil as requirements") return fmt.Errorf("LOCKABLE_LINK_ERR: Will not link Lockables to nil as requirements")
@ -408,7 +407,7 @@ func LockLockables(context *StateContext, to_lock []Lockable, new_owner Lockable
return UpdateStates(context, new_owner, NewLockMap( return UpdateStates(context, new_owner, NewLockMap(
LockList(to_lock, []string{"lock"}), LockList(to_lock, []string{"lock"}),
NewLockInfo(new_owner, []string{}), NewLockInfo(new_owner, nil),
), func(context *StateContext) error { ), func(context *StateContext) error {
// First loop is to check that the states can be locked, and locks all requirements // First loop is to check that the states can be locked, and locks all requirements
for _, req := range(to_lock) { for _, req := range(to_lock) {

@ -568,6 +568,59 @@ func (thread * SimpleThread) AllowedToTakeLock(new_owner Lockable, lockable Lock
return false return false
} }
func ThreadParentChildLinked(ctx *Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("thread", "THREAD_CHILD_LINKED: %+v", signal)
context := NewWriteContext(ctx)
err := UpdateStates(context, thread, NewLockMap(
NewLockInfo(thread, []string{"children"}),
), func(context *StateContext) error {
sig, ok := signal.(IDSignal)
if ok == false {
ctx.Log.Logf("thread", "THREAD_NODE_LINKED_BAD_CAST")
return nil
}
info_if := thread.ChildInfo(sig.ID)
if info_if == nil {
ctx.Log.Logf("thread", "THREAD_NODE_LINKED: %s is not a child of %s", sig.ID)
return nil
}
info_r, correct := info_if.(ParentInfo)
if correct == false {
ctx.Log.Logf("thread", "THREAD_NODE_LINKED_BAD_INFO_CAST")
}
info := info_r.Parent()
if info.Start == true {
ChildGo(ctx, thread, thread.Child(sig.ID), info.StartAction)
}
return nil
})
if err != nil {
} else {
}
return "wait", nil
}
func ThreadParentStartChild(ctx *Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("thread", "THREAD_START_CHILD")
sig, ok := signal.(StartChildSignal)
if ok == false {
ctx.Log.Logf("thread", "THREAD_START_CHILD_BAD_SIGNAL: %+v", signal)
return "wait", nil
}
err := ThreadStartChild(ctx, thread, sig)
if err != nil {
ctx.Log.Logf("thread", "THREAD_START_CHILD_ERR: %s", err)
} else {
ctx.Log.Logf("thread", "THREAD_START_CHILD: %s", sig.ID.String())
}
return "wait", nil
}
// Helper function to start a child from a thread during a signal handler // Helper function to start a child from a thread during a signal handler
// Starts a write context, so cannot be called from either a write or read context // Starts a write context, so cannot be called from either a write or read context
func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) error { func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) error {
@ -590,16 +643,19 @@ func ThreadStartChild(ctx *Context, thread Thread, signal StartChildSignal) erro
// Helper function to restore threads that should be running from a parents restore action // Helper function to restore threads that should be running from a parents restore action
// Starts a write context, so cannot be called from either a write or read context // Starts a write context, so cannot be called from either a write or read context
func ThreadRestore(ctx * Context, thread Thread) error { func ThreadRestore(ctx * Context, thread Thread, start bool) error {
context := NewWriteContext(ctx) context := NewWriteContext(ctx)
return UpdateStates(context, thread, NewLockInfo(thread, []string{"children"}), func(context *StateContext) error { return UpdateStates(context, thread, NewLockInfo(thread, []string{"children"}), func(context *StateContext) error {
return UpdateStates(context, thread, LockList(thread.Children(), []string{"start"}), func(context *StateContext) error { return UpdateStates(context, thread, LockList(thread.Children(), []string{"start"}), func(context *StateContext) error {
for _, child := range(thread.Children()) { for _, child := range(thread.Children()) {
should_run := (thread.ChildInfo(child.ID())).(ParentInfo).Parent() info := (thread.ChildInfo(child.ID())).(ParentInfo).Parent()
ctx.Log.Logf("thread", "THREAD_RESTORE: %s -> %s: %+v", thread.ID(), child.ID(), should_run) if info.Start == true && child.State() != "finished" {
if should_run.Start == true && child.State() != "finished" {
ctx.Log.Logf("thread", "THREAD_RESTORED: %s -> %s", thread.ID(), child.ID()) ctx.Log.Logf("thread", "THREAD_RESTORED: %s -> %s", thread.ID(), child.ID())
ChildGo(ctx, thread, child, should_run.RestoreAction) if start == true {
ChildGo(ctx, thread, child, info.StartAction)
} else {
ChildGo(ctx, thread, child, info.RestoreAction)
}
} }
} }
return nil return nil