Basic vex Match

graph-rework
noah metz 2023-06-03 01:38:35 -06:00
parent 0085a71142
commit f838f53dda
7 changed files with 173 additions and 35 deletions

@ -11,3 +11,6 @@ clean:
run: build run: build
${BINARY_PATH} ${BINARY_PATH}
test:
go test

@ -14,10 +14,10 @@ func (event * BaseEvent) update(signal GraphSignal) {
event.signal <- signal event.signal <- signal
if event.parent != nil && signal.Type() != "abort"{ if event.parent != nil && signal.Type() != "abort"{
event.parent.update(signal) SendUpdate(event.parent, signal)
} else if signal.Type() == "abort" { } else if signal.Type() == "abort" {
for _, child := range(event.Children()) { for _, child := range(event.Children()) {
child.update(signal) SendUpdate(child, signal)
} }
} }
} }
@ -56,7 +56,7 @@ type Event interface {
LockParent() LockParent()
UnlockParent() UnlockParent()
Action(action string) (func()(string, error), bool) Action(action string) (func()(string, error), bool)
Handler(signal_type string) (func() (string, error), bool) Handler(signal_type string) (func(GraphSignal) (string, error), bool)
RequiredResources() []Resource RequiredResources() []Resource
DoneResource() Resource DoneResource() Resource
@ -66,7 +66,7 @@ type Event interface {
setParent(parent Event) setParent(parent Event)
} }
func (event * BaseEvent) Handler(signal_type string) (func()(string, error), bool) { func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) {
handler, exists := event.handlers[signal_type] handler, exists := event.handlers[signal_type]
return handler, exists return handler, exists
} }
@ -126,12 +126,15 @@ func AddChild(event Event, child Event, info EventInfo) error {
event.UnlockChildren() event.UnlockChildren()
event.UnlockParent() event.UnlockParent()
update := NewSignal(event, "child_added")
update.description = child.Name()
SendUpdate(event, NewSignal(event, "child_added")) SendUpdate(event, NewSignal(event, "child_added"))
return nil return nil
} }
func RunEvent(event Event) error { func RunEvent(event Event) error {
log.Printf("EVENT_RUN: %s", event.Name()) log.Printf("EVENT_RUN: %s", event.Name())
go SendUpdate(event, NewSignal(event, "event_start"))
next_action := "start" next_action := "start"
var err error = nil var err error = nil
for next_action != "" { for next_action != "" {
@ -226,7 +229,7 @@ type BaseEvent struct {
child_info map[string]EventInfo child_info map[string]EventInfo
child_lock sync.Mutex child_lock sync.Mutex
actions map[string]func() (string, error) actions map[string]func() (string, error)
handlers map[string]func() (string, error) handlers map[string]func(GraphSignal) (string, error)
parent Event parent Event
parent_lock sync.Mutex parent_lock sync.Mutex
abort chan string abort chan string
@ -253,7 +256,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource
done_resource: done_resource, done_resource: done_resource,
required_resources: required_resources, required_resources: required_resources,
actions: map[string]func()(string, error){}, actions: map[string]func()(string, error){},
handlers: map[string]func()(string, error){}, handlers: map[string]func(GraphSignal)(string, error){},
abort: make(chan string, 1), abort: make(chan string, 1),
} }
@ -268,7 +271,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource
} else { } else {
signal_fn, exists := event.Handler(signal.Type()) signal_fn, exists := event.Handler(signal.Type())
if exists == true { if exists == true {
return signal_fn() return signal_fn(signal)
} }
} }
// ignore signals other than "abort" and "do_action" // ignore signals other than "abort" and "do_action"
@ -353,7 +356,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc
err := LockResources(event) err := LockResources(event)
// start in new goroutine // start in new goroutine
if err != nil { if err != nil {
//log.Printf("Failed to lock %s: %s", event.Name(), err) log.Printf("Failed to lock %s: %s", event.Name(), err)
} else { } else {
info.state = "running" info.state = "running"
log.Printf("EVENT_START: %s", event.Name()) log.Printf("EVENT_START: %s", event.Name())
@ -374,9 +377,13 @@ func NewEventQueue(name string, description string, required_resources []Resourc
for _, resource := range(needed_resources) { for _, resource := range(needed_resources) {
_, exists := queue.listened_resources[resource.ID()]
if exists == false {
log.Printf("REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
queue.listened_resources[resource.ID()] = resource queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal) resource.RegisterChannel(queue.signal)
} }
}
queue.UnlockChildren() queue.UnlockChildren()
@ -395,15 +402,19 @@ func NewEventQueue(name string, description string, required_resources []Resourc
return "queue_event", nil return "queue_event", nil
} }
queue.handlers["child_added"] = func() (string, error) { queue.handlers["arena_connected"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
queue.handlers["child_added"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil return "queue_event", nil
} }
queue.handlers["lock_change"] = func() (string, error) { queue.handlers["lock_changed"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil return "queue_event", nil
} }
queue.handlers["event_done"] = func() (string, error) { queue.handlers["event_done"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil return "queue_event", nil
} }
@ -435,7 +446,6 @@ func (event * BaseEvent) ChildInfo(idx Event) EventInfo {
} }
func (event * BaseEvent) LockChildren() { func (event * BaseEvent) LockChildren() {
log.Printf("LOCKING CHILDREN OF %s", event.Name())
event.child_lock.Lock() event.child_lock.Lock()
} }

@ -4,6 +4,7 @@ import (
"log" "log"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"time"
) )
// Generate a random graphql id // Generate a random graphql id
@ -16,12 +17,18 @@ type GraphSignal interface {
Source() GraphNode Source() GraphNode
Type() string Type() string
Description() string Description() string
Time() time.Time
} }
type BaseSignal struct { type BaseSignal struct {
source GraphNode source GraphNode
signal_type string signal_type string
description string description string
time time.Time
}
func (signal BaseSignal) Time() time.Time {
return signal.time
} }
func (signal BaseSignal) Source() GraphNode { func (signal BaseSignal) Source() GraphNode {
@ -134,7 +141,12 @@ func (node * BaseNode) update(signal GraphSignal) {
} }
func SendUpdate(node GraphNode, signal GraphSignal) { func SendUpdate(node GraphNode, signal GraphSignal) {
if signal.Source() != nil {
log.Printf("UPDATE %s -> %s: %+v", signal.Source().Name(), node.Name(), signal)
} else {
log.Printf("UPDATE %s: %+v", node.Name(), signal) log.Printf("UPDATE %s: %+v", node.Name(), signal)
}
node.UpdateListeners(signal) node.UpdateListeners(signal)
node.update(signal) node.update(signal)
} }

@ -22,8 +22,8 @@ func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) {
func (t * graph_tester) CheckForNone(listener chan GraphSignal, str string) { func (t * graph_tester) CheckForNone(listener chan GraphSignal, str string) {
timeout := time.After(listner_timeout) timeout := time.After(listner_timeout)
select { select {
case <- listener: case sig := <- listener:
t.Fatal(str) t.Fatal(fmt.Printf("%s : %+v", str, sig))
case <-timeout: case <-timeout:
} }
} }
@ -182,7 +182,7 @@ func TestLockResource(t * testing.T) {
NotifyResourceLocked(r3) NotifyResourceLocked(r3)
(*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel") (*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel")
(*graph_tester)(t).CheckForValue(rel, "No value on root_event update channel") (*graph_tester)(t).CheckForNone(rel, "Value on root_event update channel")
err = LockResource(r3, root_event) err = LockResource(r3, root_event)
if err == nil { if err == nil {
@ -211,7 +211,7 @@ func TestLockResource(t * testing.T) {
NotifyResourceUnlocked(r3) NotifyResourceUnlocked(r3)
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3") (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3")
(*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r3") (*graph_tester)(t).CheckForNone(rel, "Update on rel after unlocking r3")
err = LockResource(r4, root_event) err = LockResource(r4, root_event)
if err != nil { if err != nil {
@ -220,7 +220,7 @@ func TestLockResource(t * testing.T) {
NotifyResourceLocked(r4) NotifyResourceLocked(r4)
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4") (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4")
(*graph_tester)(t).CheckForValue(rel, "No update on rel after locking r4") (*graph_tester)(t).CheckForNone(rel, "Update on rel after locking r4")
err = UnlockResource(r4, root_event) err = UnlockResource(r4, root_event)
if err != nil { if err != nil {

@ -17,13 +17,13 @@ func (resource * BaseResource) update(signal GraphSignal) {
for _, parent := range resource.Parents() { for _, parent := range resource.Parents() {
SendUpdate(parent, signal) SendUpdate(parent, signal)
} }
}
if resource.lock_holder != nil { if resource.lock_holder != nil {
SendUpdate(resource.lock_holder, signal) SendUpdate(resource.lock_holder, signal)
} }
} }
}
// Resource is the interface that DAG nodes are made from // Resource is the interface that DAG nodes are made from
// A resource needs to be able to represent logical entities and connections to physical entities. // A resource needs to be able to represent logical entities and connections to physical entities.
// A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted // A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted
@ -115,13 +115,22 @@ func LockResource(resource Resource, event Event) error {
return errors.New(err_str) return errors.New(err_str)
} }
err := resource.lock(event)
if err != nil {
resource.UnlockState()
err_str := fmt.Sprintf("Failed to lock resource: %s", err)
return errors.New(err_str)
}
var lock_err error = nil var lock_err error = nil
locked_resources := []Resource{}
for _, child := range resource.Children() { for _, child := range resource.Children() {
err := LockResource(child, event) err := LockResource(child, event)
if err != nil{ if err != nil{
lock_err = err lock_err = err
break break
} }
locked_resources = append(locked_resources, child)
} }
if lock_err != nil { if lock_err != nil {
@ -132,11 +141,6 @@ func LockResource(resource Resource, event Event) error {
resource.SetOwner(event) resource.SetOwner(event)
err := resource.lock(event)
if err != nil {
resource.UnlockState()
return errors.New("Failed to lock resource")
}
resource.UnlockState() resource.UnlockState()
return nil return nil

@ -107,7 +107,7 @@ func (arena * Arena) Connect(abort chan error) bool {
signal := NewSignal(arena, "arena_connected") signal := NewSignal(arena, "arena_connected")
signal.description = update_str signal.description = update_str
arena.connected = true arena.connected = true
go arena.update(signal) go SendUpdate(arena, signal)
log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name()) log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name())
for true { for true {
select { select {
@ -115,7 +115,7 @@ func (arena * Arena) Connect(abort chan error) bool {
log.Printf("Virtual arena %s aborting", arena.Name()) log.Printf("Virtual arena %s aborting", arena.Name())
break break
case update := <- arena.signal: case update := <- arena.signal:
log.Printf("%s update: %s", arena.Name(), update) log.Printf("%s update: %+v", arena.Name(), update)
new_owner := arena.Owner() new_owner := arena.Owner()
if new_owner != owner { if new_owner != owner {
log.Printf("NEW_OWNER for %s", arena.Name()) log.Printf("NEW_OWNER for %s", arena.Name())
@ -143,6 +143,8 @@ func (arena * Arena) Connect(abort chan error) bool {
} }
const start_slack = 3000 * time.Millisecond const start_slack = 3000 * time.Millisecond
const TEMP_AUTON_TIME = time.Second * 3
const TEMP_DRIVE_TIME = time.Second * 5
func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match { func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match {
name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name()) name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name())
@ -162,18 +164,87 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
return "wait", nil return "wait", nil
} }
match.actions["queue_autonomous"] = func() (string, error) { match.handlers["queue_autonomous"] = func(signal GraphSignal) (string, error) {
if match.state != "scheduled" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil
}
match.control = "none" match.control = "none"
match.state = "autonomous_queued" match.state = "autonomous_queued"
match.control_start = time.Now().Add(start_slack) match.control_start = time.Now().Add(start_slack)
go SendUpdate(match, NewSignal(match, "autonomous_queued"))
return "wait", nil return "wait", nil
} }
match.actions["start_autonomous"] = func() (string, error) { match.handlers["start_autonomous"] = func(signal GraphSignal) (string, error) {
match.control = "autonomous" if match.state != "autonomous_queued" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil
}
match.control = "program"
match.state = "autonomous_running" match.state = "autonomous_running"
// TODO replace with typed protobuf
match.control_start = signal.Time()
go SendUpdate(match, NewSignal(match, "autonomous_running"))
go func(match * Match) {
control_wait := time.Until(match.control_start.Add(TEMP_AUTON_TIME))
time.Sleep(control_wait)
SendUpdate(match, NewSignal(match, "autonomous_done"))
}(match)
return "wait", nil
}
match.handlers["autonomous_done"] = func(signal GraphSignal) (string, error) {
if match.state != "autonomous_running" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil
}
match.control = "none"
match.state = "autonomous_done"
return "wait", nil
}
match.handlers["queue_driver"] = func(signal GraphSignal) (string, error) {
if match.state != "autonomous_done"{
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil
}
match.control = "none"
match.state = "driver_queued"
match.control_start = time.Now().Add(start_slack)
go SendUpdate(match, NewSignal(match, "driver_queued"))
return "wait", nil
}
match.handlers["start_driver"] = func(signal GraphSignal) (string, error) {
if match.state != "driver_queued" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil return "wait", nil
} }
match.control = "driver"
match.state = "driver_running"
match.control_start = signal.Time()
go SendUpdate(match, NewSignal(match, "driver_running"))
go func(match * Match) {
control_wait := time.Until(match.control_start.Add(TEMP_DRIVE_TIME))
time.Sleep(control_wait)
SendUpdate(match, NewSignal(match, "driver_done"))
}(match)
return "wait", nil
}
match.handlers["driver_done"] = func(signal GraphSignal) (string, error) {
if match.state != "driver_running" {
log.Printf("BAD_STATE: %s: %s", signal.Type(), match.state)
return "wait", nil
}
match.control = "none"
match.state = "driver_done"
return "", nil
}
return match return match
} }

@ -3,6 +3,8 @@ package main
import ( import (
"testing" "testing"
"fmt" "fmt"
"runtime/pprof"
"os"
"time" "time"
) )
@ -131,6 +133,7 @@ func TestNewMatch(t *testing.T) {
arena := NewVirtualArena(arena_name) arena := NewVirtualArena(arena_name)
match := NewMatch(alliance_1, alliance_2, arena) match := NewMatch(alliance_1, alliance_2, arena)
match_c := match.UpdateChannel()
root_event := NewEventQueue("root_event", "", []Resource{}) root_event := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource() r := root_event.DoneResource()
@ -138,12 +141,47 @@ func TestNewMatch(t *testing.T) {
event_manager.AddEvent(root_event, match, NewEventQueueInfo(1)) event_manager.AddEvent(root_event, match, NewEventQueueInfo(1))
go func() { go func() {
time.Sleep(time.Second * 5) time.Sleep(time.Second * 20)
if r.Owner() != nil { if r.Owner() != nil {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
AbortEvent(root_event) AbortEvent(root_event)
} }
}() }()
go func(match_c chan GraphSignal) {
(*graph_tester)(t).CheckForValue(match_c, "no update to match after starting 1")
(*graph_tester)(t).CheckForNone(match_c, "update to match after starting 2")
SendUpdate(match, NewSignal(nil, "queue_autonomous"))
(*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing autonomous 1")
(*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing autonomous 2")
(*graph_tester)(t).CheckForNone(match_c, "update to match after queueing autonomous 3")
auton_signal := NewSignal(nil, "start_autonomous")
auton_signal.time = time.Now()
SendUpdate(match, auton_signal)
(*graph_tester)(t).CheckForValue(match_c, "no update to match after starting autonomous 1")
(*graph_tester)(t).CheckForValue(match_c, "no update to match after starting autonomous 2")
(*graph_tester)(t).CheckForNone(match_c, "update to match after starting autonomous 3")
time.Sleep(TEMP_AUTON_TIME)
time.Sleep(time.Millisecond * 100)
(*graph_tester)(t).CheckForValue(match_c, "no update to match after ending autonomous 1")
(*graph_tester)(t).CheckForNone(match_c, "update to match after ending autonomous 2")
SendUpdate(match, NewSignal(nil, "queue_driver"))
(*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing driver 1")
(*graph_tester)(t).CheckForValue(match_c, "no update to match after queueing driver 2")
(*graph_tester)(t).CheckForNone(match_c, "update to match after queueing driver 3")
driver_signal := NewSignal(nil, "start_driver")
driver_signal.time = time.Now()
SendUpdate(match, driver_signal)
(*graph_tester)(t).CheckForValue(match_c, "no update to match after starting driver 1")
(*graph_tester)(t).CheckForValue(match_c, "no update to match after starting driver 2")
(*graph_tester)(t).CheckForNone(match_c, "update to match after starting driver 3")
time.Sleep(TEMP_DRIVE_TIME)
time.Sleep(time.Millisecond * 100)
(*graph_tester)(t).CheckForValue(match_c, "no update to match after game done 1")
(*graph_tester)(t).CheckForValue(match_c, "no update to match after game done 2")
(*graph_tester)(t).CheckForNone(match_c, "update to match after game done 3")
}(match_c)
err := event_manager.Run() err := event_manager.Run()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)