Added basic event queue machine

graph-rework
noah metz 2023-05-29 23:54:52 -06:00
parent 0c3193162c
commit f6834c9201
4 changed files with 243 additions and 45 deletions

@ -5,6 +5,7 @@ import (
"errors"
graphql "github.com/graph-gophers/graphql-go"
"reflect"
"sort"
)
// Update the events listeners, and notify the parent to do the same
@ -50,9 +51,13 @@ type Event interface {
Parent() Event
RegisterParent(parent Event) error
RequiredResources() []Resource
CreatedResources() []Resource
DoneResource() Resource
AddChild(child Event, info EventInfo) error
FindChild(id graphql.ID) Event
Run() error
Abort() error
Lock() error
Unlock() error
}
// BaseEvent is the most basic event that can exist in the event tree.
@ -63,11 +68,86 @@ type Event interface {
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
created_resources []Resource
done_resource Resource
required_resources []Resource
children []Event
child_info map[Event]EventInfo
actions map[string]func() (string, error)
parent Event
signal chan string
abort chan string
}
func (event * BaseEvent) Abort() error {
event.signal <- "abort"
return nil
}
func (queue * EventQueue) Abort() error {
for _, event := range(queue.Children()) {
event.Abort()
}
queue.signal <- "abort"
return nil
}
func (event * BaseEvent) Lock() error {
locked_resources := []Resource{}
lock_err := false
for _, resource := range(event.RequiredResources()) {
err := resource.Lock(event)
if err != nil {
lock_err = true
}
}
if lock_err == true {
for _, resource := range(locked_resources) {
resource.Unlock(event)
}
return errors.New("failed to lock required resources")
}
return nil
}
func (event * BaseEvent) Unlock() error {
return event.DoneResource().Unlock(event)
}
func (event * BaseEvent) Run() error {
next_action := "start"
var err error = nil
for next_action != "" {
// Check if the edge exists
action, exists := event.actions[next_action]
if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action)
return errors.New(error_str)
}
// Run the edge function
next_action, err = action()
if err != nil {
return err
}
// Check signals
select {
case reason := <-event.abort:
error_str := fmt.Sprintf("State Machine aborted: %s", reason)
return errors.New(error_str)
default:
}
// Update the event after running the edge
event.Update()
}
err = event.DoneResource().Unlock(event)
if err != nil {
return err
}
return nil
}
// EventQueue is a basic event that can have children.
@ -76,7 +156,7 @@ type EventQueue struct {
BaseEvent
}
func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent, Resource) {
func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
event := &BaseEvent{
BaseNode: BaseNode{
@ -88,17 +168,23 @@ func NewEvent(name string, description string, required_resources []Resource) (*
parent: nil,
children: []Event{},
child_info: map[Event]EventInfo{},
created_resources: []Resource{done_resource},
done_resource: done_resource,
required_resources: required_resources,
actions: map[string]func()(string, error){},
signal: make(chan string, 10),
}
// Lock the done_resource by default
done_resource.Lock(event)
return event, done_resource
event.actions["start"] = func() (string, error) {
return "", nil
}
return event
}
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue, Resource) {
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
queue := &EventQueue{
BaseEvent: BaseEvent{
@ -111,14 +197,83 @@ func NewEventQueue(name string, description string, required_resources []Resourc
parent: nil,
children: []Event{},
child_info: map[Event]EventInfo{},
created_resources: []Resource{done_resource},
done_resource: done_resource,
required_resources: required_resources,
actions: map[string]func()(string, error){},
signal: make(chan string, 10),
abort: make(chan string, 1),
},
}
done_resource.Lock(queue)
// Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent
done_resource.Lock(&queue.BaseEvent)
queue.actions["start"] = func() (string, error) {
return "queue_event", nil
}
queue.actions["queue_event"] = func() (string, error) {
// Sort the list of events by priority
// Keep trying to lock the highest priority event until the end of the list is reached, or an event is locked
// If an event is locked, transition it to "started" and start event in a new goroutine
// If the end of the queue is reached and there are no uncompleted events, transition to "done"
// If the end of the queue is reached and there are uncompleted events, transition to "wait"
copied_events := make([]Event, len(queue.Children()))
copy(copied_events, queue.Children())
less := func(i int, j int) bool {
info_i := queue.ChildInfo(copied_events[i]).(*EventQueueInfo)
info_j := queue.ChildInfo(copied_events[j]).(*EventQueueInfo)
return info_i.priority < info_j.priority
}
sort.SliceStable(copied_events, less)
wait := false
for _, event := range(copied_events) {
info := queue.ChildInfo(event).(*EventQueueInfo)
if info.state == "queued" {
wait = true
// Try to lock it
err := event.Lock()
// start in new goroutine
if err != nil {
} else {
info.state = "running"
go func(event Event, info * EventQueueInfo, queue * EventQueue) {
event.Run()
info.state = "done"
queue.signal <- "event_done"
}(event, info, queue)
}
} else if info.state == "running" {
wait = true
}
}
if wait == true {
return "wait", nil
} else {
return "", nil
}
}
queue.actions["wait"] = func() (string, error) {
// Wait until signaled by a thread
/*
What signals to take action for:
- abort : sent by any other thread : abort any child events and set the next event to none
- resource_available : sent by the aggregator goroutine when the lock on a resource changes : see if any events can be locked
- event_done : sent by child event threads : see if all events are completed
*/
signal := <- queue.signal
if signal == "abort" {
queue.abort <- "aborted by signal"
return "", nil
}
return "queue_event", nil
}
return queue, done_resource
return queue
}
// Store the nodes parent for upwards propagation of changes
@ -139,8 +294,8 @@ func (event * BaseEvent) RequiredResources() []Resource {
return event.required_resources
}
func (event * BaseEvent) CreatedResources() []Resource {
return event.created_resources
func (event * BaseEvent) DoneResource() Resource {
return event.done_resource
}
func (event * BaseEvent) Children() []Event {

@ -32,7 +32,7 @@ func fake_data() * EventManager {
}
}
root_event, _ := NewEventQueue("root_event", "", []Resource{})
root_event := NewEventQueue("root_event", "", []Resource{})
event_manager := NewEventManager(root_event, resources)

@ -5,7 +5,6 @@ import (
"log"
"errors"
graphql "github.com/graph-gophers/graphql-go"
"context"
)
type EventManager struct {
@ -35,9 +34,8 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
return manager;
}
func (manager * EventManager) Run(ctx context.Context) error {
//return manager.root_event.Run(ctx)
return nil
func (manager * EventManager) Run() error {
return manager.root_event.Run()
}
func (manager * EventManager) FindResource(id graphql.ID) Resource {
@ -97,14 +95,13 @@ func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo
}
}
for _, resource := range child.CreatedResources() {
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
manager.AddResource(resource)
resource := child.DoneResource()
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
manager.AddResource(resource)
if manager.root_event == nil && parent != nil {
error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent")

@ -3,7 +3,7 @@ package main
import (
"testing"
"time"
"context"
"fmt"
)
type graph_tester testing.T
@ -51,7 +51,7 @@ func TestNewResourceAdd(t *testing.T) {
description := "A resource for testing"
children := []Resource{}
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
test_resource := NewResource(name, description, children)
event_manager := NewEventManager(root_event, []Resource{test_resource})
res := event_manager.FindResource(test_resource.ID())
@ -66,7 +66,7 @@ func TestNewResourceAdd(t *testing.T) {
}
func TestDoubleResourceAdd(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
test_resource := NewResource("", "", []Resource{})
event_manager := NewEventManager(root_event, []Resource{test_resource})
err := event_manager.AddResource(test_resource)
@ -77,7 +77,7 @@ func TestDoubleResourceAdd(t * testing.T) {
}
func TestMissingResourceAdd(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
@ -89,7 +89,7 @@ func TestMissingResourceAdd(t * testing.T) {
}
func TestTieredResource(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
@ -100,7 +100,7 @@ func TestTieredResource(t * testing.T) {
}
func TestResourceUpdate(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
@ -139,14 +139,14 @@ func TestResourceUpdate(t * testing.T) {
}
func TestAddEvent(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
name := "Test Event"
description := "A test event"
resources := []Resource{r2}
new_event, _ := NewEvent(name, description, resources)
new_event := NewEvent(name, description, resources)
event_manager := NewEventManager(root_event, []Resource{r1})
err := event_manager.AddResource(r2)
@ -177,8 +177,8 @@ func TestAddEvent(t * testing.T) {
}
func TestLockResource(t * testing.T) {
root_event, _ := NewEvent("", "", []Resource{})
test_event, _ := NewEvent("", "", []Resource{})
root_event := NewEvent("", "", []Resource{})
test_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
@ -243,9 +243,9 @@ func TestLockResource(t * testing.T) {
}
func TestAddToEventQueue(t * testing.T) {
queue, _ := NewEventQueue("q", "", []Resource{})
event_1, _ := NewEvent("1", "", []Resource{})
event_2, _ := NewEvent("2", "", []Resource{})
queue := NewEventQueue("q", "", []Resource{})
event_1 := NewEvent("1", "", []Resource{})
event_2 := NewEvent("2", "", []Resource{})
err := queue.AddChild(event_1, nil)
if err == nil {
@ -264,7 +264,8 @@ func TestAddToEventQueue(t * testing.T) {
}
func TestStartBaseEvent(t * testing.T) {
event_1, r := NewEvent("1", "", []Resource{})
event_1 := NewEvent("1", "", []Resource{})
r := event_1.DoneResource()
manager := NewEventManager(event_1, []Resource{})
e_l := event_1.UpdateChannel()
@ -276,7 +277,7 @@ func TestStartBaseEvent(t * testing.T) {
t.Fatal("r is not owned by event_1")
}
err := manager.Run(context.Background())
err := manager.Run()
if err != nil {
t.Fatal(err)
}
@ -289,12 +290,50 @@ func TestStartBaseEvent(t * testing.T) {
}
}
func TestAbortEventQueue(t * testing.T) {
root_event := NewEventQueue("", "", []Resource{})
r := root_event.DoneResource()
manager := NewEventManager(root_event, []Resource{})
r1 := NewResource("r1", "", []Resource{})
err := manager.AddResource(r1)
if err != nil {
t.Fatal(err)
}
r1.Lock(root_event)
e1 := NewEvent("1", "", []Resource{r1})
e1_info := NewEventQueueInfo(1)
// Add an event so that the queue doesn't auto complete
err = manager.AddEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal(err)
}
// Now that an event manager is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
time.Sleep(time.Second)
root_event.Abort()
}()
err = manager.Run()
if err == nil {
t.Fatal("event manager completed without error")
}
if r.Owner() == nil {
t.Fatal("root event was finished after starting")
}
}
func TestStartEventQueue(t * testing.T) {
root_event, r := NewEventQueue("", "", []Resource{})
root_event := NewEventQueue("", "", []Resource{})
r := root_event.DoneResource()
rel := root_event.UpdateChannel();
manager := NewEventManager(root_event, []Resource{})
e1, e1_r := NewEvent("1", "", []Resource{})
e1:= NewEvent("1", "", []Resource{})
e1_r := e1.DoneResource()
e1_info := NewEventQueueInfo(1)
err := manager.AddEvent(root_event, e1, e1_info)
if err != nil {
@ -302,7 +341,8 @@ func TestStartEventQueue(t * testing.T) {
}
(*graph_tester)(t).CheckForNil(rel)
e2, e2_r := NewEvent("1", "", []Resource{})
e2 := NewEvent("1", "", []Resource{})
e2_r := e2.DoneResource()
e2_info := NewEventQueueInfo(2)
err = manager.AddEvent(root_event, e2, e2_info)
if err != nil {
@ -310,7 +350,8 @@ func TestStartEventQueue(t * testing.T) {
}
(*graph_tester)(t).CheckForNil(rel)
e3, e3_r := NewEvent("1", "", []Resource{})
e3 := NewEvent("1", "", []Resource{})
e3_r := e3.DoneResource()
e3_info := NewEventQueueInfo(3)
err = manager.AddEvent(root_event, e3, e3_info)
if err != nil {
@ -322,16 +363,21 @@ func TestStartEventQueue(t * testing.T) {
e2_l := e2.UpdateChannel();
e3_l := e3.UpdateChannel();
// Abort the event after 5 seconds just in case
go func() {
time.Sleep(5 * time.Second)
root_event.Abort()
}()
// Now that an event manager is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
err = manager.Run(context.Background())
err = manager.Run()
if err != nil {
t.Fatal(err)
}
time.Sleep( 5 * time.Second)
if r.Owner() != nil {
fmt.Printf("root_event.DoneResource(): %p", root_event.DoneResource())
t.Fatal("root event was not finished after starting")
}