diff --git a/event.go b/event.go index 13f49fc..22766a7 100644 --- a/event.go +++ b/event.go @@ -6,20 +6,20 @@ import ( "errors" "reflect" "sort" + "sync" ) // Update the events listeners, and notify the parent to do the same -func (event * BaseEvent) Update(signal GraphSignal) error { - log.Printf("UPDATE BaseEvent %s: %+v", event.Name(), signal) - +func (event * BaseEvent) update(signal GraphSignal) { event.signal <- signal - event.BaseNode.Update(signal) - - if event.parent != nil{ - return event.parent.Update(signal) + if event.parent != nil && signal.Type() != "abort"{ + event.parent.update(signal) + } else if signal.Type() == "abort" { + for _, child := range(event.Children()) { + child.update(signal) + } } - return nil } type EventInfo interface { @@ -48,50 +48,122 @@ func NewEventQueueInfo(priority int) * EventQueueInfo { type Event interface { GraphNode Children() []Event + LockChildren() + UnlockChildren() + InfoType() reflect.Type ChildInfo(event Event) EventInfo Parent() Event - RegisterParent(parent Event) error + LockParent() + UnlockParent() + Action(action string) (func()(string, error), bool) + Handler(signal_type string) (func() (string, error), bool) RequiredResources() []Resource DoneResource() Resource - AddChild(child Event, info EventInfo) error - FindChild(id string) Event - Run() error - Abort() error - LockResources() error - Finish() error + + finish() error + + addChild(child Event, info EventInfo) + setParent(parent Event) } -// BaseEvent is the most basic event that can exist in the event tree. -// On start it automatically transitions to completion. -// It can optionally require events, which will all need to be locked to start it -// It can optionally create resources, which will be locked by default and unlocked on completion -// This node by itself doesn't implement any special behaviours for children, so they will be ignored. -// When starter, this event automatically transitions to completion and unlocks all it's resources(including created) -type BaseEvent struct { - BaseNode - done_resource Resource - required_resources []Resource - children []Event - child_info map[Event]EventInfo - actions map[string]func() (string, error) - handlers map[string]func() (string, error) - parent Event - abort chan string +func (event * BaseEvent) Handler(signal_type string) (func()(string, error), bool) { + handler, exists := event.handlers[signal_type] + return handler, exists +} + +func FindChild(event Event, id string) Event { + if id == event.ID() { + return event + } + + for _, child := range event.Children() { + result := FindChild(child, id) + if result != nil { + return result + } + } + + return nil +} + +func CheckInfoType(event Event, info EventInfo) bool { + if event.InfoType() == nil || info == nil { + if event.InfoType() == nil && info == nil { + return true + } else { + return false + } + } + + return event.InfoType() == reflect.TypeOf(info) } -func (event * BaseEvent) Abort() error { - for _, event := range(event.children) { - event.Abort() +func AddChild(event Event, child Event, info EventInfo) error { + if CheckInfoType(event, info) == false { + return errors.New("AddChild got wrong type") + } + + event.LockParent() + if event.Parent() != nil { + event.UnlockParent() + return errors.New("Parent already registered") } - event.signal <- NewSignal(event, "abort") + + event.LockChildren() + + for _, c := range(event.Children()) { + if c.ID() == child.ID() { + event.UnlockChildren() + event.UnlockParent() + return errors.New("Child already in event") + } + } + + // After all the checks are done, update the state of child + parent, then unlock and update + child.setParent(event) + event.addChild(child, info) + + event.UnlockChildren() + event.UnlockParent() + + SendUpdate(event, NewSignal(event, "child_added")) return nil } -func (event * BaseEvent) LockResources() error { +func RunEvent(event Event) error { + log.Printf("EVENT_RUN: %s", event.Name()) + next_action := "start" + var err error = nil + for next_action != "" { + action, exists := event.Action(next_action) + if exists == false { + error_str := fmt.Sprintf("%s is not a valid action", next_action) + return errors.New(error_str) + } + + log.Printf("EVENT_ACTION: %s - %s", event.Name(), next_action) + next_action, err = action() + if err != nil { + return err + } + } + + log.Printf("EVENT_RUN_DONE: %s", event.Name()) + + return nil +} + +func AbortEvent(event Event) error { + signal := NewSignal(event, "abort") + SendUpdate(event, signal) + return nil +} + +func LockResources(event Event) error { locked_resources := []Resource{} var lock_err error = nil for _, resource := range(event.RequiredResources()) { - err := resource.Lock(event) + err := LockResource(resource, event) if err != nil { lock_err = err break @@ -101,67 +173,68 @@ func (event * BaseEvent) LockResources() error { if lock_err != nil { for _, resource := range(locked_resources) { - resource.Unlock(event) + UnlockResource(resource, event) } return lock_err - } else { - for _, resource := range(locked_resources) { - log.Printf("NOTIFYING %s that it's locked", resource.Name()) - resource.NotifyLocked() - } + } + + for _, resource := range(locked_resources) { + NotifyResourceLocked(resource) } return nil } -func (event * BaseEvent) Finish() error { +func FinishEvent(event Event) error { + // TODO make more 'safe' like LockResources, or make UnlockResource not return errors log.Printf("EVENT_FINISH: %s", event.Name()) for _, resource := range(event.RequiredResources()) { - err := resource.Unlock(event) + err := UnlockResource(resource, event) if err != nil { panic(err) } - resource.NotifyUnlocked() + NotifyResourceUnlocked(resource) } - err := event.DoneResource().Unlock(event) + err := UnlockResource(event.DoneResource(), event) if err != nil { return err } - err = event.DoneResource().NotifyUnlocked() + NotifyResourceUnlocked(event.DoneResource()) - event.Update(NewSignal(event, "event_done")) + err = event.finish() + if err != nil { + return err + } - return err + SendUpdate(event, NewSignal(event, "event_done")) + return nil } -func (event * BaseEvent) LockDone() { - event.DoneResource().Lock(event) +// BaseEvent is the most basic event that can exist in the event tree. +// On start it automatically transitions to completion. +// It can optionally require events, which will all need to be locked to start it +// It can optionally create resources, which will be locked by default and unlocked on completion +// This node by itself doesn't implement any special behaviours for children, so they will be ignored. +// When starter, this event automatically transitions to completion and unlocks all it's resources(including created) +type BaseEvent struct { + BaseNode + done_resource Resource + required_resources []Resource + children []Event + child_info map[string]EventInfo + child_lock sync.Mutex + actions map[string]func() (string, error) + handlers map[string]func() (string, error) + parent Event + parent_lock sync.Mutex + abort chan string } -func (event * BaseEvent) Run() error { - log.Printf("EVENT_RUN: %s", event.Name()) - next_action := "start" - var err error = nil - for next_action != "" { - // Check if the edge exists - action, exists := event.actions[next_action] - if exists == false { - error_str := fmt.Sprintf("%s is not a valid action", next_action) - return errors.New(error_str) - } - - // Run the edge function - update_str := fmt.Sprintf("EVENT_ACTION: %s", next_action) - log.Printf(update_str) - next_action, err = action() - if err != nil { - return err - } - } - - return nil +func (event * BaseEvent) Action(action string) (func() (string, error), bool) { + action_fn, exists := event.actions[action] + return action_fn, exists } func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { @@ -171,12 +244,12 @@ func NewBaseEvent(name string, description string, required_resources []Resource name: name, description: description, id: randid(), - signal: make(chan GraphSignal, 10), + signal: make(chan GraphSignal, 100), listeners: map[chan GraphSignal] chan GraphSignal{}, }, parent: nil, children: []Event{}, - child_info: map[Event]EventInfo{}, + child_info: map[string]EventInfo{}, done_resource: done_resource, required_resources: required_resources, actions: map[string]func()(string, error){}, @@ -184,6 +257,8 @@ func NewBaseEvent(name string, description string, required_resources []Resource abort: make(chan string, 1), } + LockResource(event.done_resource, &event) + event.actions["wait"] = func() (string, error) { signal := <- event.signal if signal.Type() == "abort" { @@ -191,7 +266,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource } else if signal.Type() == "do_action" { return signal.Description(), nil } else { - signal_fn, exists := event.handlers[signal.Type()] + signal_fn, exists := event.Handler(signal.Type()) if exists == true { return signal_fn() } @@ -207,9 +282,6 @@ func NewEvent(name string, description string, required_resources []Resource) (* event := NewBaseEvent(name, description, required_resources) event_ptr := &event - // Lock the done_resource by default - event.LockDone() - event_ptr.actions["start"] = func() (string, error) { return "", nil } @@ -217,21 +289,32 @@ func NewEvent(name string, description string, required_resources []Resource) (* return event_ptr } +func (event * BaseEvent) finish() error { + return nil +} + +func (event * BaseEvent) InfoType() reflect.Type { + return nil +} + // EventQueue is a basic event that can have children. // On start, it attempts to start it's children from the highest 'priority' type EventQueue struct { BaseEvent listened_resources map[string]Resource + queue_lock sync.Mutex } -func (queue * EventQueue) Finish() error { +func (queue * EventQueue) finish() error { for _, resource := range(queue.listened_resources) { resource.UnregisterChannel(queue.signal) } - return queue.BaseEvent.Finish() + return nil } - +func (queue * EventQueue) InfoType() reflect.Type { + return reflect.TypeOf((*EventQueueInfo)(nil)) +} func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { queue := &EventQueue{ @@ -239,15 +322,13 @@ func NewEventQueue(name string, description string, required_resources []Resourc listened_resources: map[string]Resource{}, } - // Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent - queue.LockDone() - queue.actions["start"] = func() (string, error) { return "queue_event", nil } queue.actions["queue_event"] = func() (string, error) { // Copy the events to sort the list + queue.LockChildren() copied_events := make([]Event, len(queue.Children())) copy(copied_events, queue.Children()) less := func(i int, j int) bool { @@ -269,20 +350,21 @@ func NewEventQueue(name string, description string, required_resources []Resourc if info.state == "queued" { wait = true // Try to lock it - err := event.LockResources() + err := LockResources(event) // start in new goroutine if err != nil { + //log.Printf("Failed to lock %s: %s", event.Name(), err) } else { info.state = "running" log.Printf("EVENT_START: %s", event.Name()) go func(event Event, info * EventQueueInfo, queue Event) { log.Printf("EVENT_GOROUTINE: %s", event.Name()) - err := event.Run() + err := RunEvent(event) if err != nil { log.Printf("EVENT_ERROR: %s", err) } info.state = "done" - event.Finish() + FinishEvent(event) }(event, info, queue) } } else if info.state == "running" { @@ -290,11 +372,14 @@ func NewEventQueue(name string, description string, required_resources []Resourc } } + for _, resource := range(needed_resources) { queue.listened_resources[resource.ID()] = resource resource.RegisterChannel(queue.signal) } + queue.UnlockChildren() + if wait == true { return "wait", nil } else { @@ -310,7 +395,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc return "queue_event", nil } - queue.actions["event_added"] = func() (string, error) { + queue.handlers["child_added"] = func() (string, error) { return "queue_event", nil } @@ -325,16 +410,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc return queue } -// Store the nodes parent for upwards propagation of changes -func (event * BaseEvent) RegisterParent(parent Event) error{ - if event.parent != nil { - return errors.New("Parent already registered") - } - - event.parent = parent - return nil -} - func (event * BaseEvent) Parent() Event { return event.parent } @@ -352,70 +427,35 @@ func (event * BaseEvent) Children() []Event { } func (event * BaseEvent) ChildInfo(idx Event) EventInfo { - val, ok := event.child_info[idx] + val, ok := event.child_info[idx.ID()] if ok == false { return nil } return val } -func (event * BaseEvent) FindChild(id string) Event { - if id == event.ID() { - return event - } - - for _, child := range event.Children() { - result := child.FindChild(id) - if result != nil { - return result - } - } - - return nil +func (event * BaseEvent) LockChildren() { + log.Printf("LOCKING CHILDREN OF %s", event.Name()) + event.child_lock.Lock() } -// Checks that the type of info is equal to EventQueueInfo -func (event * EventQueue) AddChild(child Event, info EventInfo) error { - if checkType(info, (*EventQueueInfo)(nil)) == false { - return errors.New("EventQueue.AddChild passed invalid type for info") - } - - return event.addChild(child, info) +func (event * BaseEvent) UnlockChildren() { + event.child_lock.Unlock() } -func (event * BaseEvent) addChild(child Event, info EventInfo) error { - err := child.RegisterParent(event) - if err != nil { - error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID()) - return errors.New(error_str) - } - - event.children = append(event.children, child) - event.child_info[child] = info - event.Update(NewSignal(event, "child_added")) - return nil +func (event * BaseEvent) LockParent() { + event.parent_lock.Lock() } -// Overloaded function AddChild checks the info passed and calls the BaseEvent.addChild -func (event * BaseEvent) AddChild(child Event, info EventInfo) error { - if info != nil { - return errors.New("info must be nil for BaseEvent children") - } - - return event.addChild(child, info) +func (event * BaseEvent) UnlockParent() { + event.parent_lock.Unlock() } -func checkType(first interface{}, second interface{}) bool { - if first == nil || second == nil { - if first == nil && second == nil { - return true - } else { - return false - } - } - - first_type := reflect.TypeOf(first) - second_type := reflect.TypeOf(second) +func (event * BaseEvent) setParent(parent Event) { + event.parent = parent +} - return first_type == second_type +func (event * BaseEvent) addChild(child Event, info EventInfo) { + event.children = append(event.children, child) + event.child_info[child.ID()] = info } diff --git a/graph.go b/graph.go index ff0c136..ce26899 100644 --- a/graph.go +++ b/graph.go @@ -49,7 +49,8 @@ type GraphNode interface { Name() string Description() string ID() string - Update(update GraphSignal) error + UpdateListeners(update GraphSignal) + update(update GraphSignal) RegisterChannel(listener chan GraphSignal) UnregisterChannel(listener chan GraphSignal) UpdateChannel() chan GraphSignal @@ -79,7 +80,7 @@ func (node * BaseNode) ID() string { } // Create a new listener channel for the node, add it to the nodes listener list, and return the new channel -const listener_buffer = 10 +const listener_buffer = 100 func (node * BaseNode) UpdateChannel() chan GraphSignal { new_listener := make(chan GraphSignal, listener_buffer) node.RegisterChannel(new_listener) @@ -110,17 +111,31 @@ func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) { func (node * BaseNode) UpdateListeners(update GraphSignal) { node.listeners_lock.Lock() + closed := []chan GraphSignal{} + for _, listener := range node.listeners { log.Printf("UPDATE_LISTENER %s: %p", node.Name(), listener) - listener <- update + select { + case listener <- update: + default: + close(listener) + closed = append(closed, listener) + } + } + + for _, listener := range(closed) { + delete(node.listeners, listener) } node.listeners_lock.Unlock() } -// Basic implementation that sends the signal to the nodes channel -func (node * BaseNode) Update(signal GraphSignal) error { - log.Printf("UPDATE: BaseNode %s: %+v", node.Name(), signal) +func (node * BaseNode) update(signal GraphSignal) { +} + +func SendUpdate(node GraphNode, signal GraphSignal) { + log.Printf("UPDATE %s: %+v", node.Name(), signal) node.UpdateListeners(signal) - return nil + node.update(signal) } + diff --git a/main.go b/main.go index 1b616b4..350e4d1 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,9 @@ package main import ( "log" + "runtime/pprof" + "time" + "os" ) func fake_team(org string, id string, names []string) (*Team, []*Member) { @@ -25,8 +28,8 @@ func fake_data() * EventManager { t6, m6 := fake_team("210", "X", []string{"toby"}) t7, m7 := fake_team("210", "Y", []string{"jennifer"}) t8, m8 := fake_team("210", "Z", []string{"emily"}) - t9, m9 := fake_team("666", "A", []string{"jimmy"}) - t10, m10 := fake_team("666", "B", []string{"timmy"}) + //t9, m9 := fake_team("666", "A", []string{"jimmy"}) + //t10, m10 := fake_team("666", "B", []string{"timmy"}) //t11, m11 := fake_team("666", "C", []string{"grace"}) //t12, m12 := fake_team("666", "D", []string{"jeremy"}) //t13, m13 := fake_team("315", "W", []string{"bobby"}) @@ -42,8 +45,8 @@ func fake_data() * EventManager { teams = append(teams, t6) teams = append(teams, t7) teams = append(teams, t8) - teams = append(teams, t9) - teams = append(teams, t10) + //teams = append(teams, t9) + //teams = append(teams, t10) //teams = append(teams, t11) //teams = append(teams, t12) //teams = append(teams, t13) @@ -59,8 +62,8 @@ func fake_data() * EventManager { resources = append(resources, m6[0]) resources = append(resources, m7[0]) resources = append(resources, m8[0]) - resources = append(resources, m9[0]) - resources = append(resources, m10[0]) + //resources = append(resources, m9[0]) + //resources = append(resources, m10[0]) //resources = append(resources, m11[0]) //resources = append(resources, m12[0]) //resources = append(resources, m13[0]) @@ -71,12 +74,6 @@ func fake_data() * EventManager { arenas := []*Arena{} arenas = append(arenas, NewVirtualArena("Arena 1")) arenas = append(arenas, NewVirtualArena("Arena 2")) - arenas = append(arenas, NewVirtualArena("Arena 3")) - arenas = append(arenas, NewVirtualArena("Arena 4")) - arenas = append(arenas, NewVirtualArena("Arena 5")) - arenas = append(arenas, NewVirtualArena("Arena 6")) - arenas = append(arenas, NewVirtualArena("Arena 7")) - arenas = append(arenas, NewVirtualArena("Arena 8")) for _, arena := range arenas { resources = append(resources, arena) @@ -87,47 +84,55 @@ func fake_data() * EventManager { } alliances := []*Alliance{} - for i, team := range(teams) { - for j, team2 := range(teams) { - if i != j { - alliance := NewAlliance(team, team2) - alliances = append(alliances, alliance) - } - } - } + alliances = append(alliances, NewAlliance(t1, t2)) + alliances = append(alliances, NewAlliance(t3, t4)) + alliances = append(alliances, NewAlliance(t5, t6)) + alliances = append(alliances, NewAlliance(t7, t8)) + for _, alliance := range alliances { resources = append(resources, alliance) } root_event := NewEventQueue("root_event", "", []Resource{}) + stay_resource := NewResource("stay_resource", "", []Resource{}) + resources = append(resources, stay_resource) + stay_event := NewEvent("stay_event", "", []Resource{stay_resource}) + LockResource(stay_resource, stay_event) event_manager := NewEventManager(root_event, resources) - arena_idx := 0 - // Generate 3 games for each team by picking 3 random teams - for i, alliance := range(alliances) { - for j, alliance2 := range(alliances) { - if j != i { - if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] { - } else { - match := NewMatch(alliance, alliance2, arenas[arena_idx]) - log.Printf("Adding %s", match.Name()) - err := event_manager.AddEvent(root_event, match, NewEventQueueInfo(i)) - if err != nil { - log.Printf("Error adding %s: %s", match.Name(), err) - } - arena_idx += 1 - if arena_idx >= len(arenas) { - arena_idx = 0 + event_manager.AddEvent(root_event, stay_event, NewEventQueueInfo(1)) + + go func(alliances []*Alliance, arenas []*Arena, event_manager * EventManager) { + for i, alliance := range(alliances) { + for j, alliance2 := range(alliances) { + if j != i { + if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] { + } else { + for arena_idx := 0; arena_idx < len(arenas); arena_idx++ { + match := NewMatch(alliance, alliance2, arenas[arena_idx]) + log.Printf("Adding %s", match.Name()) + err := event_manager.AddEvent(root_event, match, NewEventQueueInfo(i)) + if err != nil { + log.Printf("Error adding %s: %s", match.Name(), err) + } + } } } } } - } + }(alliances, arenas, event_manager) return event_manager } func main() { + go func() { + time.Sleep(5 * time.Second) + if false { + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } + }() + event_manager := fake_data() log.Printf("Starting event_manager") err := event_manager.Run() diff --git a/manager.go b/manager.go index 3f0a3b8..cd982f5 100644 --- a/manager.go +++ b/manager.go @@ -9,6 +9,7 @@ import ( type EventManager struct { dag_nodes map[string]Resource root_event Event + aborts []chan error } // root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(children first) @@ -17,6 +18,7 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { manager := &EventManager{ dag_nodes: map[string]Resource{}, root_event: nil, + aborts: []chan error{}, } // Construct the DAG @@ -39,37 +41,32 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { // Connect to all resources(in a thread to handle reconnections), and start the first event func (manager * EventManager) Run() error { log.Printf("MANAGER_START") - aborts := []chan error{} - for _, resource := range(manager.dag_nodes) { - abort := make(chan error, 1) - abort_used := resource.Connect(abort) - if abort_used == true { - aborts = append(aborts, abort) - } - } abort := make(chan error, 1) - go func(abort chan error, aborts []chan error) { + go func(abort chan error, manager * EventManager) { <- abort - for _, c := range(aborts) { + for _, c := range(manager.aborts) { c <- nil } - }(abort, aborts) + }(abort, manager) - err := manager.root_event.LockResources() + err := LockResources(manager.root_event) if err != nil { + log.Printf("MANAGER_LOCK_ERR: %s", err) abort <- nil return err } - err = manager.root_event.Run() + err = RunEvent(manager.root_event) abort <- nil if err != nil { + log.Printf("MANAGER_RUN_ERR: %s", err) return err } - err = manager.root_event.Finish() + err = FinishEvent(manager.root_event) if err != nil { + log.Printf("MANAGER_FINISH_ERR: %s", err) return err } log.Printf("MANAGER_DONE") @@ -87,12 +84,13 @@ func (manager * EventManager) FindResource(id string) Resource { } func (manager * EventManager) FindEvent(id string) Event { - event := manager.root_event.FindChild(id) + event := FindChild(manager.root_event, id) return event } func (manager * EventManager) AddResource(resource Resource) error { + log.Printf("Adding resource %s", resource.Name()) _, exists := manager.dag_nodes[resource.ID()] if exists == true { error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.Name()) @@ -107,8 +105,13 @@ func (manager * EventManager) AddResource(resource Resource) error { } } manager.dag_nodes[resource.ID()] = resource + abort := make(chan error, 1) + abort_used := resource.Connect(abort) + if abort_used == true { + manager.aborts = append(manager.aborts, abort) + } for _, child := range resource.Children() { - child.AddParent(resource) + AddParent(child, resource) } return nil } @@ -152,16 +155,16 @@ func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo manager.root_event = child return nil; } else { - if manager.root_event.FindChild(parent.ID()) == nil { + if FindChild(manager.root_event, parent.ID()) == nil { error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID()) return errors.New(error_str) } - if manager.root_event.FindChild(child.ID()) != nil { + if FindChild(manager.root_event, child.ID()) != nil { error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID()) return errors.New(error_str) } - return parent.AddChild(child, info) + return AddChild(parent, child, info) } } diff --git a/manager_test.go b/manager_test.go index 36619f1..72fe90f 100644 --- a/manager_test.go +++ b/manager_test.go @@ -7,7 +7,7 @@ import ( ) type graph_tester testing.T -const listner_timeout = 100 * time.Millisecond +const listner_timeout = 50 * time.Millisecond func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) { timeout := time.After(listner_timeout) @@ -99,23 +99,21 @@ func TestResourceUpdate(t * testing.T) { r4_l := r4.UpdateChannel() // Calling Update() on the parent with no other parents should only notify node listeners - println("UPDATE_START") - r3.Update(NewSignal(nil, "test")) - println("UPDATE_DONE") + SendUpdate(r3, NewSignal(nil, "test")) (*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r3") (*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r3") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r3") (*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r3") // Calling Update() on a child should notify listeners of the parent and child, but not siblings - r2.Update(NewSignal(nil, "test")) + SendUpdate(r2, NewSignal(nil, "test")) (*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r2") (*graph_tester)(t).CheckForValue(r2_l, "No update on r2 after updating r2") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r2") (*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r2") // Calling Update() on a child should notify listeners of the parent and child, but not siblings - r1.Update(NewSignal(nil, "test")) + SendUpdate(r1, NewSignal(nil, "test")) (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after updating r1") (*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r1") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r1") @@ -166,7 +164,7 @@ func TestLockResource(t * testing.T) { r1 := NewResource("r1", "", []Resource{}) r2 := NewResource("r2", "", []Resource{}) r3 := NewResource("r3", "", []Resource{r1, r2}) - r4 := NewResource("r3", "", []Resource{r1, r2}) + r4 := NewResource("r4", "", []Resource{r1, r2}) event_manager := NewEventManager(root_event, []Resource{r1, r2, r3, r4}) @@ -177,75 +175,60 @@ func TestLockResource(t * testing.T) { r1_l := r1.UpdateChannel() rel := root_event.UpdateChannel() - err := r3.Lock(root_event) + err := LockResource(r3, root_event) if err != nil { t.Fatal("Failed to lock r3") } - - err = r3.NotifyLocked() - if err != nil { - t.Fatal("Failed to notify r3 of lock") - } + NotifyResourceLocked(r3) (*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel") (*graph_tester)(t).CheckForValue(rel, "No value on root_event update channel") - err = r3.Lock(root_event) + err = LockResource(r3, root_event) if err == nil { t.Fatal("Locked r3 after locking r3") } - err = r4.Lock(root_event) + err = LockResource(r4, root_event) if err == nil { t.Fatal("Locked r4 after locking r3") } - err = r1.Lock(root_event) + err = LockResource(r1, root_event) if err == nil { t.Fatal("Locked r1 after locking r3") } - err = r3.Unlock(test_event) + err = UnlockResource(r3, test_event) if err == nil { t.Fatal("Unlocked r3 with event that didn't lock it") } - err = r3.Unlock(root_event) + err = UnlockResource(r3, root_event) if err != nil { t.Fatal("Failed to unlock r3") } - - err = r3.NotifyUnlocked() - if err != nil { - t.Fatal("Failed to notify r3 it was unlocked") - } + NotifyResourceUnlocked(r3) (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3") (*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r3") - err = r4.Lock(root_event) + err = LockResource(r4, root_event) if err != nil { t.Fatal("Failed to lock r4 after unlocking r3") } + NotifyResourceLocked(r4) - err = r4.NotifyLocked() - if err != nil { - t.Fatal("Failed to notify r4 it was locked") - } (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4") (*graph_tester)(t).CheckForValue(rel, "No update on rel after locking r4") - err = r4.Unlock(root_event) + err = UnlockResource(r4, root_event) if err != nil { t.Fatal("Failed to unlock r4") } + NotifyResourceUnlocked(r4) - err = r4.NotifyUnlocked() - if err != nil { - t.Fatal("Failed to notify r4 it was unlocked") - } (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r4") - (*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r4") } func TestAddToEventQueue(t * testing.T) { @@ -253,17 +236,17 @@ func TestAddToEventQueue(t * testing.T) { event_1 := NewEvent("1", "", []Resource{}) event_2 := NewEvent("2", "", []Resource{}) - err := queue.AddChild(event_1, nil) + err := AddChild(queue, event_1, nil) if err == nil { t.Fatal("suceeded in added nil info to queue") } - err = queue.AddChild(event_1, &EventQueueInfo{priority: 0}) + err = AddChild(queue, event_1, &EventQueueInfo{priority: 0}) if err != nil { t.Fatal("failed to add valid event + info to queue") } - err = queue.AddChild(event_2, &EventQueueInfo{priority: 1}) + err = AddChild(queue, event_2, &EventQueueInfo{priority: 1}) if err != nil { t.Fatal("failed to add valid event + info to queue") } @@ -279,7 +262,7 @@ func TestStartBaseEvent(t * testing.T) { (*graph_tester)(t).CheckForNone(e_l, "Update on event_1 before starting") (*graph_tester)(t).CheckForNone(r_l, "Update on r_1 before starting") - if r.Owner() != event_1 { + if r.Owner().ID() != event_1.ID() { t.Fatal("r is not owned by event_1") } @@ -306,7 +289,7 @@ func TestAbortEventQueue(t * testing.T) { if err != nil { t.Fatal(err) } - r1.Lock(root_event) + LockResource(r1, root_event) e1 := NewEvent("1", "", []Resource{r1}) e1_info := NewEventQueueInfo(1) // Add an event so that the queue doesn't auto complete @@ -319,7 +302,7 @@ func TestAbortEventQueue(t * testing.T) { // start the queue and check that all the events are executed go func() { time.Sleep(time.Second) - root_event.Abort() + AbortEvent(root_event) }() err = manager.Run() @@ -336,12 +319,12 @@ func TestStartEventQueue(t * testing.T) { root_event := NewEventQueue("root_event", "", []Resource{}) r := root_event.DoneResource() rel := root_event.UpdateChannel(); - res_1 := NewResource("test_resource", "", []Resource{}) - res_2 := NewResource("test_resource", "", []Resource{}) + res_1 := NewResource("test_resource_1", "", []Resource{}) + res_2 := NewResource("test_resource_2", "", []Resource{}) manager := NewEventManager(root_event, []Resource{res_1, res_2}) - e1:= NewEvent("1", "", []Resource{res_1, res_2}) + e1:= NewEvent("e1", "", []Resource{res_1, res_2}) e1_r := e1.DoneResource() e1_info := NewEventQueueInfo(1) err := manager.AddEvent(root_event, e1, e1_info) @@ -350,7 +333,7 @@ func TestStartEventQueue(t * testing.T) { } (*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e1") - e2 := NewEvent("1", "", []Resource{res_1}) + e2 := NewEvent("e2", "", []Resource{res_1}) e2_r := e2.DoneResource() e2_info := NewEventQueueInfo(2) err = manager.AddEvent(root_event, e2, e2_info) @@ -359,7 +342,7 @@ func TestStartEventQueue(t * testing.T) { } (*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e2") - e3 := NewEvent("1", "", []Resource{res_2}) + e3 := NewEvent("e3", "", []Resource{res_2}) e3_r := e3.DoneResource() e3_info := NewEventQueueInfo(3) err = manager.AddEvent(root_event, e3, e3_info) @@ -375,7 +358,9 @@ func TestStartEventQueue(t * testing.T) { // Abort the event after 5 seconds just in case go func() { time.Sleep(5 * time.Second) - root_event.Abort() + if r.Owner() != nil { + AbortEvent(root_event) + } }() // Now that an event manager is constructed with a queue and 3 basic events diff --git a/resource.go b/resource.go index 67014b2..488fbd5 100644 --- a/resource.go +++ b/resource.go @@ -9,19 +9,20 @@ import ( // Resources propagate update up to multiple parents, and not downwards // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) -func (resource * BaseResource) Update(signal GraphSignal) error { - log.Printf("UPDATE BaseResource %s: %+v", resource.Name(), signal) - - resource.BaseNode.Update(signal) - - for _, parent := range resource.Parents() { - err := parent.Update(signal) - if err != nil { - return err +func (resource * BaseResource) update(signal GraphSignal) { + if signal.Type() == "lock_changed" { + for _, child := range resource.Children() { + SendUpdate(child, signal) + } + } else { + for _, parent := range resource.Parents() { + SendUpdate(parent, signal) } } - return nil + if resource.lock_holder != nil { + SendUpdate(resource.lock_holder, signal) + } } // Resource is the interface that DAG nodes are made from @@ -30,128 +31,178 @@ func (resource * BaseResource) Update(signal GraphSignal) error { // The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background) type Resource interface { GraphNode - AddParent(parent Resource) error + Owner() Event Children() []Resource Parents() []Resource - Lock(event Event) error - NotifyLocked() error - NotifyUnlocked() error - Unlock(event Event) error - Owner() Event + + AddParent(parent Resource) error + LockParents() + UnlockParents() + + SetOwner(owner Event) + LockState() + UnlockState() + + lock(event Event) error + unlock(event Event) error Connect(abort chan error) bool } -// BaseResource is the most basic resource that can exist in the DAG -// It holds a single state variable, which contains a pointer to the event that is locking it -type BaseResource struct { - BaseNode - parents []Resource - children []Resource - lock_holder Event - state_lock sync.Mutex -} +func AddParent(resource Resource, parent Resource) error { + if parent.ID() == resource.ID() { + error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.Name()) + return errors.New(error_str) + } -func (resource * BaseResource) Connect(abort chan error) bool { - return false -} + resource.LockParents() + for _, p := range resource.Parents() { + if p.ID() == parent.ID() { + error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.Name(), resource.Name()) + return errors.New(error_str) + } + } -func (resource * BaseResource) Owner() Event { - return resource.lock_holder + err := resource.AddParent(parent) + resource.UnlockParents() + + return err } -func (resource * BaseResource) NotifyUnlocked() error { - err := resource.Update(NewSignal(resource, "lock_change")) - if err != nil { - return err +func UnlockResource(resource Resource, event Event) error { + log.Printf("RESOURCE_UNLOCK: %s", resource.Name()) + var err error = nil + resource.LockState() + if resource.Owner() == nil { + resource.UnlockState() + return errors.New("Resource already unlocked") } - for _, child := range(resource.children) { - err = child.NotifyUnlocked() + if resource.Owner().ID() != event.ID() { + resource.UnlockState() + return errors.New("Resource not locked by parent, unlock failed") + } + + var lock_err error = nil + for _, child := range resource.Children() { + err := UnlockResource(child, event) if err != nil { - return err + lock_err = err + break } } + if lock_err != nil { + resource.UnlockState() + err_str := fmt.Sprintf("Resource failed to unlock: %s", lock_err) + return errors.New(err_str) + } + + resource.SetOwner(nil) + + err = resource.unlock(event) + if err != nil { + resource.UnlockState() + return errors.New("Failed to unlock resource") + } + + resource.UnlockState() + + signal := NewSignal(event, "lock_changed") + signal.description = "unlock" + + SendUpdate(resource, signal) return nil } -func (resource * BaseResource) NotifyLocked() error { - err := resource.Update(NewSignal(resource, "lock_change")) - if err != nil { - return err +func LockResource(resource Resource, event Event) error { + log.Printf("RESOURCE_LOCK: %s", resource.Name()) + resource.LockState() + if resource.Owner() != nil { + resource.UnlockState() + err_str := fmt.Sprintf("Resource already locked: %s", resource.Name()) + return errors.New(err_str) } - for _, child := range(resource.children) { - err = child.NotifyLocked() - if err != nil { - return err + var lock_err error = nil + for _, child := range resource.Children() { + err := LockResource(child, event) + if err != nil{ + lock_err = err + break } } - resource.lock_holder.Update(NewSignal(resource, "lock_change")) + if lock_err != nil { + resource.UnlockState() + err_str := fmt.Sprintf("Resource failed to lock: %s", lock_err) + return errors.New(err_str) + } + + resource.SetOwner(event) + + err := resource.lock(event) + if err != nil { + resource.UnlockState() + return errors.New("Failed to lock resource") + } + resource.UnlockState() return nil } -// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children -// When the bottom of a tree is reached(no more children) go back up and set the lock state -func (resource * BaseResource) Lock(event Event) error { - return resource.lock(event) +func NotifyResourceLocked(resource Resource) { + signal := NewSignal(resource, "lock_changed") + signal.description = "lock" + + go SendUpdate(resource, signal) } -func (resource * BaseResource) lock(event Event) error { - var err error = nil +func NotifyResourceUnlocked(resource Resource) { + signal := NewSignal(resource, "lock_changed") + signal.description = "unlock" - resource.state_lock.Lock() - if resource.lock_holder != nil { - err_str := fmt.Sprintf("Resource already locked: %s", resource.Name()) - err = errors.New(err_str) - } else { - all_children_locked := true - for _, child := range resource.Children() { - err = child.Lock(event) - if err != nil { - all_children_locked = false - break - } - } - if all_children_locked == true { - resource.lock_holder = event - } - } - resource.state_lock.Unlock() + go SendUpdate(resource, signal) +} - return err +// BaseResource is the most basic resource that can exist in the DAG +// It holds a single state variable, which contains a pointer to the event that is locking it +type BaseResource struct { + BaseNode + parents []Resource + parents_lock sync.Mutex + children []Resource + children_lock sync.Mutex + lock_holder Event + state_lock sync.Mutex } -// Recurse through children, unlocking until no more children -// If the child isn't locked by the unlocker -func (resource * BaseResource) Unlock(event Event) error { - var err error = nil - //unlocked := false +func (resource * BaseResource) SetOwner(owner Event) { + resource.lock_holder = owner +} +func (resource * BaseResource) LockState() { resource.state_lock.Lock() - if resource.lock_holder == nil { - err = errors.New("Resource already unlocked") - } else if resource.lock_holder != event { - err = errors.New("Resource not locked by parent, can't unlock") - } else { - all_children_unlocked := true - for _, child := range resource.Children() { - err = child.Unlock(event) - if err != nil { - all_children_unlocked = false - break - } - } - if all_children_unlocked == true{ - resource.lock_holder = nil - //unlocked = true - } - } +} + +func (resource * BaseResource) UnlockState() { resource.state_lock.Unlock() +} - return err +func (resource * BaseResource) Connect(abort chan error) bool { + return false +} + +func (resource * BaseResource) Owner() Event { + return resource.lock_holder +} + +//BaseResources don't check anything special when locking/unlocking +func (resource * BaseResource) lock(event Event) error { + return nil +} + +func (resource * BaseResource) unlock(event Event) error { + return nil } func (resource * BaseResource) Children() []Resource { @@ -162,23 +213,15 @@ func (resource * BaseResource) Parents() []Resource { return resource.parents } -// Add a parent to a DAG node -func (resource * BaseResource) AddParent(parent Resource) error { - // Don't add self as parent - if parent.ID() == resource.ID() { - error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID()) - return errors.New(error_str) - } +func (resource * BaseResource) LockParents() { + resource.parents_lock.Lock() +} - // Don't add parent if it's already a parent - for _, p := range resource.parents { - if p.ID() == parent.ID() { - error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID()) - return errors.New(error_str) - } - } +func (resource * BaseResource) UnlockParents() { + resource.parents_lock.Unlock() +} - // Add the parent +func (resource * BaseResource) AddParent(parent Resource) error { resource.parents = append(resource.parents, parent) return nil } @@ -190,6 +233,7 @@ func NewBaseResource(name string, description string, children []Resource) BaseR description: description, id: randid(), listeners: map[chan GraphSignal]chan GraphSignal{}, + signal: make(chan GraphSignal, 100), }, parents: []Resource{}, children: children, diff --git a/vex.go b/vex.go index 08f2a92..32ca3fd 100644 --- a/vex.go +++ b/vex.go @@ -85,36 +85,29 @@ func NewVirtualArena(name string) * Arena { return arena } -func (arena * Arena) Lock(event Event) error { +func (arena * Arena) lock(event Event) error { if arena.connected == false { log.Printf("ARENA NOT CONNECTED: %s", arena.Name()) error_str := fmt.Sprintf("%s is not connected, cannot lock", arena.Name()) return errors.New(error_str) } - return arena.lock(event) + return nil } -func (arena * Arena) Update(signal GraphSignal) error { - log.Printf("UPDATE Arena %s: %+v", arena.Name(), signal) - - arena.BaseResource.Update(signal) - - if arena.connected == true { - arena.signal <- signal - } - - return nil +func (arena * Arena) update(signal GraphSignal) { + log.Printf("ARENA_UPDATE: %s", arena.Name()) + arena.signal <- signal } func (arena * Arena) Connect(abort chan error) bool { log.Printf("Connecting %s", arena.Name()) go func(arena * Arena, abort chan error) { owner := arena.Owner() - arena.connected = true update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name()) signal := NewSignal(arena, "arena_connected") signal.description = update_str - arena.Update(signal) + arena.connected = true + go arena.update(signal) log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name()) for true { select { @@ -161,7 +154,6 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match control: "init", control_start: time.UnixMilli(0), } - match.LockDone() match.actions["start"] = func() (string, error) { log.Printf("STARTING_MATCH %s", match.Name()) diff --git a/vex_test.go b/vex_test.go index fdf3acf..56fbccd 100644 --- a/vex_test.go +++ b/vex_test.go @@ -132,13 +132,16 @@ func TestNewMatch(t *testing.T) { match := NewMatch(alliance_1, alliance_2, arena) root_event := NewEventQueue("root_event", "", []Resource{}) + r := root_event.DoneResource() event_manager := NewEventManager(root_event, []Resource{member_1, member_2, member_3, member_4, team_1, team_2, team_3, team_4, alliance_1, alliance_2, arena}) event_manager.AddEvent(root_event, match, NewEventQueueInfo(1)) go func() { - time.Sleep(time.Second * 2) - root_event.Abort() + time.Sleep(time.Second * 5) + if r.Owner() != nil { + AbortEvent(root_event) + } }() err := event_manager.Run()