Updated GQL, fixed manager, and added queries/mutations

graph-rework
noah metz 2023-06-13 14:10:59 -06:00
parent 230f4a88eb
commit d5954d2860
5 changed files with 233 additions and 202 deletions

153
gql.go

@ -116,7 +116,6 @@ func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWr
params.OperationName = res.OperationName params.OperationName = res.OperationName
} }
if len(res.Variables) > 0 { if len(res.Variables) > 0 {
log.Logf("gql", "VARIABLES: %+v", res.Variables)
params.VariableValues = res.Variables params.VariableValues = res.Variables
} }
result := graphql.Do(params) result := graphql.Do(params)
@ -495,6 +494,61 @@ func GQLTypeSignalInput() *graphql.InputObject {
return gql_type_signal_input return gql_type_signal_input
} }
var gql_mutation_update_event *graphql.Field = nil
func GQLMutationUpdateEvent() *graphql.Field {
if gql_mutation_update_event == nil {
gql_mutation_update_event = &graphql.Field{
Type: GQLTypeSignal(),
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.String,
},
"signal": &graphql.ArgumentConfig{
Type: GQLTypeSignalInput(),
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
return nil, errors.New(fmt.Sprintf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server")))
}
signal_map, ok := p.Args["signal"].(map[string]interface{})
if ok == false {
return nil, errors.New(fmt.Sprintf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"]))
}
signal := NewSignal(server, signal_map["Type"].(string))
signal.description = signal_map["Description"].(string)
signal.time = signal_map["Time"].(time.Time)
id , ok := p.Args["id"].(string)
if ok == false {
return nil, errors.New("Failed to cast arg id to string")
}
owner := server.Owner()
if owner == nil {
return nil, errors.New("Cannot send update without owner")
}
root_event, ok := owner.(Event)
if ok == false {
return nil, errors.New("Cannot send update to Event unless owned by an Event")
}
node := FindChild(root_event, id)
if node == nil {
return nil, errors.New("Failed to find id in event tree from server")
}
SendUpdate(node, signal)
return signal, nil
},
}
}
return gql_mutation_update_event
}
type GQLServer struct { type GQLServer struct {
BaseResource BaseResource
@ -502,15 +556,19 @@ type GQLServer struct {
listen string listen string
gql_channel chan error gql_channel chan error
extended_types map[reflect.Type]*graphql.Object extended_types map[reflect.Type]*graphql.Object
extended_queries map[string]*graphql.Field
extended_mutations map[string]*graphql.Field
} }
func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object) * GQLServer { func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object, extended_queries map[string]*graphql.Field, extended_mutations map[string]*graphql.Field) * GQLServer {
server := &GQLServer{ server := &GQLServer{
BaseResource: NewBaseResource("GQL Server", "graphql server for event signals", []Resource{}), BaseResource: NewBaseResource("GQL Server", "graphql server for event signals", []Resource{}),
listen: listen, listen: listen,
abort: make(chan error, 1), abort: make(chan error, 1),
gql_channel: make(chan error, 1), gql_channel: make(chan error, 1),
extended_types: extended_types, extended_types: extended_types,
extended_queries: extended_queries,
extended_mutations: extended_mutations,
} }
return server return server
@ -535,71 +593,31 @@ func (server * GQLServer) Handler() func(http.ResponseWriter, *http.Request) {
gql_types = append(gql_types, gql_t) gql_types = append(gql_types, gql_t)
} }
schemaConfig := graphql.SchemaConfig{ gql_queries := graphql.Fields{
Types: gql_types, "Owner": GQLQueryOwner(),
Mutation: graphql.NewObject(graphql.ObjectConfig{
Name: "Mutation",
Fields: graphql.Fields{
"updateEvent": &graphql.Field{
Type: GQLTypeSignal(),
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.String,
},
"signal": &graphql.ArgumentConfig{
Type: GQLTypeSignalInput(),
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
signal_map, ok := p.Args["signal"].(map[string]interface{})
if ok == false {
return nil, errors.New(fmt.Sprintf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"]))
}
signal := NewSignal(server, signal_map["Type"].(string))
signal.description = signal_map["Description"].(string)
signal.time = signal_map["Time"].(time.Time)
id , ok := p.Args["id"].(string)
if ok == false {
return nil, errors.New("Failed to cast arg id to string")
} }
owner := server.Owner() for key, value := range(server.extended_queries) {
if owner == nil { gql_queries[key] = value
return nil, errors.New("Cannot send update without owner")
} }
root_event, ok := owner.(Event) gql_mutations := graphql.Fields{
if ok == false { "updateEvent": GQLMutationUpdateEvent(),
return nil, errors.New("Cannot send update to Event unless owned by an Event")
} }
node := FindChild(root_event, id) for key, value := range(server.extended_mutations) {
if node == nil { gql_mutations[key] = value
return nil, errors.New("Failed to find id in event tree from server")
} }
SendUpdate(node, signal) schemaConfig := graphql.SchemaConfig{
return signal, nil Types: gql_types,
},
},
},
}),
Query: graphql.NewObject(graphql.ObjectConfig{ Query: graphql.NewObject(graphql.ObjectConfig{
Name: "Query", Name: "Query",
Fields: graphql.Fields{ Fields: gql_queries,
"Owner": &graphql.Field{ }),
Type: GQLInterfaceEvent(), Mutation: graphql.NewObject(graphql.ObjectConfig{
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Name: "Mutation",
server.lock_holder_lock.Lock() Fields: gql_mutations,
defer server.lock_holder_lock.Unlock()
owner := server.Owner()
return owner, nil
},
},
},
}), }),
} }
@ -610,9 +628,30 @@ func (server * GQLServer) Handler() func(http.ResponseWriter, *http.Request) {
ctx := context.Background() ctx := context.Background()
ctx = context.WithValue(ctx, "valid_events", valid_events) ctx = context.WithValue(ctx, "valid_events", valid_events)
ctx = context.WithValue(ctx, "valid_resources", valid_resources) ctx = context.WithValue(ctx, "valid_resources", valid_resources)
ctx = context.WithValue(ctx, "gql_server", server)
return GQLHandler(schema, ctx) return GQLHandler(schema, ctx)
} }
var gql_query_owner *graphql.Field = nil
func GQLQueryOwner() *graphql.Field {
if gql_query_owner == nil {
gql_query_owner = &graphql.Field{
Type: GQLInterfaceEvent(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
panic("Failed to get/cast gql_server from context")
}
return server.Owner(), nil
},
}
}
return gql_query_owner
}
func (server * GQLServer) Init(abort chan error) bool { func (server * GQLServer) Init(abort chan error) bool {
go func(abort chan error) { go func(abort chan error) {
log.Logf("gql", "GOROUTINE_START for %s", server.ID()) log.Logf("gql", "GOROUTINE_START for %s", server.ID())

@ -3,6 +3,7 @@ package main
import ( import (
"github.com/graphql-go/graphql" "github.com/graphql-go/graphql"
"reflect" "reflect"
"fmt"
) )
func GQLVexTypes() map[reflect.Type]*graphql.Object { func GQLVexTypes() map[reflect.Type]*graphql.Object {
@ -12,6 +13,56 @@ func GQLVexTypes() map[reflect.Type]*graphql.Object {
return types return types
} }
func GQLVexMutations() map[string]*graphql.Field {
mutations := map[string]*graphql.Field{}
return mutations
}
func GQLVexQueries() map[string]*graphql.Field {
queries := map[string]*graphql.Field{}
queries["Arenas"] = GQLVexQueryArenas()
return queries
}
func FindResources(event Event, resource_type reflect.Type) []Resource {
resources := event.RequiredResources()
for _, child := range(event.Children()) {
resources = append(resources, FindResources(child, resource_type)...)
}
return resources
}
var gql_vex_query_arenas *graphql.Field = nil
func GQLVexQueryArenas() *graphql.Field {
if gql_vex_query_arenas == nil {
gql_vex_query_arenas = &graphql.Field{
Type: GQLVexListArena(),
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLServer)
if ok == false {
panic("Failed to get/cast gql_server from context")
}
owner, is_event := server.Owner().(Event)
if is_event == false {
return nil, fmt.Errorf("Can't enumerate arenas when server is attached to resource")
}
return FindResources(owner, reflect.TypeOf((*Arena)(nil))), nil
},
}
}
return gql_vex_query_arenas
}
var gql_vex_list_arena * graphql.List = nil
func GQLVexListArena() * graphql.List {
if gql_vex_list_arena == nil {
gql_vex_list_arena = graphql.NewList(GQLVexTypeArena())
}
return gql_vex_list_arena
}
var gql_vex_list_team * graphql.List = nil var gql_vex_list_team * graphql.List = nil
func GQLVexListTeam() * graphql.List { func GQLVexListTeam() * graphql.List {
if gql_vex_list_team == nil { if gql_vex_list_team == nil {

@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error {
} }
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
logger.Init([]string{"gql"}) logger.Init([]string{"gql", "manager"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
log := l.Log() log := l.Log()
@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
} }
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
logger.Init([]string{"gql"}) logger.Init([]string{"gql", "manager"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...)) l.Log().Msg(fmt.Sprintf(format, items...))

@ -6,6 +6,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"fmt"
) )
func fake_team(org string, id string, names []string) (*Team, []*Member) { func fake_team(org string, id string, names []string) (*Team, []*Member) {
@ -17,7 +18,7 @@ func fake_team(org string, id string, names []string) (*Team, []*Member) {
return team, members return team, members
} }
func fake_data() (* EventManager, []Arena, []Arena) { func fake_data() (* EventManager) {
resources := []Resource{} resources := []Resource{}
teams_div1 := []*Team{} teams_div1 := []*Team{}
@ -116,21 +117,25 @@ func fake_data() (* EventManager, []Arena, []Arena) {
} }
gql_server := NewGQLServer(":8080", GQLVexTypes()) gql_server := NewGQLServer(":8080", GQLVexTypes(), GQLVexQueries(), GQLVexMutations())
resources = append(resources, gql_server) resources = append(resources, gql_server)
root_event := NewEventQueue("root_event", "", []Resource{gql_server}) root_event := NewEventQueue("root_event", "", []Resource{gql_server})
event_manager := NewEventManager(root_event, resources)
div_1 := NewEventQueue("Division 1", "", []Resource{}) div_1 := NewEventQueue("Division 1", "", []Resource{})
div_2 := NewEventQueue("Division 2", "", []Resource{}) err := AddChild(root_event, div_1, NewEventQueueInfo(1))
err := event_manager.AddEvent(root_event, div_1, NewEventQueueInfo(1))
if err != nil { if err != nil {
panic("Failed to add div_1") panic(fmt.Sprintf("Failed to add div_1: %s", err))
} }
err = event_manager.AddEvent(root_event, div_2, NewEventQueueInfo(1))
div_2 := NewEventQueue("Division 2", "", []Resource{})
err = AddChild(root_event, div_2, NewEventQueueInfo(1))
if err != nil { if err != nil {
panic("Failed to add div_2") panic(fmt.Sprintf("Failed to add div_2: %s", err))
} }
event_manager := NewEventManager(root_event, resources)
for i, alliance := range(alliances_div1) { for i, alliance := range(alliances_div1) {
for j, alliance2 := range(alliances_div1) { for j, alliance2 := range(alliances_div1) {
if j != i { if j != i {
@ -167,81 +172,11 @@ func fake_data() (* EventManager, []Arena, []Arena) {
} }
} }
return event_manager, arenas_div1, arenas_div2 return event_manager
}
type FakeClient struct {
state string
start time.Time
arena Arena
update chan GraphSignal
games_done int
}
func NewFakeClient(arena Arena) * FakeClient {
client := &FakeClient{
state: "init",
start: time.Now(),
arena: arena,
update: arena.UpdateChannel(),
games_done: 0,
}
return client
}
func (client * FakeClient) process_update(update GraphSignal) {
arena := client.arena
if update.Source() != nil {
log.Logf("test", "FAKE_CLIENT_UPDATE: %s -> %+v", update.Source().Name(), update)
} else {
log.Logf("test", "FAKE_CLIENT_UPDATE: nil -> %+v", update)
}
if update.Type() == "event_start" {
if client.state != "init" {
log.Logf("test", "BAD_CLIENT_STATE: event_start when match not in init: %s %s", arena.Name(), client.state)
}
client.state = "autonomous_queued"
log.Logf("test", "FAKE_CLIENT_ACTION: Match started on %s, queuing autonomous automatically", arena.Name())
SendUpdate(arena, NewSignal(nil, "queue_autonomous"))
} else if update.Type() == "autonomous_queued" {
if client.state != "autonomous_queued" {
log.Logf("test", "BAD_CLIENT_STATE: autonomous_queued when match not in autonomous_queued: %s %s", arena.Name(), client.state)
}
client.state = "autonomous_started"
log.Logf("test", "FAKE_CLIENT_ACTION: Autonomous queued on %s for %s, starting automatically at requested time", arena.Name(), update.Time())
signal := NewSignal(nil, "start_autonomous")
signal.time = update.Time()
SendUpdate(arena, signal)
} else if update.Type() == "autonomous_done" {
if client.state != "autonomous_started" {
log.Logf("test", "BAD_CLIENT_STATE: autonomous_done when match not in autonomous_started: %s %s", arena.Name(), client.state)
}
client.state = "driver_queued"
log.Logf("test", "FAKE_CLIENT_ACTION: Autonomous done on %s for %s, queueing driver automatically", arena.Name(), update.Time())
signal := NewSignal(nil, "queue_driver")
SendUpdate(arena, signal)
} else if update.Type() == "driver_queued" {
if client.state != "driver_queued" {
log.Logf("test", "BAD_CLIENT_STATE: driver_queued when match not in autonomous_done: %s %s", arena.Name(), client.state)
}
client.state = "driver_started"
log.Logf("test", "FAKE_CLIENT_ACTION: Driver queueud on %s for %s, starting driver automatically at requested time", arena.Name(), update.Time())
signal := NewSignal(nil, "start_driver")
signal.time = update.Time()
SendUpdate(arena, signal)
} else if update.Type() == "driver_done" {
if client.state != "driver_started" {
log.Logf("test", "BAD_CLIENT_STATE: driver_done when match not in driver_started: %s %s", arena.Name(), client.state)
}
client.state = "init"
log.Logf("test", "FAKE_CLIENT_ACTION: Driver done on %s for %s", arena.Name(), update.Time())
client.games_done += 1
}
} }
func main() { func main() {
event_manager, _, _ := fake_data() event_manager := fake_data()
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
@ -261,12 +196,6 @@ func main() {
pprof.WriteHeapProfile(memfile) pprof.WriteHeapProfile(memfile)
}() }()
/*// Fake arena clients
arena_1_client := NewFakeClient(arenas_div1[0])
arena_2_client := NewFakeClient(arenas_div1[1])
arena_3_client := NewFakeClient(arenas_div2[0])
arena_4_client := NewFakeClient(arenas_div2[1])
*/
go func() { go func() {
for true { for true {
select { select {
@ -274,25 +203,7 @@ func main() {
signal := NewSignal(nil, "abort") signal := NewSignal(nil, "abort")
signal.description = event_manager.root_event.ID() signal.description = event_manager.root_event.ID()
SendUpdate(event_manager.root_event, signal) SendUpdate(event_manager.root_event, signal)
time.Sleep(time.Second * 5)
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
break break
/*case update := <- arena_1_client.update:
arena_1_client.process_update(update)
case update := <- arena_2_client.update:
arena_2_client.process_update(update)
case update := <- arena_3_client.update:
arena_3_client.process_update(update)
case update := <- arena_4_client.update:
arena_4_client.process_update(update)
}
if arena_1_client.games_done == 12 &&
arena_2_client.games_done == 12 &&
arena_3_client.games_done == 12 &&
arena_4_client.games_done == 12 {
//signal := NewSignal(nil, "cancel")
//signal.description = event_manager.root_event.ID()
//SendUpdate(event_manager.root_event, signal)*/
} }
} }
}() }()

@ -121,50 +121,80 @@ func (manager * EventManager) AddResource(resource Resource) error {
// Check that created resources don't exist in the DAG // Check that created resources don't exist in the DAG
// Add resources created by the event to the DAG // Add resources created by the event to the DAG
// Add child to parent // Add child to parent
func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error { func (manager * EventManager) CheckResources(event Event) error {
if child == nil { if event == nil {
return errors.New("Cannot add nil Event to EventManager") return errors.New("Cannot check nil event for resources")
} else if len(child.Children()) != 0 {
return errors.New("Adding events recursively not implemented")
} }
for _, resource := range child.RequiredResources() { for _, r := range(event.RequiredResources()) {
_, exists := manager.dag_nodes[resource.ID()] res_found := false
for _, res := range(manager.dag_nodes) {
if res.ID() == r.ID() {
res_found = true
}
}
if res_found == false {
return errors.New(fmt.Sprintf("Failed to find %s in the resource forest for %s", r.Name(), event.Name()))
}
}
for _, c := range(event.Children()) {
err := manager.CheckResources(c)
if err != nil {
return err
}
}
return nil
}
func (manager * EventManager) AddDoneResources(event Event) {
if event == nil {
return
}
done_resource := event.DoneResource()
_, exists := manager.dag_nodes[done_resource.ID()]
if exists == false { if exists == false {
error_str := fmt.Sprintf("Required resource %s not in DAG, cannot add event %s", resource.ID(), child.ID()) manager.AddResource(done_resource)
return errors.New(error_str) }
for _, child := range(event.Children()) {
manager.AddDoneResources(child)
} }
}
func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error {
if child == nil {
return errors.New("Cannot add nil Event to EventManager")
} }
resource := child.DoneResource() err := manager.CheckResources(child)
_, exists := manager.dag_nodes[resource.ID()] if err != nil {
if exists == true { return fmt.Errorf("Failed to add event to event manager: %w", err)
error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
} }
manager.AddResource(resource)
if manager.root_event == nil && parent != nil { manager.AddDoneResources(child)
error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent")
return errors.New(error_str) if manager.root_event == nil {
} else if manager.root_event != nil && parent == nil { if parent != nil {
// TODO return fmt.Errorf("EventManager has no root, so can't add event to parent")
return errors.New("Replacing root event not implemented")
} else if manager.root_event == nil && parent == nil {
manager.root_event = child
return nil;
} else { } else {
if FindChild(manager.root_event, parent.ID()) == nil { manager.root_event = child
error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID()) return nil
return errors.New(error_str)
} }
} else {
if FindChild(manager.root_event, child.ID()) != nil { if parent == nil {
error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID()) return fmt.Errorf("Replacing root event not implemented")
return errors.New(error_str) } else if FindChild(manager.root_event, parent.ID()) == nil {
return fmt.Errorf("Parent does not exists in event tree")
} else if FindChild(manager.root_event, child.ID()) != nil {
return fmt.Errorf("Child already exists in event tree")
} else {
AddChild(parent, child, info)
} }
return AddChild(parent, child, info)
} }
return nil
} }