diff --git a/event.go b/event.go index b3479cd..2f19d66 100644 --- a/event.go +++ b/event.go @@ -227,6 +227,7 @@ func AbortChildren(event Event) { } func LockResources(event Event) error { + log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.RequiredResources()) locked_resources := []Resource{} var lock_err error = nil for _, resource := range(event.RequiredResources()) { @@ -394,7 +395,7 @@ func (queue * EventQueue) InfoType() reflect.Type { func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { queue := &EventQueue{ - BaseEvent: NewBaseEvent(name, description, []Resource{}), + BaseEvent: NewBaseEvent(name, description, required_resources), listened_resources: map[string]Resource{}, } @@ -535,47 +536,3 @@ type GQLEvent struct { BaseEvent abort chan error } - -func NewGQLEvent(listen string, child Event) * GQLEvent { - event := &GQLEvent{ - BaseEvent: NewBaseEvent("GQL Handler", "", []Resource{}), - abort: make(chan error, 1), - } - - event.actions["wait"] = EventWait(event) - - event.handlers["abort"] = func (signal GraphSignal) (string, error) { - if signal.Description() == event.ID() { - event.abort <- nil - AbortChildren(event) - return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID())) - } - return "wait", nil - } - - event.handlers["cancel"] = func (signal GraphSignal) (string, error) { - if signal.Description() == event.ID() { - event.abort <- nil - CancelChildren(event) - return "", nil - } - return "wait", nil - } - - event.actions["start"] = func() (string, error) { - // start the gql handler goroutine - log.Logf("gql", "Starting GQL thread for %s", event.ID()) - go func(event * GQLEvent) { - for true { - select { - case <- event.abort: - log.Logf("gql", "Stopping GQL thread for %s", event.ID()) - break - } - } - }(event) - return "wait", nil - } - - return event -} diff --git a/graph.go b/graph.go index bac33bd..919648e 100644 --- a/graph.go +++ b/graph.go @@ -40,7 +40,7 @@ func (logger * DefaultLogger) Init(components []string) error { } writer := io.MultiWriter(file, os.Stdout) - for _, c := range(all_components) { + for _, c := range([]string{"gql"}) { if component_enabled(c) == true { logger.loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger() } else { diff --git a/main.go b/main.go index 18747ed..f27889f 100644 --- a/main.go +++ b/main.go @@ -114,7 +114,9 @@ func fake_data() (* EventManager, []Arena, []Arena) { } - root_event := NewEventQueue("root_event", "", []Resource{}) + gql_server := NewGQLServer(":8080") + resources = append(resources, gql_server) + root_event := NewEventQueue("root_event", "", []Resource{gql_server}) event_manager := NewEventManager(root_event, resources) div_1 := NewEventQueue("Division 1", "", []Resource{}) div_2 := NewEventQueue("Division 2", "", []Resource{}) diff --git a/manager_test.go b/manager_test.go index e5afd30..3c2a6f3 100644 --- a/manager_test.go +++ b/manager_test.go @@ -355,6 +355,7 @@ func TestStartEventQueue(t * testing.T) { e1:= NewEvent("e1", "", []Resource{res_1, res_2}) + e1_l := e1.UpdateChannel() e1_r := e1.DoneResource() e1_info := NewEventQueueInfo(1) err := manager.AddEvent(root_event, e1, e1_info) @@ -364,6 +365,7 @@ func TestStartEventQueue(t * testing.T) { (*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1") e2 := NewEvent("e2", "", []Resource{res_1}) + e2_l := e2.UpdateChannel() e2_r := e2.DoneResource() e2_info := NewEventQueueInfo(2) err = manager.AddEvent(root_event, e2, e2_info) @@ -373,6 +375,7 @@ func TestStartEventQueue(t * testing.T) { (*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2") e3 := NewEvent("e3", "", []Resource{res_2}) + e3_l := e3.UpdateChannel() e3_r := e3.DoneResource() e3_info := NewEventQueueInfo(3) err = manager.AddEvent(root_event, e3, e3_info) @@ -394,7 +397,9 @@ func TestStartEventQueue(t * testing.T) { // Now that an event manager is constructed with a queue and 3 basic events // start the queue and check that all the events are executed go func() { - (*graph_tester)(t).WaitForValue(rel, "event_done", e3, time.Second, "No event_done for e3") + (*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3") + (*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "No event_done for e3") + (*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "No event_done for e3") signal := NewSignal(nil, "cancel") signal.description = root_event.ID() SendUpdate(root_event, signal) diff --git a/resource.go b/resource.go index 1e33608..27b501c 100644 --- a/resource.go +++ b/resource.go @@ -144,6 +144,7 @@ func LockResource(resource Resource, node GraphNode) error { return errors.New(err_str) } + log.Logf("resource", "Locked %s", resource.Name()) resource.SetOwner(node) @@ -252,3 +253,44 @@ func NewResource(name string, description string, children []Resource) * BaseRes resource := NewBaseResource(name, description, children) return &resource } + +type GQLServer struct { + BaseResource + abort chan error + listen string + gql_channel chan error +} + +func NewGQLServer(listen string) * GQLServer { + server := &GQLServer{ + BaseResource: NewBaseResource("GQL Connection", "Connection to a GQL server", []Resource{}), + listen: listen, + abort: make(chan error, 1), + gql_channel: make(chan error, 1), + } + + return server +} + +func (server * GQLServer) update(signal GraphSignal) { + server.signal <- signal + server.BaseResource.update(signal) +} + +func (server * GQLServer) Init(abort chan error) bool { + go func(abort chan error) { + log.Logf("gql", "GOROUTINE_START for %s", server.ID()) + for true { + select { + case <-abort: + log.Logf("gql", "GOROUTINE_ABORT for %s", server.ID()) + break + case <-server.signal: + // Take signals to resource and send to GQL subscriptions + case <-server.gql_channel: + // Parse GQL query from channel and reply with resolved query + } + } + }(abort) + return true +}