diff --git a/event.go b/event.go index 26e8c6e..e232af6 100644 --- a/event.go +++ b/event.go @@ -243,13 +243,7 @@ func (event * BaseEvent) Action(action string) (func() (string, error), bool) { func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { done_resource := NewResource("event_done", "signal that event is done", []Resource{}) event := BaseEvent{ - BaseNode: BaseNode{ - name: name, - description: description, - id: randid(), - signal: make(chan GraphSignal, 100), - listeners: map[chan GraphSignal] chan GraphSignal{}, - }, + BaseNode: NewBaseNode(name, description, randid()), parent: nil, children: []Event{}, child_info: map[string]EventInfo{}, @@ -271,6 +265,12 @@ func NewBaseEvent(name string, description string, required_resources []Resource } else { signal_fn, exists := event.Handler(signal.Type()) if exists == true { + log.Printf("EVENT_HANDLER: %s - %s", event.name, signal.Type()) + if signal.Source() != nil { + log.Printf("SIGNAL: %s %s -> %+v", signal.Last(), signal.Source().Name(), signal) + } else { + log.Printf("SIGNAL: %s nil -> %+v", signal.Last(), signal) + } return signal_fn(signal) } } diff --git a/graph.go b/graph.go index 92febc1..c509f75 100644 --- a/graph.go +++ b/graph.go @@ -76,6 +76,18 @@ type GraphNode interface { UpdateChannel() chan GraphSignal } +func NewBaseNode(name string, description string, id string) BaseNode { + node := BaseNode{ + name: name, + description: description, + id: id, + signal: make(chan GraphSignal, 100), + listeners: map[chan GraphSignal]chan GraphSignal{}, + } + log.Printf("NEW_NODE: %s - %s", node.ID(), node.Name()) + return node +} + // BaseNode is the most basic implementation of the GraphNode interface // It is used to implement functions common to Events and Resources type BaseNode struct { @@ -138,6 +150,7 @@ func (node * BaseNode) UpdateListeners(update GraphSignal) { select { case listener <- update: default: + log.Printf("CLOSED_LISTENER: %s: %p", node.Name(), listener) close(listener) closed = append(closed, listener) } diff --git a/main.go b/main.go index 7babb87..6322b75 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,7 @@ func fake_team(org string, id string, names []string) (*Team, []*Member) { return team, members } -func fake_data() * EventManager { +func fake_data() (* EventManager, *Arena, *Arena) { resources := []Resource{} teams := []*Team{} @@ -122,16 +122,43 @@ func fake_data() * EventManager { } }(alliances, arenas, event_manager) - return event_manager + return event_manager, arenas[0], arenas[1] +} + +func process_fake_arena(update GraphSignal, arena * Arena) { + if update.Type() == "event_start" { + log.Printf("FAKE_ARENA_ACTION: Match started on %s, queuing autonomous automatically", arena.Name()) + SendUpdate(arena, NewSignal(nil, "queue_autonomous")) + } else if update.Type() == "autonomous_queued" { + log.Printf("FAKE_ARENA_ACTION: Autonomous queued on %s for %s, starting automatically at requested time", arena.Name(), update.Time()) + signal := NewSignal(nil, "start_autonomous") + signal.time = update.Time() + SendUpdate(arena, signal) + } } func main() { go func() { time.Sleep(5 * time.Second) - pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + if false { + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } }() - event_manager := fake_data() + event_manager, arena_1, arena_2 := fake_data() + // Fake arena clients + go func() { + arena_1_updates := arena_1.UpdateChannel() + arena_2_updates := arena_2.UpdateChannel() + for true { + select { + case update := <- arena_1_updates: + process_fake_arena(update, arena_1) + case update := <- arena_2_updates: + process_fake_arena(update, arena_2) + } + } + }() log.Printf("Starting event_manager") err := event_manager.Run() if err != nil { diff --git a/manager_test.go b/manager_test.go index dc64bd6..7fdb5bb 100644 --- a/manager_test.go +++ b/manager_test.go @@ -4,20 +4,33 @@ import ( "testing" "time" "fmt" + "os" + "runtime/pprof" ) type graph_tester testing.T const listner_timeout = 50 * time.Millisecond -func (t * graph_tester) WaitForValue(listener chan GraphSignal, signal_type string, timeout time.Duration, str string) GraphSignal { +func (t * graph_tester) WaitForValue(listener chan GraphSignal, signal_type string, source GraphNode, timeout time.Duration, str string) GraphSignal { timeout_channel := time.After(timeout) for true { select { case signal := <- listener: if signal.Type() == signal_type { - return signal + if signal.Source() == nil || source == nil { + fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s", signal.Type(), signal.Source()) + if source == nil && signal.Source() == nil{ + return signal + } + } else { + fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s", signal.Type(), signal.Source().Name()) + if signal.Source().ID() == source.ID() { + return signal + } + } } case <-timeout_channel: + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) t.Fatal(str) return nil } @@ -31,6 +44,7 @@ func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) Gra case signal := <- listener: return signal case <-timeout: + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) t.Fatal(str) return nil } @@ -40,7 +54,8 @@ func (t * graph_tester) CheckForNone(listener chan GraphSignal, str string) { timeout := time.After(listner_timeout) select { case sig := <- listener: - t.Fatal(fmt.Printf("%s : %+v", str, sig)) + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + t.Fatal(fmt.Sprintf("%s : %+v", str, sig)) case <-timeout: } } @@ -198,8 +213,8 @@ func TestLockResource(t * testing.T) { } NotifyResourceLocked(r3) - (*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel") - (*graph_tester)(t).CheckForNone(rel, "Value on root_event update channel") + (*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second, "Wasn't notified of r1 lock on r1 after r3 lock") + (*graph_tester)(t).WaitForValue(rel, "lock_changed", r1, time.Second, "Wasn't notified of r1 lock on rel after r3 lock") err = LockResource(r3, root_event) if err == nil { @@ -226,26 +241,22 @@ func TestLockResource(t * testing.T) { t.Fatal("Failed to unlock r3") } NotifyResourceUnlocked(r3) - - (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3") - (*graph_tester)(t).CheckForNone(rel, "Update on rel after unlocking r3") + (*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r3 unlock") err = LockResource(r4, root_event) if err != nil { t.Fatal("Failed to lock r4 after unlocking r3") } NotifyResourceLocked(r4) - - (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4") - (*graph_tester)(t).CheckForNone(rel, "Update on rel after locking r4") + (*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock") + (*graph_tester)(t).WaitForValue(rel, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock") err = UnlockResource(r4, root_event) if err != nil { t.Fatal("Failed to unlock r4") } NotifyResourceUnlocked(r4) - - (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r4") + (*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r4 lock") } func TestAddToEventQueue(t * testing.T) { @@ -392,18 +403,18 @@ func TestStartEventQueue(t * testing.T) { t.Fatal("root event was not finished after starting") } + (*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "no e1 event_done") if e1_r.Owner() != nil { t.Fatal("e1 was not completed") } - (*graph_tester)(t).CheckForValue(e1_l, "No update on e1 after running") + (*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "no e2 event_done") if e2_r.Owner() != nil { - t.Fatal("e2 was not completed") + t.Fatal(fmt.Sprintf("e2 was not completed")) } - (*graph_tester)(t).CheckForValue(e2_l, "No update on e2 after running") + (*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "no e3 event_done") if e3_r.Owner() != nil { t.Fatal("e3 was not completed") } - (*graph_tester)(t).CheckForValue(e3_l, "No update on e3 after running") } diff --git a/resource.go b/resource.go index 350b8e2..e94780e 100644 --- a/resource.go +++ b/resource.go @@ -10,18 +10,14 @@ import ( // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) func (resource * BaseResource) update(signal GraphSignal) { new_signal := signal.Trace(resource.ID()) - if signal.Type() == "lock_changed" { - for _, child := range resource.Children() { - SendUpdate(child, new_signal) - } - } else { - for _, parent := range resource.Parents() { - SendUpdate(parent, new_signal) - } - if resource.lock_holder != nil { - if resource.lock_holder.ID() != signal.Last() { - SendUpdate(resource.lock_holder, new_signal) - } + + for _, parent := range resource.Parents() { + SendUpdate(parent, new_signal) + } + + if resource.lock_holder != nil { + if resource.lock_holder.ID() != signal.Last() { + SendUpdate(resource.lock_holder, new_signal) } } } @@ -152,6 +148,10 @@ func NotifyResourceLocked(resource Resource) { signal := NewSignal(resource, "lock_changed") signal.description = "lock" + for _, child := range resource.Children() { + NotifyResourceLocked(child) + } + go SendUpdate(resource, signal) } @@ -159,6 +159,10 @@ func NotifyResourceUnlocked(resource Resource) { signal := NewSignal(resource, "lock_changed") signal.description = "unlock" + for _, child := range(resource.Children()) { + NotifyResourceUnlocked(child) + } + go SendUpdate(resource, signal) } @@ -226,13 +230,7 @@ func (resource * BaseResource) AddParent(parent Resource) error { func NewBaseResource(name string, description string, children []Resource) BaseResource { resource := BaseResource{ - BaseNode: BaseNode{ - name: name, - description: description, - id: randid(), - listeners: map[chan GraphSignal]chan GraphSignal{}, - signal: make(chan GraphSignal, 100), - }, + BaseNode: NewBaseNode(name, description, randid()), parents: []Resource{}, children: children, } diff --git a/vex.go b/vex.go index d24a3c2..705994b 100644 --- a/vex.go +++ b/vex.go @@ -90,14 +90,12 @@ func (arena * Arena) lock(event Event) error { func (arena * Arena) update(signal GraphSignal) { log.Printf("ARENA_UPDATE: %s", arena.Name()) arena.signal <- signal - new_signal := signal.Trace(arena.ID()) - arena.BaseResource.update(new_signal) + arena.BaseResource.update(signal) } func (arena * Arena) Connect(abort chan error) bool { log.Printf("Connecting %s", arena.Name()) go func(arena * Arena, abort chan error) { - owner := arena.Owner() update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name()) signal := NewSignal(arena, "resource_connected") signal.description = update_str @@ -110,27 +108,7 @@ func (arena * Arena) Connect(abort chan error) bool { log.Printf("Virtual arena %s aborting", arena.Name()) break case update := <- arena.signal: - log.Printf("%s update: %+v", arena.Name(), update) - new_owner := arena.Owner() - if new_owner != owner { - log.Printf("NEW_OWNER for %s", arena.Name()) - if new_owner != nil { - log.Printf("new: %s", new_owner.Name()) - } else { - log.Printf("new: nil") - } - - if owner != nil { - log.Printf("old: %s", owner.Name()) - } else { - log.Printf("old: nil") - } - - owner = new_owner - if owner != nil { - } else { - } - } + log.Printf("FAKE_ARENA_ACTION: %s : %+v", arena.Name(), update) } } }(arena, abort) @@ -184,7 +162,9 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.control = "none" match.state = "autonomous_queued" match.control_start = time.Now().Add(start_slack) - go SendUpdate(match, NewSignal(match, "autonomous_queued")) + new_signal := NewSignal(match, "autonomous_queued") + new_signal.time = match.control_start + go SendUpdate(match, new_signal) return "wait", nil } @@ -225,7 +205,9 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.control = "none" match.state = "driver_queued" match.control_start = time.Now().Add(start_slack) - go SendUpdate(match, NewSignal(match, "driver_queued")) + new_signal := NewSignal(match, "driver_queued") + new_signal.time = match.control_start + go SendUpdate(match, new_signal) return "wait", nil } diff --git a/vex_test.go b/vex_test.go index 2b99666..31a7a7a 100644 --- a/vex_test.go +++ b/vex_test.go @@ -149,32 +149,21 @@ func TestNewMatch(t *testing.T) { }() go func(arena_c chan GraphSignal) { - (*graph_tester)(t).WaitForValue(arena_c, "event_start", 1*time.Second, "no event_start") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after starting") + (*graph_tester)(t).WaitForValue(arena_c, "event_start", match, 1*time.Second, "no event_start") SendUpdate(arena, NewSignal(nil, "queue_autonomous")) - (*graph_tester)(t).WaitForValue(arena_c, "autonomous_queued", 1*time.Second, "no autonomous_queued") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after queueing autonomous") + (*graph_tester)(t).WaitForValue(arena_c, "autonomous_queued", match, 1*time.Second, "no autonomous_queued") auton_signal := NewSignal(nil, "start_autonomous") auton_signal.time = time.Now() SendUpdate(arena, auton_signal) - (*graph_tester)(t).WaitForValue(arena_c, "autonomous_running", 1*time.Second, "no autonomous_running") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after starting autonomous") - time.Sleep(TEMP_AUTON_TIME) - time.Sleep(time.Millisecond * 100) - (*graph_tester)(t).WaitForValue(arena_c, "autonomous_done", 6*time.Second, "no autonomous_done") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after ending autonomous") + (*graph_tester)(t).WaitForValue(arena_c, "autonomous_running", match, 1*time.Second, "no autonomous_running") + (*graph_tester)(t).WaitForValue(arena_c, "autonomous_done", match, 6*time.Second, "no autonomous_done") SendUpdate(arena, NewSignal(nil, "queue_driver")) - (*graph_tester)(t).WaitForValue(arena_c, "driver_queued", 1*time.Second, "no driver_queued") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after queueing driver") + (*graph_tester)(t).WaitForValue(arena_c, "driver_queued", match, 1*time.Second, "no driver_queued") driver_signal := NewSignal(nil, "start_driver") driver_signal.time = time.Now() SendUpdate(arena, driver_signal) - (*graph_tester)(t).WaitForValue(arena_c, "driver_running", 1*time.Second, "no driver_running") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after starting driver") - time.Sleep(TEMP_DRIVE_TIME) - time.Sleep(time.Millisecond * 100) - (*graph_tester)(t).WaitForValue(arena_c, "driver_done", 1*time.Second, "no driver_done") - (*graph_tester)(t).CheckForNone(arena_c, "update to match after game done 3") + (*graph_tester)(t).WaitForValue(arena_c, "driver_running", match, 1*time.Second, "no driver_running") + (*graph_tester)(t).WaitForValue(arena_c, "driver_done", match, 6*time.Second, "no driver_done") }(arena_c) err := event_manager.Run()