From 76cea43d9b612c141fa40d7e4780c8490a971814 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Fri, 16 Jun 2023 17:20:16 -0600 Subject: [PATCH] Made subscription a bit more robust --- gql.go | 84 ++++++++++++++++++------------- graph.go | 6 +-- test-site/src/routes/+page.svelte | 2 +- 3 files changed, 52 insertions(+), 40 deletions(-) diff --git a/gql.go b/gql.go index 1572b45..654a300 100644 --- a/gql.go +++ b/gql.go @@ -280,7 +280,7 @@ func GQLWSDo(p graphql.Params) chan *graphql.Result { func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r * http.Request) { - log.Logf("gqlws", "HANDLING %s",r.RemoteAddr) + log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) header_map := map[string]interface{}{} for header, value := range(r.Header) { header_map[header] = value @@ -863,6 +863,52 @@ func (server * GQLServer) update(signal GraphSignal) { server.BaseResource.update(signal) } +func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) { + return GQLSubscribeFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error) { + return signal, nil + }) +} + +func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) { + server, ok := p.Context.Value("gql_server").(*GQLServer) + if ok == false { + return nil, fmt.Errorf("Failed to get gql_Server from context and cast to GQLServer") + } + + c := make(chan interface{}) + go func(c chan interface{}, server *GQLServer) { + sig_c := server.UpdateChannel() + for { + val, ok := <- sig_c + if ok == false { + return + } + ret, err := fn(val, p) + if err != nil { + log.Logf("gqlws", "type convertor error %s", err) + return + } + c <- ret + } + }(c, server) + return c, nil +} + +var gql_subscription_update * graphql.Field = nil +func GQLSubscriptionUpdate() * graphql.Field { + if gql_subscription_update == nil { + gql_subscription_update = &graphql.Field{ + Type: GQLTypeSignal(), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source, nil + }, + Subscribe: GQLSubscribeSignal, + } + } + + return gql_subscription_update +} + func MakeGQLHandlers(server * GQLServer) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) { valid_events := map[reflect.Type]*graphql.Object{} valid_events[reflect.TypeOf((*BaseEvent)(nil))] = GQLTypeBaseEvent() @@ -886,41 +932,7 @@ func MakeGQLHandlers(server * GQLServer) (func(http.ResponseWriter, *http.Reques } gql_subscriptions := graphql.Fields{ - "Test": &graphql.Field{ - Type: GQLTypeSignal(), - Resolve: func(p graphql.ResolveParams) (interface{}, error) { - return p.Source, nil - }, - Subscribe: func(p graphql.ResolveParams) (interface{}, error) { - /*c := make(chan interface{}) - go func() { - elements := []string{"a", "b", "c"} - for _, r := range elements { - select { - case <-p.Context.Done(): - close(c) - return - case c <- r: - } - } - close(c) - }() - return c, nil*/ - server, ok := p.Context.Value("gql_server").(*GQLServer) - if ok == false { - return nil, fmt.Errorf("Failed to get gql_server from context and cast") - } - c := make(chan interface{}) - go func(c chan interface{}) { - sig_c := server.UpdateChannel() - for { - val, _ := <- sig_c - c <- val - } - }(c) - return c, nil - }, - }, + "Update": GQLSubscriptionUpdate(), } for key, value := range(server.extended_subscriptions) { diff --git a/graph.go b/graph.go index 67cdc98..903a776 100644 --- a/graph.go +++ b/graph.go @@ -23,7 +23,7 @@ type DefaultLogger struct { } var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}} -var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws", "listeners"} +var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws", "gqlws_new", "listeners"} func (logger * DefaultLogger) Init(components []string) error { logger.init_lock.Lock() @@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error { } func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { - logger.Init([]string{"gqlws"}) + logger.Init([]string{"gqlws", "gqlws_new"}) l, exists := logger.loggers[component] if exists == true { log := l.Log() @@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface } func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { - logger.Init([]string{"gqlws"}) + logger.Init([]string{"gqlws", "gqlws_new"}) l, exists := logger.loggers[component] if exists == true { l.Log().Msg(fmt.Sprintf(format, items...)) diff --git a/test-site/src/routes/+page.svelte b/test-site/src/routes/+page.svelte index 6e1f420..1dfc8cc 100644 --- a/test-site/src/routes/+page.svelte +++ b/test-site/src/routes/+page.svelte @@ -16,7 +16,7 @@ var game_id = null console.log("STARTING_CLIENT") client.subscribe({ operationName: "Sub", - query: "query GetArenas { Arenas { Name Owner { ... on Match { Name, ID } } } } subscription Sub { Test { String } }", + query: "query GetArenas { Arenas { Name Owner { ... on Match { Name, ID } } } } subscription Sub { Update { String } }", }, { next: (data) => {