Added restore actions, and gql should run them

graph-rework-2
noah metz 2023-07-02 12:47:45 -06:00
parent 8d041fa896
commit 300e735065
4 changed files with 38 additions and 16 deletions

@ -338,12 +338,16 @@ type GQLThreadInfo struct {
ThreadInfo `json:ignore`
Start bool `json:"start"`
Started bool `json:"started"`
FirstAction string `json:"first_action"`
RestoreAction string `json:"restore_action"`
}
func NewGQLThreadInfo(start bool) GQLThreadInfo {
func NewGQLThreadInfo(start bool, first_action string, restore_action string) GQLThreadInfo {
info := GQLThreadInfo{
Start: start,
Started: false,
FirstAction: first_action,
RestoreAction: restore_action,
}
return info
}
@ -408,6 +412,21 @@ func NewGQLThreadState(listen string) GQLThreadState {
var gql_actions ThreadActions = ThreadActions{
"wait": ThreadWait,
"restore": func(ctx * GraphContext, thread Thread) (string, error) {
// Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) {
server_state := thread.State().(*GQLThreadState)
for _, child := range(server_state.Children()) {
should_run := server_state.child_info[child.ID()].(*GQLThreadInfo)
if should_run.Started == true {
ChildGo(ctx, server_state, thread, child.ID(), should_run.RestoreAction)
}
}
return nil
})
return "wait", nil
},
"start": func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("gql", "SERVER_STARTED")
server, ok := thread.(*GQLThread)
@ -460,7 +479,7 @@ var gql_handlers ThreadHandlers = ThreadHandlers{
return nil
}
if should_run.Start == true && should_run.Started == false {
ChildGo(ctx, server_state, thread, signal.Source())
ChildGo(ctx, server_state, thread, signal.Source(), should_run.FirstAction)
should_run.Started = true
}
return nil

@ -18,11 +18,11 @@ func TestGQLThread(t * testing.T) {
test_thread_2, err := NewSimpleThread(ctx, "Test thread 2", []Lockable{}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)
i1 := NewGQLThreadInfo(true)
i1 := NewGQLThreadInfo(true, "start", "restore")
err = LinkThreads(ctx, gql_thread, test_thread_1, &i1)
fatalErr(t, err)
i2 := NewGQLThreadInfo(false)
i2 := NewGQLThreadInfo(false, "start", "restore")
err = LinkThreads(ctx, gql_thread, test_thread_2, &i2)
fatalErr(t, err)
@ -31,7 +31,7 @@ func TestGQLThread(t * testing.T) {
SendUpdate(ctx, thread, CancelSignal(nil))
}(gql_thread)
err = RunThread(ctx, gql_thread)
err = RunThread(ctx, gql_thread, "start")
fatalErr(t, err)
}
@ -44,7 +44,7 @@ func TestGQLDBLoad(t * testing.T) {
fatalErr(t, err)
SendUpdate(ctx, t1, CancelSignal(nil))
err = RunThread(ctx, t1)
err = RunThread(ctx, t1, "start")
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{t1}, func(states NodeStateMap) error {

@ -365,7 +365,7 @@ func FindChild(ctx * GraphContext, thread Thread, thread_state ThreadState, id N
return nil
}
func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_id NodeID) {
func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_id NodeID, first_action string) {
child := thread_state.Child(child_id)
if child == nil {
panic(fmt.Errorf("Child not in thread, can't start %s", child_id))
@ -374,7 +374,7 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_
go func(child Thread) {
ctx.Log.Logf("gql", "THREAD_START_CHILD: %s", child.ID())
defer thread.ChildWaits().Done()
err := RunThread(ctx, child)
err := RunThread(ctx, child, first_action)
if err != nil {
ctx.Log.Logf("gql", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err)
} else {
@ -383,7 +383,7 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_
}(child)
}
func RunThread(ctx * GraphContext, thread Thread) error {
func RunThread(ctx * GraphContext, thread Thread, first_action string) error {
ctx.Log.Logf("thread", "THREAD_RUN: %s", thread.ID())
err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) (error) {
@ -392,7 +392,6 @@ func RunThread(ctx * GraphContext, thread Thread) error {
if thread_state.Owner() != nil {
owner_id = thread_state.Owner().ID()
}
// Don't lock the thread if it's already locked itself
if owner_id != thread.ID() {
return LockLockables(ctx, []Lockable{thread}, thread, nil, nodes)
}
@ -417,9 +416,7 @@ func RunThread(ctx * GraphContext, thread Thread) error {
return err
}
SendUpdate(ctx, thread, NewSignal(thread, "thread_start"))
next_action := "start"
next_action := first_action
for next_action != "" {
action, exists := thread.Action(next_action)
if exists == false {
@ -526,6 +523,11 @@ var ThreadDefaultStart = func(ctx * GraphContext, thread Thread) (string, error)
return "wait", nil
}
var ThreadDefaultRestore = func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("thread", "THREAD_DEFAULT_RESTORE: %s", thread.ID())
return "wait", nil
}
var ThreadWait = func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("thread", "THREAD_WAIT: %s TIMEOUT: %+v", thread.ID(), thread.Timeout())
for {
@ -601,6 +603,7 @@ func NewThreadHandlers() ThreadHandlers{
var BaseThreadActions = ThreadActions{
"wait": ThreadWait,
"start": ThreadDefaultStart,
"restore": ThreadDefaultRestore,
}
var BaseThreadHandlers = ThreadHandlers{

@ -18,7 +18,7 @@ func TestNewThread(t * testing.T) {
SendUpdate(ctx, t1, CancelSignal(nil))
}(t1)
err = RunThread(ctx, t1)
err = RunThread(ctx, t1, "start")
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{t1}, func(states NodeStateMap) (error) {
@ -45,7 +45,7 @@ func TestThreadWithRequirement(t * testing.T) {
}(t1)
fatalErr(t, err)
err = RunThread(ctx, t1)
err = RunThread(ctx, t1, "start")
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{l1}, func(states NodeStateMap) (error) {
@ -68,7 +68,7 @@ func TestThreadDBLoad(t * testing.T) {
SendUpdate(ctx, t1, CancelSignal(nil))
err = RunThread(ctx, t1)
err = RunThread(ctx, t1, "start")
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{t1}, func(states NodeStateMap) error {