diff --git a/gql.go b/gql.go index ec75039..fb63df4 100644 --- a/gql.go +++ b/gql.go @@ -335,7 +335,7 @@ type GQLThread struct { } type GQLThreadInfo struct { - ThreadInfo `json:ignore` + ThreadInfo `json:"-"` Start bool `json:"start"` Started bool `json:"started"` FirstAction string `json:"first_action"` @@ -390,6 +390,16 @@ func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) ( return state, nil } +func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadInfo, error) { + info := GQLThreadInfo{ + Start: raw["start"].(bool), + Started: raw["started"].(bool), + FirstAction: raw["first_action"].(string), + RestoreAction: raw["restore_action"].(string), + } + return &info, nil +} + func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) { thread := RestoreBaseThread(ctx, id, gql_actions, gql_handlers) gql_thread := GQLThread{ @@ -415,10 +425,44 @@ var gql_actions ThreadActions = ThreadActions{ "restore": func(ctx * GraphContext, thread Thread) (string, error) { // Start all the threads that should be "started" ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) + server, ok := thread.(*GQLThread) + if ok == false { + panic("thread is not *GQLThread") + } + + // Serve the GQL http and ws handlers + mux := http.NewServeMux() + mux.HandleFunc("/gql", GQLHandler(ctx, server)) + mux.HandleFunc("/gqlws", GQLWSHandler(ctx, server)) + + // Server a graphiql interface(TODO make configurable whether to start this) + mux.HandleFunc("/graphiql", GraphiQLHandler()) + + // Server the ./site directory to /site (TODO make configurable with better defaults) + fs := http.FileServer(http.Dir("./site")) + mux.Handle("/site/", http.StripPrefix("/site", fs)) + + UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){ + server_state := states[server.ID()].(*GQLThreadState) + server.http_server = &http.Server{ + Addr: server_state.Listen, + Handler: mux, + } + return nil + }) + + server.http_done.Add(1) + go func(server *GQLThread) { + defer server.http_done.Done() + err := server.http_server.ListenAndServe() + if err != http.ErrServerClosed { + panic(fmt.Sprintf("Failed to start gql server: %s", err)) + } + }(server) + UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { server_state := thread.State().(*GQLThreadState) for _, child := range(server_state.Children()) { - fmt.Printf("\n%+v\n\n", server_state.child_info[child.ID()]) should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo) if should_run.Started == true { ChildGo(ctx, server_state, thread, child.ID(), should_run.RestoreAction) diff --git a/gql_test.go b/gql_test.go index 213e137..a01606a 100644 --- a/gql_test.go +++ b/gql_test.go @@ -8,7 +8,7 @@ import ( ) func TestGQLThread(t * testing.T) { - ctx := logTestContext(t, []string{"thread"}) + ctx := logTestContext(t, []string{}) gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}) fatalErr(t, err) @@ -36,29 +36,56 @@ func TestGQLThread(t * testing.T) { } func TestGQLDBLoad(t * testing.T) { - ctx := logTestContext(t, []string{}) + ctx := logTestContext(t, []string{"thread"}) l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{}) fatalErr(t, err) - t1, err := NewGQLThread(ctx, ":8080", []Lockable{l1}) + t1, err := NewSimpleThread(ctx, "Test Thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers) fatalErr(t, err) + update_channel := t1.UpdateChannel(10) - SendUpdate(ctx, t1, CancelSignal(nil)) - err = RunThread(ctx, t1, "start") + gql, err := NewGQLThread(ctx, ":8080", []Lockable{l1}) fatalErr(t, err) - err = UseStates(ctx, []GraphNode{t1}, func(states NodeStateMap) error { - ser, err := json.MarshalIndent(states[t1.ID()], "", " ") - fmt.Printf("\n%s\n\n", ser) + info := NewGQLThreadInfo(true, "start", "restore") + err = LinkThreads(ctx, gql, t1, &info) + fatalErr(t, err) + + SendUpdate(ctx, gql, CancelSignal(nil)) + err = RunThread(ctx, gql, "start") + fatalErr(t, err) + + (*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_done", t1, 100*time.Millisecond, "Dicn't received update_done on t1 from t1") + + err = UseStates(ctx, []GraphNode{gql, t1}, func(states NodeStateMap) error { + ser1, err := json.MarshalIndent(states[gql.ID()], "", " ") + ser2, err := json.MarshalIndent(states[t1.ID()], "", " ") + fmt.Printf("\n%s\n\n", ser1) + fmt.Printf("\n%s\n\n", ser2) return err }) - t1_loaded, err := LoadNode(ctx, t1.ID()) + gql_loaded, err := LoadNode(ctx, gql.ID()) fatalErr(t, err) + var t1_loaded *BaseThread = nil - err = UseStates(ctx, []GraphNode{t1_loaded}, func(states NodeStateMap) error { - ser, err := json.MarshalIndent(states[t1_loaded.ID()], "", " ") + err = UseStates(ctx, []GraphNode{gql_loaded}, func(states NodeStateMap) error { + ser, err := json.MarshalIndent(states[gql_loaded.ID()], "", " ") fmt.Printf("\n%s\n\n", ser) + child := states[gql_loaded.ID()].(ThreadState).Children()[0] + t1_loaded = child.(*BaseThread) + update_channel = t1_loaded.UpdateChannel(10) + err = UseMoreStates(ctx, []GraphNode{child}, states, func(states NodeStateMap) error { + ser, err := json.MarshalIndent(states[child.ID()], "", " ") + fmt.Printf("\n%s\n\n", ser) + return err + }) return err }) + + SendUpdate(ctx, gql_loaded, CancelSignal(nil)) + err = RunThread(ctx, gql_loaded.(Thread), "restore") + fatalErr(t, err) + (*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_done", t1_loaded, 100*time.Millisecond, "Dicn't received update_done on t1_loaded from t1_loaded") + } diff --git a/graph.go b/graph.go index 3389133..25a7f66 100644 --- a/graph.go +++ b/graph.go @@ -24,11 +24,14 @@ type StateLoadFunc func(*GraphContext, []byte, NodeMap)(NodeState, error) type StateLoadMap map[string]StateLoadFunc type NodeLoadFunc func(*GraphContext, NodeID)(GraphNode, error) type NodeLoadMap map[string]NodeLoadFunc +type InfoLoadFunc func(*GraphContext, map[string]interface{})(ThreadInfo, error) +type InfoLoadMap map[string]InfoLoadFunc type GraphContext struct { DB * badger.DB Log Logger NodeLoadFuncs NodeLoadMap StateLoadFuncs StateLoadMap + InfoLoadFuncs InfoLoadMap GQL * GQLContext } @@ -195,7 +198,7 @@ func LoadNodeRecurse(ctx * GraphContext, id NodeID, loaded_nodes map[NodeID]Grap return node, nil } -func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_loads NodeLoadMap, types TypeList, type_map ObjTypeMap, queries FieldMap, subscriptions FieldMap, mutations FieldMap) * GraphContext { +func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_loads NodeLoadMap, info_loads InfoLoadMap, types TypeList, type_map ObjTypeMap, queries FieldMap, subscriptions FieldMap, mutations FieldMap) * GraphContext { gql, err := NewGQLContext(types, type_map, queries, subscriptions, mutations) if err != nil { panic(err) @@ -215,6 +218,9 @@ func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_ "simple_thread": LoadSimpleThreadState, "gql_thread": LoadGQLThreadState, }, + InfoLoadFuncs: InfoLoadMap{ + "gql_thread": LoadGQLThreadInfo, + }, } for name, fn := range(state_loads) { @@ -225,6 +231,10 @@ func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_ ctx.NodeLoadFuncs[name] = fn } + for name, fn := range(info_loads) { + ctx.InfoLoadFuncs[name] = fn + } + return &ctx } @@ -446,6 +456,28 @@ func RestoreNode(ctx * GraphContext, id NodeID) BaseNode { return node } +func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error { + ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state) + + var serialized_state []byte = nil + if state != nil { + ser, err := json.Marshal(state) + if err != nil { + return fmt.Errorf("DB_MARSHAL_ERROR: %e", err) + } + serialized_state = ser + } else { + serialized_state = []byte{} + } + + err := ctx.DB.Update(func(txn *badger.Txn) error { + err := txn.Set([]byte(id), serialized_state) + return err + }) + + return err +} + // Create a new base node with a new ID func NewNode(ctx * GraphContext, state NodeState) (BaseNode, error) { node := BaseNode{ @@ -540,28 +572,6 @@ func WriteDBStates(ctx * GraphContext, nodes NodeMap) error{ return err } -func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error { - ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state) - - var serialized_state []byte = nil - if state != nil { - ser, err := json.Marshal(state) - if err != nil { - return fmt.Errorf("DB_MARSHAL_ERROR: %e", err) - } - serialized_state = ser - } else { - serialized_state = []byte{} - } - - err := ctx.DB.Update(func(txn *badger.Txn) error { - err := txn.Set([]byte(id), serialized_state) - return err - }) - - return err -} - func (node * BaseNode) SetState(new_state NodeState) { node.state = new_state } diff --git a/graph_test.go b/graph_test.go index a723a12..4be2b3c 100644 --- a/graph_test.go +++ b/graph_test.go @@ -58,7 +58,7 @@ func logTestContext(t * testing.T, components []string) * GraphContext { t.Fatal(err) } - return NewGraphContext(db, NewConsoleLogger(components), StateLoadMap{}, NodeLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) + return NewGraphContext(db, NewConsoleLogger(components), StateLoadMap{}, NodeLoadMap{}, InfoLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) } func testContext(t * testing.T) * GraphContext { @@ -67,7 +67,7 @@ func testContext(t * testing.T) * GraphContext { t.Fatal(err) } - return NewGraphContext(db, NewConsoleLogger([]string{}), StateLoadMap{}, NodeLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) + return NewGraphContext(db, NewConsoleLogger([]string{}), StateLoadMap{}, NodeLoadMap{}, InfoLoadMap{}, TypeList{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) } func fatalErr(t * testing.T, err error) { diff --git a/lockable.go b/lockable.go index 7e2f219..c1c8239 100644 --- a/lockable.go +++ b/lockable.go @@ -109,10 +109,10 @@ func (state * BaseLockableState) ReturnLock(lockable_id NodeID) Lockable { // Nothing can take a lock from a base lockable either func (state * BaseLockableState) AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool { - _, exists := state.locks_held[lockable_id] - if exists == false { - panic ("Trying to give away lock we don't own") - } +// _, exists := state.locks_held[lockable_id] +// if exists == false { +// panic (fmt.Sprintf("%s tried to give away lock to %s but doesn't own it: %+v", node_id, lockable_id, state)) +// } return false } diff --git a/thread.go b/thread.go index 7af9814..4e1554f 100644 --- a/thread.go +++ b/thread.go @@ -96,13 +96,15 @@ func SaveBaseThreadState(state * BaseThreadState) BaseThreadStateJSON { lockable_state := SaveBaseLockableState(&state.BaseLockableState) - return BaseThreadStateJSON{ + ret := BaseThreadStateJSON{ Parent: parent_id, Children: children, Timeout: state.timeout, TimeoutAction: state.timeout_action, BaseLockableStateJSON: lockable_state, } + + return ret } func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, handlers ThreadHandlers) BaseThread { @@ -147,11 +149,13 @@ func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_no if ok == false { return nil, err } - state.owner = p_t + state.parent = p_t } + // TODO: Call different loading functions(to return different ThreadInfo types, based on the j.Type, + // Will probably have to add another set of callbacks to the context for this, and since there's now 3 sets that need to be matching it could be useful to move them to a struct so it's easier to keep in sync i := 0 - for id, info := range(j.Children) { + for id, info_raw := range(j.Children) { child_node, err := LoadNodeRecurse(ctx, id, loaded_nodes) if err != nil { return nil, err @@ -161,7 +165,23 @@ func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_no return nil, fmt.Errorf("%+v is not a Thread as expected", child_node) } state.children[i] = child_t - state.child_info[id] = info + + info_map, ok := info_raw.(map[string]interface{}) + if ok == false { + return nil, fmt.Errorf("Parsed map wrong type: %+v", info_raw) + } + info_fn, exists := ctx.InfoLoadFuncs[j.Type] + var parsed_info ThreadInfo + if exists == false { + parsed_info = nil + } else { + parsed_info, err = info_fn(ctx, info_map) + if err != nil { + return nil, err + } + } + + state.child_info[id] = parsed_info i++ } @@ -372,13 +392,13 @@ func ChildGo(ctx * GraphContext, thread_state ThreadState, thread Thread, child_ } thread.ChildWaits().Add(1) go func(child Thread) { - ctx.Log.Logf("gql", "THREAD_START_CHILD: %s", child.ID()) + ctx.Log.Logf("thread", "THREAD_START_CHILD: %s from %s", thread.ID(), child.ID()) defer thread.ChildWaits().Done() err := RunThread(ctx, child, first_action) if err != nil { - ctx.Log.Logf("gql", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err) + ctx.Log.Logf("thread", "THREAD_CHILD_RUN_ERR: %s %e", child.ID(), err) } else { - ctx.Log.Logf("gql", "THREAD_CHILD_RUN_DONE: %s", child.ID()) + ctx.Log.Logf("thread", "THREAD_CHILD_RUN_DONE: %s", child.ID()) } }(child) }