package graphvent import ( "net/http" "github.com/graphql-go/graphql" "github.com/graphql-go/graphql/language/parser" "github.com/graphql-go/graphql/language/source" "github.com/graphql-go/graphql/language/ast" "context" "encoding/json" "io" "reflect" "fmt" "sync" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" ) func GraphiQLHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r * http.Request) { graphiql_string := fmt.Sprintf(` GraphiQL
Loading...
`) w.Header().Set("Content-Type", "text/html; charset=utf-8") w.WriteHeader(http.StatusOK) io.WriteString(w, graphiql_string) } } type GQLWSPayload struct { OperationName string `json:"operationName,omitempty"` Query string `json:"query,omitempty"` Variables map[string]interface{} `json:"variables,omitempty"` Extensions map[string]interface{} `json:"extensions,omitempty"` Data string `json:"data,omitempty"` } type GQLWSMsg struct { ID string `json:"id,omitempty"` Type string `json:"type"` Payload GQLWSPayload `json:"payload,omitempty"` } func enableCORS(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Origin", "*") (*w).Header().Set("Access-Control-Allow-Credentials", "true") (*w).Header().Set("Access-Control-Allow-Headers", "*") (*w).Header().Set("Access-Control-Allow-Methods", "*") } func GQLHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { gql_ctx := context.Background() gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "gql_server", server) return func(w http.ResponseWriter, r * http.Request) { ctx.Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr) enableCORS(&w) header_map := map[string]interface{}{} for header, value := range(r.Header) { header_map[header] = value } ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS") str, err := io.ReadAll(r.Body) if err != nil { ctx.Log.Logf("gql", "failed to read request body: %s", err) return } query := GQLWSPayload{} json.Unmarshal(str, &query) params := graphql.Params{ Schema: ctx.GQL.Schema, Context: gql_ctx, RequestString: query.Query, } if query.OperationName != "" { params.OperationName = query.OperationName } if len(query.Variables) > 0 { params.VariableValues = query.Variables } result := graphql.Do(params) if len(result.Errors) > 0 { extra_fields := map[string]interface{}{} extra_fields["body"] = string(str) extra_fields["headers"] = r.Header ctx.Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) } json.NewEncoder(w).Encode(result) } } func sendOneResultAndClose(res *graphql.Result) chan *graphql.Result { resultChannel := make(chan *graphql.Result) go func() { resultChannel <- res close(resultChannel) }() return resultChannel } func getOperationTypeOfReq(p graphql.Params) string{ source := source.NewSource(&source.Source{ Body: []byte(p.RequestString), Name: "GraphQL request", }) AST, err := parser.Parse(parser.ParseParams{Source: source}) if err != nil { return "" } for _, node := range AST.Definitions { if operationDef, ok := node.(*ast.OperationDefinition); ok { name := "" if operationDef.Name != nil { name = operationDef.Name.Value } if name == p.OperationName || p.OperationName == "" { return operationDef.Operation } } } return "" } func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result { operation := getOperationTypeOfReq(p) ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) if operation == ast.OperationTypeSubscription { return graphql.Subscribe(p) } res := graphql.Do(p) return sendOneResultAndClose(res) } func GQLWSHandler(ctx * GraphContext, server * GQLThread) func(http.ResponseWriter, *http.Request) { gql_ctx := context.Background() gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) gql_ctx = context.WithValue(gql_ctx, "gql_server", server) return func(w http.ResponseWriter, r * http.Request) { ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) enableCORS(&w) header_map := map[string]interface{}{} for header, value := range(r.Header) { header_map[header] = value } ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS") u := ws.HTTPUpgrader{ Protocol: func(protocol string) bool { ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) if string(protocol) == "graphql-transport-ws" || string(protocol) == "graphql-ws" { return true } return false }, } conn, _, _, err := u.Upgrade(r, w) if err == nil { defer conn.Close() conn_state := "init" for { // TODO: Make this a select between reading client data and getting updates from the event to push to clients" msg_raw, op, err := wsutil.ReadClientData(conn) ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) msg := GQLWSMsg{} json.Unmarshal(msg_raw, &msg) if err != nil { ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR") break } if msg.Type == "connection_init" { if conn_state != "init" { ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) break } conn_state = "ready" err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}")) if err != nil { ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") break } } else if msg.Type == "ping" { ctx.Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr) err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}")) if err != nil { ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") } } else if msg.Type == "subscribe" { ctx.Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) params := graphql.Params{ Schema: ctx.GQL.Schema, Context: gql_ctx, RequestString: msg.Payload.Query, } if msg.Payload.OperationName != "" { params.OperationName = msg.Payload.OperationName } if len(msg.Payload.Variables) > 0 { params.VariableValues = msg.Payload.Variables } res_chan := GQLWSDo(ctx, params) if res_chan == nil { ctx.Log.Logf("gqlws", "res_chan is nil") } else { ctx.Log.Logf("gqlws", "res_chan: %+v", res_chan) } go func(res_chan chan *graphql.Result) { for { next, ok := <-res_chan if ok == false { ctx.Log.Logf("gqlws", "response channel was closed") return } if next == nil { ctx.Log.Logf("gqlws", "NIL_ON_CHANNEL") return } if len(next.Errors) > 0 { extra_fields := map[string]interface{}{} extra_fields["query"] = string(msg.Payload.Query) ctx.Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors) continue } ctx.Log.Logf("gqlws", "DATA: %+v", next.Data) data, err := json.Marshal(next.Data) if err != nil { ctx.Log.Logf("gqlws", "ERROR: %+v", err) continue } msg, err := json.Marshal(GQLWSMsg{ ID: msg.ID, Type: "next", Payload: GQLWSPayload{ Data: string(data), }, }) if err != nil { ctx.Log.Logf("gqlws", "ERROR: %+v", err) continue } err = wsutil.WriteServerMessage(conn, 1, msg) if err != nil { ctx.Log.Logf("gqlws", "ERROR: %+v", err) continue } } }(res_chan) } else { } } return } else { panic("Failed to upgrade websocket") } } } type TypeList []graphql.Type type ObjTypeMap map[reflect.Type]*graphql.Object type FieldMap map[string]*graphql.Field type GQLThread struct { BaseThread http_server *http.Server http_done *sync.WaitGroup } type GQLThreadInfo struct { ThreadInfo `json:"-"` Start bool `json:"start"` Started bool `json:"started"` FirstAction string `json:"first_action"` RestoreAction string `json:"restore_action"` } func NewGQLThreadInfo(start bool, first_action string, restore_action string) GQLThreadInfo { info := GQLThreadInfo{ Start: start, Started: false, FirstAction: first_action, RestoreAction: restore_action, } return info } type GQLThreadStateJSON struct { BaseThreadStateJSON Listen string `json:"listen"` } type GQLThreadState struct { BaseThreadState Listen string } func (state * GQLThreadState) MarshalJSON() ([]byte, error) { thread_state := SaveBaseThreadState(&state.BaseThreadState) return json.Marshal(&GQLThreadStateJSON{ BaseThreadStateJSON: thread_state, Listen: state.Listen, }) } func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) (NodeState, error){ var j GQLThreadStateJSON err := json.Unmarshal(data, &j) if err != nil { return nil, err } thread_state, err := RestoreBaseThreadState(ctx, j.BaseThreadStateJSON, loaded_nodes) if err != nil { return nil, err } state := &GQLThreadState{ BaseThreadState: *thread_state, Listen: j.Listen, } return state, nil } func LoadGQLThreadInfo(ctx * GraphContext, raw map[string]interface{}) (ThreadInfo, error) { info := GQLThreadInfo{ Start: raw["start"].(bool), Started: raw["started"].(bool), FirstAction: raw["first_action"].(string), RestoreAction: raw["restore_action"].(string), } return &info, nil } func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) { gql_thread := GQLThread{ BaseThread: RestoreBaseThread(ctx, id, gql_actions, gql_handlers), http_server: nil, http_done: &sync.WaitGroup{}, } return &gql_thread, nil } func NewGQLThreadState(listen string) GQLThreadState { state := GQLThreadState{ BaseThreadState: NewBaseThreadState("GQL Server", "gql_thread"), Listen: listen, } state.InfoType = reflect.TypeOf((*GQLThreadInfo)(nil)) return state } var gql_actions ThreadActions = ThreadActions{ "wait": ThreadWait, "restore": func(ctx * GraphContext, thread Thread) (string, error) { // Start all the threads that should be "started" ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID()) server, ok := thread.(*GQLThread) if ok == false { panic("thread is not *GQLThread") } // Serve the GQL http and ws handlers mux := http.NewServeMux() mux.HandleFunc("/gql", GQLHandler(ctx, server)) mux.HandleFunc("/gqlws", GQLWSHandler(ctx, server)) // Server a graphiql interface(TODO make configurable whether to start this) mux.HandleFunc("/graphiql", GraphiQLHandler()) // Server the ./site directory to /site (TODO make configurable with better defaults) fs := http.FileServer(http.Dir("./site")) mux.Handle("/site/", http.StripPrefix("/site", fs)) UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){ server_state := states[server.ID()].(*GQLThreadState) server.http_server = &http.Server{ Addr: server_state.Listen, Handler: mux, } return nil }) server.http_done.Add(1) go func(server *GQLThread) { defer server.http_done.Done() err := server.http_server.ListenAndServe() if err != http.ErrServerClosed { panic(fmt.Sprintf("Failed to start gql server: %s", err)) } }(server) UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { server_state := thread.State().(*GQLThreadState) for _, child := range(server_state.Children()) { should_run := (server_state.child_info[child.ID()]).(*GQLThreadInfo) if should_run.Started == true { ChildGo(ctx, server_state, thread, child.ID(), should_run.RestoreAction) } } return nil }) return "wait", nil }, "start": func(ctx * GraphContext, thread Thread) (string, error) { ctx.Log.Logf("gql", "SERVER_STARTED") server, ok := thread.(*GQLThread) if ok == false { panic(fmt.Sprintf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State())) } // Serve the GQL http and ws handlers mux := http.NewServeMux() mux.HandleFunc("/gql", GQLHandler(ctx, server)) mux.HandleFunc("/gqlws", GQLWSHandler(ctx, server)) // Server a graphiql interface(TODO make configurable whether to start this) mux.HandleFunc("/graphiql", GraphiQLHandler()) // Server the ./site directory to /site (TODO make configurable with better defaults) fs := http.FileServer(http.Dir("./site")) mux.Handle("/site/", http.StripPrefix("/site", fs)) UseStates(ctx, []GraphNode{server}, func(states NodeStateMap)(error){ server_state := states[server.ID()].(*GQLThreadState) server.http_server = &http.Server{ Addr: server_state.Listen, Handler: mux, } return nil }) server.http_done.Add(1) go func(server *GQLThread) { defer server.http_done.Done() err := server.http_server.ListenAndServe() if err != http.ErrServerClosed { panic(fmt.Sprintf("Failed to start gql server: %s", err)) } }(server) return "wait", nil }, } var gql_handlers ThreadHandlers = ThreadHandlers{ "child_added": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal) UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap)(error) { server_state := thread.State().(*GQLThreadState) should_run, exists := server_state.child_info[signal.Source()].(*GQLThreadInfo) if exists == false { ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child") return nil } if should_run.Start == true && should_run.Started == false { ChildGo(ctx, server_state, thread, signal.Source(), should_run.FirstAction) should_run.Started = true } return nil }) return "wait", nil }, "abort": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_ABORT") server := thread.(*GQLThread) server.http_server.Shutdown(context.TODO()) server.http_done.Wait() return "", fmt.Errorf("GQLThread aborted by signal") }, "cancel": func(ctx * GraphContext, thread Thread, signal GraphSignal) (string, error) { ctx.Log.Logf("gql", "GQL_CANCEL") server := thread.(*GQLThread) server.http_server.Shutdown(context.TODO()) server.http_done.Wait() return "", nil }, } func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable) (*GQLThread, error) { state := NewGQLThreadState(listen) base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state) if err != nil { return nil, err } thread := &GQLThread { BaseThread: base_thread, http_server: nil, http_done: &sync.WaitGroup{}, } if len(requirements) > 0 { req_nodes := make([]GraphNode, len(requirements)) for i, req := range(requirements) { req_nodes[i] = req } err = UpdateStates(ctx, req_nodes, func(nodes NodeMap) error { return LinkLockables(ctx, thread, requirements, nodes) }) if err != nil { return nil, err } } return thread, nil }