Moved wait action to a closure generator instead of using a non-pointer closure that didn't pickup any event changes

graph-rework
noah metz 2023-06-03 23:49:25 -06:00
parent debeed7091
commit 0f92de1619
3 changed files with 128 additions and 45 deletions

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"time"
"log" "log"
"errors" "errors"
"reflect" "reflect"
@ -66,6 +67,10 @@ type Event interface {
Handler(signal_type string) (func(GraphSignal) (string, error), bool) Handler(signal_type string) (func(GraphSignal) (string, error), bool)
RequiredResources() []Resource RequiredResources() []Resource
DoneResource() Resource DoneResource() Resource
SetTimeout(end_time time.Time, action string)
Timeout() <-chan time.Time
TimeoutAction() string
Signal() chan GraphSignal
finish() error finish() error
@ -73,6 +78,23 @@ type Event interface {
setParent(parent Event) setParent(parent Event)
} }
func (event * BaseEvent) Signal() chan GraphSignal {
return event.signal
}
func (event * BaseEvent) TimeoutAction() string {
return event.timeout_action
}
func (event * BaseEvent) Timeout() <-chan time.Time {
return event.timeout
}
func (event * BaseEvent) SetTimeout(end_time time.Time, action string) {
event.timeout_action = action
event.timeout = time.After(time.Until(end_time))
}
func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) { func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) {
handler, exists := event.handlers[signal_type] handler, exists := event.handlers[signal_type]
return handler, exists return handler, exists
@ -245,6 +267,8 @@ type BaseEvent struct {
parent Event parent Event
parent_lock sync.Mutex parent_lock sync.Mutex
abort chan string abort chan string
timeout <-chan time.Time
timeout_action string
} }
func (event * BaseEvent) Action(action string) (func() (string, error), bool) { func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
@ -252,6 +276,33 @@ func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
return action_fn, exists return action_fn, exists
} }
func EventWait(event Event) (func() (string, error)) {
return func() (string, error) {
log.Printf("EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout())
select {
case signal := <- event.Signal():
if signal.Source() != nil {
log.Printf("EVENT_SIGNAL: %s %s %s -> %+v", event.Name(), signal.Last(), signal.Source().Name(), signal)
} else {
log.Printf("EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal)
}
if signal.Type() == "abort" {
return "", errors.New("State machine aborted by signal")
} else {
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
log.Printf("EVENT_HANDLER: %s - %s", event.Name(), signal.Type())
return signal_fn(signal)
}
}
return "wait", nil
case <- event.Timeout():
log.Printf("EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction())
return event.TimeoutAction(), nil
}
}
}
func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{}) done_resource := NewResource("event_done", "signal that event is done", []Resource{})
event := BaseEvent{ event := BaseEvent{
@ -264,31 +315,12 @@ func NewBaseEvent(name string, description string, required_resources []Resource
actions: map[string]func()(string, error){}, actions: map[string]func()(string, error){},
handlers: map[string]func(GraphSignal)(string, error){}, handlers: map[string]func(GraphSignal)(string, error){},
abort: make(chan string, 1), abort: make(chan string, 1),
timeout: nil,
timeout_action: "",
} }
LockResource(event.done_resource, &event) LockResource(event.done_resource, &event)
event.actions["wait"] = func() (string, error) {
signal := <- event.signal
if signal.Type() == "abort" {
return "", errors.New("State machine aborted by signal")
} else if signal.Type() == "do_action" {
return signal.Description(), nil
} else {
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
log.Printf("EVENT_HANDLER: %s - %s", event.name, signal.Type())
if signal.Source() != nil {
log.Printf("SIGNAL: %s %s -> %+v", signal.Last(), signal.Source().Name(), signal)
} else {
log.Printf("SIGNAL: %s nil -> %+v", signal.Last(), signal)
}
return signal_fn(signal)
}
}
return "wait", nil
}
return event return event
} }
@ -296,6 +328,8 @@ func NewEvent(name string, description string, required_resources []Resource) (*
event := NewBaseEvent(name, description, required_resources) event := NewBaseEvent(name, description, required_resources)
event_ptr := &event event_ptr := &event
event_ptr.actions["wait"] = EventWait(event_ptr)
event_ptr.actions["start"] = func() (string, error) { event_ptr.actions["start"] = func() (string, error) {
return "", nil return "", nil
} }
@ -336,6 +370,8 @@ func NewEventQueue(name string, description string, required_resources []Resourc
listened_resources: map[string]Resource{}, listened_resources: map[string]Resource{},
} }
queue.actions["wait"] = EventWait(queue)
queue.actions["start"] = func() (string, error) { queue.actions["start"] = func() (string, error) {
return "queue_event", nil return "queue_event", nil
} }

