diff --git a/Makefile b/Makefile index 2e29734..2546b93 100644 --- a/Makefile +++ b/Makefile @@ -11,3 +11,6 @@ clean: run: build ${BINARY_PATH} + +test: + go test diff --git a/event.go b/event.go index 22766a7..fb2f350 100644 --- a/event.go +++ b/event.go @@ -14,10 +14,10 @@ func (event * BaseEvent) update(signal GraphSignal) { event.signal <- signal if event.parent != nil && signal.Type() != "abort"{ - event.parent.update(signal) + SendUpdate(event.parent, signal) } else if signal.Type() == "abort" { for _, child := range(event.Children()) { - child.update(signal) + SendUpdate(child, signal) } } } @@ -56,7 +56,7 @@ type Event interface { LockParent() UnlockParent() Action(action string) (func()(string, error), bool) - Handler(signal_type string) (func() (string, error), bool) + Handler(signal_type string) (func(GraphSignal) (string, error), bool) RequiredResources() []Resource DoneResource() Resource @@ -66,7 +66,7 @@ type Event interface { setParent(parent Event) } -func (event * BaseEvent) Handler(signal_type string) (func()(string, error), bool) { +func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) { handler, exists := event.handlers[signal_type] return handler, exists } @@ -126,12 +126,15 @@ func AddChild(event Event, child Event, info EventInfo) error { event.UnlockChildren() event.UnlockParent() + update := NewSignal(event, "child_added") + update.description = child.Name() SendUpdate(event, NewSignal(event, "child_added")) return nil } func RunEvent(event Event) error { log.Printf("EVENT_RUN: %s", event.Name()) + go SendUpdate(event, NewSignal(event, "event_start")) next_action := "start" var err error = nil for next_action != "" { @@ -226,7 +229,7 @@ type BaseEvent struct { child_info map[string]EventInfo child_lock sync.Mutex actions map[string]func() (string, error) - handlers map[string]func() (string, error) + handlers map[string]func(GraphSignal) (string, error) parent Event parent_lock sync.Mutex abort chan string @@ -253,7 +256,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource done_resource: done_resource, required_resources: required_resources, actions: map[string]func()(string, error){}, - handlers: map[string]func()(string, error){}, + handlers: map[string]func(GraphSignal)(string, error){}, abort: make(chan string, 1), } @@ -268,7 +271,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource } else { signal_fn, exists := event.Handler(signal.Type()) if exists == true { - return signal_fn() + return signal_fn(signal) } } // ignore signals other than "abort" and "do_action" @@ -353,7 +356,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc err := LockResources(event) // start in new goroutine if err != nil { - //log.Printf("Failed to lock %s: %s", event.Name(), err) + log.Printf("Failed to lock %s: %s", event.Name(), err) } else { info.state = "running" log.Printf("EVENT_START: %s", event.Name()) @@ -374,8 +377,12 @@ func NewEventQueue(name string, description string, required_resources []Resourc for _, resource := range(needed_resources) { - queue.listened_resources[resource.ID()] = resource - resource.RegisterChannel(queue.signal) + _, exists := queue.listened_resources[resource.ID()] + if exists == false { + log.Printf("REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name()) + queue.listened_resources[resource.ID()] = resource + resource.RegisterChannel(queue.signal) + } } queue.UnlockChildren() @@ -395,15 +402,19 @@ func NewEventQueue(name string, description string, required_resources []Resourc return "queue_event", nil } - queue.handlers["child_added"] = func() (string, error) { + queue.handlers["arena_connected"] = func(signal GraphSignal) (string, error) { + return "queue_event", nil + } + + queue.handlers["child_added"] = func(signal GraphSignal) (string, error) { return "queue_event", nil } - queue.handlers["lock_change"] = func() (string, error) { + queue.handlers["lock_changed"] = func(signal GraphSignal) (string, error) { return "queue_event", nil } - queue.handlers["event_done"] = func() (string, error) { + queue.handlers["event_done"] = func(signal GraphSignal) (string, error) { return "queue_event", nil } @@ -435,7 +446,6 @@ func (event * BaseEvent) ChildInfo(idx Event) EventInfo { } func (event * BaseEvent) LockChildren() { - log.Printf("LOCKING CHILDREN OF %s", event.Name()) event.child_lock.Lock() } diff --git a/graph.go b/graph.go index ce26899..15ce817 100644 --- a/graph.go +++ b/graph.go @@ -4,6 +4,7 @@ import ( "log" "sync" "github.com/google/uuid" + "time" ) // Generate a random graphql id @@ -16,12 +17,18 @@ type GraphSignal interface { Source() GraphNode Type() string Description() string + Time() time.Time } type BaseSignal struct { source GraphNode signal_type string description string + time time.Time +} + +func (signal BaseSignal) Time() time.Time { + return signal.time } func (signal BaseSignal) Source() GraphNode { @@ -134,7 +141,12 @@ func (node * BaseNode) update(signal GraphSignal) { } func SendUpdate(node GraphNode, signal GraphSignal) { - log.Printf("UPDATE %s: %+v", node.Name(), signal) + if signal.Source() != nil { + log.Printf("UPDATE %s -> %s: %+v", signal.Source().Name(), node.Name(), signal) + } else { + log.Printf("UPDATE %s: %+v", node.Name(), signal) + + } node.UpdateListeners(signal) node.update(signal) } diff --git a/manager_test.go b/manager_test.go index dd5d374..03c28fc 100644 --- a/manager_test.go +++ b/manager_test.go @@ -22,9 +22,9 @@ func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) { func (t * graph_tester) CheckForNone(listener chan GraphSignal, str string) { timeout := time.After(listner_timeout) select { - case <- listener: - t.Fatal(str) - case <-timeout: + case sig := <- listener: + t.Fatal(fmt.Printf("%s : %+v", str, sig)) + case <-timeout: } } @@ -182,7 +182,7 @@ func TestLockResource(t * testing.T) { NotifyResourceLocked(r3) (*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel") - (*graph_tester)(t).CheckForValue(rel, "No value on root_event update channel") + (*graph_tester)(t).CheckForNone(rel, "Value on root_event update channel") err = LockResource(r3, root_event) if err == nil { @@ -211,7 +211,7 @@ func TestLockResource(t * testing.T) { NotifyResourceUnlocked(r3) (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3") - (*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r3") + (*graph_tester)(t).CheckForNone(rel, "Update on rel after unlocking r3") err = LockResource(r4, root_event) if err != nil { @@ -220,7 +220,7 @@ func TestLockResource(t * testing.T) { NotifyResourceLocked(r4) (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4") - (*graph_tester)(t).CheckForValue(rel, "No update on rel after locking r4") + (*graph_tester)(t).CheckForNone(rel, "Update on rel after locking r4") err = UnlockResource(r4, root_event) if err != nil { diff --git a/resource.go b/resource.go index 0d8003d..653673c 100644 --- a/resource.go +++ b/resource.go @@ -17,11 +17,11 @@ func (resource * BaseResource) update(signal GraphSignal) { for _, parent := range resource.Parents() { SendUpdate(parent, signal) } + if resource.lock_holder != nil { + SendUpdate(resource.lock_holder, signal) + } } - if resource.lock_holder != nil { - SendUpdate(resource.lock_holder, signal) - } } // Resource is the interface that DAG nodes are made from @@ -115,13 +115,22 @@ func LockResource(resource Resource, event Event) error { return errors.New(err_str) } + err := resource.lock(event) + if err != nil { + resource.UnlockState() + err_str := fmt.Sprintf("Failed to lock resource: %s", err) + return errors.New(err_str) + } + var lock_err error = nil + locked_resources := []Resource{} for _, child := range resource.Children() { err := LockResource(child, event) if err != nil{ lock_err = err break } + locked_resources = append(locked_resources, child) } if lock_err != nil { @@ -132,11 +141,6 @@ func LockResource(resource Resource, event Event) error { resource.SetOwner(event) - err := resource.lock(event) - if err != nil { - resource.UnlockState() - return errors.New("Failed to lock resource") - } resource.UnlockState() return nil diff --git a/vex.go b/vex.go index 32ca3fd..141d0f6 100644 --- a/vex.go +++ b/vex.go @@ -107,7 +107,7 @@ func (arena * Arena) Connect(abort chan error) bool { signal := NewSignal(arena, "arena_connected") signal.description = update_str arena.connected = true - go arena.update(signal) + go SendUpdate(arena, signal) log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name()) for true { select { @@ -115,7 +115,7 @@ func (arena * Arena) Connect(abort chan error) bool { log.Printf("Virtual arena %s aborting", arena.Name()) break case update := <- arena.signal: - log.Printf("%s update: %s", arena.Name(), update) + log.Printf("%s update: %+v", arena.Name(), update) new_owner := arena.Owner() if new_owner != owner { log.Printf("NEW_OWNER for %s", arena.Name()) @@ -143,6 +143,8 @@ func (arena * Arena) Connect(abort chan error) bool { } const start_slack = 3000 * time.Millisecond +const TEMP_AUTON_TIME = time.Second * 3 +const TEMP_DRIVE_TIME = time.Second * 5 func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match { name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name()) @@ -162,18 +164,87 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match return "wait", nil } - match.actions["queue_autonomous"] = func() (string, error) { + match.handlers["queue_autonomous"] = func(signal GraphSignal) (string, error) { + if match.state != "scheduled" { + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } match.control = "none" match.state = "autonomous_queued" match.control_start = time.Now().Add(start_slack) + go SendUpdate(match, NewSignal(match, "autonomous_queued")) return "wait", nil } - match.actions["start_autonomous"] = func() (string, error) { - match.control = "autonomous" + match.handlers["start_autonomous"] = func(signal GraphSignal) (string, error) { + if match.state != "autonomous_queued" { + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } + match.control = "program" match.state = "autonomous_running" + // TODO replace with typed protobuf + match.control_start = signal.Time() + go SendUpdate(match, NewSignal(match, "autonomous_running")) + go func(match * Match) { + control_wait := time.Until(match.control_start.Add(TEMP_AUTON_TIME)) + time.Sleep(control_wait) + SendUpdate(match, NewSignal(match, "autonomous_done")) + }(match) + return "wait", nil + } + + match.handlers["autonomous_done"] = func(signal GraphSignal) (string, error) { + if match.state != "autonomous_running" { + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } + match.control = "none" + match.state = "autonomous_done" + return "wait", nil } + match.handlers["queue_driver"] = func(signal GraphSignal) (string, error) { + if match.state != "autonomous_done"{ + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } + match.control = "none" + match.state = "driver_queued" + match.control_start = time.Now().Add(start_slack) + go SendUpdate(match, NewSignal(match, "driver_queued")) + return "wait", nil + } + + match.handlers["start_driver"] = func(signal GraphSignal) (string, error) { + if match.state != "driver_queued" { + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } + match.control = "driver" + match.state = "driver_running" + match.control_start = signal.Time() + + go SendUpdate(match, NewSignal(match, "driver_running")) + go func(match * Match) { + control_wait := time.Until(match.control_start.Add(TEMP_DRIVE_TIME)) + time.Sleep(control_wait) + SendUpdate(match, NewSignal(match, "driver_done")) + }(match) + return "wait", nil + } + + match.handlers["driver_done"] = func(signal GraphSignal) (string, error) { + if match.state != "driver_running" { + log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + return "wait", nil + } + match.control = "none" + match.state = "driver_done" + + return "", nil + } + return match } diff --git a/vex_test.go b/vex_test.go index 56fbccd..45b8634 100644 --- a/vex_test.go +++ b/vex_test.go @@ -3,6 +3,8 @@ package main import ( "testing" "fmt" + "runtime/pprof" + "os" "time" ) @@ -131,6 +133,7 @@ func TestNewMatch(t *testing.T) { arena := NewVirtualArena(arena_name) match := NewMatch(alliance_1, alliance_2, arena) + match_c := match.UpdateChannel() root_event := NewEventQueue("root_event", "", []Resource{}) r := root_event.DoneResource() @@ -138,12 +141,47 @@ func TestNewMatch(t *testing.T) { event_manager.AddEvent(root_event, match, NewEventQueueInfo(1)) go func() { - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 20) if r.Owner() != nil { + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) AbortEvent(root_event) } }() + go func(match_c chan GraphSignal) { + (*graph_tester)(t).CheckForValue(match_c, "no update to match after starting 1") + (*graph_tester)(t).CheckForNone(match_c, "update to match after starting 2") + SendUpdate(match, NewSignal(nil, "queue_autonomous")) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing autonomous 1") + (*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing autonomous 2") + (*graph_tester)(t).CheckForNone(match_c, "update to match after queueing autonomous 3") + auton_signal := NewSignal(nil, "start_autonomous") + auton_signal.time = time.Now() + SendUpdate(match, auton_signal) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after starting autonomous 1") + (*graph_tester)(t).CheckForValue(match_c, "no update to match after starting autonomous 2") + (*graph_tester)(t).CheckForNone(match_c, "update to match after starting autonomous 3") + time.Sleep(TEMP_AUTON_TIME) + time.Sleep(time.Millisecond * 100) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after ending autonomous 1") + (*graph_tester)(t).CheckForNone(match_c, "update to match after ending autonomous 2") + SendUpdate(match, NewSignal(nil, "queue_driver")) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing driver 1") + (*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing driver 2") + (*graph_tester)(t).CheckForNone(match_c, "update to match after queueing driver 3") + driver_signal := NewSignal(nil, "start_driver") + driver_signal.time = time.Now() + SendUpdate(match, driver_signal) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after starting driver 1") + (*graph_tester)(t).CheckForValue(match_c, "no update to match after starting driver 2") + (*graph_tester)(t).CheckForNone(match_c, "update to match after starting driver 3") + time.Sleep(TEMP_DRIVE_TIME) + time.Sleep(time.Millisecond * 100) + (*graph_tester)(t).CheckForValue(match_c, "no update to match after game done 1") + (*graph_tester)(t).CheckForValue(match_c, "no update to match after game done 2") + (*graph_tester)(t).CheckForNone(match_c, "update to match after game done 3") + }(match_c) + err := event_manager.Run() if err != nil { t.Fatal(err)