Fixed typo in thread loading that resulted in the parent being loaded as an owner instead

graph-rework-2
noah metz 2023-07-03 13:14:48 -06:00
parent a05d847863
commit 34f8a9f009
6 changed files with 150 additions and 49 deletions

@ -335,7 +335,7 @@ type GQLThread struct {
} }
type GQLThreadInfo struct { type GQLThreadInfo struct {
ThreadInfo `json:ignore` ThreadInfo `json:"-"`
Start bool `json:"start"` Start bool `json:"start"`
Started bool `json:"started"` Started bool `json:"started"`
FirstAction string `json:"first_action"` FirstAction string `json:"first_action"`
@ -390,6 +390,16 @@ func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) (
return state, nil return state, nil
} }
func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadInfo, error) {
info := GQLThreadInfo{
Start: raw["start"].(bool),
Started: raw["started"].(bool),
FirstAction: raw["first_action"].(string),
RestoreAction: raw["restore_action"].(string),
}
return &info, nil
}
func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) { func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
thread := RestoreBaseThread(ctx, id, gql_actions, gql_handlers) thread := RestoreBaseThread(ctx, id, gql_actions, gql_handlers)
gql_thread := GQLThread{ gql_thread := GQLThread{
@ -415,10 +425,44 @@ var gql_actions ThreadActions = ThreadActions{
"restore": func(ctx * GraphContext, thread Thread) (string, error) { "restore": func(ctx * GraphContext, thread Thread) (string, error) {
// Start all the threads that should be "started" // Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
server, ok := thread.(*GQLThread)
if ok == false {
panic("thread is not *GQLThread")
}
// Serve the GQL http and ws handlers
mux := http.NewServeMux()
mux.HandleFunc("/gql", GQLHandler(ctx, server))
mux.HandleFunc("/gqlws", GQLWSHandler(ctx, server))
// Server a graphiql interface(TODO make configurable whether to start this)
mux.HandleFunc("/graphiql", GraphiQLHandler())
// Server the ./site directory to /site (TODO make configurable with better defaults)
fs := http.FileServer(http.Dir("./site"))
mux.Handle("/site/", http.StripPrefix("/site", fs))
UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){
server_state := states[server.ID()].(*GQLThreadState)
server.http_server = &http.Server{
Addr: server_state.Listen,
Handler: mux,
}
return nil
})
server.http_done.Add(1)
go func(server *GQLThread) {
defer server.http_done.Done()
err := server.http_server.ListenAndServe()
if err != http.ErrServerClosed {
panic(fmt.Sprintf("Failed to start gql server: %s", err))
}
}(server)
UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) {
server_state := thread.State().(*GQLThreadState) server_state := thread.State().(*GQLThreadState)
for _, child := range(server_state.Children()) { for _, child := range(server_state.Children()) {
fmt.Printf("\n%+v\n\n", server_state.child_info[child.ID()])
should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo) should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo)
if should_run.Started == true { if should_run.Started == true {
ChildGo(ctx, server_state, thread, child.ID(), should_run.RestoreAction) ChildGo(ctx, server_state, thread, child.ID(), should_run.RestoreAction)

@ -8,7 +8,7 @@ import (
) )
func TestGQLThread(t * testing.T) { func TestGQLThread(t * testing.T) {
ctx := logTestContext(t, []string{"thread"}) ctx := logTestContext(t, []string{})
gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}) gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{})
fatalErr(t, err) fatalErr(t, err)
@ -36,29 +36,56 @@ func TestGQLThread(t * testing.T) {
} }
func TestGQLDBLoad(t * testing.T) { func TestGQLDBLoad(t * testing.T) {
ctx := logTestContext(t, []string{}) ctx := logTestContext(t, []string{"thread"})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{}) l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err) fatalErr(t, err)
t1, err := NewGQLThread(ctx, ":8080", []Lockable{l1}) t1, err := NewSimpleThread(ctx, "Test Thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err) fatalErr(t, err)
update_channel := t1.UpdateChannel(10)
SendUpdate(ctx, t1, CancelSignal(nil)) gql, err := NewGQLThread(ctx, ":8080", []Lockable{l1})
err = RunThread(ctx, t1, "start")
fatalErr(t, err) fatalErr(t, err)
err = UseStates(ctx, []GraphNode{t1}, func(states NodeStateMap) error { info := NewGQLThreadInfo(true, "start", "restore")
ser, err := json.MarshalIndent(states[t1.ID()], "", " ") err = LinkThreads(ctx, gql, t1, &info)
fmt.Printf("\n%s\n\n", ser) fatalErr(t, err)
SendUpdate(ctx, gql, CancelSignal(nil))
err = RunThread(ctx, gql, "start")
fatalErr(t, err)
(*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_done", t1, 100*time.Millisecond, "Dicn't received update_done on t1 from t1")
err = UseStates(ctx, []GraphNode{gql, t1}, func(states NodeStateMap) error {
ser1, err := json.MarshalIndent(states[gql.ID()], "", " ")
ser2, err := json.MarshalIndent(states[t1.ID()], "", " ")
fmt.Printf("\n%s\n\n", ser1)
fmt.Printf("\n%s\n\n", ser2)
return err return err
}) })
t1_loaded, err := LoadNode(ctx, t1.ID()) gql_loaded, err := LoadNode(ctx, gql.ID())
fatalErr(t, err) fatalErr(t, err)
var t1_loaded *BaseThread = nil
err = UseStates(ctx, []GraphNode{t1_loaded}, func(states NodeStateMap) error { err = UseStates(ctx, []GraphNode{gql_loaded}, func(states NodeStateMap) error {
ser, err := json.MarshalIndent(states[t1_loaded.ID()], "", " ") ser, err := json.MarshalIndent(states[gql_loaded.ID()], "", " ")
fmt.Printf("\n%s\n\n", ser) fmt.Printf("\n%s\n\n", ser)
child := states[gql_loaded.ID()].(ThreadState).Children()[0]
t1_loaded = child.(*BaseThread)
update_channel = t1_loaded.UpdateChannel(10)
err = UseMoreStates(ctx, []GraphNode{child}, states, func(states NodeStateMap) error {
ser, err := json.MarshalIndent(states[child.ID()], "", " ")
fmt.Printf("\n%s\n\n", ser)
return err
})
return err return err
}) })
SendUpdate(ctx, gql_loaded, CancelSignal(nil))
err = RunThread(ctx, gql_loaded.(Thread), "restore")
fatalErr(t, err)
(*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_done", t1_loaded, 100*time.Millisecond, "Dicn't received update_done on t1_loaded from t1_loaded")
} }

@ -24,11 +24,14 @@ type StateLoadFunc func(*GraphContext, []byte, NodeMap)(NodeState, error)
type StateLoadMap map[string]StateLoadFunc type StateLoadMap map[string]StateLoadFunc
type NodeLoadFunc func(*GraphContext, NodeID)(GraphNode, error) type NodeLoadFunc func(*GraphContext, NodeID)(GraphNode, error)
type NodeLoadMap map[string]NodeLoadFunc type NodeLoadMap map[string]NodeLoadFunc
type InfoLoadFunc func(*GraphContext, map[string]interface{})(ThreadInfo, error)
type InfoLoadMap map[string]InfoLoadFunc
type GraphContext struct { type GraphContext struct {
DB * badger.DB DB * badger.DB
Log Logger Log Logger
NodeLoadFuncs NodeLoadMap NodeLoadFuncs NodeLoadMap
StateLoadFuncs StateLoadMap StateLoadFuncs StateLoadMap
InfoLoadFuncs InfoLoadMap
GQL * GQLContext GQL * GQLContext
} }
@ -195,7 +198,7 @@ func LoadNodeRecurse(ctx * GraphContext, id NodeID, loaded_nodes map[NodeID]Grap
return node, nil return node, nil
} }
func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_loads NodeLoadMap, types TypeList, type_map ObjTypeMap, queries FieldMap, subscriptions FieldMap, mutations FieldMap) * GraphContext { func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_loads NodeLoadMap, info_loads InfoLoadMap, types TypeList, type_map ObjTypeMap, queries FieldMap, subscriptions FieldMap, mutations FieldMap) * GraphContext {
gql, err := NewGQLContext(types, type_map, queries, subscriptions, mutations) gql, err := NewGQLContext(types, type_map, queries, subscriptions, mutations)
if err != nil { if err != nil {
panic(err) panic(err)
@ -215,6 +218,9 @@ func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_
"simple_thread": LoadSimpleThreadState, "simple_thread": LoadSimpleThreadState,
"gql_thread": LoadGQLThreadState, "gql_thread": LoadGQLThreadState,
}, },
InfoLoadFuncs: InfoLoadMap{
"gql_thread": LoadGQLThreadInfo,
},
} }
for name, fn := range(state_loads) { for name, fn := range(state_loads) {
@ -225,6 +231,10 @@ func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_
ctx.NodeLoadFuncs[name] = fn ctx.NodeLoadFuncs[name] = fn
} }
for name, fn := range(info_loads) {
ctx.InfoLoadFuncs[name] = fn
}
return &ctx return &ctx
} }
@ -446,6 +456,28 @@ func RestoreNode(ctx * GraphContext, id NodeID) BaseNode {
return node return node
} }
func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error {
ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state)
var serialized_state []byte = nil
if state != nil {
ser, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("DB_MARSHAL_ERROR: %e", err)
}
serialized_state = ser
} else {
serialized_state = []byte{}
}
err := ctx.DB.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(id), serialized_state)
return err
})
return err
}
// Create a new base node with a new ID // Create a new base node with a new ID
func NewNode(ctx * GraphContext, state NodeState) (BaseNode, error) { func NewNode(ctx * GraphContext, state NodeState) (BaseNode, error) {
node := BaseNode{ node := BaseNode{
@ -540,28 +572,6 @@ func WriteDBStates(ctx * GraphContext, nodes NodeMap) error{
return err return err
} }
func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error {
ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state)
var serialized_state []byte = nil
if state != nil {
ser, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("DB_MARSHAL_ERROR: %e", err)
}
serialized_state = ser
} else {
serialized_state = []byte{}
}
err := ctx.DB.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(id), serialized_state)
return err
})
return err
}
func (node * BaseNode) SetState(new_state NodeState) { func (node * BaseNode) SetState(new_state NodeState) {
node.state = new_state node.state = new_state
} }

