Split to seperate files and removed FSM libary to prepare for embedded callback mechanism

graph-rework
noah metz 2023-05-29 19:17:52 -06:00
parent c06ff16fc9
commit 11bf1e5344
8 changed files with 632 additions and 469 deletions

@ -0,0 +1,217 @@
package main
import (
"fmt"
"errors"
graphql "github.com/graph-gophers/graphql-go"
"reflect"
)
// Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) Update() error {
err := event.UpdateListeners()
if err != nil {
return err
}
if event.parent != nil{
return event.parent.Update()
}
return nil
}
type EventInfo interface {
}
type BaseEventInfo interface {
EventInfo
}
type EventQueueInfo struct {
EventInfo
priority int
state string
}
func NewEventQueueInfo(priority int) * EventQueueInfo {
info := &EventQueueInfo{
priority: priority,
state: "queued",
}
return info
}
// Event is the interface that event tree nodes must implement
type Event interface {
GraphNode
Children() []Event
ChildInfo(event Event) EventInfo
Parent() Event
RegisterParent(parent Event) error
RequiredResources() []Resource
CreatedResources() []Resource
AddChild(child Event, info EventInfo) error
FindChild(id graphql.ID) Event
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
created_resources []Resource
required_resources []Resource
children []Event
child_info map[Event]EventInfo
parent Event
}
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
}
func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent, Resource) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
event := &BaseEvent{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parent: nil,
children: []Event{},
child_info: map[Event]EventInfo{},
created_resources: []Resource{done_resource},
required_resources: required_resources,
}
// Lock the done_resource by default
done_resource.Lock(event)
return event, done_resource
}
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue, Resource) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
queue := &EventQueue{
BaseEvent: BaseEvent{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parent: nil,
children: []Event{},
child_info: map[Event]EventInfo{},
created_resources: []Resource{done_resource},
required_resources: required_resources,
},
}
done_resource.Lock(queue)
return queue, done_resource
}
// Store the nodes parent for upwards propagation of changes
func (event * BaseEvent) RegisterParent(parent Event) error{
if event.parent != nil {
return errors.New("Parent already registered")
}
event.parent = parent
return nil
}
func (event * BaseEvent) Parent() Event {
return event.parent
}
func (event * BaseEvent) RequiredResources() []Resource {
return event.required_resources
}
func (event * BaseEvent) CreatedResources() []Resource {
return event.created_resources
}
func (event * BaseEvent) Children() []Event {
return event.children
}
func (event * BaseEvent) ChildInfo(idx Event) EventInfo {
val, ok := event.child_info[idx]
if ok == false {
return nil
}
return val
}
func (event * BaseEvent) FindChild(id graphql.ID) Event {
if id == event.ID() {
return event
}
for _, child := range event.Children() {
result := child.FindChild(id)
if result != nil {
return result
}
}
return nil
}
// Checks that the type of info is equal to EventQueueInfo
func (event * EventQueue) AddChild(child Event, info EventInfo) error {
if checkType(info, (*EventQueueInfo)(nil)) == false {
return errors.New("EventQueue.AddChild passed invalid type for info")
}
return event.addChild(child, info)
}
func (event * BaseEvent) addChild(child Event, info EventInfo) error {
err := child.RegisterParent(event)
if err != nil {
error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID())
return errors.New(error_str)
}
event.children = append(event.children, child)
event.child_info[child] = info
event.Update()
return nil
}
// Overloaded function AddChild checks the info passed and calls the BaseEvent.addChild
func (event * BaseEvent) AddChild(child Event, info EventInfo) error {
if info != nil {
return errors.New("info must be nil for BaseEvent children")
}
return event.addChild(child, info)
}
func checkType(first interface{}, second interface{}) bool {
if first == nil || second == nil {
if first == nil && second == nil {
return true
} else {
return false
}
}
first_type := reflect.TypeOf(first)
second_type := reflect.TypeOf(second)
return first_type == second_type
}

@ -5,4 +5,5 @@ go 1.20
require (
github.com/google/uuid v1.3.0 // indirect
github.com/graph-gophers/graphql-go v1.5.0 // indirect
github.com/looplab/fsm v1.0.1 // indirect
)

@ -7,6 +7,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc=
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

