From 9eb4d7f69949ea40a52ed37848e297ecb44753df Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Fri, 16 Jun 2023 01:46:15 -0600 Subject: [PATCH] Added basic subscription(never updates data, only pings. Having issue with test-site since it continuously opens websockets without reporting an issue. --- go.mod | 5 +- go.sum | 8 +++ gql.go | 152 +++++++++++++++++++++++++++++++++++++++++++---------- gql_vex.go | 68 ++++++++++++++++++++++++ graph.go | 6 +-- vex.go | 7 ++- 6 files changed, 211 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index b1f2c43..37574a4 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,14 @@ module git.metznet.ca/MetzNet/graphvent go 1.20 require ( + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/gobwas/ws v1.2.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/graphql-go/graphql v0.8.1 // indirect github.com/graphql-go/handler v0.2.3 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/rs/zerolog v1.29.1 // indirect - golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 // indirect + golang.org/x/sys v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index c5fb224..a6aa4b7 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk= +github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -33,6 +39,8 @@ go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5f golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gql.go b/gql.go index 38f281e..d519333 100644 --- a/gql.go +++ b/gql.go @@ -11,6 +11,8 @@ import ( "fmt" "sync" "time" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" ) func GraphiQLHandler() func(http.ResponseWriter, *http.Request) { @@ -176,40 +178,136 @@ func GQLInterfaceNode() *graphql.Interface { return gql_interface_node } -type GQLQuery struct { - Query string `json:"query"` - OperationName string `json:"operationName"` - Variables map[string]interface{} `json:"variables"` +type GQLWSPayload struct { + OperationName string `json:"operationName,omitempty"` + Query string `json:"query,omitempty"` + Variables map[string]interface{} `json:"variables,omitempty"` + Extensions map[string]interface{} `json:"extensions,omitempty"` + Data string `json:"data,omitempty"` +} + +type GQLWSMsg struct { + ID string `json:"id,omitempty"` + Type string `json:"type"` + Payload GQLWSPayload `json:"payload,omitempty"` } func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r * http.Request) { - str, err := io.ReadAll(r.Body) - if err != nil { - log.Logf("gql", "failed to read request body: %s", err) - return - } - res := GQLQuery{} - json.Unmarshal(str, &res) - params := graphql.Params{ - Schema: schema, - Context: ctx, - RequestString: res.Query, + header_map := map[string]interface{}{} + for header, value := range(r.Header) { + header_map[header] = value } - if res.OperationName != "" { - params.OperationName = res.OperationName - } - if len(res.Variables) > 0 { - params.VariableValues = res.Variables + log.Logm("gql", header_map, "REQUEST_HEADERS") + u := ws.HTTPUpgrader{ + Protocol: func(protocol string) bool { + log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) + return string(protocol) == "graphql-transport-ws" + }, } - result := graphql.Do(params) - if len(result.Errors) > 0 { - extra_fields := map[string]interface{}{} - extra_fields["body"] = string(str) - extra_fields["headers"] = r.Header - log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) + conn, _, _, err := u.Upgrade(r, w) + if err == nil { + defer conn.Close() + conn_state := "init" + for { + msg_raw, op, err := wsutil.ReadClientData(conn) + log.Logf("gqlws", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) + msg := GQLWSMsg{} + json.Unmarshal(msg_raw, &msg) + if err != nil { + log.Logf("gqlws", "WS_CLIENT_ERROR") + break + } + if msg.Type == "connection_init" { + if conn_state != "init" { + log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) + break + } + conn_state = "ready" + err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}")) + if err != nil { + log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") + break + } + } else if msg.Type == "ping" { + err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}")) + if err != nil { + log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") + } + } else if msg.Type == "subscribe" { + log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) + params := graphql.Params{ + Schema: schema, + Context: ctx, + RequestString: msg.Payload.Query, + } + if msg.Payload.OperationName != "" { + params.OperationName = msg.Payload.OperationName + } + if len(msg.Payload.Variables) > 0 { + params.VariableValues = msg.Payload.Variables + } + result := graphql.Do(params) + + if len(result.Errors) > 0 { + extra_fields := map[string]interface{}{} + extra_fields["query"] = string(msg.Payload.Query) + log.Logm("gql", extra_fields, "ERROR: wrong result, unexpected errors: %v", result.Errors) + break + } + + + log.Logf("gqlws", "DATA: %+v", result.Data) + data, err := json.Marshal(result.Data) + msg, err := json.Marshal(GQLWSMsg{ + ID: msg.ID, + Type: "next", + Payload: GQLWSPayload{ + Data: string(data), + }, + }) + if err != nil { + log.Logf("gqlws", "ERROR: %+v", err) + break + } + log.Logf("gqlws", "WRITING_GQLWS: %s", msg) + err = wsutil.WriteServerMessage(conn, 1, msg) + if err != nil { + log.Logf("gqlws", "ERROR: %+v", err) + break + } + } + } + return + } else { + str, err := io.ReadAll(r.Body) + if err != nil { + log.Logf("gql", "failed to read request body: %s", err) + return + } + query := GQLWSPayload{} + json.Unmarshal(str, &query) + + params := graphql.Params{ + Schema: schema, + Context: ctx, + RequestString: query.Query, + } + if query.OperationName != "" { + params.OperationName = query.OperationName + } + if len(query.Variables) > 0 { + params.VariableValues = query.Variables + } + result := graphql.Do(params) + if len(result.Errors) > 0 { + extra_fields := map[string]interface{}{} + extra_fields["body"] = string(str) + extra_fields["headers"] = r.Header + log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) + } + json.NewEncoder(w).Encode(result) } - json.NewEncoder(w).Encode(result) } } diff --git a/gql_vex.go b/gql_vex.go index f316f59..828c308 100644 --- a/gql_vex.go +++ b/gql_vex.go @@ -4,6 +4,8 @@ import ( "github.com/graphql-go/graphql" "reflect" "fmt" + "errors" + "time" ) func GQLVexTypes() map[reflect.Type]*graphql.Object { @@ -15,6 +17,7 @@ func GQLVexTypes() map[reflect.Type]*graphql.Object { func GQLVexMutations() map[string]*graphql.Field { mutations := map[string]*graphql.Field{} + mutations["setMatchState"] = GQLVexMutationSetMatchState() return mutations } @@ -48,6 +51,71 @@ func FindResources(event Event, resource_type reflect.Type) []Resource { return ret } +var gql_vex_mutation_set_match_state *graphql.Field= nil +func GQLVexMutationSetMatchState() *graphql.Field { + if gql_vex_mutation_set_match_state == nil { + gql_vex_mutation_set_match_state = &graphql.Field{ + Type: GQLTypeSignal(), + Args: graphql.FieldConfigArgument{ + "id": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + "state": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + "time": &graphql.ArgumentConfig{ + Type: graphql.DateTime, + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + server, ok := p.Context.Value("gql_server").(*GQLServer) + if ok == false { + return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server")) + } + + id, ok := p.Args["id"].(string) + if ok == false { + return nil, errors.New("Failed to cast arg id to string") + } + + state, ok := p.Args["state"].(string) + if ok == false { + return nil, errors.New("Failed to cast arg state to string") + } + + start, ok := p.Args["time"].(time.Time) + if ok == false { + start = time.Now() + } + + signal := NewSignal(server, state) + signal.description = id + signal.time = start + + owner := server.Owner() + if owner == nil { + return nil, errors.New("Cannot send update without owner") + } + + root_event, ok := owner.(Event) + if ok == false { + return nil, errors.New("Cannot send update to Event unless owned by an Event") + } + + node := FindChild(root_event, id) + if node == nil { + return nil, errors.New("Failed to find id in event tree from server") + } + + SendUpdate(node, signal) + + return signal, nil + }, + } + } + return gql_vex_mutation_set_match_state +} + var gql_vex_query_arenas *graphql.Field = nil func GQLVexQueryArenas() *graphql.Field { if gql_vex_query_arenas == nil { diff --git a/graph.go b/graph.go index f544084..e023610 100644 --- a/graph.go +++ b/graph.go @@ -23,7 +23,7 @@ type DefaultLogger struct { } var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}} -var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql"} +var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws"} func (logger * DefaultLogger) Init(components []string) error { logger.init_lock.Lock() @@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error { } func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { - logger.Init([]string{"gql", "manager"}) + logger.Init([]string{"gqlws"}) l, exists := logger.loggers[component] if exists == true { log := l.Log() @@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface } func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { - logger.Init([]string{"gql", "manager"}) + logger.Init([]string{"gqlws"}) l, exists := logger.loggers[component] if exists == true { l.Log().Msg(fmt.Sprintf(format, items...)) diff --git a/vex.go b/vex.go index 2ad1f88..e0df44c 100644 --- a/vex.go +++ b/vex.go @@ -93,7 +93,6 @@ func (arena * VirtualArena) lock(node GraphNode) error { } func (arena * VirtualArena) update(signal GraphSignal) { - log.Logf("vex", "ARENA_UPDATE: %s", arena.Name()) arena.signal <- signal arena.BaseResource.update(signal) } @@ -140,8 +139,8 @@ func NewVexEvent(name string, description string) * VexEvent { } const start_slack = 250 * time.Millisecond -const TEMP_AUTON_TIME = 250 * time.Millisecond -const TEMP_DRIVE_TIME = 250 * time.Millisecond +const TEMP_AUTON_TIME = 1 * time.Second +const TEMP_DRIVE_TIME = 1 * time.Second type Match struct { BaseEvent @@ -208,7 +207,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match { end_time := match.control_start.Add(TEMP_AUTON_TIME) match.SetTimeout(end_time, "autonomous_done") - log.Logf("vex", "AUTONOMOUS_END_TIME: %s %+v", end_time, match.timeout) + log.Logf("vex", "AUTONOMOUS@%s: %s UNTIL %s", time.Now(), match.control_start, end_time) return "wait", nil }