@ -125,30 +125,69 @@ func fake_data() (* EventManager, *Arena, *Arena) {
return event_manager, arenas[0], arenas[1] return event_manager, arenas[0], arenas[1]
} }
func process_fake_arena(update GraphSignal, arena * Arena) { type FakeClient struct {
state string
start time.Time
arena * Arena
update chan GraphSignal
}
func NewFakeClient(arena *Arena) * FakeClient {
client := &FakeClient{
state: "init",
start: time.Now(),
arena: arena,
update: arena.UpdateChannel(),
}
return client
}
func (client * FakeClient) process_update(update GraphSignal) {
arena := client.arena
if update.Source() != nil { if update.Source() != nil {
log.Printf("FAKE_CLIENT_UPDATE: %s -> %+v", update.Source().ID(), update) log.Printf("FAKE_CLIENT_UPDATE: %s -> %+v", update.Source().ID(), update)
} else { } else {
log.Printf("FAKE_CLIENT_UPDATE: %s -> %+v", update.Source(), update) log.Printf("FAKE_CLIENT_UPDATE: nil -> %+v", update)
} }
if update.Type() == "event_start" { if update.Type() == "event_start" {
if client.state != "init" {
log.Printf("BAD_CLIENT_STATE: event_start when match not in init: %s %s", arena.Name(), client.state)
}
client.state = "autonomous_queued"
log.Printf("FAKE_CLIENT_ACTION: Match started on %s, queuing autonomous automatically", arena.Name()) log.Printf("FAKE_CLIENT_ACTION: Match started on %s, queuing autonomous automatically", arena.Name())
SendUpdate(arena, NewSignal(nil, "queue_autonomous")) SendUpdate(arena, NewSignal(nil, "queue_autonomous"))
} else if update.Type() == "autonomous_queued" { } else if update.Type() == "autonomous_queued" {
if client.state != "autonomous_queued" {
log.Printf("BAD_CLIENT_STATE: autonomous_queued when match not in autonomous_queued: %s %s", arena.Name(), client.state)
}
client.state = "autonomous_started"
log.Printf("FAKE_CLIENT_ACTION: Autonomous queued on %s for %s, starting automatically at requested time", arena.Name(), update.Time()) log.Printf("FAKE_CLIENT_ACTION: Autonomous queued on %s for %s, starting automatically at requested time", arena.Name(), update.Time())
signal := NewSignal(nil, "start_autonomous") signal := NewSignal(nil, "start_autonomous")
signal.time = update.Time() signal.time = update.Time()
SendUpdate(arena, signal) SendUpdate(arena, signal)
} else if update.Type() == "autonomous_done" { } else if update.Type() == "autonomous_done" {
if client.state != "autonomous_started" {
log.Printf("BAD_CLIENT_STATE: autonomous_done when match not in autonomous_started: %s %s", arena.Name(), client.state)
}
client.state = "driver_queued"
log.Printf("FAKE_CLIENT_ACTION: Autonomous done on %s for %s, queueing driver automatically", arena.Name(), update.Time()) log.Printf("FAKE_CLIENT_ACTION: Autonomous done on %s for %s, queueing driver automatically", arena.Name(), update.Time())
signal := NewSignal(nil, "queue_driver") signal := NewSignal(nil, "queue_driver")
SendUpdate(arena, signal) SendUpdate(arena, signal)
} else if update.Type() == "driver_queued" { } else if update.Type() == "driver_queued" {
if client.state != "driver_queued" {
log.Printf("BAD_CLIENT_STATE: driver_queued when match not in autonomous_done: %s %s", arena.Name(), client.state)
}
client.state = "driver_started"
log.Printf("FAKE_CLIENT_ACTION: Driver queueud on %s for %s, starting driver automatically at requested time", arena.Name(), update.Time()) log.Printf("FAKE_CLIENT_ACTION: Driver queueud on %s for %s, starting driver automatically at requested time", arena.Name(), update.Time())
signal := NewSignal(nil, "start_driver") signal := NewSignal(nil, "start_driver")
signal.time = update.Time() signal.time = update.Time()
SendUpdate(arena, signal) SendUpdate(arena, signal)
} else if update.Type() == "driver_done" { } else if update.Type() == "driver_done" {
if client.state != "driver_started" {
log.Printf("BAD_CLIENT_STATE: driver_done when match not in driver_started: %s %s", arena.Name(), client.state)
}
client.state = "init"
log.Printf("FAKE_CLIENT_ACTION: Driver done on %s for %s", arena.Name(), update.Time()) log.Printf("FAKE_CLIENT_ACTION: Driver done on %s for %s", arena.Name(), update.Time())
} }
} }
@ -156,7 +195,7 @@ func process_fake_arena(update GraphSignal, arena * Arena) {
func main() { func main() {
go func() { go func() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
if false { if true {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
} }
}() }()
@ -164,14 +203,14 @@ func main() {
event_manager, arena_1, arena_2 := fake_data() event_manager, arena_1, arena_2 := fake_data()
// Fake arena clients // Fake arena clients
go func() { go func() {
arena_1_updates := arena_1.UpdateChannel() arena_1_client := NewFakeClient(arena_1)
arena_2_updates := arena_2.UpdateChannel() arena_2_client := NewFakeClient(arena_2)
for true { for true {
select { select {
case update := <- arena_1_updates: case update := <- arena_1_client.update:
process_fake_arena(update, arena_1) arena_1_client.process_update(update)
case update := <- arena_2_updates: case update := <- arena_2_client.update:
process_fake_arena(update, arena_2) arena_2_client.process_update(update)
} }
} }
}() }()

@ -144,6 +144,8 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
arena: arena, arena: arena,
} }
match.actions["wait"] = EventWait(match)
match.actions["start"] = func() (string, error) { match.actions["start"] = func() (string, error) {
log.Printf("STARTING_MATCH %s", match.Name()) log.Printf("STARTING_MATCH %s", match.Name())
match.control = "none" match.control = "none"
@ -174,20 +176,18 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
log.Printf("AUTONOMOUS_RUNNING: %s", match.Name()) log.Printf("AUTONOMOUS_RUNNING: %s", match.Name())
match.control = "program" match.control = "program"
match.state = "autonomous_running" match.state = "autonomous_running"
// TODO replace with typed protobuf
match.control_start = signal.Time() match.control_start = signal.Time()
go SendUpdate(match, NewSignal(match, "autonomous_running")) go SendUpdate(match, NewSignal(match, "autonomous_running"))
go func(match * Match) {
control_wait := time.Until(match.control_start.Add(TEMP_AUTON_TIME)) end_time := match.control_start.Add(TEMP_AUTON_TIME)
time.Sleep(control_wait) match.SetTimeout(end_time, "autonomous_done")
SendUpdate(match, NewSignal(match, "autonomous_done")) log.Printf("AUTONOMOUS_END_TIME: %s %+v", end_time, match.timeout)
}(match)
return "wait", nil return "wait", nil
} }
match.handlers["autonomous_done"] = func(signal GraphSignal) (string, error) { match.handlers["autonomous_done"] = func(signal GraphSignal) (string, error) {
if match.state != "autonomous_running" { if match.state != "autonomous_running" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name())
return "wait", nil return "wait", nil
} }
log.Printf("AUTONOMOUS_DONE: %s", match.Name()) log.Printf("AUTONOMOUS_DONE: %s", match.Name())
@ -199,7 +199,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
match.handlers["queue_driver"] = func(signal GraphSignal) (string, error) { match.handlers["queue_driver"] = func(signal GraphSignal) (string, error) {
if match.state != "autonomous_done"{ if match.state != "autonomous_done"{
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name())
return "wait", nil return "wait", nil
} }
match.control = "none" match.control = "none"
@ -213,7 +213,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
match.handlers["start_driver"] = func(signal GraphSignal) (string, error) { match.handlers["start_driver"] = func(signal GraphSignal) (string, error) {
if match.state != "driver_queued" { if match.state != "driver_queued" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name())
return "wait", nil return "wait", nil
} }
match.control = "driver" match.control = "driver"
@ -221,17 +221,16 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
match.control_start = signal.Time() match.control_start = signal.Time()
go SendUpdate(match, NewSignal(match, "driver_running")) go SendUpdate(match, NewSignal(match, "driver_running"))
go func(match * Match) {
control_wait := time.Until(match.control_start.Add(TEMP_DRIVE_TIME)) end_time := match.control_start.Add(TEMP_DRIVE_TIME)
time.Sleep(control_wait) match.SetTimeout(end_time, "driver_done")
SendUpdate(match, NewSignal(match, "driver_done"))
}(match)
return "wait", nil return "wait", nil
} }
match.handlers["driver_done"] = func(signal GraphSignal) (string, error) { match.handlers["driver_done"] = func(signal GraphSignal) (string, error) {
if match.state != "driver_running" { if match.state != "driver_running" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state) log.Printf("BAD_STATE: %s: %s - %s", signal.Type(), match.state, match.Name())
return "wait", nil return "wait", nil
} }
match.control = "none" match.control = "none"
@ -240,5 +239,14 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
return "", nil return "", nil
} }
match.actions["driver_done"] = func() (string, error) {
SendUpdate(match, NewSignal(match, "driver_done"))
return "wait", nil
}
match.actions["autonomous_done"] = func() (string, error) {
SendUpdate(match, NewSignal(match, "autonomous_done"))
return "wait", nil
}
return match return match
} }