Added basic subscription(never updates data, only pings. Having issue with test-site since it continuously opens websockets without reporting an issue.

graph-rework
noah metz 2023-06-16 01:46:15 -06:00
parent 4d26ed73dd
commit 9eb4d7f699
6 changed files with 211 additions and 35 deletions

@ -3,11 +3,14 @@ module git.metznet.ca/MetzNet/graphvent
go 1.20
require (
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.2.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/graphql-go/graphql v0.8.1 // indirect
github.com/graphql-go/handler v0.2.3 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/rs/zerolog v1.29.1 // indirect
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 // indirect
golang.org/x/sys v0.6.0 // indirect
)

@ -3,6 +3,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@ -33,6 +39,8 @@ go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5f
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

152
gql.go

@ -11,6 +11,8 @@ import (
"fmt"
"sync"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func GraphiQLHandler() func(http.ResponseWriter, *http.Request) {
@ -176,40 +178,136 @@ func GQLInterfaceNode() *graphql.Interface {
return gql_interface_node
}
type GQLQuery struct {
Query string `json:"query"`
OperationName string `json:"operationName"`
Variables map[string]interface{} `json:"variables"`
type GQLWSPayload struct {
OperationName string `json:"operationName,omitempty"`
Query string `json:"query,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
Extensions map[string]interface{} `json:"extensions,omitempty"`
Data string `json:"data,omitempty"`
}
type GQLWSMsg struct {
ID string `json:"id,omitempty"`
Type string `json:"type"`
Payload GQLWSPayload `json:"payload,omitempty"`
}
func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) {
str, err := io.ReadAll(r.Body)
if err != nil {
log.Logf("gql", "failed to read request body: %s", err)
return
}
res := GQLQuery{}
json.Unmarshal(str, &res)
params := graphql.Params{
Schema: schema,
Context: ctx,
RequestString: res.Query,
header_map := map[string]interface{}{}
for header, value := range(r.Header) {
header_map[header] = value
}
if res.OperationName != "" {
params.OperationName = res.OperationName
}
if len(res.Variables) > 0 {
params.VariableValues = res.Variables
log.Logm("gql", header_map, "REQUEST_HEADERS")
u := ws.HTTPUpgrader{
Protocol: func(protocol string) bool {
log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
return string(protocol) == "graphql-transport-ws"
},
}
result := graphql.Do(params)
if len(result.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["body"] = string(str)
extra_fields["headers"] = r.Header
log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
conn, _, _, err := u.Upgrade(r, w)
if err == nil {
defer conn.Close()
conn_state := "init"
for {
msg_raw, op, err := wsutil.ReadClientData(conn)
log.Logf("gqlws", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
msg := GQLWSMsg{}
json.Unmarshal(msg_raw, &msg)
if err != nil {
log.Logf("gqlws", "WS_CLIENT_ERROR")
break
}
if msg.Type == "connection_init" {
if conn_state != "init" {
log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state)
break
}
conn_state = "ready"
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}"))
if err != nil {
log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack")
break
}
} else if msg.Type == "ping" {
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}"))
if err != nil {
log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG")
}
} else if msg.Type == "subscribe" {
log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload)
params := graphql.Params{
Schema: schema,
Context: ctx,
RequestString: msg.Payload.Query,
}
if msg.Payload.OperationName != "" {
params.OperationName = msg.Payload.OperationName
}
if len(msg.Payload.Variables) > 0 {
params.VariableValues = msg.Payload.Variables
}
result := graphql.Do(params)
if len(result.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["query"] = string(msg.Payload.Query)
log.Logm("gql", extra_fields, "ERROR: wrong result, unexpected errors: %v", result.Errors)
break
}
log.Logf("gqlws", "DATA: %+v", result.Data)
data, err := json.Marshal(result.Data)
msg, err := json.Marshal(GQLWSMsg{
ID: msg.ID,
Type: "next",
Payload: GQLWSPayload{
Data: string(data),
},
})
if err != nil {
log.Logf("gqlws", "ERROR: %+v", err)
break
}
log.Logf("gqlws", "WRITING_GQLWS: %s", msg)
err = wsutil.WriteServerMessage(conn, 1, msg)
if err != nil {
log.Logf("gqlws", "ERROR: %+v", err)
break
}
}
}
return
} else {
str, err := io.ReadAll(r.Body)
if err != nil {
log.Logf("gql", "failed to read request body: %s", err)
return
}
query := GQLWSPayload{}
json.Unmarshal(str, &query)
params := graphql.Params{
Schema: schema,
Context: ctx,
RequestString: query.Query,
}
if query.OperationName != "" {
params.OperationName = query.OperationName
}
if len(query.Variables) > 0 {
params.VariableValues = query.Variables
}
result := graphql.Do(params)
if len(result.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["body"] = string(str)
extra_fields["headers"] = r.Header
log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
}
json.NewEncoder(w).Encode(result)
}
json.NewEncoder(w).Encode(result)
}
}

@ -4,6 +4,8 @@ import (
"github.com/graphql-go/graphql"
"reflect"
"fmt"
"errors"
"time"
)
func GQLVexTypes() map[reflect.Type]*graphql.Object {
@ -15,6 +17,7 @@ func GQLVexTypes() map[reflect.Type]*graphql.Object {
func GQLVexMutations() map[string]*graphql.Field {
mutations := map[string]*graphql.Field{}
mutations["setMatchState"] = GQLVexMutationSetMatchState()
return mutations
}
@ -48,6 +51,71 @@ func FindResources(event Event, resource_type reflect.Type) []Resource {
return ret
}
var gql_vex_mutation_set_match_state *graphql.Field= nil
func GQLVexMutationSetMatchState() *graphql.Field {
if gql_vex_mutation_set_match_state == nil {
gql_vex_mutation_set_match_state = &graphql.Field{
Type: GQLTypeSignal(),
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.String,
},
"state": &graphql.ArgumentConfig{
Type: graphql.String,
},
"time": &graphql.ArgumentConfig{
Type: graphql.DateTime,
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server"))
}
id, ok := p.Args["id"].(string)
if ok == false {
return nil, errors.New("Failed to cast arg id to string")
}
state, ok := p.Args["state"].(string)
if ok == false {
return nil, errors.New("Failed to cast arg state to string")
}
start, ok := p.Args["time"].(time.Time)
if ok == false {
start = time.Now()
}
signal := NewSignal(server, state)
signal.description = id
signal.time = start
owner := server.Owner()
if owner == nil {
return nil, errors.New("Cannot send update without owner")
}
root_event, ok := owner.(Event)
if ok == false {
return nil, errors.New("Cannot send update to Event unless owned by an Event")
}
node := FindChild(root_event, id)
if node == nil {
return nil, errors.New("Failed to find id in event tree from server")
}
SendUpdate(node, signal)
return signal, nil
},
}
}
return gql_vex_mutation_set_match_state
}
var gql_vex_query_arenas *graphql.Field = nil
func GQLVexQueryArenas() *graphql.Field {
if gql_vex_query_arenas == nil {

@ -23,7 +23,7 @@ type DefaultLogger struct {
}
var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}}
var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql"}
var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql", "vex", "gqlws"}
func (logger * DefaultLogger) Init(components []string) error {
logger.init_lock.Lock()
@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error {
}
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
logger.Init([]string{"gql", "manager"})
logger.Init([]string{"gqlws"})
l, exists := logger.loggers[component]
if exists == true {
log := l.Log()
@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
}
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
logger.Init([]string{"gql", "manager"})
logger.Init([]string{"gqlws"})
l, exists := logger.loggers[component]
if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...))

@ -93,7 +93,6 @@ func (arena * VirtualArena) lock(node GraphNode) error {
}
func (arena * VirtualArena) update(signal GraphSignal) {
log.Logf("vex", "ARENA_UPDATE: %s", arena.Name())
arena.signal <- signal
arena.BaseResource.update(signal)
}
@ -140,8 +139,8 @@ func NewVexEvent(name string, description string) * VexEvent {
}
const start_slack = 250 * time.Millisecond
const TEMP_AUTON_TIME = 250 * time.Millisecond
const TEMP_DRIVE_TIME = 250 * time.Millisecond
const TEMP_AUTON_TIME = 1 * time.Second
const TEMP_DRIVE_TIME = 1 * time.Second
type Match struct {
BaseEvent
@ -208,7 +207,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
end_time := match.control_start.Add(TEMP_AUTON_TIME)
match.SetTimeout(end_time, "autonomous_done")
log.Logf("vex", "AUTONOMOUS_END_TIME: %s %+v", end_time, match.timeout)
log.Logf("vex", "AUTONOMOUS@%s: %s UNTIL %s", time.Now(), match.control_start, end_time)
return "wait", nil
}