Removed EventMangar

graph-rework
noah metz 2023-06-20 15:48:17 -06:00
parent 32434e7dfe
commit 9980be7c86
5 changed files with 276 additions and 352 deletions

@ -21,7 +21,7 @@ func (event * BaseEvent) PropagateUpdate(signal GraphSignal) {
event.rr_lock.Lock()
defer event.rr_lock.Unlock()
for _, resource := range(event.required_resources) {
for _, resource := range(event.resources) {
SendUpdate(resource, signal)
}
} else {
@ -70,7 +70,8 @@ type Event interface {
UnlockParent()
Action(action string) (func()(string, error), bool)
Handler(signal_type string) (func(GraphSignal) (string, error), bool)
RequiredResources() []Resource
Resources() []Resource
AddResource(Resource) error
DoneResource() Resource
SetTimeout(end_time time.Time, action string)
ClearTimeout()
@ -84,6 +85,20 @@ type Event interface {
setParent(parent Event)
}
func (event * BaseEvent) AddResource(resource Resource) error {
event.resources_lock.Lock()
defer event.resources_lock.Unlock()
for _, r := range(event.resources) {
if r.ID() == resource.ID() {
return fmt.Errorf("%s is already required for %s, cannot add again", resource.Name(), event.Name())
}
}
event.resources = append(event.resources, resource)
return nil
}
func (event * BaseEvent) Signal() chan GraphSignal {
return event.signal
}
@ -112,7 +127,7 @@ func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string,
}
func FindResources(event Event, resource_type reflect.Type) []Resource {
resources := event.RequiredResources()
resources := event.Resources()
found := []Resource{}
for _, resource := range(resources) {
if reflect.TypeOf(resource) == resource_type {
@ -136,7 +151,7 @@ func FindResources(event Event, resource_type reflect.Type) []Resource {
}
func FindRequiredResource(event Event, id string) Resource {
for _, resource := range(event.RequiredResources()) {
for _, resource := range(event.Resources()) {
if resource.ID() == id {
return resource
}
@ -179,9 +194,9 @@ func CheckInfoType(event Event, info EventInfo) bool {
return event.InfoType() == reflect.TypeOf(info)
}
func AddChild(event Event, child Event, info EventInfo) error {
func LinkEvent(event Event, child Event, info EventInfo) error {
if CheckInfoType(event, info) == false {
return errors.New("AddChild got wrong type")
return errors.New("LinkEvents got wrong type")
}
event.LockParent()
@ -215,6 +230,32 @@ func AddChild(event Event, child Event, info EventInfo) error {
return nil
}
func StartRootEvent(event Event) error {
log.Logf("event", "ROOT_EVEN_START")
err := LockResources(event)
if err != nil {
log.Logf("event", "ROOT_EVENT_LOCK_ERR: %s", err)
return err
}
err = RunEvent(event)
if err != nil {
log.Logf("event", "ROOT_EVENT_RUNE_ERR: %s", err)
return err
}
err = FinishEvent(event)
if err != nil {
log.Logf("event", "ROOT_EVENT_FINISH_ERR: %s", err)
return err
}
log.Logf("event", "ROOT_EVENT_DONE")
return nil
}
func RunEvent(event Event) error {
log.Logf("event", "EVENT_RUN: %s", event.Name())
SendUpdate(event, NewSignal(event, "event_start"))
@ -252,10 +293,10 @@ func EventCancel(event Event) func(signal GraphSignal) (string, error) {
}
func LockResources(event Event) error {
log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.RequiredResources())
log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources())
locked_resources := []Resource{}
var lock_err error = nil
for _, resource := range(event.RequiredResources()) {
for _, resource := range(event.Resources()) {
err := LockResource(resource, event)
if err != nil {
lock_err = err
@ -279,7 +320,7 @@ func LockResources(event Event) error {
func FinishEvent(event Event) error {
log.Logf("event", "EVENT_FINISH: %s", event.Name())
for _, resource := range(event.RequiredResources()) {
for _, resource := range(event.Resources()) {
err := UnlockResource(resource, event)
if err != nil {
panic(err)
@ -313,7 +354,8 @@ type BaseEvent struct {
BaseNode
done_resource Resource
rr_lock sync.Mutex
required_resources []Resource
resources []Resource
resources_lock sync.Mutex
children []Event
children_lock sync.Mutex
child_info map[string]EventInfo
@ -351,15 +393,15 @@ func EventWait(event Event) (func() (string, error)) {
}
}
func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
func NewBaseEvent(name string, description string) (BaseEvent) {
done_resource, _ := NewResource("event_done", "signal that event is done", []Resource{})
event := BaseEvent{
BaseNode: NewBaseNode(name, description, randid()),
parent: nil,
children: []Event{},
child_info: map[string]EventInfo{},
done_resource: done_resource,
required_resources: required_resources,
resources: []Resource{},
Actions: map[string]func()(string, error){},
Handlers: map[string]func(GraphSignal)(string, error){},
abort: make(chan string, 1),
@ -372,10 +414,25 @@ func NewBaseEvent(name string, description string, required_resources []Resource
return event
}
func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent) {
event := NewBaseEvent(name, description, required_resources)
func AddResources(event Event, resources []Resource) error {
for _, r := range(resources) {
err := event.AddResource(r)
if err != nil {
return err
}
}
return nil
}
func NewEvent(name string, description string, resources []Resource) (* BaseEvent, error) {
event := NewBaseEvent(name, description)
event_ptr := &event
err := AddResources(event_ptr, resources)
if err != nil {
return nil, err
}
event_ptr.Actions["wait"] = EventWait(event_ptr)
event_ptr.Handlers["abort"] = EventAbort(event_ptr)
event_ptr.Handlers["cancel"] = EventCancel(event_ptr)
@ -384,7 +441,7 @@ func NewEvent(name string, description string, required_resources []Resource) (*
return "", nil
}
return event_ptr
return event_ptr, nil
}
func (event * BaseEvent) finish() error {
@ -414,12 +471,14 @@ func (queue * EventQueue) InfoType() reflect.Type {
return reflect.TypeOf((*EventQueueInfo)(nil))
}
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
func NewEventQueue(name string, description string, resources []Resource) (* EventQueue, error) {
queue := &EventQueue{
BaseEvent: NewBaseEvent(name, description, required_resources),
BaseEvent: NewBaseEvent(name, description),
listened_resources: map[string]Resource{},
}
AddResources(queue, resources)
queue.Actions["wait"] = EventWait(queue)
queue.Handlers["abort"] = EventAbort(queue)
queue.Handlers["cancel"] = EventCancel(queue)
@ -443,7 +502,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc
needed_resources := map[string]Resource{}
for _, event := range(copied_events) {
// make sure all the required resources are registered to update the event
for _, resource := range(event.RequiredResources()) {
for _, resource := range(event.Resources()) {
needed_resources[resource.ID()] = resource
}
@ -501,15 +560,15 @@ func NewEventQueue(name string, description string, required_resources []Resourc
return "queue_event", nil
}
return queue
return queue, nil
}
func (event * BaseEvent) Parent() Event {
return event.parent
}
func (event * BaseEvent) RequiredResources() []Resource {
return event.required_resources
func (event * BaseEvent) Resources() []Resource {
return event.resources
}
func (event * BaseEvent) DoneResource() Resource {

@ -860,7 +860,7 @@ type GQLServer struct {
func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object, extended_queries map[string]*graphql.Field, extended_mutations map[string]*graphql.Field, extended_subscriptions map[string]*graphql.Field) * GQLServer {
server := &GQLServer{
BaseResource: NewBaseResource("GQL Server", "graphql server for event signals", []Resource{}),
BaseResource: NewBaseResource("GQL Server", "graphql server for event signals"),
listen: listen,
abort: make(chan error, 1),
gql_channel: make(chan error, 1),

@ -1,200 +0,0 @@
package graphvent
import (
"fmt"
"errors"
)
type EventManager struct {
dag_nodes map[string]Resource
Root Event
aborts []chan error
}
// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(children first)
func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
manager := &EventManager{
dag_nodes: map[string]Resource{},
Root: nil,
aborts: []chan error{},
}
// Construct the DAG
for _, resource := range dag_nodes {
err := manager.AddResource(resource)
if err != nil {
log.Logf("manager", "Failed to add %s to EventManager: %s", resource.Name(), err)
return nil
}
}
err := manager.AddEvent(nil, root_event, nil)
if err != nil {
log.Logf("manager", "Failed to add %s to EventManager as root_event: %s", root_event.Name(), err)
}
return manager;
}
// Init to all resources(in a thread to handle reconnections), and start the first event
func (manager * EventManager) Run() error {
log.Logf("manager", "MANAGER_START")
abort := make(chan error, 1)
go func(abort chan error, manager * EventManager) {
<- abort
for _, c := range(manager.aborts) {
c <- nil
}
}(abort, manager)
err := LockResources(manager.Root)
if err != nil {
log.Logf("manager", "MANAGER_LOCK_ERR: %s", err)
abort <- nil
return err
}
err = RunEvent(manager.Root)
abort <- nil
if err != nil {
log.Logf("manager", "MANAGER_RUN_ERR: %s", err)
return err
}
err = FinishEvent(manager.Root)
if err != nil {
log.Logf("manager", "MANAGER_FINISH_ERR: %s", err)
return err
}
log.Logf("manager", "MANAGER_DONE")
return nil
}
func (manager * EventManager) FindResource(id string) Resource {
resource, exists := manager.dag_nodes[id]
if exists == false {
return nil
}
return resource
}
func (manager * EventManager) FindEvent(id string) Event {
event := FindChild(manager.Root, id)
return event
}
func (manager * EventManager) AddResource(resource Resource) error {
log.Logf("manager", "Adding resource %s", resource.Name())
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.Name())
return errors.New(error_str)
}
for _, child := range resource.Children() {
_, exists := manager.dag_nodes[child.ID()]
if exists == false {
error_str := fmt.Sprintf("%s is not in the resource DAG, cannot add %s to DAG", child.Name(), resource.Name())
return errors.New(error_str)
}
}
manager.dag_nodes[resource.ID()] = resource
abort := make(chan error, 1)
abort_used := resource.Init(abort)
if abort_used == true {
manager.aborts = append(manager.aborts, abort)
}
for _, child := range resource.Children() {
AddParent(child, resource)
}
return nil
}
// Check that the node doesn't already exist in the tree
// Check the the selected parent exists in the tree
// Check that required resources exist in the DAG
// Check that created resources don't exist in the DAG
// Add resources created by the event to the DAG
// Add child to parent
func (manager * EventManager) CheckResources(event Event) error {
if event == nil {
return errors.New("Cannot check nil event for resources")
}
for _, r := range(event.RequiredResources()) {
res_found := false
for _, res := range(manager.dag_nodes) {
if res.ID() == r.ID() {
res_found = true
}
}
if res_found == false {
return errors.New(fmt.Sprintf("Failed to find %s in the resource forest for %s", r.Name(), event.Name()))
}
}
for _, c := range(event.Children()) {
err := manager.CheckResources(c)
if err != nil {
return err
}
}
return nil
}
func (manager * EventManager) AddDoneResources(event Event) {
if event == nil {
return
}
done_resource := event.DoneResource()
_, exists := manager.dag_nodes[done_resource.ID()]
if exists == false {
manager.AddResource(done_resource)
}
for _, child := range(event.Children()) {
manager.AddDoneResources(child)
}
}
func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo) error {
if child == nil {
return errors.New("Cannot add nil Event to EventManager")
}
err := manager.CheckResources(child)
if err != nil {
return fmt.Errorf("Failed to add event to event manager: %w", err)
}
manager.AddDoneResources(child)
if manager.Root == nil {
if parent != nil {
return fmt.Errorf("EventManager has no root, so can't add event to parent")
} else {
manager.Root = child
return nil
}
} else {
if parent == nil {
return fmt.Errorf("Replacing root event not implemented")
} else if FindChild(manager.Root, parent.ID()) == nil {
return fmt.Errorf("Parent does not exists in event tree")
} else if FindChild(manager.Root, child.ID()) != nil {
return fmt.Errorf("Child already exists in event tree")
} else {
AddChild(parent, child, info)
}
}
return nil
}

@ -53,16 +53,18 @@ func (t * GraphTester) CheckForNone(listener chan GraphSignal, str string) {
}
}
func TestNewResourceAdd(t *testing.T) {
func TestNewEventWithResource(t *testing.T) {
name := "Test Resource"
description := "A resource for testing"
children := []Resource{}
root_event := NewEvent("", "", []Resource{})
test_resource := NewResource(name, description, children)
event_manager := NewEventManager(root_event, []Resource{test_resource})
res := event_manager.FindResource(test_resource.ID())
test_resource, _ := NewResource(name, description, children)
root_event, err := NewEvent("root_event", "", []Resource{test_resource})
if err != nil {
t.Fatal(err)
}
res := FindRequiredResource(root_event, test_resource.ID())
if res == nil {
t.Fatal("Failed to find Resource in EventManager after adding")
}
@ -73,48 +75,47 @@ func TestNewResourceAdd(t *testing.T) {
}
func TestDoubleResourceAdd(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
test_resource := NewResource("", "", []Resource{})
event_manager := NewEventManager(root_event, []Resource{test_resource})
err := event_manager.AddResource(test_resource)
if err == nil {
t.Fatal("Second AddResource returned nil")
}
}
test_resource, _ := NewResource("", "", []Resource{})
_, err := NewEvent("", "", []Resource{test_resource, test_resource})
func TestMissingResourceAdd(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
event_manager := NewEventManager(root_event, []Resource{})
err := event_manager.AddResource(r2)
if err == nil {
t.Fatal("AddResource with missing child returned nil")
t.Fatal("NewEvent didn't return an error")
}
}
func TestTieredResource(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
r1, _ := NewResource("r1", "", []Resource{})
r2, err := NewResource("r2", "", []Resource{r1})
if err != nil {
t.Fatal(err)
}
_, err = NewEvent("", "", []Resource{r2})
event_manager := NewEventManager(root_event, []Resource{r1, r2})
if event_manager == nil {
t.Fatal("Failed to create event manager with tiered resources")
if err != nil {
t.Fatal("Failed to create event with tiered resources")
}
}
func TestResourceUpdate(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
r4 := NewResource("r4", "", []Resource{r3})
event_manager := NewEventManager(root_event, []Resource{r1, r2, r3, r4})
if event_manager == nil {
r1, err := NewResource("r1", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r2, err := NewResource("r2", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r3, err := NewResource("r3", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
r4, err := NewResource("r4", "", []Resource{r3})
if err != nil {
t.Fatal(err)
}
_, err = NewEvent("", "", []Resource{r3, r4})
if err != nil {
t.Fatal("Failed to add initial tiered resources for test")
}
@ -146,27 +147,21 @@ func TestResourceUpdate(t * testing.T) {
}
func TestAddEvent(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{r1})
r1, _ := NewResource("r1", "", []Resource{})
r2, _ := NewResource("r2", "", []Resource{r1})
root_event, _ := NewEvent("", "", []Resource{r2})
name := "Test Event"
description := "A test event"
resources := []Resource{r2}
new_event := NewEvent(name, description, resources)
event_manager := NewEventManager(root_event, []Resource{r1})
err := event_manager.AddResource(r2)
if err != nil {
t.Fatal("Failed to add r2 to event_manager")
}
new_event, _ := NewEvent(name, description, resources)
err = event_manager.AddEvent(root_event, new_event, nil)
err := LinkEvent(root_event, new_event, nil)
if err != nil {
t.Fatalf("Failed to add new_event to root_event: %s", err)
}
res := event_manager.FindEvent(new_event.ID())
res := FindChild(root_event, new_event.ID())
if res == nil {
t.Fatalf("Failed to find new_event in event_manager: %s", err)
}
@ -175,7 +170,7 @@ func TestAddEvent(t * testing.T) {
t.Fatal("Event found in event_manager didn't match added")
}
res_required := res.RequiredResources()
res_required := res.Resources()
if len(res_required) < 1 {
t.Fatal("Event found in event_manager didn't match added")
} else if res_required[0].ID() != r2.ID() {
@ -184,23 +179,35 @@ func TestAddEvent(t * testing.T) {
}
func TestLockResource(t * testing.T) {
root_event := NewEvent("", "", []Resource{})
test_event := NewEvent("", "", []Resource{})
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
r4 := NewResource("r4", "", []Resource{r1, r2})
event_manager := NewEventManager(root_event, []Resource{r1, r2, r3, r4})
if event_manager == nil {
t.Fatal("Failed to add initial tiered resources for test")
r1, err := NewResource("r1", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r2, err := NewResource("r2", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r3, err := NewResource("r3", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
r4, err := NewResource("r4", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
root_event, err := NewEvent("", "", []Resource{})
if err != nil {
t.Fatal(err)
}
test_event, err := NewEvent("", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r1_l := r1.UpdateChannel()
rel := root_event.UpdateChannel()
err := LockResource(r3, root_event)
err = LockResource(r3, root_event)
if err != nil {
t.Fatal("Failed to lock r3")
}
@ -253,30 +260,29 @@ func TestLockResource(t * testing.T) {
}
func TestAddToEventQueue(t * testing.T) {
queue := NewEventQueue("q", "", []Resource{})
event_1 := NewEvent("1", "", []Resource{})
event_2 := NewEvent("2", "", []Resource{})
queue, _ := NewEventQueue("q", "", []Resource{})
event_1, _ := NewEvent("1", "", []Resource{})
event_2, _ := NewEvent("2", "", []Resource{})
err := AddChild(queue, event_1, nil)
err := LinkEvent(queue, event_1, nil)
if err == nil {
t.Fatal("suceeded in added nil info to queue")
}
err = AddChild(queue, event_1, &EventQueueInfo{priority: 0})
err = LinkEvent(queue, event_1, &EventQueueInfo{priority: 0})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
err = AddChild(queue, event_2, &EventQueueInfo{priority: 1})
err = LinkEvent(queue, event_2, &EventQueueInfo{priority: 1})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
}
func TestStartBaseEvent(t * testing.T) {
event_1 := NewEvent("TestStartBaseEvent event_1", "", []Resource{})
event_1, _ := NewEvent("TestStartBaseEvent event_1", "", []Resource{})
r := event_1.DoneResource()
manager := NewEventManager(event_1, []Resource{})
e_l := event_1.UpdateChannel()
r_l := r.UpdateChannel()
@ -287,7 +293,7 @@ func TestStartBaseEvent(t * testing.T) {
t.Fatal("r is not owned by event_1")
}
err := manager.Run()
err := StartRootEvent(event_1)
if err != nil {
t.Fatal(err)
}
@ -302,25 +308,21 @@ func TestStartBaseEvent(t * testing.T) {
}
func TestAbortEventQueue(t * testing.T) {
root_event := NewEventQueue("root_event", "", []Resource{})
r1, _ := NewResource("r1", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
manager := NewEventManager(root_event, []Resource{})
r1 := NewResource("r1", "", []Resource{})
err := manager.AddResource(r1)
if err != nil {
t.Fatal(err)
}
LockResource(r1, root_event)
e1 := NewEvent("event_1", "", []Resource{r1})
e1, _ := NewEvent("event_1", "", []Resource{r1})
e1_info := NewEventQueueInfo(1)
// Add an event so that the queue doesn't auto complete
err = manager.AddEvent(root_event, e1, e1_info)
err := LinkEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal(err)
}
// Now that an event manager is constructed with a queue and 3 basic events
// Now that the event is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
time.Sleep(100 * time.Millisecond)
@ -328,9 +330,9 @@ func TestAbortEventQueue(t * testing.T) {
SendUpdate(root_event, abort_signal)
}()
err = manager.Run()
err = StartRootEvent(root_event)
if err == nil {
t.Fatal("event manager completed without error")
t.Fatal("root_event completed without error")
}
if r.Owner() == nil {
@ -339,41 +341,40 @@ func TestAbortEventQueue(t * testing.T) {
}
func TestStartEventQueue(t * testing.T) {
root_event := NewEventQueue("root_event", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
rel := root_event.UpdateChannel();
res_1 := NewResource("test_resource_1", "", []Resource{})
res_2 := NewResource("test_resource_2", "", []Resource{})
manager := NewEventManager(root_event, []Resource{res_1, res_2})
res_1, _ := NewResource("test_resource_1", "", []Resource{})
res_2, _ := NewResource("test_resource_2", "", []Resource{})
e1:= NewEvent("e1", "", []Resource{res_1, res_2})
e1, _ := NewEvent("e1", "", []Resource{res_1, res_2})
e1_l := e1.UpdateChannel()
e1_r := e1.DoneResource()
e1_info := NewEventQueueInfo(1)
err := manager.AddEvent(root_event, e1, e1_info)
err := LinkEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal("Failed to add e1 to manager")
t.Fatal("Failed to add e1 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1")
e2 := NewEvent("e2", "", []Resource{res_1})
e2, _ := NewEvent("e2", "", []Resource{res_1})
e2_l := e2.UpdateChannel()
e2_r := e2.DoneResource()
e2_info := NewEventQueueInfo(2)
err = manager.AddEvent(root_event, e2, e2_info)
err = LinkEvent(root_event, e2, e2_info)
if err != nil {
t.Fatal("Failed to add e2 to manager")
t.Fatal("Failed to add e2 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2")
e3 := NewEvent("e3", "", []Resource{res_2})
e3, _ := NewEvent("e3", "", []Resource{res_2})
e3_l := e3.UpdateChannel()
e3_r := e3.DoneResource()
e3_info := NewEventQueueInfo(3)
err = manager.AddEvent(root_event, e3, e3_info)
err = LinkEvent(root_event, e3, e3_info)
if err != nil {
t.Fatal("Failed to add e3 to manager")
t.Fatal("Failed to add e3 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e3")
@ -386,7 +387,7 @@ func TestStartEventQueue(t * testing.T) {
}
}()
// Now that an event manager is constructed with a queue and 3 basic events
// Now that a root_event is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
(*GraphTester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3")
@ -396,7 +397,7 @@ func TestStartEventQueue(t * testing.T) {
SendUpdate(root_event, signal)
}()
err = manager.Run()
err = StartRootEvent(root_event)
if err != nil {
t.Fatal(err)
}

@ -12,8 +12,8 @@ func (resource * BaseResource) PropagateUpdate(signal GraphSignal) {
if signal.Downwards() == false {
// Child->Parent, resource updates parent resources
resource.parents_lock.Lock()
defer resource.parents_lock.Unlock()
resource.connection_lock.Lock()
defer resource.connection_lock.Unlock()
for _, parent := range resource.Parents() {
SendUpdate(parent, signal)
}
@ -25,8 +25,8 @@ func (resource * BaseResource) PropagateUpdate(signal GraphSignal) {
SendUpdate(resource.lock_holder, signal)
}
resource.children_lock.Lock()
defer resource.children_lock.Unlock()
resource.connection_lock.Lock()
defer resource.connection_lock.Unlock()
for _, child := range(resource.children) {
SendUpdate(child, signal)
}
@ -43,9 +43,10 @@ type Resource interface {
Children() []Resource
Parents() []Resource
AddParent(parent Resource) error
LockParents()
UnlockParents()
AddParent(parent Resource)
AddChild(child Resource)
LockConnections()
UnlockConnections()
SetOwner(owner GraphNode)
LockState()
@ -56,24 +57,46 @@ type Resource interface {
unlock(node GraphNode) error
}
func AddParent(resource Resource, parent Resource) error {
if parent.ID() == resource.ID() {
error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.Name())
return errors.New(error_str)
// Recurse up cur's parents to ensure r is not present
func checkIfParent(r Resource, cur Resource) bool {
if r == nil || cur == nil {
panic("Cannot recurse DAG with nil")
}
resource.LockParents()
for _, p := range resource.Parents() {
if p.ID() == parent.ID() {
error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.Name(), resource.Name())
return errors.New(error_str)
if r.ID() == cur.ID() {
return true
}
cur.LockConnections()
defer cur.UnlockConnections()
for _, p := range(cur.Parents()) {
if checkIfParent(r, p) == true {
return true
}
}
err := resource.AddParent(parent)
resource.UnlockParents()
return false
}
return err
// Recurse doen cur's children to ensure r is not present
func checkIfChild(r Resource, cur Resource) bool {
if r == nil || cur == nil {
panic("Cannot recurse DAG with nil")
}
if r.ID() == cur.ID() {
return true
}
cur.LockConnections()
defer cur.UnlockConnections()
for _, c := range(cur.Children()) {
if checkIfChild(r, c) == true {
return true
}
}
return false
}
func UnlockResource(resource Resource, event Event) error {
@ -149,9 +172,8 @@ func LockResource(resource Resource, node GraphNode) error {
type BaseResource struct {
BaseNode
parents []Resource
parents_lock sync.Mutex
children []Resource
children_lock sync.Mutex
connection_lock sync.Mutex
lock_holder GraphNode
lock_holder_lock sync.Mutex
state_lock sync.Mutex
@ -196,30 +218,72 @@ func (resource * BaseResource) Parents() []Resource {
return resource.parents
}
func (resource * BaseResource) LockParents() {
resource.parents_lock.Lock()
func (resource * BaseResource) LockConnections() {
resource.connection_lock.Lock()
}
func (resource * BaseResource) UnlockParents() {
resource.parents_lock.Unlock()
func (resource * BaseResource) UnlockConnections() {
resource.connection_lock.Unlock()
}
func (resource * BaseResource) AddParent(parent Resource) error {
func (resource * BaseResource) AddParent(parent Resource) {
resource.parents = append(resource.parents, parent)
return nil
}
func NewBaseResource(name string, description string, children []Resource) BaseResource {
func (resource * BaseResource) AddChild(child Resource) {
resource.children = append(resource.children, child)
}
func NewBaseResource(name string, description string) BaseResource {
resource := BaseResource{
BaseNode: NewBaseNode(name, description, randid()),
parents: []Resource{},
children: children,
children: []Resource{},
}
return resource
}
func NewResource(name string, description string, children []Resource) * BaseResource {
resource := NewBaseResource(name, description, children)
return &resource
func LinkResource(resource Resource, child Resource) error {
if child == nil || resource == nil {
return fmt.Errorf("Will not connect nil to resource DAG")
} else if child.ID() == resource.ID() {
return fmt.Errorf("Will not connect resource to itself")
}
if checkIfChild(resource, child) {
return fmt.Errorf("%s is a child of %s, cannot add as parent", resource.Name(), child.Name())
}
for _, p := range(resource.Parents()) {
if checkIfParent(child, p) {
return fmt.Errorf("Will not add %s as a parent of itself", child.Name())
}
}
child.AddParent(resource)
resource.AddChild(child)
return nil
}
func LinkResources(resource Resource, children []Resource) error {
for _, c := range(children) {
err := LinkResource(resource, c)
if err != nil {
return err
}
}
return nil
}
func NewResource(name string, description string, children []Resource) (* BaseResource, error) {
resource := NewBaseResource(name, description)
resource_ptr := &resource
err := LinkResources(resource_ptr, children)
if err != nil {
return nil, err
}
return resource_ptr, nil
}