@ -58,7 +58,7 @@ func logTestContext(t * testing.T, components []string) * GraphContext {
t.Fatal(err) t.Fatal(err)
} }
return NewGraphContext(db, NewConsoleLogger(components), StateLoadMap{}, NodeLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) return NewGraphContext(db, NewConsoleLogger(components), StateLoadMap{}, NodeLoadMap{}, InfoLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{})
} }
func testContext(t * testing.T) * GraphContext { func testContext(t * testing.T) * GraphContext {
@ -67,7 +67,7 @@ func testContext(t * testing.T) * GraphContext {
t.Fatal(err) t.Fatal(err)
} }
return NewGraphContext(db, NewConsoleLogger([]string{}), StateLoadMap{}, NodeLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) return NewGraphContext(db, NewConsoleLogger([]string{}), StateLoadMap{}, NodeLoadMap{}, InfoLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{})
} }
func fatalErr(t * testing.T, err error) { func fatalErr(t * testing.T, err error) {

@ -109,10 +109,10 @@ func (state * BaseLockableState) ReturnLock(lockable_id NodeID) Lockable {
// Nothing can take a lock from a base lockable either // Nothing can take a lock from a base lockable either
func (state * BaseLockableState) AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool { func (state * BaseLockableState) AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool {
_, exists := state.locks_held[lockable_id] // _, exists := state.locks_held[lockable_id]
if exists == false { // if exists == false {
panic ("Trying to give away lock we don't own") // panic (fmt.Sprintf("%s tried to give away lock to %s but doesn't own it: %+v", node_id, lockable_id, state))
} // }
return false return false
} }

@ -96,13 +96,15 @@ func SaveBaseThreadState(state * BaseThreadState) BaseThreadStateJSON {
lockable_state := SaveBaseLockableState(&state.BaseLockableState) lockable_state := SaveBaseLockableState(&state.BaseLockableState)
return BaseThreadStateJSON{ ret := BaseThreadStateJSON{
Parent: parent_id, Parent: parent_id,
Children: children, Children: children,
Timeout: state.timeout, Timeout: state.timeout,
TimeoutAction: state.timeout_action, TimeoutAction: state.timeout_action,
BaseLockableStateJSON: lockable_state, BaseLockableStateJSON: lockable_state,
} }
return ret
} }
func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, handlers ThreadHandlers) BaseThread { func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, handlers ThreadHandlers) BaseThread {
@ -147,11 +149,13 @@ func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_no
if ok == false { if ok == false {
return nil, err return nil, err
} }
state.owner = p_t state.parent = p_t
} }
// TODO: Call different loading functions(to return different ThreadInfo types, based on the j.Type,
// Will probably have to add another set of callbacks to the context for this, and since there's now 3 sets that need to be matching it could be useful to move them to a struct so it's easier to keep in sync
i := 0 i := 0
for id, info := range(j.Children) { for id, info_raw := range(j.Children) {
child_node, err := LoadNodeRecurse(ctx, id, loaded_nodes) child_node, err := LoadNodeRecurse(ctx, id, loaded_nodes)
if err != nil { if err != nil {
return nil, err return nil, err
@ -161,7 +165,23 @@ func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_no
return nil, fmt.Errorf("%+v is not a Thread as expected", child_node) return nil, fmt.Errorf("%+v is not a Thread as expected", child_node)
} }
state.children[i] = child_t state.children[i] = child_t
state.child_info[id] = info
info_map, ok := info_raw.(map[string]interface{})
if ok == false {
return nil, fmt.Errorf("Parsed map wrong type: %+v", info_raw)
}
info_fn, exists := ctx.InfoLoadFuncs[j.Type]
var parsed_info ThreadInfo
if exists == false {
parsed_info = nil
} else {
parsed_info, err = info_fn(ctx, info_map)
if err != nil {
return nil, err
}
}
state.child_info[id] = parsed_info
i++ i++
} }
@ -372,13 +392,13 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_
} }
thread.ChildWaits().Add(1) thread.ChildWaits().Add(1)
go func(child Thread) { go func(child Thread) {
ctx.Log.Logf("gql", "THREAD_START_CHILD: %s", child.ID()) ctx.Log.Logf("thread", "THREAD_START_CHILD: %s from %s", thread.ID(), child.ID())
defer thread.ChildWaits().Done() defer thread.ChildWaits().Done()
err := RunThread(ctx, child, first_action) err := RunThread(ctx, child, first_action)
if err != nil { if err != nil {
ctx.Log.Logf("gql", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err) ctx.Log.Logf("thread", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err)
} else { } else {
ctx.Log.Logf("gql", "THREAD_CHILD_RUN_DONE: %s", child.ID()) ctx.Log.Logf("thread", "THREAD_CHILD_RUN_DONE: %s", child.ID())
} }
}(child) }(child)
} }