Created GQL resource which will act as a graphql server for the attached node and all nodes under it

graph-rework
noah metz 2023-06-07 00:36:40 -06:00
parent 2d31daa916
commit 75d0eb0396
5 changed files with 54 additions and 48 deletions

@ -227,6 +227,7 @@ func AbortChildren(event Event) {
} }
func LockResources(event Event) error { func LockResources(event Event) error {
log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.RequiredResources())
locked_resources := []Resource{} locked_resources := []Resource{}
var lock_err error = nil var lock_err error = nil
for _, resource := range(event.RequiredResources()) { for _, resource := range(event.RequiredResources()) {
@ -394,7 +395,7 @@ func (queue * EventQueue) InfoType() reflect.Type {
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
queue := &EventQueue{ queue := &EventQueue{
BaseEvent: NewBaseEvent(name, description, []Resource{}), BaseEvent: NewBaseEvent(name, description, required_resources),
listened_resources: map[string]Resource{}, listened_resources: map[string]Resource{},
} }
@ -535,47 +536,3 @@ type GQLEvent struct {
BaseEvent BaseEvent
abort chan error abort chan error
} }
func NewGQLEvent(listen string, child Event) * GQLEvent {
event := &GQLEvent{
BaseEvent: NewBaseEvent("GQL Handler", "", []Resource{}),
abort: make(chan error, 1),
}
event.actions["wait"] = EventWait(event)
event.handlers["abort"] = func (signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
event.abort <- nil
AbortChildren(event)
return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID()))
}
return "wait", nil
}
event.handlers["cancel"] = func (signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
event.abort <- nil
CancelChildren(event)
return "", nil
}
return "wait", nil
}
event.actions["start"] = func() (string, error) {
// start the gql handler goroutine
log.Logf("gql", "Starting GQL thread for %s", event.ID())
go func(event * GQLEvent) {
for true {
select {
case <- event.abort:
log.Logf("gql", "Stopping GQL thread for %s", event.ID())
break
}
}
}(event)
return "wait", nil
}
return event
}

@ -40,7 +40,7 @@ func (logger * DefaultLogger) Init(components []string) error {
} }
writer := io.MultiWriter(file, os.Stdout) writer := io.MultiWriter(file, os.Stdout)
for _, c := range(all_components) { for _, c := range([]string{"gql"}) {
if component_enabled(c) == true { if component_enabled(c) == true {
logger.loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger() logger.loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger()
} else { } else {

@ -114,7 +114,9 @@ func fake_data() (* EventManager, []Arena, []Arena) {
} }
root_event := NewEventQueue("root_event", "", []Resource{}) gql_server := NewGQLServer(":8080")
resources = append(resources, gql_server)
root_event := NewEventQueue("root_event", "", []Resource{gql_server})
event_manager := NewEventManager(root_event, resources) event_manager := NewEventManager(root_event, resources)
div_1 := NewEventQueue("Division 1", "", []Resource{}) div_1 := NewEventQueue("Division 1", "", []Resource{})
div_2 := NewEventQueue("Division 2", "", []Resource{}) div_2 := NewEventQueue("Division 2", "", []Resource{})

@ -355,6 +355,7 @@ func TestStartEventQueue(t * testing.T) {
e1:= NewEvent("e1", "", []Resource{res_1, res_2}) e1:= NewEvent("e1", "", []Resource{res_1, res_2})
e1_l := e1.UpdateChannel()
e1_r := e1.DoneResource() e1_r := e1.DoneResource()
e1_info := NewEventQueueInfo(1) e1_info := NewEventQueueInfo(1)
err := manager.AddEvent(root_event, e1, e1_info) err := manager.AddEvent(root_event, e1, e1_info)
@ -364,6 +365,7 @@ func TestStartEventQueue(t * testing.T) {
(*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1") (*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1")
e2 := NewEvent("e2", "", []Resource{res_1}) e2 := NewEvent("e2", "", []Resource{res_1})
e2_l := e2.UpdateChannel()
e2_r := e2.DoneResource() e2_r := e2.DoneResource()
e2_info := NewEventQueueInfo(2) e2_info := NewEventQueueInfo(2)
err = manager.AddEvent(root_event, e2, e2_info) err = manager.AddEvent(root_event, e2, e2_info)
@ -373,6 +375,7 @@ func TestStartEventQueue(t * testing.T) {
(*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2") (*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2")
e3 := NewEvent("e3", "", []Resource{res_2}) e3 := NewEvent("e3", "", []Resource{res_2})
e3_l := e3.UpdateChannel()
e3_r := e3.DoneResource() e3_r := e3.DoneResource()
e3_info := NewEventQueueInfo(3) e3_info := NewEventQueueInfo(3)
err = manager.AddEvent(root_event, e3, e3_info) err = manager.AddEvent(root_event, e3, e3_info)
@ -394,7 +397,9 @@ func TestStartEventQueue(t * testing.T) {
// Now that an event manager is constructed with a queue and 3 basic events // Now that an event manager is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed // start the queue and check that all the events are executed
go func() { go func() {
(*graph_tester)(t).WaitForValue(rel, "event_done", e3, time.Second, "No event_done for e3") (*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3")
(*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "No event_done for e3")
(*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "No event_done for e3")
signal := NewSignal(nil, "cancel") signal := NewSignal(nil, "cancel")
signal.description = root_event.ID() signal.description = root_event.ID()
SendUpdate(root_event, signal) SendUpdate(root_event, signal)

@ -144,6 +144,7 @@ func LockResource(resource Resource, node GraphNode) error {
return errors.New(err_str) return errors.New(err_str)
} }
log.Logf("resource", "Locked %s", resource.Name())
resource.SetOwner(node) resource.SetOwner(node)
@ -252,3 +253,44 @@ func NewResource(name string, description string, children []Resource) * BaseRes
resource := NewBaseResource(name, description, children) resource := NewBaseResource(name, description, children)
return &resource return &resource
} }
type GQLServer struct {
BaseResource
abort chan error
listen string
gql_channel chan error
}
func NewGQLServer(listen string) * GQLServer {
server := &GQLServer{
BaseResource: NewBaseResource("GQL Connection", "Connection to a GQL server", []Resource{}),
listen: listen,
abort: make(chan error, 1),
gql_channel: make(chan error, 1),
}
return server
}
func (server * GQLServer) update(signal GraphSignal) {
server.signal <- signal
server.BaseResource.update(signal)
}
func (server * GQLServer) Init(abort chan error) bool {
go func(abort chan error) {
log.Logf("gql", "GOROUTINE_START for %s", server.ID())
for true {
select {
case <-abort:
log.Logf("gql", "GOROUTINE_ABORT for %s", server.ID())
break
case <-server.signal:
// Take signals to resource and send to GQL subscriptions
case <-server.gql_channel:
// Parse GQL query from channel and reply with resolved query
}
}
}(abort)
return true
}