diff --git a/gql.go b/gql.go index b66b1a9..7397037 100644 --- a/gql.go +++ b/gql.go @@ -112,7 +112,7 @@ func enableCORS(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Methods", "*") } -func GQLHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { +func GQLHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) { gql_ctx := context.Background() gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "gql_server", server) @@ -191,7 +191,7 @@ func getOperationTypeOfReq(p graphql.Params) string{ return "" } -func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result { +func GQLWSDo(ctx * Context, p graphql.Params) chan *graphql.Result { operation := getOperationTypeOfReq(p) ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) @@ -203,7 +203,7 @@ func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result { return sendOneResultAndClose(res) } -func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { +func GQLWSHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) { gql_ctx := context.Background() gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "gql_server", server) @@ -230,7 +230,6 @@ func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWrit defer conn.Close() conn_state := "init" for { - // TODO: Make this a select between reading client data and getting updates from the event to push to clients" msg_raw, op, err := wsutil.ReadClientData(conn) ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) msg := GQLWSMsg{} @@ -329,112 +328,79 @@ func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWrit } } -type TypeList []graphql.Type -type ObjTypeMap map[reflect.Type]*graphql.Object -type FieldMap map[string]*graphql.Field - type GQLThread struct { - BaseThread + SimpleThread http_server *http.Server http_done *sync.WaitGroup + Listen string } -type GQLThreadInfo struct { - ThreadInfo `json:"-"` - Start bool `json:"start"` - StartState string `json:"start_state"` - RestoreState string `json:"restore_state"` +func (thread * GQLThread) Type() NodeType { + return NodeType("gql_thread") } -func NewGQLThreadInfo(start bool, start_state string, restore_state string) GQLThreadInfo { - info := GQLThreadInfo{ - Start: start, - StartState: start_state, - RestoreState: restore_state, - } - return info +func (thread * GQLThread) Serialize() ([]byte, error) { + thread_json := NewGQLThreadJSON(thread) + return json.MarshalIndent(&thread_json, "", " ") } -type GQLThreadStateJSON struct { - BaseThreadStateJSON +type GQLThreadJSON struct { + SimpleThreadJSON Listen string `json:"listen"` } -type GQLThreadState struct { - BaseThreadState - Listen string +type GQLThreadInfo struct { + Start bool `json:"start"` + StartAction string `json:"start_action"` + RestoreAction string `json:"restore_action"` } -func (state * GQLThreadState) MarshalJSON() ([]byte, error) { - thread_state := SaveBaseThreadState(&state.BaseThreadState) - return json.Marshal(&GQLThreadStateJSON{ - BaseThreadStateJSON: thread_state, - Listen: state.Listen, - }) +func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON { + thread_json := NewSimpleThreadJSON(&thread.SimpleThread) + + return GQLThreadJSON{ + SimpleThreadJSON: thread_json, + Listen: thread.Listen, + } } -func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) (NodeState, error){ - var j GQLThreadStateJSON +func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) { + var j GQLThreadJSON err := json.Unmarshal(data, &j) if err != nil { return nil, err } - thread_state, err := RestoreBaseThreadState(ctx, j.BaseThreadStateJSON, loaded_nodes) + thread := NewGQLThread(id, j.Name, j.StateName, j.Listen) + nodes[id] = &thread + + err = RestoreSimpleThread(ctx, &thread, j.SimpleThreadJSON, nodes) if err != nil { return nil, err } - state := &GQLThreadState{ - BaseThreadState: *thread_state, - Listen: j.Listen, - } - - return state, nil -} - -func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadInfo, error) { - info := GQLThreadInfo{ - Start: raw["start"].(bool), - StartState: raw["start_state"].(string), - RestoreState: raw["restore_state"].(string), - } - return &info, nil + return &thread, nil } -func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) { - gql_thread := GQLThread{ - BaseThread: RestoreBaseThread(ctx, id, gql_actions, gql_handlers), - http_server: nil, - http_done: &sync.WaitGroup{}, - } - - return &gql_thread, nil -} - -func NewGQLThreadState(listen string) GQLThreadState { - state := GQLThreadState{ - BaseThreadState: NewBaseThreadState("GQL Server", "gql_thread"), +func NewGQLThread(id NodeID, name string, state_name string, listen string) GQLThread { + return GQLThread{ + SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*GQLThreadInfo)(nil)), gql_actions, gql_handlers), Listen: listen, } - state.InfoType = reflect.TypeOf((*GQLThreadInfo)(nil)) - return state } var gql_actions ThreadActions = ThreadActions{ "wait": ThreadWait, - "restore": func(ctx * GraphContext, thread Thread) (string, error) { + "restore": func(ctx * Context, thread Thread) (string, error) { // Start all the threads that should be "started" ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) - UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { - server_state := thread.State().(*GQLThreadState) - return UpdateMoreStates(ctx, NodeList(server_state.Children()), nodes, func(nodes NodeMap) error { - for _, child := range(server_state.Children()) { - child_state := child.State().(ThreadState) - should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo) - if should_run.Start == true && child_state.State() != "finished" { - ChildGo(ctx, thread, child, should_run.RestoreState) + UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) { + return UpdateMoreStates(ctx, NodeList(thread.Children()), nodes, func(nodes NodeMap) error { + for _, child := range(thread.Children()) { + should_run := (thread.ChildInfo(child.ID())).(*GQLThreadInfo) + if should_run.Start == true && child.State() != "finished" { + ChildGo(ctx, thread, child, should_run.RestoreAction) } } return nil @@ -443,7 +409,7 @@ var gql_actions ThreadActions = ThreadActions{ return "start_server", nil }, - "start": func(ctx * GraphContext, thread Thread) (string, error) { + "start": func(ctx * Context, thread Thread) (string, error) { ctx.Log.Logf("gql", "GQL_START") err := ThreadStart(ctx, thread) if err != nil { @@ -452,7 +418,7 @@ var gql_actions ThreadActions = ThreadActions{ return "start_server", nil }, - "start_server": func(ctx * GraphContext, thread Thread) (string, error) { + "start_server": func(ctx * Context, thread Thread) (string, error) { server, ok := thread.(*GQLThread) if ok == false { return "", fmt.Errorf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State()) @@ -471,10 +437,9 @@ var gql_actions ThreadActions = ThreadActions{ fs := http.FileServer(http.Dir("./site")) mux.Handle("/site/", http.StripPrefix("/site", fs)) - UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){ - server_state := states[server.ID()].(*GQLThreadState) + UseStates(ctx, []Node{server}, func(nodes NodeMap)(error){ server.http_server = &http.Server{ - Addr: server_state.Listen, + Addr: server.Listen, Handler: mux, } return nil @@ -494,30 +459,29 @@ var gql_actions ThreadActions = ThreadActions{ } var gql_handlers ThreadHandlers = ThreadHandlers{ - "child_added": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { + "child_added": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal) - UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { - server_state := thread.State().(*GQLThreadState) - should_run, exists := server_state.child_info[signal.Source()].(*GQLThreadInfo) + UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) { + should_run, exists := thread.ChildInfo(signal.Source()).(*GQLThreadInfo) if exists == false { ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child") return nil } if should_run.Start == true { - ChildGo(ctx, thread, server_state.Child(signal.Source()), should_run.StartState) + ChildGo(ctx, thread, thread.Child(signal.Source()), should_run.StartAction) } return nil }) return "wait", nil }, - "abort": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { + "abort": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_ABORT") server := thread.(*GQLThread) server.http_server.Shutdown(context.TODO()) server.http_done.Wait() return "", NewThreadAbortedError(signal.Source()) }, - "cancel": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { + "cancel": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_CANCEL") server := thread.(*GQLThread) server.http_server.Shutdown(context.TODO()) @@ -526,30 +490,3 @@ var gql_handlers ThreadHandlers = ThreadHandlers{ }, } -func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable) (*GQLThread, error) { - state := NewGQLThreadState(listen) - base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state) - if err != nil { - return nil, err - } - - thread := &GQLThread { - BaseThread: base_thread, - http_server: nil, - http_done: &sync.WaitGroup{}, - } - - if len(requirements) > 0 { - req_nodes := make([]GraphNode, len(requirements)) - for i, req := range(requirements) { - req_nodes[i] = req - } - err = UpdateStates(ctx, req_nodes, func(nodes NodeMap) error { - return LinkLockables(ctx, thread, requirements, nodes) - }) - if err != nil { - return nil, err - } - } - return thread, nil -}