Added base GQLThread

graph-rework
noah metz 2023-06-25 20:20:59 -06:00
parent 1598e2939a
commit a185cc3dfc
6 changed files with 1081 additions and 4 deletions

506
gql.go

@ -0,0 +1,506 @@
package graphvent
import (
"net/http"
"github.com/graphql-go/graphql"
"github.com/graphql-go/graphql/language/parser"
"github.com/graphql-go/graphql/language/source"
"github.com/graphql-go/graphql/language/ast"
"context"
"encoding/json"
"io"
"reflect"
"fmt"
"sync"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func GraphiQLHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) {
graphiql_string := fmt.Sprintf(`
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<title>GraphiQL</title>
<style>
body {
height: 100%%;
margin: 0;
width: 100%%;
overflow: hidden;
}
#graphiql {
height: 100vh;
}
</style>
<!--
This GraphiQL example depends on Promise and fetch, which are available in
modern browsers, but can be "polyfilled" for older browsers.
GraphiQL itself depends on React DOM.
If you do not want to rely on a CDN, you can host these files locally or
include them directly in your favored resource bundler.
-->
<script
crossorigin
src="https://unpkg.com/react@18/umd/react.development.js"
></script>
<script
crossorigin
src="https://unpkg.com/react-dom@18/umd/react-dom.development.js"
></script>
<!--
These two files can be found in the npm module, however you may wish to
copy them directly into your environment, or perhaps include them in your
favored resource bundler.
-->
<link rel="stylesheet" href="https://unpkg.com/graphiql/graphiql.min.css" />
</head>
<body>
<div id="graphiql">Loading...</div>
<script
src="https://unpkg.com/graphiql/graphiql.min.js"
type="application/javascript"
></script>
<script>
const root = ReactDOM.createRoot(document.getElementById('graphiql'));
root.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
url: '/gql',
}),
defaultEditorToolsVisibility: true,
}),
);
</script>
</body>
</html>
`)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
io.WriteString(w, graphiql_string)
}
}
type GQLWSPayload struct {
OperationName string `json:"operationName,omitempty"`
Query string `json:"query,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
Extensions map[string]interface{} `json:"extensions,omitempty"`
Data string `json:"data,omitempty"`
}
type GQLWSMsg struct {
ID string `json:"id,omitempty"`
Type string `json:"type"`
Payload GQLWSPayload `json:"payload,omitempty"`
}
func enableCORS(w *http.ResponseWriter) {
(*w).Header().Set("Access-Control-Allow-Origin", "*")
(*w).Header().Set("Access-Control-Allow-Credentials", "true")
(*w).Header().Set("Access-Control-Allow-Headers", "*")
(*w).Header().Set("Access-Control-Allow-Methods", "*")
}
func GQLHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) {
ctx.Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr)
enableCORS(&w)
header_map := map[string]interface{}{}
for header, value := range(r.Header) {
header_map[header] = value
}
ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
str, err := io.ReadAll(r.Body)
if err != nil {
ctx.Log.Logf("gql", "failed to read request body: %s", err)
return
}
query := GQLWSPayload{}
json.Unmarshal(str, &query)
params := graphql.Params{
Schema: schema,
Context: gql_ctx,
RequestString: query.Query,
}
if query.OperationName != "" {
params.OperationName = query.OperationName
}
if len(query.Variables) > 0 {
params.VariableValues = query.Variables
}
result := graphql.Do(params)
if len(result.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["body"] = string(str)
extra_fields["headers"] = r.Header
ctx.Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
}
json.NewEncoder(w).Encode(result)
}
}
func sendOneResultAndClose(res *graphql.Result) chan *graphql.Result {
resultChannel := make(chan *graphql.Result)
go func() {
resultChannel <- res
close(resultChannel)
}()
return resultChannel
}
func getOperationTypeOfReq(p graphql.Params) string{
source := source.NewSource(&source.Source{
Body: []byte(p.RequestString),
Name: "GraphQL request",
})
AST, err := parser.Parse(parser.ParseParams{Source: source})
if err != nil {
return ""
}
for _, node := range AST.Definitions {
if operationDef, ok := node.(*ast.OperationDefinition); ok {
name := ""
if operationDef.Name != nil {
name = operationDef.Name.Value
}
if name == p.OperationName || p.OperationName == "" {
return operationDef.Operation
}
}
}
return ""
}
func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result {
operation := getOperationTypeOfReq(p)
ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString)
if operation == ast.OperationTypeSubscription {
return graphql.Subscribe(p)
}
res := graphql.Do(p)
return sendOneResultAndClose(res)
}
func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) {
ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
header_map := map[string]interface{}{}
for header, value := range(r.Header) {
header_map[header] = value
}
ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
u := ws.HTTPUpgrader{
Protocol: func(protocol string) bool {
ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
return string(protocol) == "graphql-transport-ws"
},
}
conn, _, _, err := u.Upgrade(r, w)
if err == nil {
defer conn.Close()
conn_state := "init"
for {
// TODO: Make this a select between reading client data and getting updates from the event to push to clients"
msg_raw, op, err := wsutil.ReadClientData(conn)
ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
msg := GQLWSMsg{}
json.Unmarshal(msg_raw, &msg)
if err != nil {
ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR")
break
}
if msg.Type == "connection_init" {
if conn_state != "init" {
ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state)
break
}
conn_state = "ready"
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}"))
if err != nil {
ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack")
break
}
} else if msg.Type == "ping" {
ctx.Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr)
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}"))
if err != nil {
ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG")
}
} else if msg.Type == "subscribe" {
ctx.Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload)
params := graphql.Params{
Schema: schema,
Context: gql_ctx,
RequestString: msg.Payload.Query,
}
if msg.Payload.OperationName != "" {
params.OperationName = msg.Payload.OperationName
}
if len(msg.Payload.Variables) > 0 {
params.VariableValues = msg.Payload.Variables
}
res_chan := GQLWSDo(ctx, params)
go func(res_chan chan *graphql.Result) {
for {
next, ok := <-res_chan
if ok == false {
ctx.Log.Logf("gqlws", "response channel was closed")
return
}
if next == nil {
ctx.Log.Logf("gqlws", "NIL_ON_CHANNEL")
return
}
if len(next.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["query"] = string(msg.Payload.Query)
ctx.Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors)
continue
}
ctx.Log.Logf("gqlws", "DATA: %+v", next.Data)
data, err := json.Marshal(next.Data)
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
msg, err := json.Marshal(GQLWSMsg{
ID: msg.ID,
Type: "next",
Payload: GQLWSPayload{
Data: string(data),
},
})
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
err = wsutil.WriteServerMessage(conn, 1, msg)
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
}
}(res_chan)
} else {
}
}
return
} else {
panic("Failed to upgrade websocket")
}
}
}
type ObjTypeMap map[reflect.Type]*graphql.Object
type FieldMap map[string]*graphql.Field
type GQLThread struct {
BaseThread
extended_types ObjTypeMap
extended_queries FieldMap
extended_subscriptions FieldMap
extended_mutations FieldMap
}
type GQLThreadState struct {
BaseThreadState
Listen string
}
func NewGQLThreadState(listen string) GQLThreadState {
return GQLThreadState{
BaseThreadState: NewBaseThreadState("GQL Server"),
Listen: listen,
}
}
var gql_actions ThreadActions = ThreadActions{
"start": func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("gql", "SERVER_STARTED")
server := thread.(*GQLThread)
go func() {
ctx.Log.Logf("gql", "GOROUTINE_START for %s", server.ID())
mux := http.NewServeMux()
http_handler, ws_handler := MakeGQLHandlers(ctx, server)
mux.HandleFunc("/gql", http_handler)
mux.HandleFunc("/gqlws", ws_handler)
mux.HandleFunc("/graphiql", GraphiQLHandler())
fs := http.FileServer(http.Dir("./site"))
mux.Handle("/site/", http.StripPrefix("/site", fs))
srv_if, _ := UseStates(ctx, []GraphNode{server}, func(states []NodeState)(interface{}, error){
server_state := states[0].(*GQLThreadState)
return &http.Server{
Addr: server_state.Listen,
Handler: mux,
}, nil
})
srv := srv_if.(*http.Server)
http_done := &sync.WaitGroup{}
http_done.Add(1)
go func(srv *http.Server, http_done *sync.WaitGroup) {
defer http_done.Done()
err := srv.ListenAndServe()
if err != http.ErrServerClosed {
panic(fmt.Sprintf("Failed to start gql server: %s", err))
}
}(srv, http_done)
for true {
select {
case signal:=<-server.signal:
if signal.Type() == "abort" || signal.Type() == "cancel" {
err := srv.Shutdown(context.Background())
if err != nil{
panic(fmt.Sprintf("Failed to shutdown gql server: %s", err))
}
http_done.Wait()
break
}
ctx.Log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal)
// Take signals to resource and send to GQL subscriptions
}
}
}()
return "wait", nil
},
}
var gql_handlers ThreadHandlers = ThreadHandlers{
}
func NewGQLThread(ctx * GraphContext, listen string, requirements []Lockable, extended_types ObjTypeMap, extended_queries FieldMap, extended_mutations FieldMap, extended_subscriptions FieldMap) (*GQLThread, error) {
state := NewGQLThreadState(listen)
base_thread, err := NewBaseThread(ctx, gql_actions, gql_handlers, &state)
if err != nil {
return nil, err
}
thread := &GQLThread {
BaseThread: base_thread,
extended_types: extended_types,
extended_queries: extended_queries,
extended_mutations: extended_mutations,
extended_subscriptions: extended_subscriptions,
}
err = LinkLockables(ctx, thread, requirements)
if err != nil {
return nil, err
}
return thread, nil
}
func MakeGQLHandlers(ctx * GraphContext, server * GQLThread) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) {
valid_nodes := map[reflect.Type]*graphql.Object{}
valid_lockables := map[reflect.Type]*graphql.Object{}
valid_threads := map[reflect.Type]*graphql.Object{}
valid_lockables[reflect.TypeOf((*BaseLockable)(nil))] = GQLTypeBaseNode()
for t, v := range(valid_lockables) {
valid_nodes[t] = v
}
valid_threads[reflect.TypeOf((*BaseThread)(nil))] = GQLTypeBaseThread()
valid_threads[reflect.TypeOf((*GQLThread)(nil))] = GQLTypeGQLThread()
for t, v := range(valid_threads) {
valid_lockables[t] = v
valid_nodes[t] = v
}
gql_types := []graphql.Type{GQLTypeSignal(), GQLTypeSignalInput()}
for _, v := range(valid_nodes) {
gql_types = append(gql_types, v)
}
node_type := reflect.TypeOf((GraphNode)(nil))
lockable_type := reflect.TypeOf((Lockable)(nil))
thread_type := reflect.TypeOf((Thread)(nil))
for go_t, gql_t := range(server.extended_types) {
if go_t.Implements(node_type) {
valid_nodes[go_t] = gql_t
}
if go_t.Implements(lockable_type) {
valid_lockables[go_t] = gql_t
}
if go_t.Implements(thread_type) {
valid_threads[go_t] = gql_t
}
gql_types = append(gql_types, gql_t)
}
gql_queries := graphql.Fields{
"Self": GQLQuerySelf(),
}
for key, value := range(server.extended_queries) {
gql_queries[key] = value
}
gql_subscriptions := graphql.Fields{
"Update": GQLSubscriptionUpdate(),
}
for key, value := range(server.extended_subscriptions) {
gql_subscriptions[key] = value
}
gql_mutations := graphql.Fields{
"SendUpdate": GQLMutationSendUpdate(),
}
for key, value := range(server.extended_mutations) {
gql_mutations[key] = value
}
schemaConfig := graphql.SchemaConfig{
Types: gql_types,
Query: graphql.NewObject(graphql.ObjectConfig{
Name: "Query",
Fields: gql_queries,
}),
Mutation: graphql.NewObject(graphql.ObjectConfig{
Name: "Mutation",
Fields: gql_mutations,
}),
Subscription: graphql.NewObject(graphql.ObjectConfig{
Name: "Subscription",
Fields: gql_subscriptions,
}),
}
schema, err := graphql.NewSchema(schemaConfig)
if err != nil{
panic(err)
}
gql_ctx := context.Background()
gql_ctx = context.WithValue(gql_ctx, "valid_nodes", valid_nodes)
gql_ctx = context.WithValue(gql_ctx, "valid_lockables", valid_lockables)
gql_ctx = context.WithValue(gql_ctx, "valid_threads", valid_threads)
gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
return GQLHandler(ctx, schema, gql_ctx), GQLWSHandler(ctx, schema, gql_ctx)
}

@ -0,0 +1,551 @@
package graphvent
import (
"github.com/graphql-go/graphql"
"reflect"
"fmt"
"time"
)
var gql_interface_graph_node *graphql.Interface = nil
func GQLInterfaceGraphNode() *graphql.Interface {
if gql_interface_graph_node == nil {
gql_interface_graph_node = graphql.NewInterface(graphql.InterfaceConfig{
Name: "GraphNode",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object)
if ok == false {
return nil
}
for key, value := range(valid_nodes) {
if reflect.TypeOf(p.Value) == key {
return value
}
}
return nil
},
Fields: graphql.Fields{},
})
gql_interface_graph_node.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
})
gql_interface_graph_node.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
})
}
return gql_interface_graph_node
}
var gql_list_thread *graphql.List = nil
func GQLListThread() *graphql.List {
if gql_list_thread == nil {
gql_list_thread = graphql.NewList(GQLInterfaceThread())
}
return gql_list_thread
}
var gql_interface_thread *graphql.Interface = nil
func GQLInterfaceThread() *graphql.Interface {
if gql_interface_thread == nil {
gql_interface_thread = graphql.NewInterface(graphql.InterfaceConfig{
Name: "Thread",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
valid_nodes, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object)
if ok == false {
return nil
}
for key, value := range(valid_nodes) {
if reflect.TypeOf(p.Value) == key {
return value
}
}
return nil
},
Fields: graphql.Fields{},
})
gql_interface_thread.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
})
}
return gql_interface_thread
}
var gql_interface_lockable *graphql.Interface = nil
func GQLInterfaceLockable() *graphql.Interface {
if gql_interface_lockable == nil {
gql_interface_lockable = graphql.NewInterface(graphql.InterfaceConfig{
Name: "Lockable",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
valid_nodes, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object)
if ok == false {
return nil
}
for key, value := range(valid_nodes) {
if reflect.TypeOf(p.Value) == key {
return value
}
}
return nil
},
Fields: graphql.Fields{},
})
gql_interface_lockable.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
})
gql_interface_lockable.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
})
}
return gql_interface_lockable
}
func GQLNodeID(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(GraphNode)
if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GraphNode")
}
return node.ID(), nil
}
func GQLNodeName(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(GraphNode)
if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GraphNode")
}
ctx, ok := p.Context.Value("graph_context").(*GraphContext)
if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext")
}
name, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) {
return states[0].Name(), nil
})
if err != nil {
return nil, err
}
name_str, ok := name.(string)
if ok == false {
return nil, fmt.Errorf("Failed to cast name to string %+v", name)
}
return name_str, nil
}
func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(*GQLThread)
if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GQLThread")
}
ctx, ok := p.Context.Value("graph_context").(*GraphContext)
if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext")
}
listen, err := UseStates(ctx, []GraphNode{node}, func(states []NodeState) (interface{}, error) {
gql_thread, ok := states[0].(*GQLThreadState)
if ok == false {
return nil, fmt.Errorf("Failed to cast state to GQLThreadState")
}
return gql_thread.Listen, nil
})
if err != nil {
return nil, err
}
listen_str, ok := listen.(string)
if ok == false {
return nil, fmt.Errorf("Failed to cast listen to string %+v", listen)
}
return listen_str, nil
}
var gql_type_gql_thread *graphql.Object = nil
func GQLTypeGQLThread() * graphql.Object {
if gql_type_gql_thread == nil {
gql_type_gql_thread = graphql.NewObject(graphql.ObjectConfig{
Name: "GQLThread",
Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(),
GQLInterfaceThread(),
},
IsTypeOf: func(p graphql.IsTypeOfParams) bool {
_, ok := p.Value.(*GQLThread)
return ok
},
Fields: graphql.Fields{},
})
gql_type_gql_thread.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeID,
})
gql_type_gql_thread.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeName,
})
gql_type_gql_thread.AddFieldConfig("Listen", &graphql.Field{
Type: graphql.String,
Resolve: GQLThreadListen,
})
}
return gql_type_gql_thread
}
var gql_type_base_thread *graphql.Object = nil
func GQLTypeBaseThread() * graphql.Object {
if gql_type_base_thread == nil {
gql_type_base_thread = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseThread",
Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(),
GQLInterfaceThread(),
},
IsTypeOf: func(p graphql.IsTypeOfParams) bool {
valid_threads, ok := p.Context.Value("valid_threads").(map[reflect.Type]*graphql.Object)
if ok == false {
return false
}
value_type := reflect.TypeOf(p.Value)
for go_type, _ := range(valid_threads) {
if value_type == go_type {
return true
}
}
return false
},
Fields: graphql.Fields{},
})
gql_type_base_thread.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeID,
})
gql_type_base_thread.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeName,
})
}
return gql_type_base_thread
}
var gql_type_base_lockable *graphql.Object = nil
func GQLTypeBaseLockable() * graphql.Object {
if gql_type_base_lockable == nil {
gql_type_base_lockable = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseLockable",
Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(),
GQLInterfaceLockable(),
},
IsTypeOf: func(p graphql.IsTypeOfParams) bool {
valid_lockables, ok := p.Context.Value("valid_lockables").(map[reflect.Type]*graphql.Object)
if ok == false {
return false
}
value_type := reflect.TypeOf(p.Value)
for go_type, _ := range(valid_lockables) {
if value_type == go_type {
return true
}
}
return false
},
Fields: graphql.Fields{},
})
gql_type_base_lockable.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeID,
})
gql_type_base_lockable.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeName,
})
}
return gql_type_base_lockable
}
var gql_type_base_node *graphql.Object = nil
func GQLTypeBaseNode() * graphql.Object {
if gql_type_base_node == nil {
gql_type_base_node = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseNode",
Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(),
},
IsTypeOf: func(p graphql.IsTypeOfParams) bool {
valid_nodes, ok := p.Context.Value("valid_nodes").(map[reflect.Type]*graphql.Object)
if ok == false {
return false
}
value_type := reflect.TypeOf(p.Value)
for go_type, _ := range(valid_nodes) {
if value_type == go_type {
return true
}
}
return false
},
Fields: graphql.Fields{},
})
gql_type_base_node.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeID,
})
gql_type_base_node.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
Resolve: GQLNodeName,
})
}
return gql_type_base_node
}
func GQLSignalFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
if signal, ok := p.Source.(GraphSignal); ok {
return fn(signal, p)
}
return nil, fmt.Errorf("Failed to cast source to event")
}
func GQLSignalType(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Type(), nil
})
}
func GQLSignalSource(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Source(), nil
})
}
func GQLSignalDirection(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Direction(), nil
})
}
func GQLSignalString(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.String(), nil
})
}
var gql_type_signal *graphql.Object = nil
func GQLTypeSignal() *graphql.Object {
if gql_type_signal == nil {
gql_type_signal = graphql.NewObject(graphql.ObjectConfig{
Name: "SignalOut",
IsTypeOf: func(p graphql.IsTypeOfParams) bool {
_, ok := p.Value.(GraphSignal)
return ok
},
Fields: graphql.Fields{},
})
gql_type_signal.AddFieldConfig("Type", &graphql.Field{
Type: graphql.String,
Resolve: GQLSignalType,
})
gql_type_signal.AddFieldConfig("Source", &graphql.Field{
Type: graphql.String,
Resolve: GQLSignalSource,
})
gql_type_signal.AddFieldConfig("Direction", &graphql.Field{
Type: graphql.Boolean,
Resolve: GQLSignalDirection,
})
gql_type_signal.AddFieldConfig("String", &graphql.Field{
Type: graphql.String,
Resolve: GQLSignalString,
})
}
return gql_type_signal
}
var gql_type_signal_input *graphql.InputObject = nil
func GQLTypeSignalInput() *graphql.InputObject {
if gql_type_signal_input == nil {
gql_type_signal_input = graphql.NewInputObject(graphql.InputObjectConfig{
Name: "SignalIn",
Fields: graphql.InputObjectConfigFieldMap{},
})
gql_type_signal_input.AddFieldConfig("Type", &graphql.InputObjectFieldConfig{
Type: graphql.String,
})
gql_type_signal_input.AddFieldConfig("Description", &graphql.InputObjectFieldConfig{
Type: graphql.String,
DefaultValue: "",
})
gql_type_signal_input.AddFieldConfig("Time", &graphql.InputObjectFieldConfig{
Type: graphql.DateTime,
DefaultValue: time.Now(),
})
}
return gql_type_signal_input
}
func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) {
return GQLSubscribeFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error) {
return signal, nil
})
}
func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLThread)
if ok == false {
return nil, fmt.Errorf("Failed to get gql_server from context and cast to GQLServer")
}
ctx, ok := p.Context.Value("graph_context").(*GraphContext)
if ok == false {
return nil, fmt.Errorf("Failed to get graph_context from context and cast to GraphContext")
}
c := make(chan interface{})
go func(c chan interface{}, server *GQLThread) {
sig_c := server.UpdateChannel(0)
for {
val, ok := <- sig_c
if ok == false {
return
}
ret, err := fn(val, p)
if err != nil {
ctx.Log.Logf("gqlws", "type convertor error %s", err)
return
}
c <- ret
}
}(c, server)
return c, nil
}
var gql_subscription_update * graphql.Field = nil
func GQLSubscriptionUpdate() * graphql.Field {
if gql_subscription_update == nil {
gql_subscription_update = &graphql.Field{
Type: GQLTypeSignal(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return p.Source, nil
},
Subscribe: GQLSubscribeSignal,
}
}
return gql_subscription_update
}
var gql_mutation_send_update *graphql.Field = nil
func GQLMutationSendUpdate() *graphql.Field {
if gql_mutation_send_update == nil {
gql_mutation_send_update = &graphql.Field{
Type: GQLTypeSignal(),
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.String,
},
"signal": &graphql.ArgumentConfig{
Type: GQLTypeSignalInput(),
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLThread)
if ok == false {
return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server"))
}
ctx, ok := p.Context.Value("graph_context").(*GraphContext)
if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext: %+v", p.Context.Value("graph_context"))
}
signal_map, ok := p.Args["signal"].(map[string]interface{})
if ok == false {
return nil, fmt.Errorf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"])
}
var signal GraphSignal = nil
if signal_map["Direction"] == Up {
signal = NewSignal(server, signal_map["Type"].(string))
} else if signal_map["Direction"] == Down {
signal = NewDownSignal(server, signal_map["Type"].(string))
} else if signal_map["Direction"] == Direct {
signal = NewDirectSignal(server, signal_map["Type"].(string))
} else {
return nil, fmt.Errorf("Bad direction: %d", signal_map["Direction"])
}
id , ok := p.Args["id"].(NodeID)
if ok == false {
return nil, fmt.Errorf("Failed to cast arg id to string")
}
node_if, err := UseStates(ctx, []GraphNode{server}, func(states []NodeState) (interface{}, error){
server_state := states[0].(*GQLThreadState)
node := FindChild(ctx, server, server_state, id)
if node == nil {
return nil, fmt.Errorf("Failed to find ID: %s as child of server thread", id)
}
return node, nil
})
if err != nil {
return nil, err
}
node, ok := node_if.(GraphNode)
if ok == false {
return nil, fmt.Errorf("Failed to cast found node to GraphNode")
}
SendUpdate(ctx, node, signal)
return signal, nil
},
}
}
return gql_mutation_send_update
}
var gql_query_self *graphql.Field = nil
func GQLQuerySelf() *graphql.Field {
if gql_query_self == nil {
gql_query_self = &graphql.Field{
Type: GQLTypeGQLThread(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLThread)
if ok == false {
return nil, fmt.Errorf("failed to cast gql_server to GQLThread")
}
return server, nil
},
}
}
return gql_query_self
}

@ -0,0 +1,20 @@
package graphvent
import (
"testing"
)
func TestGQLThread(t * testing.T) {
ctx := testContext(t)
gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{})
fatalErr(t, err)
test_thread, err := NewSimpleBaseThread(ctx, "Test thread 1", []Lockable{}, ThreadActions{}, ThreadHandlers{})
fatalErr(t, err)
err = LinkThreads(ctx, gql_thread, test_thread, nil)
fatalErr(t, err)
err = RunThread(ctx, gql_thread)
fatalErr(t, err)
}

@ -174,7 +174,7 @@ func CancelSignal(source GraphNode) BaseSignal {
}
type NodeState interface {
Name() string
}
// GraphNode is the interface common to both DAG nodes and Event tree nodes

@ -14,6 +14,7 @@ import (
//
// RecordLockHolder records that lockable_id needs to be passed back to lock_holder
type LockHolderState interface {
NodeState
ReturnLock(lockable_id NodeID) GraphNode
AllowedToTakeLock(node_id NodeID, lockable_id NodeID) bool
RecordLockHolder(lockable_id NodeID, lock_holder GraphNode)
@ -22,7 +23,6 @@ type LockHolderState interface {
// LockableState is the interface that a lockables state must have to allow it to connect to the DAG
type LockableState interface {
LockHolderState
Name() string
Requirements() []Lockable
AddRequirement(requirement Lockable)
Dependencies() []Lockable

@ -370,7 +370,7 @@ func NewBaseThreadState(name string) BaseThreadState {
}
}
func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
thread := BaseThread{
BaseLockable: BaseLockable{BaseNode: NewNode(ctx, RandID(), state)},
Actions: ThreadActions{
@ -398,7 +398,7 @@ func NewBaseThread(ctx * GraphContext, name string, actions ThreadActions, handl
func NewSimpleBaseThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) {
state := NewBaseThreadState(name)
thread, err := NewBaseThread(ctx, name, actions, handlers, &state)
thread, err := NewBaseThread(ctx, actions, handlers, &state)
if err != nil {
return nil, err
}