diff --git a/event.go b/event.go index f93293a..e4a2fdb 100644 --- a/event.go +++ b/event.go @@ -5,6 +5,7 @@ import ( "errors" graphql "github.com/graph-gophers/graphql-go" "reflect" + "sort" ) // Update the events listeners, and notify the parent to do the same @@ -50,9 +51,13 @@ type Event interface { Parent() Event RegisterParent(parent Event) error RequiredResources() []Resource - CreatedResources() []Resource + DoneResource() Resource AddChild(child Event, info EventInfo) error FindChild(id graphql.ID) Event + Run() error + Abort() error + Lock() error + Unlock() error } // BaseEvent is the most basic event that can exist in the event tree. @@ -63,11 +68,86 @@ type Event interface { // When starter, this event automatically transitions to completion and unlocks all it's resources(including created) type BaseEvent struct { BaseNode - created_resources []Resource + done_resource Resource required_resources []Resource children []Event child_info map[Event]EventInfo + actions map[string]func() (string, error) parent Event + signal chan string + abort chan string +} + +func (event * BaseEvent) Abort() error { + event.signal <- "abort" + return nil +} + +func (queue * EventQueue) Abort() error { + for _, event := range(queue.Children()) { + event.Abort() + } + queue.signal <- "abort" + return nil +} + +func (event * BaseEvent) Lock() error { + locked_resources := []Resource{} + lock_err := false + for _, resource := range(event.RequiredResources()) { + err := resource.Lock(event) + if err != nil { + lock_err = true + } + } + + if lock_err == true { + for _, resource := range(locked_resources) { + resource.Unlock(event) + } + return errors.New("failed to lock required resources") + } + return nil +} + +func (event * BaseEvent) Unlock() error { + return event.DoneResource().Unlock(event) +} + +func (event * BaseEvent) Run() error { + next_action := "start" + var err error = nil + for next_action != "" { + // Check if the edge exists + action, exists := event.actions[next_action] + if exists == false { + error_str := fmt.Sprintf("%s is not a valid action", next_action) + return errors.New(error_str) + } + + // Run the edge function + next_action, err = action() + if err != nil { + return err + } + + // Check signals + select { + case reason := <-event.abort: + error_str := fmt.Sprintf("State Machine aborted: %s", reason) + return errors.New(error_str) + default: + } + + // Update the event after running the edge + event.Update() + } + + err = event.DoneResource().Unlock(event) + if err != nil { + return err + } + return nil } // EventQueue is a basic event that can have children. @@ -76,7 +156,7 @@ type EventQueue struct { BaseEvent } -func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent, Resource) { +func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent) { done_resource := NewResource("event_done", "signal that event is done", []Resource{}) event := &BaseEvent{ BaseNode: BaseNode{ @@ -88,17 +168,23 @@ func NewEvent(name string, description string, required_resources []Resource) (* parent: nil, children: []Event{}, child_info: map[Event]EventInfo{}, - created_resources: []Resource{done_resource}, + done_resource: done_resource, required_resources: required_resources, + actions: map[string]func()(string, error){}, + signal: make(chan string, 10), } // Lock the done_resource by default done_resource.Lock(event) - return event, done_resource + event.actions["start"] = func() (string, error) { + return "", nil + } + + return event } -func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue, Resource) { +func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { done_resource := NewResource("event_done", "signal that event is done", []Resource{}) queue := &EventQueue{ BaseEvent: BaseEvent{ @@ -111,14 +197,83 @@ func NewEventQueue(name string, description string, required_resources []Resourc parent: nil, children: []Event{}, child_info: map[Event]EventInfo{}, - created_resources: []Resource{done_resource}, + done_resource: done_resource, required_resources: required_resources, + actions: map[string]func()(string, error){}, + signal: make(chan string, 10), + abort: make(chan string, 1), }, } - done_resource.Lock(queue) + // Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent + done_resource.Lock(&queue.BaseEvent) + + queue.actions["start"] = func() (string, error) { + return "queue_event", nil + } + + queue.actions["queue_event"] = func() (string, error) { + // Sort the list of events by priority + // Keep trying to lock the highest priority event until the end of the list is reached, or an event is locked + // If an event is locked, transition it to "started" and start event in a new goroutine + // If the end of the queue is reached and there are no uncompleted events, transition to "done" + // If the end of the queue is reached and there are uncompleted events, transition to "wait" + copied_events := make([]Event, len(queue.Children())) + copy(copied_events, queue.Children()) + less := func(i int, j int) bool { + info_i := queue.ChildInfo(copied_events[i]).(*EventQueueInfo) + info_j := queue.ChildInfo(copied_events[j]).(*EventQueueInfo) + return info_i.priority < info_j.priority + } + sort.SliceStable(copied_events, less) + + wait := false + for _, event := range(copied_events) { + info := queue.ChildInfo(event).(*EventQueueInfo) + if info.state == "queued" { + wait = true + // Try to lock it + err := event.Lock() + // start in new goroutine + if err != nil { + + } else { + info.state = "running" + go func(event Event, info * EventQueueInfo, queue * EventQueue) { + event.Run() + info.state = "done" + queue.signal <- "event_done" + }(event, info, queue) + } + } else if info.state == "running" { + wait = true + } + } + + if wait == true { + return "wait", nil + } else { + return "", nil + } + } + + queue.actions["wait"] = func() (string, error) { + // Wait until signaled by a thread + /* + What signals to take action for: + - abort : sent by any other thread : abort any child events and set the next event to none + - resource_available : sent by the aggregator goroutine when the lock on a resource changes : see if any events can be locked + - event_done : sent by child event threads : see if all events are completed + */ + signal := <- queue.signal + if signal == "abort" { + queue.abort <- "aborted by signal" + return "", nil + } + return "queue_event", nil + } - return queue, done_resource + return queue } // Store the nodes parent for upwards propagation of changes @@ -139,8 +294,8 @@ func (event * BaseEvent) RequiredResources() []Resource { return event.required_resources } -func (event * BaseEvent) CreatedResources() []Resource { - return event.created_resources +func (event * BaseEvent) DoneResource() Resource { + return event.done_resource } func (event * BaseEvent) Children() []Event { diff --git a/main.go b/main.go index 97a2aba..e4995ee 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ func fake_data() * EventManager { } } - root_event, _ := NewEventQueue("root_event", "", []Resource{}) + root_event := NewEventQueue("root_event", "", []Resource{}) event_manager := NewEventManager(root_event, resources) diff --git a/manager.go b/manager.go index b4a763f..7a93895 100644 --- a/manager.go +++ b/manager.go @@ -5,7 +5,6 @@ import ( "log" "errors" graphql "github.com/graph-gophers/graphql-go" - "context" ) type EventManager struct { @@ -35,9 +34,8 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { return manager; } -func (manager * EventManager) Run(ctx context.Context) error { - //return manager.root_event.Run(ctx) - return nil +func (manager * EventManager) Run() error { + return manager.root_event.Run() } func (manager * EventManager) FindResource(id graphql.ID) Resource { @@ -97,14 +95,13 @@ func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo } } - for _, resource := range child.CreatedResources() { - _, exists := manager.dag_nodes[resource.ID()] - if exists == true { - error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID()) - return errors.New(error_str) - } - manager.AddResource(resource) + resource := child.DoneResource() + _, exists := manager.dag_nodes[resource.ID()] + if exists == true { + error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID()) + return errors.New(error_str) } + manager.AddResource(resource) if manager.root_event == nil && parent != nil { error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent") diff --git a/manager_test.go b/manager_test.go index de59d4b..9c73d4d 100644 --- a/manager_test.go +++ b/manager_test.go @@ -3,7 +3,7 @@ package main import ( "testing" "time" - "context" + "fmt" ) type graph_tester testing.T @@ -51,7 +51,7 @@ func TestNewResourceAdd(t *testing.T) { description := "A resource for testing" children := []Resource{} - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) test_resource := NewResource(name, description, children) event_manager := NewEventManager(root_event, []Resource{test_resource}) res := event_manager.FindResource(test_resource.ID()) @@ -66,7 +66,7 @@ func TestNewResourceAdd(t *testing.T) { } func TestDoubleResourceAdd(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) test_resource := NewResource("", "", []Resource{}) event_manager := NewEventManager(root_event, []Resource{test_resource}) err := event_manager.AddResource(test_resource) @@ -77,7 +77,7 @@ func TestDoubleResourceAdd(t * testing.T) { } func TestMissingResourceAdd(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) @@ -89,7 +89,7 @@ func TestMissingResourceAdd(t * testing.T) { } func TestTieredResource(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) @@ -100,7 +100,7 @@ func TestTieredResource(t * testing.T) { } func TestResourceUpdate(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{}) r3 := NewResource("r3", "", []Resource{r1, r2}) @@ -139,14 +139,14 @@ func TestResourceUpdate(t * testing.T) { } func TestAddEvent(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) name := "Test Event" description := "A test event" resources := []Resource{r2} - new_event, _ := NewEvent(name, description, resources) + new_event := NewEvent(name, description, resources) event_manager := NewEventManager(root_event, []Resource{r1}) err := event_manager.AddResource(r2) @@ -177,8 +177,8 @@ func TestAddEvent(t * testing.T) { } func TestLockResource(t * testing.T) { - root_event, _ := NewEvent("", "", []Resource{}) - test_event, _ := NewEvent("", "", []Resource{}) + root_event := NewEvent("", "", []Resource{}) + test_event := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{}) r3 := NewResource("r3", "", []Resource{r1, r2}) @@ -243,9 +243,9 @@ func TestLockResource(t * testing.T) { } func TestAddToEventQueue(t * testing.T) { - queue, _ := NewEventQueue("q", "", []Resource{}) - event_1, _ := NewEvent("1", "", []Resource{}) - event_2, _ := NewEvent("2", "", []Resource{}) + queue := NewEventQueue("q", "", []Resource{}) + event_1 := NewEvent("1", "", []Resource{}) + event_2 := NewEvent("2", "", []Resource{}) err := queue.AddChild(event_1, nil) if err == nil { @@ -264,7 +264,8 @@ func TestAddToEventQueue(t * testing.T) { } func TestStartBaseEvent(t * testing.T) { - event_1, r := NewEvent("1", "", []Resource{}) + event_1 := NewEvent("1", "", []Resource{}) + r := event_1.DoneResource() manager := NewEventManager(event_1, []Resource{}) e_l := event_1.UpdateChannel() @@ -276,7 +277,7 @@ func TestStartBaseEvent(t * testing.T) { t.Fatal("r is not owned by event_1") } - err := manager.Run(context.Background()) + err := manager.Run() if err != nil { t.Fatal(err) } @@ -289,12 +290,50 @@ func TestStartBaseEvent(t * testing.T) { } } +func TestAbortEventQueue(t * testing.T) { + root_event := NewEventQueue("", "", []Resource{}) + r := root_event.DoneResource() + manager := NewEventManager(root_event, []Resource{}) + + r1 := NewResource("r1", "", []Resource{}) + err := manager.AddResource(r1) + if err != nil { + t.Fatal(err) + } + r1.Lock(root_event) + e1 := NewEvent("1", "", []Resource{r1}) + e1_info := NewEventQueueInfo(1) + // Add an event so that the queue doesn't auto complete + err = manager.AddEvent(root_event, e1, e1_info) + if err != nil { + t.Fatal(err) + } + + // Now that an event manager is constructed with a queue and 3 basic events + // start the queue and check that all the events are executed + go func() { + time.Sleep(time.Second) + root_event.Abort() + }() + + err = manager.Run() + if err == nil { + t.Fatal("event manager completed without error") + } + + if r.Owner() == nil { + t.Fatal("root event was finished after starting") + } +} + func TestStartEventQueue(t * testing.T) { - root_event, r := NewEventQueue("", "", []Resource{}) + root_event := NewEventQueue("", "", []Resource{}) + r := root_event.DoneResource() rel := root_event.UpdateChannel(); manager := NewEventManager(root_event, []Resource{}) - e1, e1_r := NewEvent("1", "", []Resource{}) + e1:= NewEvent("1", "", []Resource{}) + e1_r := e1.DoneResource() e1_info := NewEventQueueInfo(1) err := manager.AddEvent(root_event, e1, e1_info) if err != nil { @@ -302,7 +341,8 @@ func TestStartEventQueue(t * testing.T) { } (*graph_tester)(t).CheckForNil(rel) - e2, e2_r := NewEvent("1", "", []Resource{}) + e2 := NewEvent("1", "", []Resource{}) + e2_r := e2.DoneResource() e2_info := NewEventQueueInfo(2) err = manager.AddEvent(root_event, e2, e2_info) if err != nil { @@ -310,7 +350,8 @@ func TestStartEventQueue(t * testing.T) { } (*graph_tester)(t).CheckForNil(rel) - e3, e3_r := NewEvent("1", "", []Resource{}) + e3 := NewEvent("1", "", []Resource{}) + e3_r := e3.DoneResource() e3_info := NewEventQueueInfo(3) err = manager.AddEvent(root_event, e3, e3_info) if err != nil { @@ -322,16 +363,21 @@ func TestStartEventQueue(t * testing.T) { e2_l := e2.UpdateChannel(); e3_l := e3.UpdateChannel(); + // Abort the event after 5 seconds just in case + go func() { + time.Sleep(5 * time.Second) + root_event.Abort() + }() + // Now that an event manager is constructed with a queue and 3 basic events // start the queue and check that all the events are executed - err = manager.Run(context.Background()) + err = manager.Run() if err != nil { t.Fatal(err) } - time.Sleep( 5 * time.Second) - if r.Owner() != nil { + fmt.Printf("root_event.DoneResource(): %p", root_event.DoneResource()) t.Fatal("root event was not finished after starting") }