Made subscription a bit more robust

graph-rework
noah metz 2023-06-16 17:20:16 -06:00
parent bfcf06b190
commit 76cea43d9b
3 changed files with 52 additions and 40 deletions

@ -280,7 +280,7 @@ func GQLWSDo(p graphql.Params) chan *graphql.Result {
func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) { return func(w http.ResponseWriter, r * http.Request) {
log.Logf("gqlws", "HANDLING %s",r.RemoteAddr) log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
header_map := map[string]interface{}{} header_map := map[string]interface{}{}
for header, value := range(r.Header) { for header, value := range(r.Header) {
header_map[header] = value header_map[header] = value
@ -863,6 +863,52 @@ func (server * GQLServer) update(signal GraphSignal) {
server.BaseResource.update(signal) server.BaseResource.update(signal)
} }
func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) {
return GQLSubscribeFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error) {
return signal, nil
})
}
func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
return nil, fmt.Errorf("Failed to get gql_Server from context and cast to GQLServer")
}
c := make(chan interface{})
go func(c chan interface{}, server *GQLServer) {
sig_c := server.UpdateChannel()
for {
val, ok := <- sig_c
if ok == false {
return
}
ret, err := fn(val, p)
if err != nil {
log.Logf("gqlws", "type convertor error %s", err)
return
}
c <- ret
}
}(c, server)
return c, nil
}
var gql_subscription_update * graphql.Field = nil
func GQLSubscriptionUpdate() * graphql.Field {
if gql_subscription_update == nil {
gql_subscription_update = &graphql.Field{
Type: GQLTypeSignal(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return p.Source, nil
},
Subscribe: GQLSubscribeSignal,
}
}
return gql_subscription_update
}
func MakeGQLHandlers(server * GQLServer) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) { func MakeGQLHandlers(server * GQLServer) (func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request)) {
valid_events := map[reflect.Type]*graphql.Object{} valid_events := map[reflect.Type]*graphql.Object{}
valid_events[reflect.TypeOf((*BaseEvent)(nil))] = GQLTypeBaseEvent() valid_events[reflect.TypeOf((*BaseEvent)(nil))] = GQLTypeBaseEvent()
@ -886,41 +932,7 @@ func MakeGQLHandlers(server * GQLServer) (func(http.ResponseWriter, *http.Reques
} }
gql_subscriptions := graphql.Fields{ gql_subscriptions := graphql.Fields{
"Test": &graphql.Field{ "Update": GQLSubscriptionUpdate(),
Type: GQLTypeSignal(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return p.Source, nil
},
Subscribe: func(p graphql.ResolveParams) (interface{}, error) {
/*c := make(chan interface{})
go func() {
elements := []string{"a", "b", "c"}
for _, r := range elements {
select {
case <-p.Context.Done():
close(c)
return
case c <- r:
}
}
close(c)
}()
return c, nil*/
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
return nil, fmt.Errorf("Failed to get gql_server from context and cast")
}
c := make(chan interface{})
go func(c chan interface{}) {
sig_c := server.UpdateChannel()
for {
val, _ := <- sig_c
c <- val
}
}(c)
return c, nil
},
},
} }
for key, value := range(server.extended_subscriptions) { for key, value := range(server.extended_subscriptions) {

@ -23,7 +23,7 @@ type DefaultLogger struct {
} }
var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}} var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}}
var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws", "listeners"} var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws", "gqlws_new", "listeners"}
func (logger * DefaultLogger) Init(components []string) error { func (logger * DefaultLogger) Init(components []string) error {
logger.init_lock.Lock() logger.init_lock.Lock()
@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error {
} }
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
logger.Init([]string{"gqlws"}) logger.Init([]string{"gqlws", "gqlws_new"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
log := l.Log() log := l.Log()
@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
} }
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
logger.Init([]string{"gqlws"}) logger.Init([]string{"gqlws", "gqlws_new"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...)) l.Log().Msg(fmt.Sprintf(format, items...))

@ -16,7 +16,7 @@ var game_id = null
console.log("STARTING_CLIENT") console.log("STARTING_CLIENT")
client.subscribe({ client.subscribe({
operationName: "Sub", operationName: "Sub",
query: "query GetArenas { Arenas { Name Owner { ... on Match { Name, ID } } } } subscription Sub { Test { String } }", query: "query GetArenas { Arenas { Name Owner { ... on Match { Name, ID } } } } subscription Sub { Update { String } }",
}, },
{ {
next: (data) => { next: (data) => {