Moved GQL restore to ThreadRestore to be reused by other classes. And GQLThreadInfo to ParentThreadInfo

graph-rework-2
noah metz 2023-07-11 16:54:09 -06:00
parent 6c38089aee
commit f64455f8ed
3 changed files with 38 additions and 40 deletions

@ -345,7 +345,7 @@ func (thread * GQLThread) Serialize() ([]byte, error) {
} }
func (thread * GQLThread) DeserializeInfo(ctx *Context, data []byte) (ThreadInfo, error) { func (thread * GQLThread) DeserializeInfo(ctx *Context, data []byte) (ThreadInfo, error) {
var info GQLThreadInfo var info ParentThreadInfo
err := json.Unmarshal(data, &info) err := json.Unmarshal(data, &info)
if err != nil { if err != nil {
return nil, err return nil, err
@ -367,20 +367,6 @@ func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON {
} }
} }
type GQLThreadInfo struct {
Start bool `json:"start"`
StartAction string `json:"start_action"`
RestoreAction string `json:"restore_action"`
}
func NewGQLThreadInfo(start bool, start_action string, restore_action string) GQLThreadInfo {
return GQLThreadInfo{
Start: start,
StartAction: start_action,
RestoreAction: restore_action,
}
}
func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) { func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j GQLThreadJSON var j GQLThreadJSON
err := json.Unmarshal(data, &j) err := json.Unmarshal(data, &j)
@ -401,7 +387,7 @@ func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, e
func NewGQLThread(id NodeID, name string, state_name string, listen string) GQLThread { func NewGQLThread(id NodeID, name string, state_name string, listen string) GQLThread {
return GQLThread{ return GQLThread{
SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*GQLThreadInfo)(nil)), gql_actions, gql_handlers), SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*ParentThreadInfo)(nil)), gql_actions, gql_handlers),
Listen: listen, Listen: listen,
http_done: &sync.WaitGroup{}, http_done: &sync.WaitGroup{},
} }
@ -412,19 +398,7 @@ var gql_actions ThreadActions = ThreadActions{
"restore": func(ctx * Context, thread Thread) (string, error) { "restore": func(ctx * Context, thread Thread) (string, error) {
// Start all the threads that should be "started" // Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
ThreadRestore(ctx, thread)
UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
return UpdateMoreStates(ctx, NodeList(thread.Children()), nodes, func(nodes NodeMap) error {
for _, child := range(thread.Children()) {
should_run := (thread.ChildInfo(child.ID())).(*GQLThreadInfo)
if should_run.Start == true && child.State() != "finished" {
ChildGo(ctx, thread, child, should_run.RestoreAction)
}
}
return nil
})
})
return "start_server", nil return "start_server", nil
}, },
"start": func(ctx * Context, thread Thread) (string, error) { "start": func(ctx * Context, thread Thread) (string, error) {
@ -480,7 +454,7 @@ var gql_handlers ThreadHandlers = ThreadHandlers{
"child_added": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { "child_added": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal) ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal)
UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) { UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
should_run, exists := thread.ChildInfo(signal.Source()).(*GQLThreadInfo) should_run, exists := thread.ChildInfo(signal.Source()).(*ParentThreadInfo)
if exists == false { if exists == false {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child") ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child")
return nil return nil

@ -18,13 +18,13 @@ func TestGQLThread(t * testing.T) {
t2 := &t2_r t2 := &t2_r
err := UpdateStates(ctx, []Node{gql_t, t1, t2}, func(nodes NodeMap) error { err := UpdateStates(ctx, []Node{gql_t, t1, t2}, func(nodes NodeMap) error {
i1 := NewGQLThreadInfo(true, "start", "restore") i1 := NewParentThreadInfo(true, "start", "restore")
err := LinkThreads(ctx, gql_t, t1, &i1, nodes) err := LinkThreads(ctx, gql_t, t1, &i1, nodes)
if err != nil { if err != nil {
return err return err
} }
i2 := NewGQLThreadInfo(false, "start", "restore") i2 := NewParentThreadInfo(false, "start", "restore")
return LinkThreads(ctx, gql_t, t2, &i2, nodes) return LinkThreads(ctx, gql_t, t2, &i2, nodes)
}) })
fatalErr(t, err) fatalErr(t, err)
@ -53,7 +53,7 @@ func TestGQLDBLoad(t * testing.T) {
gql_r := NewGQLThread(RandID(), "GQL Thread", "init", ":8080") gql_r := NewGQLThread(RandID(), "GQL Thread", "init", ":8080")
gql := &gql_r gql := &gql_r
info := NewGQLThreadInfo(true, "start", "restore") info := NewParentThreadInfo(true, "start", "restore")
err := UpdateStates(ctx, []Node{gql, t1, l1}, func(nodes NodeMap) error { err := UpdateStates(ctx, []Node{gql, t1, l1}, func(nodes NodeMap) error {
err := LinkLockables(ctx, gql, []Lockable{l1}, nodes) err := LinkLockables(ctx, gql, []Lockable{l1}, nodes)
if err != nil { if err != nil {

@ -246,6 +246,22 @@ type Thread interface {
ChildWaits() *sync.WaitGroup ChildWaits() *sync.WaitGroup
} }
// Data required by a parent thread to restore it's children
type ParentThreadInfo struct {
Start bool `json:"start"`
StartAction string `json:"start_action"`
RestoreAction string `json:"restore_action"`
}
func NewParentThreadInfo(start bool, start_action string, restore_action string) ParentThreadInfo {
return ParentThreadInfo{
Start: start,
StartAction: start_action,
RestoreAction: restore_action,
}
}
type SimpleThread struct { type SimpleThread struct {
SimpleLockable SimpleLockable
@ -530,8 +546,22 @@ func (thread * SimpleThread) AllowedToTakeLock(new_owner Lockable, lockable Lock
return false return false
} }
var ThreadRestore = func(ctx * Context, thread Thread) {
UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
return UpdateMoreStates(ctx, NodeList(thread.Children()), nodes, func(nodes NodeMap) error {
for _, child := range(thread.Children()) {
should_run := (thread.ChildInfo(child.ID())).(*ParentThreadInfo)
if should_run.Start == true && child.State() != "finished" {
ChildGo(ctx, thread, child, should_run.RestoreAction)
}
}
return nil
})
})
}
var ThreadStart = func(ctx * Context, thread Thread) error { var ThreadStart = func(ctx * Context, thread Thread) error {
err := UpdateStates(ctx, []Node{thread}, func(nodes NodeMap) error { return UpdateStates(ctx, []Node{thread}, func(nodes NodeMap) error {
owner_id := NodeID("") owner_id := NodeID("")
if thread.Owner() != nil { if thread.Owner() != nil {
owner_id = thread.Owner().ID() owner_id = thread.Owner().ID()
@ -544,12 +574,6 @@ var ThreadStart = func(ctx * Context, thread Thread) error {
} }
return thread.SetState("started") return thread.SetState("started")
}) })
if err != nil {
return err
}
return nil
} }
var ThreadDefaultStart = func(ctx * Context, thread Thread) (string, error) { var ThreadDefaultStart = func(ctx * Context, thread Thread) (string, error) {