Fixed internal signalling for threads, and update signal type in gql

graph-rework-2
noah metz 2023-06-25 21:00:00 -06:00
parent 7b84c8bc54
commit bb3c80dbc7
4 changed files with 28 additions and 18 deletions

@ -202,6 +202,7 @@ func GQLWSDo(ctx * GraphContext, p graphql.Params) chan *graphql.Result {
func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) { func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) { return func(w http.ResponseWriter, r * http.Request) {
ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
enableCORS(&w)
header_map := map[string]interface{}{} header_map := map[string]interface{}{}
for header, value := range(r.Header) { for header, value := range(r.Header) {
header_map[header] = value header_map[header] = value
@ -210,7 +211,10 @@ func GQLWSHandler(ctx * GraphContext, schema graphql.Schema, gql_ctx context.Con
u := ws.HTTPUpgrader{ u := ws.HTTPUpgrader{
Protocol: func(protocol string) bool { Protocol: func(protocol string) bool {
ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
return string(protocol) == "graphql-transport-ws" if string(protocol) == "graphql-transport-ws" || string(protocol) == "graphql-ws" {
return true
}
return false
}, },
} }
conn, _, _, err := u.Upgrade(r, w) conn, _, _, err := u.Upgrade(r, w)

@ -4,7 +4,6 @@ import (
"github.com/graphql-go/graphql" "github.com/graphql-go/graphql"
"reflect" "reflect"
"fmt" "fmt"
"time"
) )
var gql_interface_graph_node *graphql.Interface = nil var gql_interface_graph_node *graphql.Interface = nil
@ -606,7 +605,15 @@ func GQLSignalSource(p graphql.ResolveParams) (interface{}, error) {
func GQLSignalDirection(p graphql.ResolveParams) (interface{}, error) { func GQLSignalDirection(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Direction(), nil direction := signal.Direction()
if direction == Up {
return "up", nil
} else if direction == Down {
return "down", nil
} else if direction == Direct {
return "direct", nil
}
return nil, fmt.Errorf("Invalid direction: %+v", direction)
}) })
} }
@ -638,7 +645,7 @@ func GQLTypeSignal() *graphql.Object {
Resolve: GQLSignalSource, Resolve: GQLSignalSource,
}) })
gql_type_signal.AddFieldConfig("Direction", &graphql.Field{ gql_type_signal.AddFieldConfig("Direction", &graphql.Field{
Type: graphql.Boolean, Type: graphql.String,
Resolve: GQLSignalDirection, Resolve: GQLSignalDirection,
}) })
gql_type_signal.AddFieldConfig("String", &graphql.Field{ gql_type_signal.AddFieldConfig("String", &graphql.Field{
@ -658,14 +665,11 @@ func GQLTypeSignalInput() *graphql.InputObject {
}) })
gql_type_signal_input.AddFieldConfig("Type", &graphql.InputObjectFieldConfig{ gql_type_signal_input.AddFieldConfig("Type", &graphql.InputObjectFieldConfig{
Type: graphql.String, Type: graphql.String,
DefaultValue: "cancel",
}) })
gql_type_signal_input.AddFieldConfig("Description", &graphql.InputObjectFieldConfig{ gql_type_signal_input.AddFieldConfig("Direction", &graphql.InputObjectFieldConfig{
Type: graphql.String, Type: graphql.String,
DefaultValue: "", DefaultValue: "down",
})
gql_type_signal_input.AddFieldConfig("Time", &graphql.InputObjectFieldConfig{
Type: graphql.DateTime,
DefaultValue: time.Now(),
}) })
} }
return gql_type_signal_input return gql_type_signal_input
@ -751,24 +755,24 @@ func GQLMutationSendUpdate() *graphql.Field {
return nil, fmt.Errorf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"]) return nil, fmt.Errorf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"])
} }
var signal GraphSignal = nil var signal GraphSignal = nil
if signal_map["Direction"] == Up { if signal_map["Direction"] == "up" {
signal = NewSignal(server, signal_map["Type"].(string)) signal = NewSignal(server, signal_map["Type"].(string))
} else if signal_map["Direction"] == Down { } else if signal_map["Direction"] == "down" {
signal = NewDownSignal(server, signal_map["Type"].(string)) signal = NewDownSignal(server, signal_map["Type"].(string))
} else if signal_map["Direction"] == Direct { } else if signal_map["Direction"] == "direct" {
signal = NewDirectSignal(server, signal_map["Type"].(string)) signal = NewDirectSignal(server, signal_map["Type"].(string))
} else { } else {
return nil, fmt.Errorf("Bad direction: %d", signal_map["Direction"]) return nil, fmt.Errorf("Bad direction: %d", signal_map["Direction"])
} }
id , ok := p.Args["id"].(NodeID) id , ok := p.Args["id"].(string)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast arg id to string") return nil, fmt.Errorf("Failed to cast arg id to string")
} }
node_if, err := UseStates(ctx, []GraphNode{server}, func(states []NodeState) (interface{}, error){ node_if, err := UseStates(ctx, []GraphNode{server}, func(states []NodeState) (interface{}, error){
server_state := states[0].(*GQLThreadState) server_state := states[0].(*GQLThreadState)
node := FindChild(ctx, server, server_state, id) node := FindChild(ctx, server, server_state, NodeID(id))
if node == nil { if node == nil {
return nil, fmt.Errorf("Failed to find ID: %s as child of server thread", id) return nil, fmt.Errorf("Failed to find ID: %s as child of server thread", id)
} }

@ -5,7 +5,7 @@ import (
) )
func TestGQLThread(t * testing.T) { func TestGQLThread(t * testing.T) {
ctx := testContext(t) ctx := logTestContext(t, []string{"gqlws", "gql", "thread", "update"})
gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{}) gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{}, ObjTypeMap{}, FieldMap{}, FieldMap{}, FieldMap{})
fatalErr(t, err) fatalErr(t, err)

@ -335,15 +335,17 @@ var ThreadWait = func(ctx * GraphContext, thread Thread) (string, error) {
for { for {
select { select {
case signal := <- thread.SignalChannel(): case signal := <- thread.SignalChannel():
ctx.Log.Logf("thread", "THREAD_SIGNAL: %s %+v", thread.ID(), signal)
if signal.Source() == thread.ID() { if signal.Source() == thread.ID() {
ctx.Log.Logf("thread", "THREAD_SIGNAL_INTERNAL") ctx.Log.Logf("thread", "THREAD_SIGNAL_INTERNAL")
continue } else {
ctx.Log.Logf("thread", "THREAD_SIGNAL: %s %+v", thread.ID(), signal)
} }
signal_fn, exists := thread.Handler(signal.Type()) signal_fn, exists := thread.Handler(signal.Type())
if exists == true { if exists == true {
ctx.Log.Logf("thread", "THREAD_HANDLER: %s - %s", thread.ID(), signal.Type()) ctx.Log.Logf("thread", "THREAD_HANDLER: %s - %s", thread.ID(), signal.Type())
return signal_fn(ctx, thread, signal) return signal_fn(ctx, thread, signal)
} else {
ctx.Log.Logf("thread", "THREAD_NOHANDLER: %s - %s", thread.ID(), signal.Type())
} }
case <- thread.Timeout(): case <- thread.Timeout():
ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), thread.TimeoutAction()) ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), thread.TimeoutAction())