diff --git a/event.go b/event.go index c0d0c2b..f6ce5c7 100644 --- a/event.go +++ b/event.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "time" "log" "errors" "reflect" @@ -66,6 +67,10 @@ type Event interface { Handler(signal_type string) (func(GraphSignal) (string, error), bool) RequiredResources() []Resource DoneResource() Resource + SetTimeout(end_time time.Time, action string) + Timeout() <-chan time.Time + TimeoutAction() string + Signal() chan GraphSignal finish() error @@ -73,6 +78,23 @@ type Event interface { setParent(parent Event) } +func (event * BaseEvent) Signal() chan GraphSignal { + return event.signal +} + +func (event * BaseEvent) TimeoutAction() string { + return event.timeout_action +} + +func (event * BaseEvent) Timeout() <-chan time.Time { + return event.timeout +} + +func (event * BaseEvent) SetTimeout(end_time time.Time, action string) { + event.timeout_action = action + event.timeout = time.After(time.Until(end_time)) +} + func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) { handler, exists := event.handlers[signal_type] return handler, exists @@ -245,6 +267,8 @@ type BaseEvent struct { parent Event parent_lock sync.Mutex abort chan string + timeout <-chan time.Time + timeout_action string } func (event * BaseEvent) Action(action string) (func() (string, error), bool) { @@ -252,6 +276,33 @@ func (event * BaseEvent) Action(action string) (func() (string, error), bool) { return action_fn, exists } +func EventWait(event Event) (func() (string, error)) { + return func() (string, error) { + log.Printf("EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout()) + select { + case signal := <- event.Signal(): + if signal.Source() != nil { + log.Printf("EVENT_SIGNAL: %s %s %s -> %+v", event.Name(), signal.Last(), signal.Source().Name(), signal) + } else { + log.Printf("EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal) + } + if signal.Type() == "abort" { + return "", errors.New("State machine aborted by signal") + } else { + signal_fn, exists := event.Handler(signal.Type()) + if exists == true { + log.Printf("EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) + return signal_fn(signal) + } + } + return "wait", nil + case <- event.Timeout(): + log.Printf("EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction()) + return event.TimeoutAction(), nil + } + } +} + func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { done_resource := NewResource("event_done", "signal that event is done", []Resource{}) event := BaseEvent{ @@ -264,31 +315,12 @@ func NewBaseEvent(name string, description string, required_resources []Resource actions: map[string]func()(string, error){}, handlers: map[string]func(GraphSignal)(string, error){}, abort: make(chan string, 1), + timeout: nil, + timeout_action: "", } LockResource(event.done_resource, &event) - event.actions["wait"] = func() (string, error) { - signal := <- event.signal - if signal.Type() == "abort" { - return "", errors.New("State machine aborted by signal") - } else if signal.Type() == "do_action" { - return signal.Description(), nil - } else { - signal_fn, exists := event.Handler(signal.Type()) - if exists == true { - log.Printf("EVENT_HANDLER: %s - %s", event.name, signal.Type()) - if signal.Source() != nil { - log.Printf("SIGNAL: %s %s -> %+v", signal.Last(), signal.Source().Name(), signal) - } else { - log.Printf("SIGNAL: %s nil -> %+v", signal.Last(), signal) - } - return signal_fn(signal) - } - } - return "wait", nil - } - return event } @@ -296,6 +328,8 @@ func NewEvent(name string, description string, required_resources []Resource) (* event := NewBaseEvent(name, description, required_resources) event_ptr := &event + event_ptr.actions["wait"] = EventWait(event_ptr) + event_ptr.actions["start"] = func() (string, error) { return "", nil } @@ -336,6 +370,8 @@ func NewEventQueue(name string, description string, required_resources []Resourc listened_resources: map[string]Resource{}, } + queue.actions["wait"] = EventWait(queue) + queue.actions["start"] = func() (string, error) { return "queue_event", nil } diff --git a/main.go b/main.go index 024ff08..8cf8fa5 100644 --- a/main.go +++ b/main.go @@ -125,30 +125,69 @@ func fake_data() (* EventManager, *Arena, *Arena) { return event_manager, arenas[0], arenas[1] } -func process_fake_arena(update GraphSignal, arena * Arena) { +type FakeClient struct { + state string + start time.Time + arena * Arena + update chan GraphSignal +} + +func NewFakeClient(arena *Arena) * FakeClient { + client := &FakeClient{ + state: "init", + start: time.Now(), + arena: arena, + update: arena.UpdateChannel(), + } + + return client +} + +func (client * FakeClient) process_update(update GraphSignal) { + arena := client.arena if update.Source() != nil { log.Printf("FAKE_CLIENT_UPDATE: %s -> %+v", update.Source().ID(), update) } else { - log.Printf("FAKE_CLIENT_UPDATE: %s -> %+v", update.Source(), update) + log.Printf("FAKE_CLIENT_UPDATE: nil -> %+v", update) } if update.Type() == "event_start" { + if client.state != "init" { + log.Printf("BAD_CLIENT_STATE: event_start when match not in init: %s %s", arena.Name(), client.state) + } + client.state = "autonomous_queued" log.Printf("FAKE_CLIENT_ACTION: Match started on %s, queuing autonomous automatically", arena.Name()) SendUpdate(arena, NewSignal(nil, "queue_autonomous")) } else if update.Type() == "autonomous_queued" { + if client.state != "autonomous_queued" { + log.Printf("BAD_CLIENT_STATE: autonomous_queued when match not in autonomous_queued: %s %s", arena.Name(), client.state) + } + client.state = "autonomous_started" log.Printf("FAKE_CLIENT_ACTION: Autonomous queued on %s for %s, starting automatically at requested time", arena.Name(), update.Time()) signal := NewSignal(nil, "start_autonomous") signal.time = update.Time() SendUpdate(arena, signal) } else if update.Type() == "autonomous_done" { + if client.state != "autonomous_started" { + log.Printf("BAD_CLIENT_STATE: autonomous_done when match not in autonomous_started: %s %s", arena.Name(), client.state) + } + client.state = "driver_queued" log.Printf("FAKE_CLIENT_ACTION: Autonomous done on %s for %s, queueing driver automatically", arena.Name(), update.Time()) signal := NewSignal(nil, "queue_driver") SendUpdate(arena, signal) } else if update.Type() == "driver_queued" { + if client.state != "driver_queued" { + log.Printf("BAD_CLIENT_STATE: driver_queued when match not in autonomous_done: %s %s", arena.Name(), client.state) + } + client.state = "driver_started" log.Printf("FAKE_CLIENT_ACTION: Driver queueud on %s for %s, starting driver automatically at requested time", arena.Name(), update.Time()) signal := NewSignal(nil, "start_driver") signal.time = update.Time() SendUpdate(arena, signal) } else if update.Type() == "driver_done" { + if client.state != "driver_started" { + log.Printf("BAD_CLIENT_STATE: driver_done when match not in driver_started: %s %s", arena.Name(), client.state) + } + client.state = "init" log.Printf("FAKE_CLIENT_ACTION: Driver done on %s for %s", arena.Name(), update.Time()) } } @@ -156,7 +195,7 @@ func process_fake_arena(update GraphSignal, arena * Arena) { func main() { go func() { time.Sleep(5 * time.Second) - if false { + if true { pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } }() @@ -164,14 +203,14 @@ func main() { event_manager, arena_1, arena_2 := fake_data() // Fake arena clients go func() { - arena_1_updates := arena_1.UpdateChannel() - arena_2_updates := arena_2.UpdateChannel() + arena_1_client := NewFakeClient(arena_1) + arena_2_client := NewFakeClient(arena_2) for true { select { - case update := <- arena_1_updates: - process_fake_arena(update, arena_1) - case update := <- arena_2_updates: - process_fake_arena(update, arena_2) + case update := <- arena_1_client.update: + arena_1_client.process_update(update) + case update := <- arena_2_client.update: + arena_2_client.process_update(update) } } }() diff --git a/vex.go b/vex.go index 283192a..26c6b05 100644 --- a/vex.go +++ b/vex.go @@ -144,6 +144,8 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match arena: arena, } + match.actions["wait"] = EventWait(match) + match.actions["start"] = func() (string, error) { log.Printf("STARTING_MATCH %s", match.Name()) match.control = "none" @@ -174,20 +176,18 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match log.Printf("AUTONOMOUS_RUNNING: %s", match.Name()) match.control = "program" match.state = "autonomous_running" - // TODO replace with typed protobuf match.control_start = signal.Time() go SendUpdate(match, NewSignal(match, "autonomous_running")) - go func(match * Match) { - control_wait := time.Until(match.control_start.Add(TEMP_AUTON_TIME)) - time.Sleep(control_wait) - SendUpdate(match, NewSignal(match, "autonomous_done")) - }(match) + + end_time := match.control_start.Add(TEMP_AUTON_TIME) + match.SetTimeout(end_time, "autonomous_done") + log.Printf("AUTONOMOUS_END_TIME: %s %+v", end_time, match.timeout) return "wait", nil } match.handlers["autonomous_done"] = func(signal GraphSignal) (string, error) { if match.state != "autonomous_running" { - log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name()) return "wait", nil } log.Printf("AUTONOMOUS_DONE: %s", match.Name()) @@ -199,7 +199,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.handlers["queue_driver"] = func(signal GraphSignal) (string, error) { if match.state != "autonomous_done"{ - log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name()) return "wait", nil } match.control = "none" @@ -213,7 +213,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.handlers["start_driver"] = func(signal GraphSignal) (string, error) { if match.state != "driver_queued" { - log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name()) return "wait", nil } match.control = "driver" @@ -221,17 +221,16 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.control_start = signal.Time() go SendUpdate(match, NewSignal(match, "driver_running")) - go func(match * Match) { - control_wait := time.Until(match.control_start.Add(TEMP_DRIVE_TIME)) - time.Sleep(control_wait) - SendUpdate(match, NewSignal(match, "driver_done")) - }(match) + + end_time := match.control_start.Add(TEMP_DRIVE_TIME) + match.SetTimeout(end_time, "driver_done") + return "wait", nil } match.handlers["driver_done"] = func(signal GraphSignal) (string, error) { if match.state != "driver_running" { - log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) + log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name()) return "wait", nil } match.control = "none" @@ -240,5 +239,14 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match return "", nil } + match.actions["driver_done"] = func() (string, error) { + SendUpdate(match, NewSignal(match, "driver_done")) + return "wait", nil + } + match.actions["autonomous_done"] = func() (string, error) { + SendUpdate(match, NewSignal(match, "autonomous_done")) + return "wait", nil + } + return match }