diff --git a/gql.go b/gql.go
new file mode 100644
index 0000000..396f96c
--- /dev/null
+++ b/gql.go
@@ -0,0 +1,506 @@
+package graphvent
+
+import (
+ "net/http"
+ "github.com/graphql-go/graphql"
+ "github.com/graphql-go/graphql/language/parser"
+ "github.com/graphql-go/graphql/language/source"
+ "github.com/graphql-go/graphql/language/ast"
+ "context"
+ "encoding/json"
+ "io"
+ "reflect"
+ "fmt"
+ "sync"
+ "github.com/gobwas/ws"
+ "github.com/gobwas/ws/wsutil"
+)
+
+func GraphiQLHandler() func(http.ResponseWriter, *http.Request) {
+ return func(w http.ResponseWriter, r * http.Request) {
+ graphiql_string := fmt.Sprintf(`
+
+
+
+
+ GraphiQL
+
+
+
+
+
+
+
+
+ Loading...
+
+
+
+
+ `)
+
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ w.WriteHeader(http.StatusOK)
+ io.WriteString(w, graphiql_string)
+ }
+
+}
+
+type GQLWSPayload struct {
+ OperationName string `json:"operationName,omitempty"`
+ Query string `json:"query,omitempty"`
+ Variables map[string]interface{} `json:"variables,omitempty"`
+ Extensions map[string]interface{} `json:"extensions,omitempty"`
+ Data string `json:"data,omitempty"`
+}
+
+type GQLWSMsg struct {
+ ID string `json:"id,omitempty"`
+ Type string `json:"type"`
+ Payload GQLWSPayload `json:"payload,omitempty"`
+}
+
+func enableCORS(w *http.ResponseWriter) {
+ (*w).Header().Set("Access-Control-Allow-Origin", "*")
+ (*w).Header().Set("Access-Control-Allow-Credentials", "true")
+ (*w).Header().Set("Access-Control-Allow-Headers", "*")
+ (*w).Header().Set("Access-Control-Allow-Methods", "*")
+}
+
+func GQLHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) {
+ return func(w http.ResponseWriter, r * http.Request) {
+ ctx.Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr)
+ enableCORS(&w)
+ header_map := map[string]interface{}{}
+ for header, value := range(r.Header) {
+ header_map[header] = value
+ }
+ ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
+
+ str, err := io.ReadAll(r.Body)
+ if err != nil {
+ ctx.Log.Logf("gql", "failed to read request body: %s", err)
+ return
+ }
+ query := GQLWSPayload{}
+ json.Unmarshal(str, &query)
+
+ params := graphql.Params{
+ Schema: schema,
+ Context: gql_ctx,
+ RequestString: query.Query,
+ }
+ if query.OperationName != "" {
+ params.OperationName = query.OperationName
+ }
+ if len(query.Variables) > 0 {
+ params.VariableValues = query.Variables
+ }
+ result := graphql.Do(params)
+ if len(result.Errors) > 0 {
+ extra_fields := map[string]interface{}{}
+ extra_fields["body"] = string(str)
+ extra_fields["headers"] = r.Header
+ ctx.Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
+ }
+ json.NewEncoder(w).Encode(result)
+ }
+}
+
+func sendOneResultAndClose(res *graphql.Result) chan *graphql.Result {
+ resultChannel := make(chan *graphql.Result)
+ go func() {
+ resultChannel <- res
+ close(resultChannel)
+ }()
+ return resultChannel
+}
+
+
+func getOperationTypeOfReq(p graphql.Params) string{
+ source := source.NewSource(&source.Source{
+ Body: []byte(p.RequestString),
+ Name: "GraphQL request",
+ })
+
+ AST, err := parser.Parse(parser.ParseParams{Source: source})
+ if err != nil {
+ return ""
+ }
+
+ for _, node := range AST.Definitions {
+ if operationDef, ok := node.(*ast.OperationDefinition); ok {
+ name := ""
+ if operationDef.Name != nil {
+ name = operationDef.Name.Value
+ }
+ if name == p.OperationName || p.OperationName == "" {
+ return operationDef.Operation
+ }
+ }
+ }
+ return ""
+}
+
+func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result {
+ operation := getOperationTypeOfReq(p)
+ ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString)
+
+ if operation == ast.OperationTypeSubscription {
+ return graphql.Subscribe(p)
+ }
+
+ res := graphql.Do(p)
+ return sendOneResultAndClose(res)
+}
+
+func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) {
+ return func(w http.ResponseWriter, r * http.Request) {
+ ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
+ header_map := map[string]interface{}{}
+ for header, value := range(r.Header) {
+ header_map[header] = value
+ }
+ ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
+ u := ws.HTTPUpgrader{
+ Protocol: func(protocol string) bool {
+ ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
+ return string(protocol) == "graphql-transport-ws"
+ },
+ }
+ conn, _, _, err := u.Upgrade(r, w)
+ if err == nil {
+ defer conn.Close()
+ conn_state := "init"
+ for {
+ // TODO: Make this a select between reading client data and getting updates from the event to push to clients"
+ msg_raw, op, err := wsutil.ReadClientData(conn)
+ ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
+ msg := GQLWSMsg{}
+ json.Unmarshal(msg_raw, &msg)
+ if err != nil {
+ ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR")
+ break
+ }
+ if msg.Type == "connection_init" {
+ if conn_state != "init" {
+ ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state)
+ break
+ }
+ conn_state = "ready"
+ err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}"))
+ if err != nil {
+ ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack")
+ break
+ }
+ } else if msg.Type == "ping" {
+ ctx.Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr)
+ err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}"))
+ if err != nil {
+ ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG")
+ }
+ } else if msg.Type == "subscribe" {
+ ctx.Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload)
+ params := graphql.Params{
+ Schema: schema,
+ Context: gql_ctx,
+ RequestString: msg.Payload.Query,
+ }
+ if msg.Payload.OperationName != "" {
+ params.OperationName = msg.Payload.OperationName
+ }
+ if len(msg.Payload.Variables) > 0 {
+ params.VariableValues = msg.Payload.Variables
+ }
+
+ res_chan := GQLWSDo(ctx, params)
+
+ go func(res_chan chan *graphql.Result) {
+ for {
+ next, ok := <-res_chan
+ if ok == false {
+ ctx.Log.Logf("gqlws", "response channel was closed")
+ return
+ }
+ if next == nil {
+ ctx.Log.Logf("gqlws", "NIL_ON_CHANNEL")
+ return
+ }
+ if len(next.Errors) > 0 {
+ extra_fields := map[string]interface{}{}
+ extra_fields["query"] = string(msg.Payload.Query)
+ ctx.Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors)
+ continue
+ }
+ ctx.Log.Logf("gqlws", "DATA: %+v", next.Data)
+ data, err := json.Marshal(next.Data)
+ if err != nil {
+ ctx.Log.Logf("gqlws", "ERROR: %+v", err)
+ continue
+ }
+ msg, err := json.Marshal(GQLWSMsg{
+ ID: msg.ID,
+ Type: "next",
+ Payload: GQLWSPayload{
+ Data: string(data),
+ },
+ })
+ if err != nil {
+ ctx.Log.Logf("gqlws", "ERROR: %+v", err)
+ continue
+ }
+
+ err = wsutil.WriteServerMessage(conn, 1, msg)
+ if err != nil {
+ ctx.Log.Logf("gqlws", "ERROR: %+v", err)
+ continue
+ }
+ }
+ }(res_chan)
+ } else {
+ }
+ }
+ return
+ } else {
+ panic("Failed to upgrade websocket")
+ }
+ }
+}
+
+type ObjTypeMap map[reflect.Type]*graphql.Object
+type FieldMap map[string]*graphql.Field
+
+type GQLThread struct {
+ BaseThread
+ extended_types ObjTypeMap
+ extended_queries FieldMap
+ extended_subscriptions FieldMap
+ extended_mutations FieldMap
+}
+
+type GQLThreadState struct {
+ BaseThreadState
+ Listen string
+}
+
+func NewGQLThreadState(listen string) GQLThreadState {
+ return GQLThreadState{
+ BaseThreadState: NewBaseThreadState("GQL Server"),
+ Listen: listen,
+ }
+}
+
+var gql_actions ThreadActions = ThreadActions{
+ "start": func(ctx * GraphContext, thread Thread) (string, error) {
+ ctx.Log.Logf("gql", "SERVER_STARTED")
+ server := thread.(*GQLThread)
+ go func() {
+ ctx.Log.Logf("gql", "GOROUTINE_START for %s", server.ID())
+
+ mux := http.NewServeMux()
+ http_handler, ws_handler := MakeGQLHandlers(ctx, server)
+ mux.HandleFunc("/gql", http_handler)
+ mux.HandleFunc("/gqlws", ws_handler)
+ mux.HandleFunc("/graphiql", GraphiQLHandler())
+ fs := http.FileServer(http.Dir("./site"))
+ mux.Handle("/site/", http.StripPrefix("/site", fs))
+
+ srv_if, _ := UseStates(ctx, []GraphNode{server}, func(states []NodeState)(interface{}, error){
+ server_state := states[0].(*GQLThreadState)
+ return &http.Server{
+ Addr: server_state.Listen,
+ Handler: mux,
+ }, nil
+ })
+ srv := srv_if.(*http.Server)
+
+ http_done := &sync.WaitGroup{}
+ http_done.Add(1)
+ go func(srv *http.Server, http_done *sync.WaitGroup) {
+ defer http_done.Done()
+ err := srv.ListenAndServe()
+ if err != http.ErrServerClosed {
+ panic(fmt.Sprintf("Failed to start gql server: %s", err))
+ }
+ }(srv, http_done)
+
+ for true {
+ select {
+ case signal:=<-server.signal:
+ if signal.Type() == "abort" || signal.Type() == "cancel" {
+ err := srv.Shutdown(context.Background())
+ if err != nil{
+ panic(fmt.Sprintf("Failed to shutdown gql server: %s", err))
+ }
+ http_done.Wait()
+ break
+ }
+ ctx.Log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal)
+ // Take signals to resource and send to GQL subscriptions
+ }
+ }
+ }()
+ return "wait", nil
+ },
+}
+
+var gql_handlers ThreadHandlers = ThreadHandlers{
+}
+
+func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable, extended_types ObjTypeMap, extended_queries FieldMap, extended_mutations FieldMap, extended_subscriptions FieldMap) (*GQLThread, error) {
+ state := NewGQLThreadState(listen)
+ base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state)
+ if err != nil {
+ return nil, err
+ }
+
+ thread := &GQLThread {
+ BaseThread: base_thread,
+ extended_types: extended_types,
+ extended_queries: extended_queries,
+ extended_mutations: extended_mutations,
+ extended_subscriptions: extended_subscriptions,
+ }
+
+ err = LinkLockables(ctx, thread, requirements)
+ if err != nil {
+ return nil, err
+ }
+ return thread, nil
+}
+
+func MakeGQLHandlers(ctx * GraphContext, server * GQLThread) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) {
+ valid_nodes := map[reflect.Type]*graphql.Object{}
+ valid_lockables := map[reflect.Type]*graphql.Object{}
+ valid_threads := map[reflect.Type]*graphql.Object{}
+ valid_lockables[reflect.TypeOf((*BaseLockable)(nil))] = GQLTypeBaseNode()
+ for t, v := range(valid_lockables) {
+ valid_nodes[t] = v
+ }
+ valid_threads[reflect.TypeOf((*BaseThread)(nil))] = GQLTypeBaseThread()
+ valid_threads[reflect.TypeOf((*GQLThread)(nil))] = GQLTypeGQLThread()
+ for t, v := range(valid_threads) {
+ valid_lockables[t] = v
+ valid_nodes[t] = v
+ }
+
+
+ gql_types := []graphql.Type{GQLTypeSignal(), GQLTypeSignalInput()}
+ for _, v := range(valid_nodes) {
+ gql_types = append(gql_types, v)
+ }
+
+ node_type := reflect.TypeOf((GraphNode)(nil))
+ lockable_type := reflect.TypeOf((Lockable)(nil))
+ thread_type := reflect.TypeOf((Thread)(nil))
+
+ for go_t, gql_t := range(server.extended_types) {
+ if go_t.Implements(node_type) {
+ valid_nodes[go_t] = gql_t
+ }
+ if go_t.Implements(lockable_type) {
+ valid_lockables[go_t] = gql_t
+ }
+ if go_t.Implements(thread_type) {
+ valid_threads[go_t] = gql_t
+ }
+ gql_types = append(gql_types, gql_t)
+ }
+
+ gql_queries := graphql.Fields{
+ "Self": GQLQuerySelf(),
+ }
+
+ for key, value := range(server.extended_queries) {
+ gql_queries[key] = value
+ }
+
+ gql_subscriptions := graphql.Fields{
+ "Update": GQLSubscriptionUpdate(),
+ }
+
+ for key, value := range(server.extended_subscriptions) {
+ gql_subscriptions[key] = value
+ }
+
+ gql_mutations := graphql.Fields{
+ "SendUpdate": GQLMutationSendUpdate(),
+ }
+
+ for key, value := range(server.extended_mutations) {
+ gql_mutations[key] = value
+ }
+
+ schemaConfig := graphql.SchemaConfig{
+ Types: gql_types,
+ Query: graphql.NewObject(graphql.ObjectConfig{
+ Name: "Query",
+ Fields: gql_queries,
+ }),
+ Mutation: graphql.NewObject(graphql.ObjectConfig{
+ Name: "Mutation",
+ Fields: gql_mutations,
+ }),
+ Subscription: graphql.NewObject(graphql.ObjectConfig{
+ Name: "Subscription",
+ Fields: gql_subscriptions,
+ }),
+ }
+
+ schema, err := graphql.NewSchema(schemaConfig)
+ if err != nil{
+ panic(err)
+ }
+ gql_ctx := context.Background()
+ gql_ctx = context.WithValue(gql_ctx, "valid_nodes", valid_nodes)
+ gql_ctx = context.WithValue(gql_ctx, "valid_lockables", valid_lockables)
+ gql_ctx = context.WithValue(gql_ctx, "valid_threads", valid_threads)
+ gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
+ gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
+ return GQLHandler(ctx, schema, gql_ctx), GQLWSHandler(ctx, schema, gql_ctx)
+}
diff --git a/gql_graph.go b/gql_graph.go
new file mode 100644
index 0000000..3fa04b9
--- /dev/null
+++ b/gql_graph.go
@@ -0,0 +1,551 @@
+package graphvent
+
+import (
+ "github.com/graphql-go/graphql"
+ "reflect"
+ "fmt"
+ "time"
+)
+
+var gql_interface_graph_node *graphql.Interface = nil
+func GQLInterfaceGraphNode() *graphql.Interface {
+ if gql_interface_graph_node == nil {
+ gql_interface_graph_node = graphql.NewInterface(graphql.InterfaceConfig{
+ Name: "GraphNode",
+ ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
+ valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return nil
+ }
+
+ for key, value := range(valid_nodes) {
+ if reflect.TypeOf(p.Value) == key {
+ return value
+ }
+ }
+ return nil
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_interface_graph_node.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ })
+
+ gql_interface_graph_node.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ })
+ }
+
+ return gql_interface_graph_node
+}
+
+var gql_list_thread *graphql.List = nil
+func GQLListThread() *graphql.List {
+ if gql_list_thread == nil {
+ gql_list_thread = graphql.NewList(GQLInterfaceThread())
+ }
+ return gql_list_thread
+}
+
+var gql_interface_thread *graphql.Interface = nil
+func GQLInterfaceThread() *graphql.Interface {
+ if gql_interface_thread == nil {
+ gql_interface_thread = graphql.NewInterface(graphql.InterfaceConfig{
+ Name: "Thread",
+ ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
+ valid_nodes, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return nil
+ }
+
+ for key, value := range(valid_nodes) {
+ if reflect.TypeOf(p.Value) == key {
+ return value
+ }
+ }
+ return nil
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_interface_thread.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ })
+ }
+
+ return gql_interface_thread
+}
+
+var gql_interface_lockable *graphql.Interface = nil
+func GQLInterfaceLockable() *graphql.Interface {
+ if gql_interface_lockable == nil {
+ gql_interface_lockable = graphql.NewInterface(graphql.InterfaceConfig{
+ Name: "Lockable",
+ ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
+ valid_nodes, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return nil
+ }
+
+ for key, value := range(valid_nodes) {
+ if reflect.TypeOf(p.Value) == key {
+ return value
+ }
+ }
+ return nil
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_interface_lockable.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ })
+
+ gql_interface_lockable.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ })
+ }
+
+ return gql_interface_lockable
+}
+
+func GQLNodeID(p graphql.ResolveParams) (interface{}, error) {
+ node, ok := p.Source.(GraphNode)
+ if ok == false || node == nil {
+ return nil, fmt.Errorf("Failed to cast source to GraphNode")
+ }
+ return node.ID(), nil
+}
+
+func GQLNodeName(p graphql.ResolveParams) (interface{}, error) {
+ node, ok := p.Source.(GraphNode)
+ if ok == false || node == nil {
+ return nil, fmt.Errorf("Failed to cast source to GraphNode")
+ }
+
+ ctx, ok := p.Context.Value("graph_context").(*GraphContext)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext")
+ }
+
+ name, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) {
+ return states[0].Name(), nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ name_str, ok := name.(string)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast name to string %+v", name)
+ }
+
+ return name_str, nil
+}
+
+func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) {
+ node, ok := p.Source.(*GQLThread)
+ if ok == false || node == nil {
+ return nil, fmt.Errorf("Failed to cast source to GQLThread")
+ }
+
+ ctx, ok := p.Context.Value("graph_context").(*GraphContext)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext")
+ }
+
+ listen, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) {
+ gql_thread, ok := states[0].(*GQLThreadState)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast state to GQLThreadState")
+ }
+ return gql_thread.Listen, nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ listen_str, ok := listen.(string)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast listen to string %+v", listen)
+ }
+
+ return listen_str, nil
+}
+
+var gql_type_gql_thread *graphql.Object = nil
+func GQLTypeGQLThread() * graphql.Object {
+ if gql_type_gql_thread == nil {
+ gql_type_gql_thread = graphql.NewObject(graphql.ObjectConfig{
+ Name: "GQLThread",
+ Interfaces: []*graphql.Interface{
+ GQLInterfaceGraphNode(),
+ GQLInterfaceThread(),
+ },
+ IsTypeOf: func(p graphql.IsTypeOfParams) bool {
+ _, ok := p.Value.(*GQLThread)
+ return ok
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_type_gql_thread.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeID,
+ })
+
+ gql_type_gql_thread.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeName,
+ })
+
+ gql_type_gql_thread.AddFieldConfig("Listen", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLThreadListen,
+ })
+ }
+ return gql_type_gql_thread
+}
+
+var gql_type_base_thread *graphql.Object = nil
+func GQLTypeBaseThread() * graphql.Object {
+ if gql_type_base_thread == nil {
+ gql_type_base_thread = graphql.NewObject(graphql.ObjectConfig{
+ Name: "BaseThread",
+ Interfaces: []*graphql.Interface{
+ GQLInterfaceGraphNode(),
+ GQLInterfaceThread(),
+ },
+ IsTypeOf: func(p graphql.IsTypeOfParams) bool {
+ valid_threads, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return false
+ }
+ value_type := reflect.TypeOf(p.Value)
+ for go_type, _ := range(valid_threads) {
+ if value_type == go_type {
+ return true
+ }
+ }
+ return false
+ },
+ Fields: graphql.Fields{},
+ })
+ gql_type_base_thread.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeID,
+ })
+
+ gql_type_base_thread.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeName,
+ })
+ }
+ return gql_type_base_thread
+}
+
+var gql_type_base_lockable *graphql.Object = nil
+func GQLTypeBaseLockable() * graphql.Object {
+ if gql_type_base_lockable == nil {
+ gql_type_base_lockable = graphql.NewObject(graphql.ObjectConfig{
+ Name: "BaseLockable",
+ Interfaces: []*graphql.Interface{
+ GQLInterfaceGraphNode(),
+ GQLInterfaceLockable(),
+ },
+ IsTypeOf: func(p graphql.IsTypeOfParams) bool {
+ valid_lockables, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return false
+ }
+ value_type := reflect.TypeOf(p.Value)
+ for go_type, _ := range(valid_lockables) {
+ if value_type == go_type {
+ return true
+ }
+ }
+ return false
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_type_base_lockable.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeID,
+ })
+
+ gql_type_base_lockable.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeName,
+ })
+ }
+
+ return gql_type_base_lockable
+}
+
+var gql_type_base_node *graphql.Object = nil
+func GQLTypeBaseNode() * graphql.Object {
+ if gql_type_base_node == nil {
+ gql_type_base_node = graphql.NewObject(graphql.ObjectConfig{
+ Name: "BaseNode",
+ Interfaces: []*graphql.Interface{
+ GQLInterfaceGraphNode(),
+ },
+ IsTypeOf: func(p graphql.IsTypeOfParams) bool {
+ valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object)
+ if ok == false {
+ return false
+ }
+ value_type := reflect.TypeOf(p.Value)
+ for go_type, _ := range(valid_nodes) {
+ if value_type == go_type {
+ return true
+ }
+ }
+ return false
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_type_base_node.AddFieldConfig("ID", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeID,
+ })
+
+ gql_type_base_node.AddFieldConfig("Name", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLNodeName,
+ })
+ }
+
+ return gql_type_base_node
+}
+
+func GQLSignalFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
+ if signal, ok := p.Source.(GraphSignal); ok {
+ return fn(signal, p)
+ }
+ return nil, fmt.Errorf("Failed to cast source to event")
+}
+
+func GQLSignalType(p graphql.ResolveParams) (interface{}, error) {
+ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
+ return signal.Type(), nil
+ })
+}
+
+func GQLSignalSource(p graphql.ResolveParams) (interface{}, error) {
+ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
+ return signal.Source(), nil
+ })
+}
+
+func GQLSignalDirection(p graphql.ResolveParams) (interface{}, error) {
+ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
+ return signal.Direction(), nil
+ })
+}
+
+func GQLSignalString(p graphql.ResolveParams) (interface{}, error) {
+ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
+ return signal.String(), nil
+ })
+}
+
+
+var gql_type_signal *graphql.Object = nil
+func GQLTypeSignal() *graphql.Object {
+ if gql_type_signal == nil {
+ gql_type_signal = graphql.NewObject(graphql.ObjectConfig{
+ Name: "SignalOut",
+ IsTypeOf: func(p graphql.IsTypeOfParams) bool {
+ _, ok := p.Value.(GraphSignal)
+ return ok
+ },
+ Fields: graphql.Fields{},
+ })
+
+ gql_type_signal.AddFieldConfig("Type", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLSignalType,
+ })
+ gql_type_signal.AddFieldConfig("Source", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLSignalSource,
+ })
+ gql_type_signal.AddFieldConfig("Direction", &graphql.Field{
+ Type: graphql.Boolean,
+ Resolve: GQLSignalDirection,
+ })
+ gql_type_signal.AddFieldConfig("String", &graphql.Field{
+ Type: graphql.String,
+ Resolve: GQLSignalString,
+ })
+ }
+ return gql_type_signal
+}
+
+var gql_type_signal_input *graphql.InputObject = nil
+func GQLTypeSignalInput() *graphql.InputObject {
+ if gql_type_signal_input == nil {
+ gql_type_signal_input = graphql.NewInputObject(graphql.InputObjectConfig{
+ Name: "SignalIn",
+ Fields: graphql.InputObjectConfigFieldMap{},
+ })
+ gql_type_signal_input.AddFieldConfig("Type", &graphql.InputObjectFieldConfig{
+ Type: graphql.String,
+ })
+ gql_type_signal_input.AddFieldConfig("Description", &graphql.InputObjectFieldConfig{
+ Type: graphql.String,
+ DefaultValue: "",
+ })
+ gql_type_signal_input.AddFieldConfig("Time", &graphql.InputObjectFieldConfig{
+ Type: graphql.DateTime,
+ DefaultValue: time.Now(),
+ })
+ }
+ return gql_type_signal_input
+}
+
+func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) {
+ return GQLSubscribeFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error) {
+ return signal, nil
+ })
+}
+
+func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
+ server, ok := p.Context.Value("gql_server").(*GQLThread)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to get gql_server from context and cast to GQLServer")
+ }
+
+ ctx, ok := p.Context.Value("graph_context").(*GraphContext)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to get graph_context from context and cast to GraphContext")
+ }
+
+ c := make(chan interface{})
+ go func(c chan interface{}, server *GQLThread) {
+ sig_c := server.UpdateChannel(0)
+ for {
+ val, ok := <- sig_c
+ if ok == false {
+ return
+ }
+ ret, err := fn(val, p)
+ if err != nil {
+ ctx.Log.Logf("gqlws", "type convertor error %s", err)
+ return
+ }
+ c <- ret
+ }
+ }(c, server)
+ return c, nil
+}
+
+var gql_subscription_update * graphql.Field = nil
+func GQLSubscriptionUpdate() * graphql.Field {
+ if gql_subscription_update == nil {
+ gql_subscription_update = &graphql.Field{
+ Type: GQLTypeSignal(),
+ Resolve: func(p graphql.ResolveParams) (interface{}, error) {
+ return p.Source, nil
+ },
+ Subscribe: GQLSubscribeSignal,
+ }
+ }
+
+ return gql_subscription_update
+}
+
+var gql_mutation_send_update *graphql.Field = nil
+func GQLMutationSendUpdate() *graphql.Field {
+ if gql_mutation_send_update == nil {
+ gql_mutation_send_update = &graphql.Field{
+ Type: GQLTypeSignal(),
+ Args: graphql.FieldConfigArgument{
+ "id": &graphql.ArgumentConfig{
+ Type: graphql.String,
+ },
+ "signal": &graphql.ArgumentConfig{
+ Type: GQLTypeSignalInput(),
+ },
+ },
+ Resolve: func(p graphql.ResolveParams) (interface{}, error) {
+ server, ok := p.Context.Value("gql_server").(*GQLThread)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server"))
+ }
+
+ ctx, ok := p.Context.Value("graph_context").(*GraphContext)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext: %+v", p.Context.Value("graph_context"))
+ }
+
+ signal_map, ok := p.Args["signal"].(map[string]interface{})
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"])
+ }
+ var signal GraphSignal = nil
+ if signal_map["Direction"] == Up {
+ signal = NewSignal(server, signal_map["Type"].(string))
+ } else if signal_map["Direction"] == Down {
+ signal = NewDownSignal(server, signal_map["Type"].(string))
+ } else if signal_map["Direction"] == Direct {
+ signal = NewDirectSignal(server, signal_map["Type"].(string))
+ } else {
+ return nil, fmt.Errorf("Bad direction: %d", signal_map["Direction"])
+ }
+
+ id , ok := p.Args["id"].(NodeID)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast arg id to string")
+ }
+
+ node_if, err := UseStates(ctx, []GraphNode{server}, func(states []NodeState) (interface{}, error){
+ server_state := states[0].(*GQLThreadState)
+ node := FindChild(ctx, server, server_state, id)
+ if node == nil {
+ return nil, fmt.Errorf("Failed to find ID: %s as child of server thread", id)
+ }
+ return node, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ node, ok := node_if.(GraphNode)
+ if ok == false {
+ return nil, fmt.Errorf("Failed to cast found node to GraphNode")
+ }
+
+ SendUpdate(ctx, node, signal)
+ return signal, nil
+ },
+ }
+ }
+
+ return gql_mutation_send_update
+}
+
+var gql_query_self *graphql.Field = nil
+func GQLQuerySelf() *graphql.Field {
+ if gql_query_self == nil {
+ gql_query_self = &graphql.Field{
+ Type: GQLTypeGQLThread(),
+ Resolve: func(p graphql.ResolveParams) (interface{}, error) {
+ server, ok := p.Context.Value("gql_server").(*GQLThread)
+ if ok == false {
+ return nil, fmt.Errorf("failed to cast gql_server to GQLThread")
+ }
+
+ return server, nil
+ },
+ }
+ }
+
+ return gql_query_self
+}
diff --git a/gql_test.go b/gql_test.go
new file mode 100644
index 0000000..a45df3f
--- /dev/null
+++ b/gql_test.go
@@ -0,0 +1,20 @@
+package graphvent
+
+import (
+ "testing"
+)
+
+func TestGQLThread(t * testing.T) {
+ ctx := testContext(t)
+ gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{})
+ fatalErr(t, err)
+
+ test_thread, err := NewSimpleBaseThread(ctx, "Test thread 1", []Lockable{}, ThreadActions{}, ThreadHandlers{})
+ fatalErr(t, err)
+
+ err = LinkThreads(ctx, gql_thread, test_thread, nil)
+ fatalErr(t, err)
+
+ err = RunThread(ctx, gql_thread)
+ fatalErr(t, err)
+}
diff --git a/graph.go b/graph.go
index 491453b..c332b7a 100644
--- a/graph.go
+++ b/graph.go
@@ -174,7 +174,7 @@ func CancelSignal(source GraphNode) BaseSignal {
}
type NodeState interface {
-
+ Name() string
}
// GraphNode is the interface common to both DAG nodes and Event tree nodes
diff --git a/lockable.go b/lockable.go
index 1eee207..f5244b6 100644
--- a/lockable.go
+++ b/lockable.go
@@ -14,6 +14,7 @@ import (
//
// RecordLockHolder records that lockable_id needs to be passed back to lock_holder
type LockHolderState interface {
+ NodeState
ReturnLock(lockable_id NodeID) GraphNode
AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool
RecordLockHolder(lockable_id NodeID, lock_holder GraphNode)
@@ -22,7 +23,6 @@ type LockHolderState interface {
// LockableState is the interface that a lockables state must have to allow it to connect to the DAG
type LockableState interface {
LockHolderState
- Name() string
Requirements() []Lockable
AddRequirement(requirement Lockable)
Dependencies() []Lockable
diff --git a/thread.go b/thread.go
index 3aaa37b..e3cbc5c 100644
--- a/thread.go
+++ b/thread.go
@@ -370,7 +370,7 @@ func NewBaseThreadState(name string) BaseThreadState {
}
}
-func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
+func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
thread := BaseThread{
BaseLockable: BaseLockable{BaseNode: NewNode(ctx, RandID(), state)},
Actions: ThreadActions{
@@ -398,7 +398,7 @@ func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handl
func NewSimpleBaseThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) {
state := NewBaseThreadState(name)
- thread, err := NewBaseThread(ctx, name, actions, handlers, &state)
+ thread, err := NewBaseThread(ctx, actions, handlers, &state)
if err != nil {
return nil, err
}