Updated gql.go

graph-rework-2
noah metz 2023-07-09 19:33:18 -06:00
parent 269e7a57e2
commit 7aedf553ee
1 changed files with 50 additions and 113 deletions

163
gql.go

@ -112,7 +112,7 @@ func enableCORS(w *http.ResponseWriter) {
(*w).Header().Set("Access-Control-Allow-Methods", "*") (*w).Header().Set("Access-Control-Allow-Methods", "*")
} }
func GQLHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { func GQLHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) {
gql_ctx := context.Background() gql_ctx := context.Background()
gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
gql_ctx = context.WithValue(gql_ctx, "gql_server", server) gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
@ -191,7 +191,7 @@ func getOperationTypeOfReq(p graphql.Params) string{
return "" return ""
} }
func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result { func GQLWSDo(ctx * Context, p graphql.Params) chan *graphql.Result {
operation := getOperationTypeOfReq(p) operation := getOperationTypeOfReq(p)
ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString)
@ -203,7 +203,7 @@ func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result {
return sendOneResultAndClose(res) return sendOneResultAndClose(res)
} }
func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { func GQLWSHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) {
gql_ctx := context.Background() gql_ctx := context.Background()
gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
gql_ctx = context.WithValue(gql_ctx, "gql_server", server) gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
@ -230,7 +230,6 @@ func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWrit
defer conn.Close() defer conn.Close()
conn_state := "init" conn_state := "init"
for { for {
// TODO: Make this a select between reading client data and getting updates from the event to push to clients"
msg_raw, op, err := wsutil.ReadClientData(conn) msg_raw, op, err := wsutil.ReadClientData(conn)
ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
msg := GQLWSMsg{} msg := GQLWSMsg{}
@ -329,112 +328,79 @@ func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWrit
} }
} }
type TypeList []graphql.Type
type ObjTypeMap map[reflect.Type]*graphql.Object
type FieldMap map[string]*graphql.Field
type GQLThread struct { type GQLThread struct {
BaseThread SimpleThread
http_server *http.Server http_server *http.Server
http_done *sync.WaitGroup http_done *sync.WaitGroup
Listen string
} }
type GQLThreadInfo struct { func (thread * GQLThread) Type() NodeType {
ThreadInfo `json:"-"` return NodeType("gql_thread")
Start bool `json:"start"`
StartState string `json:"start_state"`
RestoreState string `json:"restore_state"`
} }
func NewGQLThreadInfo(start bool, start_state string, restore_state string) GQLThreadInfo { func (thread * GQLThread) Serialize() ([]byte, error) {
info := GQLThreadInfo{ thread_json := NewGQLThreadJSON(thread)
Start: start, return json.MarshalIndent(&thread_json, "", " ")
StartState: start_state,
RestoreState: restore_state,
}
return info
} }
type GQLThreadStateJSON struct { type GQLThreadJSON struct {
BaseThreadStateJSON SimpleThreadJSON
Listen string `json:"listen"` Listen string `json:"listen"`
} }
type GQLThreadState struct { type GQLThreadInfo struct {
BaseThreadState Start bool `json:"start"`
Listen string StartAction string `json:"start_action"`
RestoreAction string `json:"restore_action"`
} }
func (state * GQLThreadState) MarshalJSON() ([]byte, error) { func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON {
thread_state := SaveBaseThreadState(&state.BaseThreadState) thread_json := NewSimpleThreadJSON(&thread.SimpleThread)
return json.Marshal(&GQLThreadStateJSON{
BaseThreadStateJSON: thread_state, return GQLThreadJSON{
Listen: state.Listen, SimpleThreadJSON: thread_json,
}) Listen: thread.Listen,
}
} }
func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) (NodeState, error){ func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j GQLThreadStateJSON var j GQLThreadJSON
err := json.Unmarshal(data, &j) err := json.Unmarshal(data, &j)
if err != nil { if err != nil {
return nil, err return nil, err
} }
thread_state, err := RestoreBaseThreadState(ctx, j.BaseThreadStateJSON, loaded_nodes) thread := NewGQLThread(id, j.Name, j.StateName, j.Listen)
nodes[id] = &thread
err = RestoreSimpleThread(ctx, &thread, j.SimpleThreadJSON, nodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
state := &GQLThreadState{ return &thread, nil
BaseThreadState: *thread_state,
Listen: j.Listen,
}
return state, nil
}
func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadInfo, error) {
info := GQLThreadInfo{
Start: raw["start"].(bool),
StartState: raw["start_state"].(string),
RestoreState: raw["restore_state"].(string),
}
return &info, nil
}
func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
gql_thread := GQLThread{
BaseThread: RestoreBaseThread(ctx, id, gql_actions, gql_handlers),
http_server: nil,
http_done: &sync.WaitGroup{},
}
return &gql_thread, nil
} }
func NewGQLThreadState(listen string) GQLThreadState { func NewGQLThread(id NodeID, name string, state_name string, listen string) GQLThread {
state := GQLThreadState{ return GQLThread{
BaseThreadState: NewBaseThreadState("GQL Server", "gql_thread"), SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*GQLThreadInfo)(nil)), gql_actions, gql_handlers),
Listen: listen, Listen: listen,
} }
state.InfoType = reflect.TypeOf((*GQLThreadInfo)(nil))
return state
} }
var gql_actions ThreadActions = ThreadActions{ var gql_actions ThreadActions = ThreadActions{
"wait": ThreadWait, "wait": ThreadWait,
"restore": func(ctx * GraphContext, thread Thread) (string, error) { "restore": func(ctx * Context, thread Thread) (string, error) {
// Start all the threads that should be "started" // Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
server_state := thread.State().(*GQLThreadState) return UpdateMoreStates(ctx, NodeList(thread.Children()), nodes, func(nodes NodeMap) error {
return UpdateMoreStates(ctx, NodeList(server_state.Children()), nodes, func(nodes NodeMap) error { for _, child := range(thread.Children()) {
for _, child := range(server_state.Children()) { should_run := (thread.ChildInfo(child.ID())).(*GQLThreadInfo)
child_state := child.State().(ThreadState) if should_run.Start == true && child.State() != "finished" {
should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo) ChildGo(ctx, thread, child, should_run.RestoreAction)
if should_run.Start == true && child_state.State() != "finished" {
ChildGo(ctx, thread, child, should_run.RestoreState)
} }
} }
return nil return nil
@ -443,7 +409,7 @@ var gql_actions ThreadActions = ThreadActions{
return "start_server", nil return "start_server", nil
}, },
"start": func(ctx * GraphContext, thread Thread) (string, error) { "start": func(ctx * Context, thread Thread) (string, error) {
ctx.Log.Logf("gql", "GQL_START") ctx.Log.Logf("gql", "GQL_START")
err := ThreadStart(ctx, thread) err := ThreadStart(ctx, thread)
if err != nil { if err != nil {
@ -452,7 +418,7 @@ var gql_actions ThreadActions = ThreadActions{
return "start_server", nil return "start_server", nil
}, },
"start_server": func(ctx * GraphContext, thread Thread) (string, error) { "start_server": func(ctx * Context, thread Thread) (string, error) {
server, ok := thread.(*GQLThread) server, ok := thread.(*GQLThread)
if ok == false { if ok == false {
return "", fmt.Errorf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State()) return "", fmt.Errorf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State())
@ -471,10 +437,9 @@ var gql_actions ThreadActions = ThreadActions{
fs := http.FileServer(http.Dir("./site")) fs := http.FileServer(http.Dir("./site"))
mux.Handle("/site/", http.StripPrefix("/site", fs)) mux.Handle("/site/", http.StripPrefix("/site", fs))
UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){ UseStates(ctx, []Node{server}, func(nodes NodeMap)(error){
server_state := states[server.ID()].(*GQLThreadState)
server.http_server = &http.Server{ server.http_server = &http.Server{
Addr: server_state.Listen, Addr: server.Listen,
Handler: mux, Handler: mux,
} }
return nil return nil
@ -494,30 +459,29 @@ var gql_actions ThreadActions = ThreadActions{
} }
var gql_handlers ThreadHandlers = ThreadHandlers{ var gql_handlers ThreadHandlers = ThreadHandlers{
"child_added": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { "child_added": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal) ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal)
UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
server_state := thread.State().(*GQLThreadState) should_run, exists := thread.ChildInfo(signal.Source()).(*GQLThreadInfo)
should_run, exists := server_state.child_info[signal.Source()].(*GQLThreadInfo)
if exists == false { if exists == false {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child") ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child")
return nil return nil
} }
if should_run.Start == true { if should_run.Start == true {
ChildGo(ctx, thread, server_state.Child(signal.Source()), should_run.StartState) ChildGo(ctx, thread, thread.Child(signal.Source()), should_run.StartAction)
} }
return nil return nil
}) })
return "wait", nil return "wait", nil
}, },
"abort": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { "abort": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_ABORT") ctx.Log.Logf("gql", "GQL_ABORT")
server := thread.(*GQLThread) server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO()) server.http_server.Shutdown(context.TODO())
server.http_done.Wait() server.http_done.Wait()
return "", NewThreadAbortedError(signal.Source()) return "", NewThreadAbortedError(signal.Source())
}, },
"cancel": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { "cancel": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_CANCEL") ctx.Log.Logf("gql", "GQL_CANCEL")
server := thread.(*GQLThread) server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO()) server.http_server.Shutdown(context.TODO())
@ -526,30 +490,3 @@ var gql_handlers ThreadHandlers = ThreadHandlers{
}, },
} }
func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable) (*GQLThread, error) {
state := NewGQLThreadState(listen)
base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state)
if err != nil {
return nil, err
}
thread := &GQLThread {
BaseThread: base_thread,
http_server: nil,
http_done: &sync.WaitGroup{},
}
if len(requirements) > 0 {
req_nodes := make([]GraphNode, len(requirements))
for i, req := range(requirements) {
req_nodes[i] = req
}
err = UpdateStates(ctx, req_nodes, func(nodes NodeMap) error {
return LinkLockables(ctx, thread, requirements, nodes)
})
if err != nil {
return nil, err
}
}
return thread, nil
}