@ -1,13 +1,10 @@
package main
import (
"fmt"
"log"
"errors"
"sync"
graphql "github.com/graph-gophers/graphql-go"
"github.com/google/uuid"
"reflect"
)
// Generate a random graphql id
@ -103,449 +100,3 @@ func (node * BaseNode) UpdateListeners() error {
func (node * BaseNode) Update() error {
return errors.New("Cannot Update a BaseNode")
}
// Resources propagate update up to multiple parents, and not downwards
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
func (resource * BaseResource) Update() error {
err := resource.UpdateListeners()
if err != nil {
return err
}
for _, parent := range resource.Parents() {
err := parent.Update()
if err != nil {
return err
}
}
if resource.lock_holder != nil {
resource.lock_holder.Update()
}
return nil
}
// Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) Update() error {
err := event.UpdateListeners()
if err != nil {
return err
}
if event.parent != nil{
return event.parent.Update()
}
return nil
}
// Resource is the interface that DAG nodes are made from
type Resource interface {
GraphNode
AddParent(parent Resource) error
Children() []Resource
Parents() []Resource
Lock(event Event) error
Unlock() error
}
// BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct {
BaseNode
parents []Resource
children []Resource
lock_holder Event
state_lock sync.Mutex
}
// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children
// When the bottom of a tree is reached(no more children) go back up and set the lock state
func (resource * BaseResource) Lock(event Event) error {
var err error = nil
locked := false
resource.state_lock.Lock()
if resource.lock_holder != nil {
err = errors.New("Resource already locked")
} else {
all_children_locked := true
for _, child := range resource.Children() {
err = child.Lock(event)
if err != nil {
all_children_locked = false
break
}
}
if all_children_locked == true {
resource.lock_holder = event
locked = true
}
}
resource.state_lock.Unlock()
if locked == true {
resource.Update()
}
return err
}
// Recurse through children, unlocking until no more children
func (resource * BaseResource) Unlock() error {
var err error = nil
unlocked := false
resource.state_lock.Lock()
if resource.lock_holder == nil {
err = errors.New("Resource already unlocked")
} else {
all_children_unlocked := true
for _, child := range resource.Children() {
err = child.Unlock()
if err != nil {
all_children_unlocked = false
break
}
}
if all_children_unlocked == true{
resource.lock_holder = nil
unlocked = true
}
}
resource.state_lock.Unlock()
if unlocked == true {
resource.Update()
}
return err
}
func (resource * BaseResource) Children() []Resource {
return resource.children
}
func (resource * BaseResource) Parents() []Resource {
return resource.parents
}
// Add a parent to a DAG node
func (resource * BaseResource) AddParent(parent Resource) error {
// Don't add self as parent
if parent.ID() == resource.ID() {
error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID())
return errors.New(error_str)
}
// Don't add parent if it's already a parent
for _, p := range resource.parents {
if p.ID() == parent.ID() {
error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID())
return errors.New(error_str)
}
}
// Add the parent
resource.parents = append(resource.parents, parent)
return nil
}
type EventInfo interface {}
type BaseEventInfo struct {
EventInfo
}
type EventQueueInfo struct {
EventInfo
priority int
}
func (info * EventQueueInfo) Priority() int {
return info.priority
}
// Event is the interface that event tree nodes must implement
type Event interface {
GraphNode
Children() []Event
ChildInfo() []EventInfo
ChildInfoType() reflect.Type
Parent() Event
RegisterParent(parent Event) error
RequiredResources() []Resource
CreatedResources() []Resource
AddChild(child Event, info EventInfo) error
FindChild(id graphql.ID) Event
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
locked_resources []Resource
created_resources []Resource
required_resources []Resource
children []Event
child_info []EventInfo
child_info_type reflect.Type
parent Event
}
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
}
func NewResource(name string, description string, children []Resource) * BaseResource {
resource := &BaseResource{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parents: []Resource{},
children: children,
}
return resource
}
func NewEvent(name string, description string, required_resources []Resource) * BaseEvent {
event := &BaseEvent{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parent: nil,
children: []Event{},
child_info: []EventInfo{},
child_info_type: reflect.TypeOf((*BaseEventInfo)(nil)).Elem(),
locked_resources: []Resource{},
created_resources: []Resource{},
required_resources: required_resources,
}
return event
}
func NewEventQueue(name string, description string, required_resources []Resource) * EventQueue {
queue := &EventQueue{
BaseEvent: BaseEvent{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parent: nil,
children: []Event{},
child_info: []EventInfo{},
child_info_type: reflect.TypeOf((*EventQueueInfo)(nil)).Elem(),
locked_resources: []Resource{},
created_resources: []Resource{},
required_resources: required_resources,
},
}
return queue
}
// Store the nodes parent for upwards propagation of changes
func (event * BaseEvent) RegisterParent(parent Event) error{
if event.parent != nil {
return errors.New("Parent already registered")
}
event.parent = parent
return nil
}
func (event * BaseEvent) Parent() Event {
return event.parent
}
func (event * BaseEvent) RequiredResources() []Resource {
return event.required_resources
}
func (event * BaseEvent) CreatedResources() []Resource {
return event.created_resources
}
func (event * BaseEvent) Children() []Event {
return event.children
}
func (event * BaseEvent) ChildInfo() []EventInfo {
return event.child_info
}
func (event * BaseEvent) ChildInfoType() reflect.Type {
return event.child_info_type
}
func (event * BaseEvent) FindChild(id graphql.ID) Event {
if id == event.ID() {
return event
}
for _, child := range event.Children() {
result := child.FindChild(id)
if result != nil {
return result
}
}
return nil
}
func (event * BaseEvent) AddChild(child Event, info EventInfo) error {
if info == nil {
return errors.New("info cannot be nil in AddChild")
}
child_info_type := reflect.TypeOf(info).Elem()
event_info_type := event.ChildInfoType()
if child_info_type != event_info_type {
error_str := fmt.Sprintf("BaseEvent only supports child_info of type %s, not %s", child_info_type.Name(), event_info_type.Name())
return errors.New(error_str)
}
err := child.RegisterParent(event)
if err != nil {
error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID())
return errors.New(error_str)
}
event.children = append(event.children, child)
event.child_info = append(event.child_info, info)
return nil
}
type EventManager struct {
dag_nodes map[graphql.ID]Resource
root_event Event
}
// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(no children first)
func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
manager := &EventManager{
dag_nodes: map[graphql.ID]Resource{},
root_event: nil,
}
// Construct the DAG
for _, resource := range dag_nodes {
err := manager.AddResource(resource)
if err != nil {
log.Printf("Failed to add %s to EventManager: %s", resource.ID(), err)
return nil
}
}
manager.AddEvent(nil, root_event, nil)
return manager;
}
func (manager * EventManager) FindResource(id graphql.ID) Resource {
resource, exists := manager.dag_nodes[id]
if exists == false {
return nil
}
return resource
}
func (manager * EventManager) FindEvent(id graphql.ID) Event {
event := manager.root_event.FindChild(id)
return event
}
func (manager * EventManager) AddResource(resource Resource) error {
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.ID())
return errors.New(error_str)
}
for _, child := range resource.Children() {
_, exists := manager.dag_nodes[child.ID()]
if exists == false {
error_str := fmt.Sprintf("%s is not in the resource DAG, cannot add %s to DAG", child.ID(), resource.ID())
return errors.New(error_str)
}
}
manager.dag_nodes[resource.ID()] = resource
for _, child := range resource.Children() {
child.AddParent(resource)
}
return nil
}
// Check that the node doesn't already exist in the tree
// Check the the selected parent exists in the tree
// Check that required resources exist in the DAG
// Check that created resources don't exist in the DAG
// Add resources created by the event to the DAG
// Add child to parent
func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error {
if child == nil {
return errors.New("Cannot add nil Event to EventManager")
} else if len(child.Children()) != 0 {
return errors.New("Adding events recursively not implemented")
}
for _, resource := range child.RequiredResources() {
_, exists := manager.dag_nodes[resource.ID()]
if exists == false {
error_str := fmt.Sprintf("Required resource %s not in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
}
for _, resource := range child.CreatedResources() {
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
}
if manager.root_event == nil && parent != nil {
error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent")
return errors.New(error_str)
} else if manager.root_event != nil && parent == nil {
// TODO
return errors.New("Replacing root event not implemented")
} else if manager.root_event == nil && parent == nil {
manager.root_event = child
return nil;
} else {
if manager.root_event.FindChild(parent.ID()) == nil {
error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID())
return errors.New(error_str)
}
if manager.root_event.FindChild(child.ID()) != nil {
error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID())
return errors.New(error_str)
}
return parent.AddChild(child, info)
}
}

