From a185cc3dfc1f134543fcebdafad849ced9686924 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sun, 25 Jun 2023 20:20:59 -0600 Subject: [PATCH] Added base GQLThread --- gql.go | 506 ++++++++++++++++++++++++++++++++++++++++++++++ gql_graph.go | 551 +++++++++++++++++++++++++++++++++++++++++++++++++++ gql_test.go | 20 ++ graph.go | 2 +- lockable.go | 2 +- thread.go | 4 +- 6 files changed, 1081 insertions(+), 4 deletions(-) create mode 100644 gql.go create mode 100644 gql_graph.go create mode 100644 gql_test.go diff --git a/gql.go b/gql.go new file mode 100644 index 0000000..396f96c --- /dev/null +++ b/gql.go @@ -0,0 +1,506 @@ +package graphvent + +import ( + "net/http" + "github.com/graphql-go/graphql" + "github.com/graphql-go/graphql/language/parser" + "github.com/graphql-go/graphql/language/source" + "github.com/graphql-go/graphql/language/ast" + "context" + "encoding/json" + "io" + "reflect" + "fmt" + "sync" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" +) + +func GraphiQLHandler() func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r * http.Request) { + graphiql_string := fmt.Sprintf(` + + + + + GraphiQL + + + + + + + + +
Loading...
+ + + + + `) + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.WriteHeader(http.StatusOK) + io.WriteString(w, graphiql_string) + } + +} + +type GQLWSPayload struct { + OperationName string `json:"operationName,omitempty"` + Query string `json:"query,omitempty"` + Variables map[string]interface{} `json:"variables,omitempty"` + Extensions map[string]interface{} `json:"extensions,omitempty"` + Data string `json:"data,omitempty"` +} + +type GQLWSMsg struct { + ID string `json:"id,omitempty"` + Type string `json:"type"` + Payload GQLWSPayload `json:"payload,omitempty"` +} + +func enableCORS(w *http.ResponseWriter) { + (*w).Header().Set("Access-Control-Allow-Origin", "*") + (*w).Header().Set("Access-Control-Allow-Credentials", "true") + (*w).Header().Set("Access-Control-Allow-Headers", "*") + (*w).Header().Set("Access-Control-Allow-Methods", "*") +} + +func GQLHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r * http.Request) { + ctx.Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr) + enableCORS(&w) + header_map := map[string]interface{}{} + for header, value := range(r.Header) { + header_map[header] = value + } + ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS") + + str, err := io.ReadAll(r.Body) + if err != nil { + ctx.Log.Logf("gql", "failed to read request body: %s", err) + return + } + query := GQLWSPayload{} + json.Unmarshal(str, &query) + + params := graphql.Params{ + Schema: schema, + Context: gql_ctx, + RequestString: query.Query, + } + if query.OperationName != "" { + params.OperationName = query.OperationName + } + if len(query.Variables) > 0 { + params.VariableValues = query.Variables + } + result := graphql.Do(params) + if len(result.Errors) > 0 { + extra_fields := map[string]interface{}{} + extra_fields["body"] = string(str) + extra_fields["headers"] = r.Header + ctx.Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) + } + json.NewEncoder(w).Encode(result) + } +} + +func sendOneResultAndClose(res *graphql.Result) chan *graphql.Result { + resultChannel := make(chan *graphql.Result) + go func() { + resultChannel <- res + close(resultChannel) + }() + return resultChannel +} + + +func getOperationTypeOfReq(p graphql.Params) string{ + source := source.NewSource(&source.Source{ + Body: []byte(p.RequestString), + Name: "GraphQL request", + }) + + AST, err := parser.Parse(parser.ParseParams{Source: source}) + if err != nil { + return "" + } + + for _, node := range AST.Definitions { + if operationDef, ok := node.(*ast.OperationDefinition); ok { + name := "" + if operationDef.Name != nil { + name = operationDef.Name.Value + } + if name == p.OperationName || p.OperationName == "" { + return operationDef.Operation + } + } + } + return "" +} + +func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result { + operation := getOperationTypeOfReq(p) + ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) + + if operation == ast.OperationTypeSubscription { + return graphql.Subscribe(p) + } + + res := graphql.Do(p) + return sendOneResultAndClose(res) +} + +func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r * http.Request) { + ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) + header_map := map[string]interface{}{} + for header, value := range(r.Header) { + header_map[header] = value + } + ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS") + u := ws.HTTPUpgrader{ + Protocol: func(protocol string) bool { + ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) + return string(protocol) == "graphql-transport-ws" + }, + } + conn, _, _, err := u.Upgrade(r, w) + if err == nil { + defer conn.Close() + conn_state := "init" + for { + // TODO: Make this a select between reading client data and getting updates from the event to push to clients" + msg_raw, op, err := wsutil.ReadClientData(conn) + ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) + msg := GQLWSMsg{} + json.Unmarshal(msg_raw, &msg) + if err != nil { + ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR") + break + } + if msg.Type == "connection_init" { + if conn_state != "init" { + ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) + break + } + conn_state = "ready" + err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}")) + if err != nil { + ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") + break + } + } else if msg.Type == "ping" { + ctx.Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr) + err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}")) + if err != nil { + ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") + } + } else if msg.Type == "subscribe" { + ctx.Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) + params := graphql.Params{ + Schema: schema, + Context: gql_ctx, + RequestString: msg.Payload.Query, + } + if msg.Payload.OperationName != "" { + params.OperationName = msg.Payload.OperationName + } + if len(msg.Payload.Variables) > 0 { + params.VariableValues = msg.Payload.Variables + } + + res_chan := GQLWSDo(ctx, params) + + go func(res_chan chan *graphql.Result) { + for { + next, ok := <-res_chan + if ok == false { + ctx.Log.Logf("gqlws", "response channel was closed") + return + } + if next == nil { + ctx.Log.Logf("gqlws", "NIL_ON_CHANNEL") + return + } + if len(next.Errors) > 0 { + extra_fields := map[string]interface{}{} + extra_fields["query"] = string(msg.Payload.Query) + ctx.Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors) + continue + } + ctx.Log.Logf("gqlws", "DATA: %+v", next.Data) + data, err := json.Marshal(next.Data) + if err != nil { + ctx.Log.Logf("gqlws", "ERROR: %+v", err) + continue + } + msg, err := json.Marshal(GQLWSMsg{ + ID: msg.ID, + Type: "next", + Payload: GQLWSPayload{ + Data: string(data), + }, + }) + if err != nil { + ctx.Log.Logf("gqlws", "ERROR: %+v", err) + continue + } + + err = wsutil.WriteServerMessage(conn, 1, msg) + if err != nil { + ctx.Log.Logf("gqlws", "ERROR: %+v", err) + continue + } + } + }(res_chan) + } else { + } + } + return + } else { + panic("Failed to upgrade websocket") + } + } +} + +type ObjTypeMap map[reflect.Type]*graphql.Object +type FieldMap map[string]*graphql.Field + +type GQLThread struct { + BaseThread + extended_types ObjTypeMap + extended_queries FieldMap + extended_subscriptions FieldMap + extended_mutations FieldMap +} + +type GQLThreadState struct { + BaseThreadState + Listen string +} + +func NewGQLThreadState(listen string) GQLThreadState { + return GQLThreadState{ + BaseThreadState: NewBaseThreadState("GQL Server"), + Listen: listen, + } +} + +var gql_actions ThreadActions = ThreadActions{ + "start": func(ctx * GraphContext, thread Thread) (string, error) { + ctx.Log.Logf("gql", "SERVER_STARTED") + server := thread.(*GQLThread) + go func() { + ctx.Log.Logf("gql", "GOROUTINE_START for %s", server.ID()) + + mux := http.NewServeMux() + http_handler, ws_handler := MakeGQLHandlers(ctx, server) + mux.HandleFunc("/gql", http_handler) + mux.HandleFunc("/gqlws", ws_handler) + mux.HandleFunc("/graphiql", GraphiQLHandler()) + fs := http.FileServer(http.Dir("./site")) + mux.Handle("/site/", http.StripPrefix("/site", fs)) + + srv_if, _ := UseStates(ctx, []GraphNode{server}, func(states []NodeState)(interface{}, error){ + server_state := states[0].(*GQLThreadState) + return &http.Server{ + Addr: server_state.Listen, + Handler: mux, + }, nil + }) + srv := srv_if.(*http.Server) + + http_done := &sync.WaitGroup{} + http_done.Add(1) + go func(srv *http.Server, http_done *sync.WaitGroup) { + defer http_done.Done() + err := srv.ListenAndServe() + if err != http.ErrServerClosed { + panic(fmt.Sprintf("Failed to start gql server: %s", err)) + } + }(srv, http_done) + + for true { + select { + case signal:=<-server.signal: + if signal.Type() == "abort" || signal.Type() == "cancel" { + err := srv.Shutdown(context.Background()) + if err != nil{ + panic(fmt.Sprintf("Failed to shutdown gql server: %s", err)) + } + http_done.Wait() + break + } + ctx.Log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal) + // Take signals to resource and send to GQL subscriptions + } + } + }() + return "wait", nil + }, +} + +var gql_handlers ThreadHandlers = ThreadHandlers{ +} + +func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable, extended_types ObjTypeMap, extended_queries FieldMap, extended_mutations FieldMap, extended_subscriptions FieldMap) (*GQLThread, error) { + state := NewGQLThreadState(listen) + base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state) + if err != nil { + return nil, err + } + + thread := &GQLThread { + BaseThread: base_thread, + extended_types: extended_types, + extended_queries: extended_queries, + extended_mutations: extended_mutations, + extended_subscriptions: extended_subscriptions, + } + + err = LinkLockables(ctx, thread, requirements) + if err != nil { + return nil, err + } + return thread, nil +} + +func MakeGQLHandlers(ctx * GraphContext, server * GQLThread) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) { + valid_nodes := map[reflect.Type]*graphql.Object{} + valid_lockables := map[reflect.Type]*graphql.Object{} + valid_threads := map[reflect.Type]*graphql.Object{} + valid_lockables[reflect.TypeOf((*BaseLockable)(nil))] = GQLTypeBaseNode() + for t, v := range(valid_lockables) { + valid_nodes[t] = v + } + valid_threads[reflect.TypeOf((*BaseThread)(nil))] = GQLTypeBaseThread() + valid_threads[reflect.TypeOf((*GQLThread)(nil))] = GQLTypeGQLThread() + for t, v := range(valid_threads) { + valid_lockables[t] = v + valid_nodes[t] = v + } + + + gql_types := []graphql.Type{GQLTypeSignal(), GQLTypeSignalInput()} + for _, v := range(valid_nodes) { + gql_types = append(gql_types, v) + } + + node_type := reflect.TypeOf((GraphNode)(nil)) + lockable_type := reflect.TypeOf((Lockable)(nil)) + thread_type := reflect.TypeOf((Thread)(nil)) + + for go_t, gql_t := range(server.extended_types) { + if go_t.Implements(node_type) { + valid_nodes[go_t] = gql_t + } + if go_t.Implements(lockable_type) { + valid_lockables[go_t] = gql_t + } + if go_t.Implements(thread_type) { + valid_threads[go_t] = gql_t + } + gql_types = append(gql_types, gql_t) + } + + gql_queries := graphql.Fields{ + "Self": GQLQuerySelf(), + } + + for key, value := range(server.extended_queries) { + gql_queries[key] = value + } + + gql_subscriptions := graphql.Fields{ + "Update": GQLSubscriptionUpdate(), + } + + for key, value := range(server.extended_subscriptions) { + gql_subscriptions[key] = value + } + + gql_mutations := graphql.Fields{ + "SendUpdate": GQLMutationSendUpdate(), + } + + for key, value := range(server.extended_mutations) { + gql_mutations[key] = value + } + + schemaConfig := graphql.SchemaConfig{ + Types: gql_types, + Query: graphql.NewObject(graphql.ObjectConfig{ + Name: "Query", + Fields: gql_queries, + }), + Mutation: graphql.NewObject(graphql.ObjectConfig{ + Name: "Mutation", + Fields: gql_mutations, + }), + Subscription: graphql.NewObject(graphql.ObjectConfig{ + Name: "Subscription", + Fields: gql_subscriptions, + }), + } + + schema, err := graphql.NewSchema(schemaConfig) + if err != nil{ + panic(err) + } + gql_ctx := context.Background() + gql_ctx = context.WithValue(gql_ctx, "valid_nodes", valid_nodes) + gql_ctx = context.WithValue(gql_ctx, "valid_lockables", valid_lockables) + gql_ctx = context.WithValue(gql_ctx, "valid_threads", valid_threads) + gql_ctx = context.WithValue(gql_ctx, "gql_server", server) + gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx) + return GQLHandler(ctx, schema, gql_ctx), GQLWSHandler(ctx, schema, gql_ctx) +} diff --git a/gql_graph.go b/gql_graph.go new file mode 100644 index 0000000..3fa04b9 --- /dev/null +++ b/gql_graph.go @@ -0,0 +1,551 @@ +package graphvent + +import ( + "github.com/graphql-go/graphql" + "reflect" + "fmt" + "time" +) + +var gql_interface_graph_node *graphql.Interface = nil +func GQLInterfaceGraphNode() *graphql.Interface { + if gql_interface_graph_node == nil { + gql_interface_graph_node = graphql.NewInterface(graphql.InterfaceConfig{ + Name: "GraphNode", + ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { + valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object) + if ok == false { + return nil + } + + for key, value := range(valid_nodes) { + if reflect.TypeOf(p.Value) == key { + return value + } + } + return nil + }, + Fields: graphql.Fields{}, + }) + + gql_interface_graph_node.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + }) + + gql_interface_graph_node.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + }) + } + + return gql_interface_graph_node +} + +var gql_list_thread *graphql.List = nil +func GQLListThread() *graphql.List { + if gql_list_thread == nil { + gql_list_thread = graphql.NewList(GQLInterfaceThread()) + } + return gql_list_thread +} + +var gql_interface_thread *graphql.Interface = nil +func GQLInterfaceThread() *graphql.Interface { + if gql_interface_thread == nil { + gql_interface_thread = graphql.NewInterface(graphql.InterfaceConfig{ + Name: "Thread", + ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { + valid_nodes, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object) + if ok == false { + return nil + } + + for key, value := range(valid_nodes) { + if reflect.TypeOf(p.Value) == key { + return value + } + } + return nil + }, + Fields: graphql.Fields{}, + }) + + gql_interface_thread.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + }) + } + + return gql_interface_thread +} + +var gql_interface_lockable *graphql.Interface = nil +func GQLInterfaceLockable() *graphql.Interface { + if gql_interface_lockable == nil { + gql_interface_lockable = graphql.NewInterface(graphql.InterfaceConfig{ + Name: "Lockable", + ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { + valid_nodes, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object) + if ok == false { + return nil + } + + for key, value := range(valid_nodes) { + if reflect.TypeOf(p.Value) == key { + return value + } + } + return nil + }, + Fields: graphql.Fields{}, + }) + + gql_interface_lockable.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + }) + + gql_interface_lockable.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + }) + } + + return gql_interface_lockable +} + +func GQLNodeID(p graphql.ResolveParams) (interface{}, error) { + node, ok := p.Source.(GraphNode) + if ok == false || node == nil { + return nil, fmt.Errorf("Failed to cast source to GraphNode") + } + return node.ID(), nil +} + +func GQLNodeName(p graphql.ResolveParams) (interface{}, error) { + node, ok := p.Source.(GraphNode) + if ok == false || node == nil { + return nil, fmt.Errorf("Failed to cast source to GraphNode") + } + + ctx, ok := p.Context.Value("graph_context").(*GraphContext) + if ok == false { + return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") + } + + name, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) { + return states[0].Name(), nil + }) + + if err != nil { + return nil, err + } + + name_str, ok := name.(string) + if ok == false { + return nil, fmt.Errorf("Failed to cast name to string %+v", name) + } + + return name_str, nil +} + +func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) { + node, ok := p.Source.(*GQLThread) + if ok == false || node == nil { + return nil, fmt.Errorf("Failed to cast source to GQLThread") + } + + ctx, ok := p.Context.Value("graph_context").(*GraphContext) + if ok == false { + return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") + } + + listen, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) { + gql_thread, ok := states[0].(*GQLThreadState) + if ok == false { + return nil, fmt.Errorf("Failed to cast state to GQLThreadState") + } + return gql_thread.Listen, nil + }) + + if err != nil { + return nil, err + } + + listen_str, ok := listen.(string) + if ok == false { + return nil, fmt.Errorf("Failed to cast listen to string %+v", listen) + } + + return listen_str, nil +} + +var gql_type_gql_thread *graphql.Object = nil +func GQLTypeGQLThread() * graphql.Object { + if gql_type_gql_thread == nil { + gql_type_gql_thread = graphql.NewObject(graphql.ObjectConfig{ + Name: "GQLThread", + Interfaces: []*graphql.Interface{ + GQLInterfaceGraphNode(), + GQLInterfaceThread(), + }, + IsTypeOf: func(p graphql.IsTypeOfParams) bool { + _, ok := p.Value.(*GQLThread) + return ok + }, + Fields: graphql.Fields{}, + }) + + gql_type_gql_thread.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeID, + }) + + gql_type_gql_thread.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeName, + }) + + gql_type_gql_thread.AddFieldConfig("Listen", &graphql.Field{ + Type: graphql.String, + Resolve: GQLThreadListen, + }) + } + return gql_type_gql_thread +} + +var gql_type_base_thread *graphql.Object = nil +func GQLTypeBaseThread() * graphql.Object { + if gql_type_base_thread == nil { + gql_type_base_thread = graphql.NewObject(graphql.ObjectConfig{ + Name: "BaseThread", + Interfaces: []*graphql.Interface{ + GQLInterfaceGraphNode(), + GQLInterfaceThread(), + }, + IsTypeOf: func(p graphql.IsTypeOfParams) bool { + valid_threads, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object) + if ok == false { + return false + } + value_type := reflect.TypeOf(p.Value) + for go_type, _ := range(valid_threads) { + if value_type == go_type { + return true + } + } + return false + }, + Fields: graphql.Fields{}, + }) + gql_type_base_thread.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeID, + }) + + gql_type_base_thread.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeName, + }) + } + return gql_type_base_thread +} + +var gql_type_base_lockable *graphql.Object = nil +func GQLTypeBaseLockable() * graphql.Object { + if gql_type_base_lockable == nil { + gql_type_base_lockable = graphql.NewObject(graphql.ObjectConfig{ + Name: "BaseLockable", + Interfaces: []*graphql.Interface{ + GQLInterfaceGraphNode(), + GQLInterfaceLockable(), + }, + IsTypeOf: func(p graphql.IsTypeOfParams) bool { + valid_lockables, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object) + if ok == false { + return false + } + value_type := reflect.TypeOf(p.Value) + for go_type, _ := range(valid_lockables) { + if value_type == go_type { + return true + } + } + return false + }, + Fields: graphql.Fields{}, + }) + + gql_type_base_lockable.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeID, + }) + + gql_type_base_lockable.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeName, + }) + } + + return gql_type_base_lockable +} + +var gql_type_base_node *graphql.Object = nil +func GQLTypeBaseNode() * graphql.Object { + if gql_type_base_node == nil { + gql_type_base_node = graphql.NewObject(graphql.ObjectConfig{ + Name: "BaseNode", + Interfaces: []*graphql.Interface{ + GQLInterfaceGraphNode(), + }, + IsTypeOf: func(p graphql.IsTypeOfParams) bool { + valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object) + if ok == false { + return false + } + value_type := reflect.TypeOf(p.Value) + for go_type, _ := range(valid_nodes) { + if value_type == go_type { + return true + } + } + return false + }, + Fields: graphql.Fields{}, + }) + + gql_type_base_node.AddFieldConfig("ID", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeID, + }) + + gql_type_base_node.AddFieldConfig("Name", &graphql.Field{ + Type: graphql.String, + Resolve: GQLNodeName, + }) + } + + return gql_type_base_node +} + +func GQLSignalFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) { + if signal, ok := p.Source.(GraphSignal); ok { + return fn(signal, p) + } + return nil, fmt.Errorf("Failed to cast source to event") +} + +func GQLSignalType(p graphql.ResolveParams) (interface{}, error) { + return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ + return signal.Type(), nil + }) +} + +func GQLSignalSource(p graphql.ResolveParams) (interface{}, error) { + return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ + return signal.Source(), nil + }) +} + +func GQLSignalDirection(p graphql.ResolveParams) (interface{}, error) { + return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ + return signal.Direction(), nil + }) +} + +func GQLSignalString(p graphql.ResolveParams) (interface{}, error) { + return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ + return signal.String(), nil + }) +} + + +var gql_type_signal *graphql.Object = nil +func GQLTypeSignal() *graphql.Object { + if gql_type_signal == nil { + gql_type_signal = graphql.NewObject(graphql.ObjectConfig{ + Name: "SignalOut", + IsTypeOf: func(p graphql.IsTypeOfParams) bool { + _, ok := p.Value.(GraphSignal) + return ok + }, + Fields: graphql.Fields{}, + }) + + gql_type_signal.AddFieldConfig("Type", &graphql.Field{ + Type: graphql.String, + Resolve: GQLSignalType, + }) + gql_type_signal.AddFieldConfig("Source", &graphql.Field{ + Type: graphql.String, + Resolve: GQLSignalSource, + }) + gql_type_signal.AddFieldConfig("Direction", &graphql.Field{ + Type: graphql.Boolean, + Resolve: GQLSignalDirection, + }) + gql_type_signal.AddFieldConfig("String", &graphql.Field{ + Type: graphql.String, + Resolve: GQLSignalString, + }) + } + return gql_type_signal +} + +var gql_type_signal_input *graphql.InputObject = nil +func GQLTypeSignalInput() *graphql.InputObject { + if gql_type_signal_input == nil { + gql_type_signal_input = graphql.NewInputObject(graphql.InputObjectConfig{ + Name: "SignalIn", + Fields: graphql.InputObjectConfigFieldMap{}, + }) + gql_type_signal_input.AddFieldConfig("Type", &graphql.InputObjectFieldConfig{ + Type: graphql.String, + }) + gql_type_signal_input.AddFieldConfig("Description", &graphql.InputObjectFieldConfig{ + Type: graphql.String, + DefaultValue: "", + }) + gql_type_signal_input.AddFieldConfig("Time", &graphql.InputObjectFieldConfig{ + Type: graphql.DateTime, + DefaultValue: time.Now(), + }) + } + return gql_type_signal_input +} + +func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) { + return GQLSubscribeFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error) { + return signal, nil + }) +} + +func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) { + server, ok := p.Context.Value("gql_server").(*GQLThread) + if ok == false { + return nil, fmt.Errorf("Failed to get gql_server from context and cast to GQLServer") + } + + ctx, ok := p.Context.Value("graph_context").(*GraphContext) + if ok == false { + return nil, fmt.Errorf("Failed to get graph_context from context and cast to GraphContext") + } + + c := make(chan interface{}) + go func(c chan interface{}, server *GQLThread) { + sig_c := server.UpdateChannel(0) + for { + val, ok := <- sig_c + if ok == false { + return + } + ret, err := fn(val, p) + if err != nil { + ctx.Log.Logf("gqlws", "type convertor error %s", err) + return + } + c <- ret + } + }(c, server) + return c, nil +} + +var gql_subscription_update * graphql.Field = nil +func GQLSubscriptionUpdate() * graphql.Field { + if gql_subscription_update == nil { + gql_subscription_update = &graphql.Field{ + Type: GQLTypeSignal(), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source, nil + }, + Subscribe: GQLSubscribeSignal, + } + } + + return gql_subscription_update +} + +var gql_mutation_send_update *graphql.Field = nil +func GQLMutationSendUpdate() *graphql.Field { + if gql_mutation_send_update == nil { + gql_mutation_send_update = &graphql.Field{ + Type: GQLTypeSignal(), + Args: graphql.FieldConfigArgument{ + "id": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + "signal": &graphql.ArgumentConfig{ + Type: GQLTypeSignalInput(), + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + server, ok := p.Context.Value("gql_server").(*GQLThread) + if ok == false { + return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server")) + } + + ctx, ok := p.Context.Value("graph_context").(*GraphContext) + if ok == false { + return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext: %+v", p.Context.Value("graph_context")) + } + + signal_map, ok := p.Args["signal"].(map[string]interface{}) + if ok == false { + return nil, fmt.Errorf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"]) + } + var signal GraphSignal = nil + if signal_map["Direction"] == Up { + signal = NewSignal(server, signal_map["Type"].(string)) + } else if signal_map["Direction"] == Down { + signal = NewDownSignal(server, signal_map["Type"].(string)) + } else if signal_map["Direction"] == Direct { + signal = NewDirectSignal(server, signal_map["Type"].(string)) + } else { + return nil, fmt.Errorf("Bad direction: %d", signal_map["Direction"]) + } + + id , ok := p.Args["id"].(NodeID) + if ok == false { + return nil, fmt.Errorf("Failed to cast arg id to string") + } + + node_if, err := UseStates(ctx, []GraphNode{server}, func(states []NodeState) (interface{}, error){ + server_state := states[0].(*GQLThreadState) + node := FindChild(ctx, server, server_state, id) + if node == nil { + return nil, fmt.Errorf("Failed to find ID: %s as child of server thread", id) + } + return node, nil + }) + if err != nil { + return nil, err + } + + node, ok := node_if.(GraphNode) + if ok == false { + return nil, fmt.Errorf("Failed to cast found node to GraphNode") + } + + SendUpdate(ctx, node, signal) + return signal, nil + }, + } + } + + return gql_mutation_send_update +} + +var gql_query_self *graphql.Field = nil +func GQLQuerySelf() *graphql.Field { + if gql_query_self == nil { + gql_query_self = &graphql.Field{ + Type: GQLTypeGQLThread(), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + server, ok := p.Context.Value("gql_server").(*GQLThread) + if ok == false { + return nil, fmt.Errorf("failed to cast gql_server to GQLThread") + } + + return server, nil + }, + } + } + + return gql_query_self +} diff --git a/gql_test.go b/gql_test.go new file mode 100644 index 0000000..a45df3f --- /dev/null +++ b/gql_test.go @@ -0,0 +1,20 @@ +package graphvent + +import ( + "testing" +) + +func TestGQLThread(t * testing.T) { + ctx := testContext(t) + gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) + fatalErr(t, err) + + test_thread, err := NewSimpleBaseThread(ctx, "Test thread 1", []Lockable{}, ThreadActions{}, ThreadHandlers{}) + fatalErr(t, err) + + err = LinkThreads(ctx, gql_thread, test_thread, nil) + fatalErr(t, err) + + err = RunThread(ctx, gql_thread) + fatalErr(t, err) +} diff --git a/graph.go b/graph.go index 491453b..c332b7a 100644 --- a/graph.go +++ b/graph.go @@ -174,7 +174,7 @@ func CancelSignal(source GraphNode) BaseSignal { } type NodeState interface { - + Name() string } // GraphNode is the interface common to both DAG nodes and Event tree nodes diff --git a/lockable.go b/lockable.go index 1eee207..f5244b6 100644 --- a/lockable.go +++ b/lockable.go @@ -14,6 +14,7 @@ import ( // // RecordLockHolder records that lockable_id needs to be passed back to lock_holder type LockHolderState interface { + NodeState ReturnLock(lockable_id NodeID) GraphNode AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool RecordLockHolder(lockable_id NodeID, lock_holder GraphNode) @@ -22,7 +23,6 @@ type LockHolderState interface { // LockableState is the interface that a lockables state must have to allow it to connect to the DAG type LockableState interface { LockHolderState - Name() string Requirements() []Lockable AddRequirement(requirement Lockable) Dependencies() []Lockable diff --git a/thread.go b/thread.go index 3aaa37b..e3cbc5c 100644 --- a/thread.go +++ b/thread.go @@ -370,7 +370,7 @@ func NewBaseThreadState(name string) BaseThreadState { } } -func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) { +func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) { thread := BaseThread{ BaseLockable: BaseLockable{BaseNode: NewNode(ctx, RandID(), state)}, Actions: ThreadActions{ @@ -398,7 +398,7 @@ func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handl func NewSimpleBaseThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) { state := NewBaseThreadState(name) - thread, err := NewBaseThread(ctx, name, actions, handlers, &state) + thread, err := NewBaseThread(ctx, actions, handlers, &state) if err != nil { return nil, err }