diff --git a/event.go b/event.go new file mode 100644 index 0000000..f93293a --- /dev/null +++ b/event.go @@ -0,0 +1,217 @@ +package main + +import ( + "fmt" + "errors" + graphql "github.com/graph-gophers/graphql-go" + "reflect" +) + +// Update the events listeners, and notify the parent to do the same +func (event * BaseEvent) Update() error { + err := event.UpdateListeners() + if err != nil { + return err + } + + if event.parent != nil{ + return event.parent.Update() + } + return nil +} + +type EventInfo interface { +} + +type BaseEventInfo interface { + EventInfo +} + +type EventQueueInfo struct { + EventInfo + priority int + state string +} + +func NewEventQueueInfo(priority int) * EventQueueInfo { + info := &EventQueueInfo{ + priority: priority, + state: "queued", + } + + return info +} + +// Event is the interface that event tree nodes must implement +type Event interface { + GraphNode + Children() []Event + ChildInfo(event Event) EventInfo + Parent() Event + RegisterParent(parent Event) error + RequiredResources() []Resource + CreatedResources() []Resource + AddChild(child Event, info EventInfo) error + FindChild(id graphql.ID) Event +} + +// BaseEvent is the most basic event that can exist in the event tree. +// On start it automatically transitions to completion. +// It can optionally require events, which will all need to be locked to start it +// It can optionally create resources, which will be locked by default and unlocked on completion +// This node by itself doesn't implement any special behaviours for children, so they will be ignored. +// When starter, this event automatically transitions to completion and unlocks all it's resources(including created) +type BaseEvent struct { + BaseNode + created_resources []Resource + required_resources []Resource + children []Event + child_info map[Event]EventInfo + parent Event +} + +// EventQueue is a basic event that can have children. +// On start, it attempts to start it's children from the highest 'priority' +type EventQueue struct { + BaseEvent +} + +func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent, Resource) { + done_resource := NewResource("event_done", "signal that event is done", []Resource{}) + event := &BaseEvent{ + BaseNode: BaseNode{ + name: name, + description: description, + id: gql_randid(), + listeners: []chan error{}, + }, + parent: nil, + children: []Event{}, + child_info: map[Event]EventInfo{}, + created_resources: []Resource{done_resource}, + required_resources: required_resources, + } + + // Lock the done_resource by default + done_resource.Lock(event) + + return event, done_resource +} + +func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue, Resource) { + done_resource := NewResource("event_done", "signal that event is done", []Resource{}) + queue := &EventQueue{ + BaseEvent: BaseEvent{ + BaseNode: BaseNode{ + name: name, + description: description, + id: gql_randid(), + listeners: []chan error{}, + }, + parent: nil, + children: []Event{}, + child_info: map[Event]EventInfo{}, + created_resources: []Resource{done_resource}, + required_resources: required_resources, + }, + } + + done_resource.Lock(queue) + + return queue, done_resource +} + +// Store the nodes parent for upwards propagation of changes +func (event * BaseEvent) RegisterParent(parent Event) error{ + if event.parent != nil { + return errors.New("Parent already registered") + } + + event.parent = parent + return nil +} + +func (event * BaseEvent) Parent() Event { + return event.parent +} + +func (event * BaseEvent) RequiredResources() []Resource { + return event.required_resources +} + +func (event * BaseEvent) CreatedResources() []Resource { + return event.created_resources +} + +func (event * BaseEvent) Children() []Event { + return event.children +} + +func (event * BaseEvent) ChildInfo(idx Event) EventInfo { + val, ok := event.child_info[idx] + if ok == false { + return nil + } + return val +} + +func (event * BaseEvent) FindChild(id graphql.ID) Event { + if id == event.ID() { + return event + } + + for _, child := range event.Children() { + result := child.FindChild(id) + if result != nil { + return result + } + } + + return nil +} + +// Checks that the type of info is equal to EventQueueInfo +func (event * EventQueue) AddChild(child Event, info EventInfo) error { + if checkType(info, (*EventQueueInfo)(nil)) == false { + return errors.New("EventQueue.AddChild passed invalid type for info") + } + + return event.addChild(child, info) +} + +func (event * BaseEvent) addChild(child Event, info EventInfo) error { + err := child.RegisterParent(event) + if err != nil { + error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID()) + return errors.New(error_str) + } + + event.children = append(event.children, child) + event.child_info[child] = info + event.Update() + return nil +} + +// Overloaded function AddChild checks the info passed and calls the BaseEvent.addChild +func (event * BaseEvent) AddChild(child Event, info EventInfo) error { + if info != nil { + return errors.New("info must be nil for BaseEvent children") + } + + return event.addChild(child, info) +} + +func checkType(first interface{}, second interface{}) bool { + if first == nil || second == nil { + if first == nil && second == nil { + return true + } else { + return false + } + } + + first_type := reflect.TypeOf(first) + second_type := reflect.TypeOf(second) + + return first_type == second_type +} diff --git a/go.mod b/go.mod index 00808cc..b347aa7 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.20 require ( github.com/google/uuid v1.3.0 // indirect github.com/graph-gophers/graphql-go v1.5.0 // indirect + github.com/looplab/fsm v1.0.1 // indirect ) diff --git a/go.sum b/go.sum index a03f06c..c189241 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc= github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os= +github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU= +github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/graph.go b/graph.go index d39ac38..a727a53 100644 --- a/graph.go +++ b/graph.go @@ -1,13 +1,10 @@ package main import ( - "fmt" - "log" "errors" "sync" graphql "github.com/graph-gophers/graphql-go" "github.com/google/uuid" - "reflect" ) // Generate a random graphql id @@ -103,449 +100,3 @@ func (node * BaseNode) UpdateListeners() error { func (node * BaseNode) Update() error { return errors.New("Cannot Update a BaseNode") } - -// Resources propagate update up to multiple parents, and not downwards -// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) -func (resource * BaseResource) Update() error { - err := resource.UpdateListeners() - if err != nil { - return err - } - - for _, parent := range resource.Parents() { - err := parent.Update() - if err != nil { - return err - } - } - - if resource.lock_holder != nil { - resource.lock_holder.Update() - } - - return nil -} - -// Update the events listeners, and notify the parent to do the same -func (event * BaseEvent) Update() error { - err := event.UpdateListeners() - if err != nil { - return err - } - - if event.parent != nil{ - return event.parent.Update() - } - return nil -} - -// Resource is the interface that DAG nodes are made from -type Resource interface { - GraphNode - AddParent(parent Resource) error - Children() []Resource - Parents() []Resource - Lock(event Event) error - Unlock() error -} - -// BaseResource is the most basic resource that can exist in the DAG -// It holds a single state variable, which contains a pointer to the event that is locking it -type BaseResource struct { - BaseNode - parents []Resource - children []Resource - lock_holder Event - state_lock sync.Mutex -} - -// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children -// When the bottom of a tree is reached(no more children) go back up and set the lock state -func (resource * BaseResource) Lock(event Event) error { - var err error = nil - locked := false - - resource.state_lock.Lock() - if resource.lock_holder != nil { - err = errors.New("Resource already locked") - } else { - all_children_locked := true - for _, child := range resource.Children() { - err = child.Lock(event) - if err != nil { - all_children_locked = false - break - } - } - if all_children_locked == true { - resource.lock_holder = event - locked = true - } - } - resource.state_lock.Unlock() - - if locked == true { - resource.Update() - } - - return err -} - -// Recurse through children, unlocking until no more children -func (resource * BaseResource) Unlock() error { - var err error = nil - unlocked := false - - resource.state_lock.Lock() - if resource.lock_holder == nil { - err = errors.New("Resource already unlocked") - } else { - all_children_unlocked := true - for _, child := range resource.Children() { - err = child.Unlock() - if err != nil { - all_children_unlocked = false - break - } - } - if all_children_unlocked == true{ - resource.lock_holder = nil - unlocked = true - } - } - resource.state_lock.Unlock() - - if unlocked == true { - resource.Update() - } - - return err -} - -func (resource * BaseResource) Children() []Resource { - return resource.children -} - -func (resource * BaseResource) Parents() []Resource { - return resource.parents -} - -// Add a parent to a DAG node -func (resource * BaseResource) AddParent(parent Resource) error { - // Don't add self as parent - if parent.ID() == resource.ID() { - error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID()) - return errors.New(error_str) - } - - // Don't add parent if it's already a parent - for _, p := range resource.parents { - if p.ID() == parent.ID() { - error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID()) - return errors.New(error_str) - } - } - - // Add the parent - resource.parents = append(resource.parents, parent) - return nil -} - -type EventInfo interface {} - -type BaseEventInfo struct { - EventInfo -} - -type EventQueueInfo struct { - EventInfo - priority int -} - -func (info * EventQueueInfo) Priority() int { - return info.priority -} - -// Event is the interface that event tree nodes must implement -type Event interface { - GraphNode - Children() []Event - ChildInfo() []EventInfo - ChildInfoType() reflect.Type - Parent() Event - RegisterParent(parent Event) error - RequiredResources() []Resource - CreatedResources() []Resource - AddChild(child Event, info EventInfo) error - FindChild(id graphql.ID) Event -} - -// BaseEvent is the most basic event that can exist in the event tree. -// On start it automatically transitions to completion. -// It can optionally require events, which will all need to be locked to start it -// It can optionally create resources, which will be locked by default and unlocked on completion -// This node by itself doesn't implement any special behaviours for children, so they will be ignored. -// When starter, this event automatically transitions to completion and unlocks all it's resources(including created) -type BaseEvent struct { - BaseNode - locked_resources []Resource - created_resources []Resource - required_resources []Resource - children []Event - child_info []EventInfo - child_info_type reflect.Type - parent Event -} - -// EventQueue is a basic event that can have children. -// On start, it attempts to start it's children from the highest 'priority' -type EventQueue struct { - BaseEvent -} - -func NewResource(name string, description string, children []Resource) * BaseResource { - resource := &BaseResource{ - BaseNode: BaseNode{ - name: name, - description: description, - id: gql_randid(), - listeners: []chan error{}, - }, - parents: []Resource{}, - children: children, - } - - return resource -} - -func NewEvent(name string, description string, required_resources []Resource) * BaseEvent { - event := &BaseEvent{ - BaseNode: BaseNode{ - name: name, - description: description, - id: gql_randid(), - listeners: []chan error{}, - }, - parent: nil, - children: []Event{}, - child_info: []EventInfo{}, - child_info_type: reflect.TypeOf((*BaseEventInfo)(nil)).Elem(), - locked_resources: []Resource{}, - created_resources: []Resource{}, - required_resources: required_resources, - } - - return event -} - -func NewEventQueue(name string, description string, required_resources []Resource) * EventQueue { - queue := &EventQueue{ - BaseEvent: BaseEvent{ - BaseNode: BaseNode{ - name: name, - description: description, - id: gql_randid(), - listeners: []chan error{}, - }, - parent: nil, - children: []Event{}, - child_info: []EventInfo{}, - child_info_type: reflect.TypeOf((*EventQueueInfo)(nil)).Elem(), - locked_resources: []Resource{}, - created_resources: []Resource{}, - required_resources: required_resources, - }, - } - - return queue -} - -// Store the nodes parent for upwards propagation of changes -func (event * BaseEvent) RegisterParent(parent Event) error{ - if event.parent != nil { - return errors.New("Parent already registered") - } - - event.parent = parent - return nil -} - -func (event * BaseEvent) Parent() Event { - return event.parent -} - -func (event * BaseEvent) RequiredResources() []Resource { - return event.required_resources -} - -func (event * BaseEvent) CreatedResources() []Resource { - return event.created_resources -} - -func (event * BaseEvent) Children() []Event { - return event.children -} - -func (event * BaseEvent) ChildInfo() []EventInfo { - return event.child_info -} - -func (event * BaseEvent) ChildInfoType() reflect.Type { - return event.child_info_type -} - -func (event * BaseEvent) FindChild(id graphql.ID) Event { - if id == event.ID() { - return event - } - - for _, child := range event.Children() { - result := child.FindChild(id) - if result != nil { - return result - } - } - - return nil -} - -func (event * BaseEvent) AddChild(child Event, info EventInfo) error { - if info == nil { - return errors.New("info cannot be nil in AddChild") - } - - child_info_type := reflect.TypeOf(info).Elem() - event_info_type := event.ChildInfoType() - if child_info_type != event_info_type { - error_str := fmt.Sprintf("BaseEvent only supports child_info of type %s, not %s", child_info_type.Name(), event_info_type.Name()) - return errors.New(error_str) - } - - err := child.RegisterParent(event) - if err != nil { - error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID()) - return errors.New(error_str) - } - - event.children = append(event.children, child) - event.child_info = append(event.child_info, info) - return nil -} - -type EventManager struct { - dag_nodes map[graphql.ID]Resource - root_event Event -} - -// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(no children first) -func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { - - manager := &EventManager{ - dag_nodes: map[graphql.ID]Resource{}, - root_event: nil, - } - - // Construct the DAG - for _, resource := range dag_nodes { - err := manager.AddResource(resource) - if err != nil { - log.Printf("Failed to add %s to EventManager: %s", resource.ID(), err) - return nil - } - } - - manager.AddEvent(nil, root_event, nil) - - return manager; -} - -func (manager * EventManager) FindResource(id graphql.ID) Resource { - resource, exists := manager.dag_nodes[id] - if exists == false { - return nil - } - - return resource -} - -func (manager * EventManager) FindEvent(id graphql.ID) Event { - event := manager.root_event.FindChild(id) - - return event -} - -func (manager * EventManager) AddResource(resource Resource) error { - _, exists := manager.dag_nodes[resource.ID()] - if exists == true { - error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.ID()) - return errors.New(error_str) - } - - for _, child := range resource.Children() { - _, exists := manager.dag_nodes[child.ID()] - if exists == false { - error_str := fmt.Sprintf("%s is not in the resource DAG, cannot add %s to DAG", child.ID(), resource.ID()) - return errors.New(error_str) - } - } - manager.dag_nodes[resource.ID()] = resource - for _, child := range resource.Children() { - child.AddParent(resource) - } - return nil -} - -// Check that the node doesn't already exist in the tree -// Check the the selected parent exists in the tree -// Check that required resources exist in the DAG -// Check that created resources don't exist in the DAG -// Add resources created by the event to the DAG -// Add child to parent -func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error { - if child == nil { - return errors.New("Cannot add nil Event to EventManager") - } else if len(child.Children()) != 0 { - return errors.New("Adding events recursively not implemented") - } - - for _, resource := range child.RequiredResources() { - _, exists := manager.dag_nodes[resource.ID()] - if exists == false { - error_str := fmt.Sprintf("Required resource %s not in DAG, cannot add event %s", resource.ID(), child.ID()) - return errors.New(error_str) - } - } - - for _, resource := range child.CreatedResources() { - _, exists := manager.dag_nodes[resource.ID()] - if exists == true { - error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID()) - return errors.New(error_str) - } - } - - if manager.root_event == nil && parent != nil { - error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent") - return errors.New(error_str) - } else if manager.root_event != nil && parent == nil { - // TODO - return errors.New("Replacing root event not implemented") - } else if manager.root_event == nil && parent == nil { - manager.root_event = child - return nil; - } else { - if manager.root_event.FindChild(parent.ID()) == nil { - error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID()) - return errors.New(error_str) - } - - if manager.root_event.FindChild(child.ID()) != nil { - error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID()) - return errors.New(error_str) - } - - return parent.AddChild(child, info) - } -} - - diff --git a/graph_test.go b/graph_test.go index 68d9c7f..de59d4b 100644 --- a/graph_test.go +++ b/graph_test.go @@ -3,6 +3,7 @@ package main import ( "testing" "time" + "context" ) type graph_tester testing.T @@ -50,7 +51,7 @@ func TestNewResourceAdd(t *testing.T) { description := "A resource for testing" children := []Resource{} - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) test_resource := NewResource(name, description, children) event_manager := NewEventManager(root_event, []Resource{test_resource}) res := event_manager.FindResource(test_resource.ID()) @@ -65,7 +66,7 @@ func TestNewResourceAdd(t *testing.T) { } func TestDoubleResourceAdd(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) test_resource := NewResource("", "", []Resource{}) event_manager := NewEventManager(root_event, []Resource{test_resource}) err := event_manager.AddResource(test_resource) @@ -76,7 +77,7 @@ func TestDoubleResourceAdd(t * testing.T) { } func TestMissingResourceAdd(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) @@ -88,7 +89,7 @@ func TestMissingResourceAdd(t * testing.T) { } func TestTieredResource(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) @@ -99,7 +100,7 @@ func TestTieredResource(t * testing.T) { } func TestResourceUpdate(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{}) r3 := NewResource("r3", "", []Resource{r1, r2}) @@ -138,14 +139,14 @@ func TestResourceUpdate(t * testing.T) { } func TestAddEvent(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{r1}) name := "Test Event" description := "A test event" resources := []Resource{r2} - new_event := NewEvent(name, description, resources) + new_event, _ := NewEvent(name, description, resources) event_manager := NewEventManager(root_event, []Resource{r1}) err := event_manager.AddResource(r2) @@ -153,7 +154,7 @@ func TestAddEvent(t * testing.T) { t.Fatal("Failed to add r2 to event_manager") } - err = event_manager.AddEvent(root_event, new_event, (*BaseEventInfo)(nil)) + err = event_manager.AddEvent(root_event, new_event, nil) if err != nil { t.Fatalf("Failed to add new_event to root_event: %s", err) } @@ -176,7 +177,8 @@ func TestAddEvent(t * testing.T) { } func TestLockResource(t * testing.T) { - root_event := NewEvent("", "", []Resource{}) + root_event, _ := NewEvent("", "", []Resource{}) + test_event, _ := NewEvent("", "", []Resource{}) r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{}) r3 := NewResource("r3", "", []Resource{r1, r2}) @@ -213,7 +215,12 @@ func TestLockResource(t * testing.T) { t.Fatal("Locked r1 after locking r3") } - err = r3.Unlock() + err = r3.Unlock(test_event) + if err == nil { + t.Fatal("Unlocked r3 with event that didn't lock it") + } + + err = r3.Unlock(root_event) if err != nil { t.Fatal("Failed to unlock r3") } @@ -227,7 +234,7 @@ func TestLockResource(t * testing.T) { (*graph_tester)(t).CheckForNil(r1_l) (*graph_tester)(t).CheckForNil(rel) - err = r4.Unlock() + err = r4.Unlock(root_event) if err != nil { t.Fatal("Failed to unlock r4") } @@ -236,21 +243,110 @@ func TestLockResource(t * testing.T) { } func TestAddToEventQueue(t * testing.T) { - queue := NewEventQueue("q", "", []Resource{}) - new_event := NewEvent("1", "", []Resource{}) + queue, _ := NewEventQueue("q", "", []Resource{}) + event_1, _ := NewEvent("1", "", []Resource{}) + event_2, _ := NewEvent("2", "", []Resource{}) - err := queue.AddChild(new_event, (*BaseEventInfo)(nil)) + err := queue.AddChild(event_1, nil) if err == nil { - t.Fatal("suceeded in added BaseEventInfo to queue") + t.Fatal("suceeded in added nil info to queue") } - err = queue.AddChild(new_event, nil) - if err == nil { - t.Fatal("suceeded in added nil info to queue") + err = queue.AddChild(event_1, &EventQueueInfo{priority: 0}) + if err != nil { + t.Fatal("failed to add valid event + info to queue") } - err = queue.AddChild(new_event, &EventQueueInfo{priority: 0}) + err = queue.AddChild(event_2, &EventQueueInfo{priority: 1}) if err != nil { t.Fatal("failed to add valid event + info to queue") } } + +func TestStartBaseEvent(t * testing.T) { + event_1, r := NewEvent("1", "", []Resource{}) + manager := NewEventManager(event_1, []Resource{}) + + e_l := event_1.UpdateChannel() + r_l := r.UpdateChannel() + (*graph_tester)(t).CheckForNone(e_l) + (*graph_tester)(t).CheckForNone(r_l) + + if r.Owner() != event_1 { + t.Fatal("r is not owned by event_1") + } + + err := manager.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + // Check that the update channels for the event and resource have updates + (*graph_tester)(t).CheckForNil(e_l) + (*graph_tester)(t).CheckForNil(r_l) + + if r.Owner() != nil { + t.Fatal("r still owned after event completed") + } +} + +func TestStartEventQueue(t * testing.T) { + root_event, r := NewEventQueue("", "", []Resource{}) + rel := root_event.UpdateChannel(); + manager := NewEventManager(root_event, []Resource{}) + + e1, e1_r := NewEvent("1", "", []Resource{}) + e1_info := NewEventQueueInfo(1) + err := manager.AddEvent(root_event, e1, e1_info) + if err != nil { + t.Fatal("Failed to add e1 to manager") + } + (*graph_tester)(t).CheckForNil(rel) + + e2, e2_r := NewEvent("1", "", []Resource{}) + e2_info := NewEventQueueInfo(2) + err = manager.AddEvent(root_event, e2, e2_info) + if err != nil { + t.Fatal("Failed to add e2 to manager") + } + (*graph_tester)(t).CheckForNil(rel) + + e3, e3_r := NewEvent("1", "", []Resource{}) + e3_info := NewEventQueueInfo(3) + err = manager.AddEvent(root_event, e3, e3_info) + if err != nil { + t.Fatal("Failed to add e3 to manager") + } + (*graph_tester)(t).CheckForNil(rel) + + e1_l := e1.UpdateChannel(); + e2_l := e2.UpdateChannel(); + e3_l := e3.UpdateChannel(); + + // Now that an event manager is constructed with a queue and 3 basic events + // start the queue and check that all the events are executed + err = manager.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + time.Sleep( 5 * time.Second) + + if r.Owner() != nil { + t.Fatal("root event was not finished after starting") + } + + if e1_r.Owner() != nil { + t.Fatal("e1 was not completed") + } + (*graph_tester)(t).CheckForNil(e1_l) + + if e2_r.Owner() != nil { + t.Fatal("e2 was not completed") + } + (*graph_tester)(t).CheckForNil(e2_l) + + if e3_r.Owner() != nil { + t.Fatal("e3 was not completed") + } + (*graph_tester)(t).CheckForNil(e3_l) +} diff --git a/main.go b/main.go index 6f85c7c..97a2aba 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ func fake_data() * EventManager { } } - root_event := NewEvent("root_event", "", []Resource{}) + root_event, _ := NewEventQueue("root_event", "", []Resource{}) event_manager := NewEventManager(root_event, resources) diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..b4a763f --- /dev/null +++ b/manager.go @@ -0,0 +1,132 @@ +package main + +import ( + "fmt" + "log" + "errors" + graphql "github.com/graph-gophers/graphql-go" + "context" +) + +type EventManager struct { + dag_nodes map[graphql.ID]Resource + root_event Event +} + +// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(no children first) +func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { + + manager := &EventManager{ + dag_nodes: map[graphql.ID]Resource{}, + root_event: nil, + } + + // Construct the DAG + for _, resource := range dag_nodes { + err := manager.AddResource(resource) + if err != nil { + log.Printf("Failed to add %s to EventManager: %s", resource.ID(), err) + return nil + } + } + + manager.AddEvent(nil, root_event, nil) + + return manager; +} + +func (manager * EventManager) Run(ctx context.Context) error { + //return manager.root_event.Run(ctx) + return nil +} + +func (manager * EventManager) FindResource(id graphql.ID) Resource { + resource, exists := manager.dag_nodes[id] + if exists == false { + return nil + } + + return resource +} + +func (manager * EventManager) FindEvent(id graphql.ID) Event { + event := manager.root_event.FindChild(id) + + return event +} + +func (manager * EventManager) AddResource(resource Resource) error { + _, exists := manager.dag_nodes[resource.ID()] + if exists == true { + error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.ID()) + return errors.New(error_str) + } + + for _, child := range resource.Children() { + _, exists := manager.dag_nodes[child.ID()] + if exists == false { + error_str := fmt.Sprintf("%s is not in the resource DAG, cannot add %s to DAG", child.ID(), resource.ID()) + return errors.New(error_str) + } + } + manager.dag_nodes[resource.ID()] = resource + for _, child := range resource.Children() { + child.AddParent(resource) + } + return nil +} + +// Check that the node doesn't already exist in the tree +// Check the the selected parent exists in the tree +// Check that required resources exist in the DAG +// Check that created resources don't exist in the DAG +// Add resources created by the event to the DAG +// Add child to parent +func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error { + if child == nil { + return errors.New("Cannot add nil Event to EventManager") + } else if len(child.Children()) != 0 { + return errors.New("Adding events recursively not implemented") + } + + for _, resource := range child.RequiredResources() { + _, exists := manager.dag_nodes[resource.ID()] + if exists == false { + error_str := fmt.Sprintf("Required resource %s not in DAG, cannot add event %s", resource.ID(), child.ID()) + return errors.New(error_str) + } + } + + for _, resource := range child.CreatedResources() { + _, exists := manager.dag_nodes[resource.ID()] + if exists == true { + error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID()) + return errors.New(error_str) + } + manager.AddResource(resource) + } + + if manager.root_event == nil && parent != nil { + error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent") + return errors.New(error_str) + } else if manager.root_event != nil && parent == nil { + // TODO + return errors.New("Replacing root event not implemented") + } else if manager.root_event == nil && parent == nil { + manager.root_event = child + return nil; + } else { + if manager.root_event.FindChild(parent.ID()) == nil { + error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID()) + return errors.New(error_str) + } + + if manager.root_event.FindChild(child.ID()) != nil { + error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID()) + return errors.New(error_str) + } + return parent.AddChild(child, info) + } +} + + diff --git a/resource.go b/resource.go new file mode 100644 index 0000000..ef6c636 --- /dev/null +++ b/resource.go @@ -0,0 +1,164 @@ +package main + +import ( + "fmt" + "errors" + "sync" +) + +// Resources propagate update up to multiple parents, and not downwards +// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) +func (resource * BaseResource) Update() error { + err := resource.UpdateListeners() + if err != nil { + return err + } + + for _, parent := range resource.Parents() { + err := parent.Update() + if err != nil { + return err + } + } + + if resource.lock_holder != nil { + resource.lock_holder.Update() + } + + return nil +} + +// Resource is the interface that DAG nodes are made from +type Resource interface { + GraphNode + AddParent(parent Resource) error + Children() []Resource + Parents() []Resource + Lock(event Event) error + Unlock(event Event) error + Owner() Event +} + +// BaseResource is the most basic resource that can exist in the DAG +// It holds a single state variable, which contains a pointer to the event that is locking it +type BaseResource struct { + BaseNode + parents []Resource + children []Resource + lock_holder Event + state_lock sync.Mutex +} + +func (resource * BaseResource) Owner() Event { + return resource.lock_holder +} + +// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children +// When the bottom of a tree is reached(no more children) go back up and set the lock state +func (resource * BaseResource) Lock(event Event) error { + var err error = nil + locked := false + + resource.state_lock.Lock() + if resource.lock_holder != nil { + err = errors.New("Resource already locked") + } else { + all_children_locked := true + for _, child := range resource.Children() { + err = child.Lock(event) + if err != nil { + all_children_locked = false + break + } + } + if all_children_locked == true { + resource.lock_holder = event + locked = true + } + } + resource.state_lock.Unlock() + + if locked == true { + resource.Update() + } + + return err +} + +// Recurse through children, unlocking until no more children +// If the child isn't locked by the unlocker +func (resource * BaseResource) Unlock(event Event) error { + var err error = nil + unlocked := false + + resource.state_lock.Lock() + if resource.lock_holder == nil { + err = errors.New("Resource already unlocked") + } else if resource.lock_holder != event { + err = errors.New("Resource not locked by parent, can't unlock") + } else { + all_children_unlocked := true + for _, child := range resource.Children() { + err = child.Unlock(event) + if err != nil { + all_children_unlocked = false + break + } + } + if all_children_unlocked == true{ + resource.lock_holder = nil + unlocked = true + } + } + resource.state_lock.Unlock() + + if unlocked == true { + resource.Update() + } + + return err +} + +func (resource * BaseResource) Children() []Resource { + return resource.children +} + +func (resource * BaseResource) Parents() []Resource { + return resource.parents +} + +// Add a parent to a DAG node +func (resource * BaseResource) AddParent(parent Resource) error { + // Don't add self as parent + if parent.ID() == resource.ID() { + error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID()) + return errors.New(error_str) + } + + // Don't add parent if it's already a parent + for _, p := range resource.parents { + if p.ID() == parent.ID() { + error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID()) + return errors.New(error_str) + } + } + + // Add the parent + resource.parents = append(resource.parents, parent) + return nil +} + +func NewResource(name string, description string, children []Resource) * BaseResource { + resource := &BaseResource{ + BaseNode: BaseNode{ + name: name, + description: description, + id: gql_randid(), + listeners: []chan error{}, + }, + parents: []Resource{}, + children: children, + } + + return resource +}