@ -3,6 +3,7 @@ package main
import (
"testing"
"time"
"context"
)
type graph_tester testing.T
@ -50,7 +51,7 @@ func TestNewResourceAdd(t *testing.T) {
description := "A resource for testing"
children := []Resource{}
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
test_resource := NewResource(name, description, children)
event_manager := NewEventManager(root_event, []Resource{test_resource})
res := event_manager.FindResource(test_resource.ID())
@ -65,7 +66,7 @@ func TestNewResourceAdd(t *testing.T) {
}
func TestDoubleResourceAdd(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
test_resource := NewResource("", "", []Resource{})
event_manager := NewEventManager(root_event, []Resource{test_resource})
err := event_manager.AddResource(test_resource)
@ -76,7 +77,7 @@ func TestDoubleResourceAdd(t * testing.T) {
}
func TestMissingResourceAdd(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
@ -88,7 +89,7 @@ func TestMissingResourceAdd(t * testing.T) {
}
func TestTieredResource(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
@ -99,7 +100,7 @@ func TestTieredResource(t * testing.T) {
}
func TestResourceUpdate(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
@ -138,14 +139,14 @@ func TestResourceUpdate(t * testing.T) {
}
func TestAddEvent(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
name := "Test Event"
description := "A test event"
resources := []Resource{r2}
new_event := NewEvent(name, description, resources)
new_event, _ := NewEvent(name, description, resources)
event_manager := NewEventManager(root_event, []Resource{r1})
err := event_manager.AddResource(r2)
@ -153,7 +154,7 @@ func TestAddEvent(t * testing.T) {
t.Fatal("Failed to add r2 to event_manager")
}
err = event_manager.AddEvent(root_event, new_event, (*BaseEventInfo)(nil))
err = event_manager.AddEvent(root_event, new_event, nil)
if err != nil {
t.Fatalf("Failed to add new_event to root_event: %s", err)
}
@ -176,7 +177,8 @@ func TestAddEvent(t * testing.T) {
}
func TestLockResource(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
root_event, _ := NewEvent("", "", []Resource{})
test_event, _ := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
@ -213,7 +215,12 @@ func TestLockResource(t * testing.T) {
t.Fatal("Locked r1 after locking r3")
}
err = r3.Unlock()
err = r3.Unlock(test_event)
if err == nil {
t.Fatal("Unlocked r3 with event that didn't lock it")
}
err = r3.Unlock(root_event)
if err != nil {
t.Fatal("Failed to unlock r3")
}
@ -227,7 +234,7 @@ func TestLockResource(t * testing.T) {
(*graph_tester)(t).CheckForNil(r1_l)
(*graph_tester)(t).CheckForNil(rel)
err = r4.Unlock()
err = r4.Unlock(root_event)
if err != nil {
t.Fatal("Failed to unlock r4")
}
@ -236,21 +243,110 @@ func TestLockResource(t * testing.T) {
}
func TestAddToEventQueue(t * testing.T) {
queue := NewEventQueue("q", "", []Resource{})
new_event := NewEvent("1", "", []Resource{})
queue, _ := NewEventQueue("q", "", []Resource{})
event_1, _ := NewEvent("1", "", []Resource{})
event_2, _ := NewEvent("2", "", []Resource{})
err := queue.AddChild(new_event, (*BaseEventInfo)(nil))
err := queue.AddChild(event_1, nil)
if err == nil {
t.Fatal("suceeded in added BaseEventInfo to queue")
t.Fatal("suceeded in added nil info to queue")
}
err = queue.AddChild(new_event, nil)
if err == nil {
t.Fatal("suceeded in added nil info to queue")
err = queue.AddChild(event_1, &EventQueueInfo{priority: 0})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
err = queue.AddChild(new_event, &EventQueueInfo{priority: 0})
err = queue.AddChild(event_2, &EventQueueInfo{priority: 1})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
}
func TestStartBaseEvent(t * testing.T) {
event_1, r := NewEvent("1", "", []Resource{})
manager := NewEventManager(event_1, []Resource{})
e_l := event_1.UpdateChannel()
r_l := r.UpdateChannel()
(*graph_tester)(t).CheckForNone(e_l)
(*graph_tester)(t).CheckForNone(r_l)
if r.Owner() != event_1 {
t.Fatal("r is not owned by event_1")
}
err := manager.Run(context.Background())
if err != nil {
t.Fatal(err)
}
// Check that the update channels for the event and resource have updates
(*graph_tester)(t).CheckForNil(e_l)
(*graph_tester)(t).CheckForNil(r_l)
if r.Owner() != nil {
t.Fatal("r still owned after event completed")
}
}
func TestStartEventQueue(t * testing.T) {
root_event, r := NewEventQueue("", "", []Resource{})
rel := root_event.UpdateChannel();
manager := NewEventManager(root_event, []Resource{})
e1, e1_r := NewEvent("1", "", []Resource{})
e1_info := NewEventQueueInfo(1)
err := manager.AddEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal("Failed to add e1 to manager")
}
(*graph_tester)(t).CheckForNil(rel)
e2, e2_r := NewEvent("1", "", []Resource{})
e2_info := NewEventQueueInfo(2)
err = manager.AddEvent(root_event, e2, e2_info)
if err != nil {
t.Fatal("Failed to add e2 to manager")
}
(*graph_tester)(t).CheckForNil(rel)
e3, e3_r := NewEvent("1", "", []Resource{})
e3_info := NewEventQueueInfo(3)
err = manager.AddEvent(root_event, e3, e3_info)
if err != nil {
t.Fatal("Failed to add e3 to manager")
}
(*graph_tester)(t).CheckForNil(rel)
e1_l := e1.UpdateChannel();
e2_l := e2.UpdateChannel();
e3_l := e3.UpdateChannel();
// Now that an event manager is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
err = manager.Run(context.Background())
if err != nil {
t.Fatal(err)
}
time.Sleep( 5 * time.Second)
if r.Owner() != nil {
t.Fatal("root event was not finished after starting")
}
if e1_r.Owner() != nil {
t.Fatal("e1 was not completed")
}
(*graph_tester)(t).CheckForNil(e1_l)
if e2_r.Owner() != nil {
t.Fatal("e2 was not completed")
}
(*graph_tester)(t).CheckForNil(e2_l)
if e3_r.Owner() != nil {
t.Fatal("e3 was not completed")
}
(*graph_tester)(t).CheckForNil(e3_l)
}

@ -32,7 +32,7 @@ func fake_data() * EventManager {
}
}
root_event := NewEvent("root_event", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{})
event_manager := NewEventManager(root_event, resources)

@ -0,0 +1,132 @@
package main
import (
"fmt"
"log"
"errors"
graphql "github.com/graph-gophers/graphql-go"
"context"
)
type EventManager struct {
dag_nodes map[graphql.ID]Resource
root_event Event
}
// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(no children first)
func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
manager := &EventManager{
dag_nodes: map[graphql.ID]Resource{},
root_event: nil,
}
// Construct the DAG
for _, resource := range dag_nodes {
err := manager.AddResource(resource)
if err != nil {
log.Printf("Failed to add %s to EventManager: %s", resource.ID(), err)
return nil
}
}
manager.AddEvent(nil, root_event, nil)
return manager;
}
func (manager * EventManager) Run(ctx context.Context) error {
//return manager.root_event.Run(ctx)
return nil
}
func (manager * EventManager) FindResource(id graphql.ID) Resource {
resource, exists := manager.dag_nodes[id]
if exists == false {
return nil
}
return resource
}
func (manager * EventManager) FindEvent(id graphql.ID) Event {
event := manager.root_event.FindChild(id)
return event
}
func (manager * EventManager) AddResource(resource Resource) error {
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.ID())
return errors.New(error_str)
}
for _, child := range resource.Children() {
_, exists := manager.dag_nodes[child.ID()]
if exists == false {
error_str := fmt.Sprintf("%s is not in the resource DAG, cannot add %s to DAG", child.ID(), resource.ID())
return errors.New(error_str)
}
}
manager.dag_nodes[resource.ID()] = resource
for _, child := range resource.Children() {
child.AddParent(resource)
}
return nil
}
// Check that the node doesn't already exist in the tree
// Check the the selected parent exists in the tree
// Check that required resources exist in the DAG
// Check that created resources don't exist in the DAG
// Add resources created by the event to the DAG
// Add child to parent
func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error {
if child == nil {
return errors.New("Cannot add nil Event to EventManager")
} else if len(child.Children()) != 0 {
return errors.New("Adding events recursively not implemented")
}
for _, resource := range child.RequiredResources() {
_, exists := manager.dag_nodes[resource.ID()]
if exists == false {
error_str := fmt.Sprintf("Required resource %s not in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
}
for _, resource := range child.CreatedResources() {
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("Created resource %s already exists in DAG, cannot add event %s", resource.ID(), child.ID())
return errors.New(error_str)
}
manager.AddResource(resource)
}
if manager.root_event == nil && parent != nil {
error_str := fmt.Sprintf("EventManager has no root, so can't add event to parent")
return errors.New(error_str)
} else if manager.root_event != nil && parent == nil {
// TODO
return errors.New("Replacing root event not implemented")
} else if manager.root_event == nil && parent == nil {
manager.root_event = child
return nil;
} else {
if manager.root_event.FindChild(parent.ID()) == nil {
error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID())
return errors.New(error_str)
}
if manager.root_event.FindChild(child.ID()) != nil {
error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID())
return errors.New(error_str)
}
return parent.AddChild(child, info)
}
}

@ -0,0 +1,164 @@
package main
import (
"fmt"
"errors"
"sync"
)
// Resources propagate update up to multiple parents, and not downwards
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
func (resource * BaseResource) Update() error {
err := resource.UpdateListeners()
if err != nil {
return err
}
for _, parent := range resource.Parents() {
err := parent.Update()
if err != nil {
return err
}
}
if resource.lock_holder != nil {
resource.lock_holder.Update()
}
return nil
}
// Resource is the interface that DAG nodes are made from
type Resource interface {
GraphNode
AddParent(parent Resource) error
Children() []Resource
Parents() []Resource
Lock(event Event) error
Unlock(event Event) error
Owner() Event
}
// BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct {
BaseNode
parents []Resource
children []Resource
lock_holder Event
state_lock sync.Mutex
}
func (resource * BaseResource) Owner() Event {
return resource.lock_holder
}
// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children
// When the bottom of a tree is reached(no more children) go back up and set the lock state
func (resource * BaseResource) Lock(event Event) error {
var err error = nil
locked := false
resource.state_lock.Lock()
if resource.lock_holder != nil {
err = errors.New("Resource already locked")
} else {
all_children_locked := true
for _, child := range resource.Children() {
err = child.Lock(event)
if err != nil {
all_children_locked = false
break
}
}
if all_children_locked == true {
resource.lock_holder = event
locked = true
}
}
resource.state_lock.Unlock()
if locked == true {
resource.Update()
}
return err
}
// Recurse through children, unlocking until no more children
// If the child isn't locked by the unlocker
func (resource * BaseResource) Unlock(event Event) error {
var err error = nil
unlocked := false
resource.state_lock.Lock()
if resource.lock_holder == nil {
err = errors.New("Resource already unlocked")
} else if resource.lock_holder != event {
err = errors.New("Resource not locked by parent, can't unlock")
} else {
all_children_unlocked := true
for _, child := range resource.Children() {
err = child.Unlock(event)
if err != nil {
all_children_unlocked = false
break
}
}
if all_children_unlocked == true{
resource.lock_holder = nil
unlocked = true
}
}
resource.state_lock.Unlock()
if unlocked == true {
resource.Update()
}
return err
}
func (resource * BaseResource) Children() []Resource {
return resource.children
}
func (resource * BaseResource) Parents() []Resource {
return resource.parents
}
// Add a parent to a DAG node
func (resource * BaseResource) AddParent(parent Resource) error {
// Don't add self as parent
if parent.ID() == resource.ID() {
error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID())
return errors.New(error_str)
}
// Don't add parent if it's already a parent
for _, p := range resource.parents {
if p.ID() == parent.ID() {
error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID())
return errors.New(error_str)
}
}
// Add the parent
resource.parents = append(resource.parents, parent)
return nil
}
func NewResource(name string, description string, children []Resource) * BaseResource {
resource := &BaseResource{
BaseNode: BaseNode{
name: name,
description: description,
id: gql_randid(),
listeners: []chan error{},
},
parents: []Resource{},
children: children,
}
return resource
}