From 990b93757f08bcfdcdfade5aec08c6668a0dc989 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Thu, 1 Jun 2023 13:11:32 -0600 Subject: [PATCH] Added virtual arena that responds to signals on a channel, cleaned up log messages, and fixed update() hierarchy --- event.go | 100 ++++++++++++++++++++++++++++++++++-------- graph.go | 23 +++++----- main.go | 47 ++++++++++++-------- manager.go | 25 ++++++++++- resource.go | 62 +++++++++++++++++--------- vex.go | 124 +++++++++++++++++++++++++++++++++++++++++----------- 6 files changed, 287 insertions(+), 94 deletions(-) diff --git a/event.go b/event.go index 787aea6..da676cf 100644 --- a/event.go +++ b/event.go @@ -6,17 +6,19 @@ import ( "errors" "reflect" "sort" + "sync" ) // Update the events listeners, and notify the parent to do the same -func (event * BaseEvent) Update() error { - err := event.UpdateListeners() +func (event * BaseEvent) Update(reason string) error { + log.Printf("UPDATE BaseEvent %s: %s", event.Name(), reason) + err := event.UpdateListeners(reason) if err != nil { return err } if event.parent != nil{ - return event.parent.Update() + return event.parent.Update("update parent") } return nil } @@ -79,8 +81,19 @@ type BaseEvent struct { abort chan string } +func (queue * EventQueue) Abort() error { + for _, event := range(queue.children) { + event.Abort() + } + for _, c := range(queue.resource_aborts) { + c <- "event abort" + } + queue.signal <- "abort" + return nil +} + func (event * BaseEvent) Abort() error { - for _, event := range(event.Children()) { + for _, event := range(event.children) { event.Abort() } event.signal <- "abort" @@ -94,21 +107,28 @@ func (event * BaseEvent) Signal(action string) error { func (event * BaseEvent) LockResources() error { locked_resources := []Resource{} - lock_err := false + var lock_err error = nil for _, resource := range(event.RequiredResources()) { err := resource.Lock(event) if err != nil { - lock_err = true + lock_err = err + break } locked_resources = append(locked_resources, resource) } - if lock_err == true { + if lock_err != nil { for _, resource := range(locked_resources) { resource.Unlock(event) } - return errors.New("failed to lock required resources") + return lock_err + } else { + for _, resource := range(locked_resources) { + log.Printf("NOTIFYING %s that it's locked", resource.Name()) + resource.NotifyLocked() + } } + return nil } @@ -118,6 +138,7 @@ func (event * BaseEvent) Finish() error { if err != nil { panic(err) } + resource.Update("unlocking after event finish") } return event.DoneResource().Unlock(event) } @@ -131,6 +152,7 @@ func (event * BaseEvent) Run() error { var err error = nil for next_action != "" { // Check if the edge exists + cur_action := next_action action, exists := event.actions[next_action] if exists == false { error_str := fmt.Sprintf("%s is not a valid action", next_action) @@ -154,7 +176,8 @@ func (event * BaseEvent) Run() error { } // Update the event after running the edge - event.Update() + update_str := fmt.Sprintf("ACTION %s: NEXT %s", cur_action, next_action) + event.Update(update_str) } err = event.DoneResource().Unlock(event) @@ -168,6 +191,8 @@ func (event * BaseEvent) Run() error { // On start, it attempts to start it's children from the highest 'priority' type EventQueue struct { BaseEvent + resource_aborts map[string]chan string + resource_lock sync.Mutex } func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { @@ -177,7 +202,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource name: name, description: description, id: randid(), - listeners: []chan error{}, + listeners: []chan string{}, }, parent: nil, children: []Event{}, @@ -209,18 +234,17 @@ func NewEvent(name string, description string, required_resources []Resource) (* func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { queue := &EventQueue{ BaseEvent: NewBaseEvent(name, description, []Resource{}), + resource_aborts: map[string]chan string{}, } // Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent queue.LockDone() queue.actions["start"] = func() (string, error) { - log.Printf("Starting Event Queue") return "queue_event", nil } queue.actions["queue_event"] = func() (string, error) { - log.Printf("Queueing events") // Copy the events to sort the list copied_events := make([]Event, len(queue.Children())) copy(copied_events, queue.Children()) @@ -233,6 +257,43 @@ func NewEventQueue(name string, description string, required_resources []Resourc wait := false for _, event := range(copied_events) { + // Update the resource_chans + for _, resource := range(event.RequiredResources()) { + queue.resource_lock.Lock() + _, exists := queue.resource_aborts[resource.ID()] + if exists == false { + log.Printf("RESOURCE_LISTENER_START: %s", resource.Name()) + abort := make(chan string, 1) + queue.resource_aborts[resource.ID()] = abort + go func(queue *EventQueue, resource Resource, abort chan string) { + log.Printf("RESOURCE_LISTENER_GOROUTINE: %s", resource.Name()) + resource_chan := resource.UpdateChannel() + for true { + select { + case <- abort: + queue.resource_lock.Lock() + delete(queue.resource_aborts, resource.ID()) + queue.resource_lock.Unlock() + log.Printf("RESORCE_LISTENER_ABORT: %s", resource.Name()) + break + case msg, ok := <- resource_chan: + if ok == false { + queue.resource_lock.Lock() + delete(queue.resource_aborts, resource.ID()) + queue.resource_lock.Unlock() + log.Printf("RESOURCE_LISTENER_CLOSED: %s : %s", resource.Name(), msg) + break + } + log.Printf("RESOURCE_LISTENER_UPDATED: %s : %s", resource.Name(), msg) + queue.signal <- "resource_update" + } + } + log.Printf("RESOURCE_LISTENER_DYING: %s", resource.Name()) + }(queue, resource, abort) + } + queue.resource_lock.Unlock() + } + info := queue.ChildInfo(event).(*EventQueueInfo) if info.state == "queued" { wait = true @@ -240,11 +301,15 @@ func NewEventQueue(name string, description string, required_resources []Resourc err := event.LockResources() // start in new goroutine if err != nil { - } else { info.state = "running" + log.Printf("EVENT_START: %s", event.Name()) go func(event Event, info * EventQueueInfo, queue Event) { - event.Run() + log.Printf("EVENT_GOROUTINE: %s", event.Name()) + err := event.Run() + if err != nil { + log.Printf("EVENT_ERROR: %s", err) + } info.state = "done" event.Finish() queue.Signal("event_done") @@ -263,17 +328,14 @@ func NewEventQueue(name string, description string, required_resources []Resourc } queue.actions["event_done"] = func() (string, error) { - log.Printf("event_done") return "queue_event", nil } - queue.actions["resource_available"] = func() (string, error) { - log.Printf("resources_available") + queue.actions["resource_update"] = func() (string, error) { return "queue_event", nil } queue.actions["event_added"] = func() (string, error) { - log.Printf("event_added") return "queue_event", nil } @@ -347,7 +409,7 @@ func (event * BaseEvent) addChild(child Event, info EventInfo) error { event.children = append(event.children, child) event.child_info[child] = info - event.Update() + event.Update("child added") return nil } diff --git a/graph.go b/graph.go index 3f5b30b..66eca90 100644 --- a/graph.go +++ b/graph.go @@ -2,6 +2,7 @@ package main import ( "errors" + "log" "sync" "github.com/google/uuid" ) @@ -17,9 +18,9 @@ type GraphNode interface { Name() string Description() string ID() string - UpdateListeners() error - UpdateChannel() chan error - Update() error + UpdateListeners(info string) error + UpdateChannel() chan string + Update(reason string) error } // BaseNode is the most basic implementation of the GraphNode interface @@ -28,7 +29,7 @@ type BaseNode struct { name string description string id string - listeners []chan error + listeners []chan string listeners_lock sync.Mutex } @@ -45,8 +46,9 @@ func (node * BaseNode) ID() string { } // Create a new listener channel for the node, add it to the nodes listener list, and return the new channel -func (node * BaseNode) UpdateChannel() chan error { - new_listener := make(chan error, 1) +const listener_buffer = 10 +func (node * BaseNode) UpdateChannel() chan string{ + new_listener := make(chan string, listener_buffer) node.listeners_lock.Lock() node.listeners = append(node.listeners, new_listener) node.listeners_lock.Unlock() @@ -54,7 +56,7 @@ func (node * BaseNode) UpdateChannel() chan error { } // Send the update to listener channels -func (node * BaseNode) UpdateListeners() error { +func (node * BaseNode) UpdateListeners(info string) error { closed_listeners := []int{} listeners_closed := false @@ -63,7 +65,7 @@ func (node * BaseNode) UpdateListeners() error { node.listeners_lock.Lock() for i, listener := range node.listeners { select { - case listener <- nil: + case listener <- info: default: close(listener) closed_listeners = append(closed_listeners, i) @@ -74,7 +76,7 @@ func (node * BaseNode) UpdateListeners() error { // If any listeners have been closed, loop over the listeners // Add listeners to the "remaining" list if i insn't in closed_listeners if listeners_closed == true { - remaining_listeners := []chan error{} + remaining_listeners := []chan string{} for i, listener := range node.listeners { listener_closed := false for _, index := range closed_listeners { @@ -96,6 +98,7 @@ func (node * BaseNode) UpdateListeners() error { } // Basic implementation must be overwritten to do anything useful -func (node * BaseNode) Update() error { +func (node * BaseNode) Update(reason string) error { + log.Printf("UPDATE: BaseNode %s: %s", node.Name(), reason) return errors.New("Cannot Update a BaseNode") } diff --git a/main.go b/main.go index 4ac1b40..7d65336 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "math/rand" ) func fake_team(org string, id string, names []string) (*Team, []*Member) { @@ -57,10 +58,14 @@ func fake_data() * EventManager { resources = append(resources, m12[0]) alliances := []*Alliance{} - for i, team := range teams[:len(teams)-1] { - for _, team2 := range teams[i+1:] { - alliance := NewAlliance(team, team2) - alliances = append(alliances, alliance) + for i, team := range(teams){ + for true { + idx := rand.Intn(len(teams)) + if idx != i { + alliance := NewAlliance(team, teams[idx]) + alliances = append(alliances, alliance) + break + } } } @@ -81,27 +86,33 @@ func fake_data() * EventManager { resources = append(resources, alliance) } - - root_event := NewEventQueue("root_event", "", []Resource{}) event_manager := NewEventManager(root_event, resources) arena_idx := 0 - for i, alliance := range alliances[:len(alliances)-1] { - for _, alliance2 := range alliances[i+1:] { - match := NewMatch(alliance, alliance2, arenas[arena_idx]) - err := event_manager.AddEvent(root_event, match, NewEventQueueInfo(i)) - if err != nil { - log.Printf("Error adding %s: %s", match.Name(), err) - } - arena_idx += 1 - if arena_idx >= len(arenas) { - arena_idx = 0 + for i := 0; i < len(alliances)*5; i++ { + alliance := alliances[i % len(alliances)] + for true { + idx := rand.Intn(len(alliances)) + if idx != i { + alliance2 := alliances[idx] + if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] { + } else { + match := NewMatch(alliance, alliance2, arenas[arena_idx]) + log.Printf("Adding %s", match.Name()) + err := event_manager.AddEvent(root_event, match, NewEventQueueInfo(i)) + if err != nil { + log.Printf("Error adding %s: %s", match.Name(), err) + } + arena_idx += 1 + if arena_idx >= len(arenas) { + arena_idx = 0 + } + } + break } } } - - return event_manager } diff --git a/manager.go b/manager.go index 31f1527..636feb9 100644 --- a/manager.go +++ b/manager.go @@ -36,8 +36,31 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager { return manager; } +// Connect to all resources(in a thread to handle reconnections), and start the first event func (manager * EventManager) Run() error { - return manager.root_event.Run() + aborts := []chan error{} + for _, resource := range(manager.dag_nodes) { + abort := make(chan error, 1) + abort_used := resource.Connect(abort) + if abort_used == true { + aborts = append(aborts, abort) + } + } + + abort := make(chan error, 1) + go func(abort chan error, aborts []chan error) { + <- abort + for _, c := range(aborts) { + c <- nil + } + }(abort, aborts) + err := manager.root_event.Run() + abort <- nil + if err != nil { + return err + } + + return nil } func (manager * EventManager) FindResource(id string) Resource { diff --git a/resource.go b/resource.go index 8f21ce3..48743e8 100644 --- a/resource.go +++ b/resource.go @@ -4,39 +4,42 @@ import ( "fmt" "errors" "sync" + "log" ) // Resources propagate update up to multiple parents, and not downwards // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) -func (resource * BaseResource) Update() error { - err := resource.UpdateListeners() +func (resource * BaseResource) Update(reason string) error { + log.Printf("UPDATE BaseResource %s: %s", resource.Name(), reason) + err := resource.UpdateListeners(reason) if err != nil { return err } for _, parent := range resource.Parents() { - err := parent.Update() + err := parent.Update("update parents") if err != nil { return err } } - if resource.lock_holder != nil { - resource.lock_holder.Update() - } - return nil } // Resource is the interface that DAG nodes are made from +// A resource needs to be able to represent logical entities and connections to physical entities. +// A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted +// The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background) type Resource interface { GraphNode AddParent(parent Resource) error Children() []Resource Parents() []Resource Lock(event Event) error + NotifyLocked() error Unlock(event Event) error Owner() Event + Connect(abort chan error) bool } // BaseResource is the most basic resource that can exist in the DAG @@ -49,19 +52,43 @@ type BaseResource struct { state_lock sync.Mutex } +func (resource * BaseResource) Connect(abort chan error) bool { + return false +} + func (resource * BaseResource) Owner() Event { return resource.lock_holder } +func (resource * BaseResource) NotifyLocked() error { + err := resource.Update("finalize_lock") + if err != nil { + return err + } + + for _, child := range(resource.children) { + err = child.NotifyLocked() + if err != nil { + return err + } + } + + return nil +} + // Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children // When the bottom of a tree is reached(no more children) go back up and set the lock state func (resource * BaseResource) Lock(event Event) error { + return resource.lock(event) +} + +func (resource * BaseResource) lock(event Event) error { var err error = nil - locked := false resource.state_lock.Lock() if resource.lock_holder != nil { - err = errors.New("Resource already locked") + err_str := fmt.Sprintf("Resource already locked: %s", resource.Name()) + err = errors.New(err_str) } else { all_children_locked := true for _, child := range resource.Children() { @@ -73,15 +100,10 @@ func (resource * BaseResource) Lock(event Event) error { } if all_children_locked == true { resource.lock_holder = event - locked = true } } resource.state_lock.Unlock() - if locked == true { - resource.Update() - } - return err } @@ -89,7 +111,7 @@ func (resource * BaseResource) Lock(event Event) error { // If the child isn't locked by the unlocker func (resource * BaseResource) Unlock(event Event) error { var err error = nil - unlocked := false + //unlocked := false resource.state_lock.Lock() if resource.lock_holder == nil { @@ -107,14 +129,14 @@ func (resource * BaseResource) Unlock(event Event) error { } if all_children_unlocked == true{ resource.lock_holder = nil - unlocked = true + //unlocked = true } } resource.state_lock.Unlock() - if unlocked == true { - resource.Update() - } + /*if unlocked == true { + resource.Update("unlocking resource") + }*/ return err } @@ -154,7 +176,7 @@ func NewResource(name string, description string, children []Resource) * BaseRes name: name, description: description, id: randid(), - listeners: []chan error{}, + listeners: []chan string{}, }, parents: []Resource{}, children: children, diff --git a/vex.go b/vex.go index bbde665..6617494 100644 --- a/vex.go +++ b/vex.go @@ -4,29 +4,9 @@ import ( "fmt" "log" "time" + "errors" ) -type Arena struct { - BaseResource -} - -func NewVirtualArena(name string) * Arena { - arena := &Arena{ - BaseResource: BaseResource{ - BaseNode: BaseNode{ - name: name, - description: "A virtual vex arena", - id: randid(), - listeners: []chan error{}, - }, - parents: []Resource{}, - children: []Resource{}, - }, - } - - return arena -} - type Member struct { BaseResource } @@ -38,7 +18,7 @@ func NewMember(name string) * Member { name: name, description: "A Team Member", id: randid(), - listeners: []chan error{}, + listeners: []chan string{}, }, parents: []Resource{}, children: []Resource{}, @@ -72,7 +52,7 @@ func NewTeam(org string, team string, members []*Member) * Team { name: name, description: description, id: randid(), - listeners: []chan error{}, + listeners: []chan string{}, }, parents: []Resource{}, children: make([]Resource, len(members)), @@ -100,7 +80,7 @@ func NewAlliance(team0 * Team, team1 * Team) * Alliance { name: name, description: description, id: randid(), - listeners: []chan error{}, + listeners: []chan string{}, }, parents: []Resource{}, children: []Resource{team0, team1}, @@ -116,10 +96,99 @@ type Match struct { control_start time.Time } +type Arena struct { + BaseResource + connected bool +} + +func NewVirtualArena(name string) * Arena { + arena := &Arena{ + BaseResource: BaseResource{ + BaseNode: BaseNode{ + name: name, + description: "A virtual vex arena", + id: randid(), + listeners: []chan string{}, + }, + parents: []Resource{}, + children: []Resource{}, + }, + connected: false, + } + + return arena +} + +func (arena * Arena) Lock(event Event) error { + if arena.connected == false { + log.Printf("ARENA NOT CONNECTED: %s", arena.Name()) + error_str := fmt.Sprintf("%s is not connected, cannot lock", arena.Name()) + return errors.New(error_str) + } + return arena.lock(event) +} + +func (arena * Arena) Connect(abort chan error) bool { + log.Printf("Connecting %s", arena.Name()) + go func(arena * Arena, abort chan error) { + update_channel := arena.UpdateChannel() + owner := arena.Owner() + var owner_channel chan string = nil + if owner != nil { + owner_channel = owner.UpdateChannel() + } + arena.connected = true + update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name()) + arena.Update(update_str) + log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name()) + for true { + select { + case <- abort: + log.Printf("Virtual arena %s aborting", arena.Name()) + break + case update, ok := <- update_channel: + if !ok { + panic("own update_channel closed") + } + log.Printf("%s update: %s", arena.Name(), update) + new_owner := arena.Owner() + if new_owner != owner { + log.Printf("NEW_OWNER for %s", arena.Name()) + if new_owner != nil { + log.Printf("new: %s", new_owner.Name()) + } else { + log.Printf("new: nil") + } + + if owner != nil { + log.Printf("old: %s", owner.Name()) + } else { + log.Printf("old: nil") + } + + owner = new_owner + if owner != nil { + owner_channel = owner.UpdateChannel() + } else { + owner_channel = nil + } + } + case update, ok := <- owner_channel: + if !ok { + panic("owner update channel closed") + } + log.Printf("%s owner update: %s", arena.Name(), update) + log.Printf("owner: %s", owner.Name()) + } + } + }(arena, abort) + return true +} + const start_slack = 3000 * time.Millisecond func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match { - name := fmt.Sprintf("Match: %s vs. %s", alliance0.Name(), alliance1.Name() ) + name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name()) description := "A vex match" match := &Match{ @@ -131,13 +200,15 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match match.LockDone() match.actions["start"] = func() (string, error) { - log.Printf("Starting match") + log.Printf("Starting match %s", match.Name()) + log.Printf("%s", match.RequiredResources()[2].Owner().Name()) match.control = "none" match.state = "scheduled" return "wait", nil } match.actions["queue_autonomous"] = func() (string, error) { + log.Printf("queue_autonomous") match.control = "none" match.state = "autonomous_queued" match.control_start = time.Now().Add(start_slack) @@ -145,6 +216,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match } match.actions["start_autonomous"] = func() (string, error) { + log.Printf("start_autonomous") match.control = "autonomous" match.state = "autonomous_running" return "wait", nil