Moved class methods to package methods. Updated graph once again

graph-rework
noah metz 2023-06-02 17:31:29 -06:00
parent 55d977e9b8
commit 100a5f8f2e
8 changed files with 474 additions and 387 deletions

@ -6,20 +6,20 @@ import (
"errors"
"reflect"
"sort"
"sync"
)
// Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) Update(signal GraphSignal) error {
log.Printf("UPDATE BaseEvent %s: %+v", event.Name(), signal)
func (event * BaseEvent) update(signal GraphSignal) {
event.signal <- signal
event.BaseNode.Update(signal)
if event.parent != nil{
return event.parent.Update(signal)
if event.parent != nil && signal.Type() != "abort"{
event.parent.update(signal)
} else if signal.Type() == "abort" {
for _, child := range(event.Children()) {
child.update(signal)
}
}
return nil
}
type EventInfo interface {
@ -48,50 +48,122 @@ func NewEventQueueInfo(priority int) * EventQueueInfo {
type Event interface {
GraphNode
Children() []Event
LockChildren()
UnlockChildren()
InfoType() reflect.Type
ChildInfo(event Event) EventInfo
Parent() Event
RegisterParent(parent Event) error
LockParent()
UnlockParent()
Action(action string) (func()(string, error), bool)
Handler(signal_type string) (func() (string, error), bool)
RequiredResources() []Resource
DoneResource() Resource
AddChild(child Event, info EventInfo) error
FindChild(id string) Event
Run() error
Abort() error
LockResources() error
Finish() error
finish() error
addChild(child Event, info EventInfo)
setParent(parent Event)
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
done_resource Resource
required_resources []Resource
children []Event
child_info map[Event]EventInfo
actions map[string]func() (string, error)
handlers map[string]func() (string, error)
parent Event
abort chan string
func (event * BaseEvent) Handler(signal_type string) (func()(string, error), bool) {
handler, exists := event.handlers[signal_type]
return handler, exists
}
func FindChild(event Event, id string) Event {
if id == event.ID() {
return event
}
func (event * BaseEvent) Abort() error {
for _, event := range(event.children) {
event.Abort()
for _, child := range event.Children() {
result := FindChild(child, id)
if result != nil {
return result
}
event.signal <- NewSignal(event, "abort")
}
return nil
}
func (event * BaseEvent) LockResources() error {
func CheckInfoType(event Event, info EventInfo) bool {
if event.InfoType() == nil || info == nil {
if event.InfoType() == nil && info == nil {
return true
} else {
return false
}
}
return event.InfoType() == reflect.TypeOf(info)
}
func AddChild(event Event, child Event, info EventInfo) error {
if CheckInfoType(event, info) == false {
return errors.New("AddChild got wrong type")
}
event.LockParent()
if event.Parent() != nil {
event.UnlockParent()
return errors.New("Parent already registered")
}
event.LockChildren()
for _, c := range(event.Children()) {
if c.ID() == child.ID() {
event.UnlockChildren()
event.UnlockParent()
return errors.New("Child already in event")
}
}
// After all the checks are done, update the state of child + parent, then unlock and update
child.setParent(event)
event.addChild(child, info)
event.UnlockChildren()
event.UnlockParent()
SendUpdate(event, NewSignal(event, "child_added"))
return nil
}
func RunEvent(event Event) error {
log.Printf("EVENT_RUN: %s", event.Name())
next_action := "start"
var err error = nil
for next_action != "" {
action, exists := event.Action(next_action)
if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action)
return errors.New(error_str)
}
log.Printf("EVENT_ACTION: %s - %s", event.Name(), next_action)
next_action, err = action()
if err != nil {
return err
}
}
log.Printf("EVENT_RUN_DONE: %s", event.Name())
return nil
}
func AbortEvent(event Event) error {
signal := NewSignal(event, "abort")
SendUpdate(event, signal)
return nil
}
func LockResources(event Event) error {
locked_resources := []Resource{}
var lock_err error = nil
for _, resource := range(event.RequiredResources()) {
err := resource.Lock(event)
err := LockResource(resource, event)
if err != nil {
lock_err = err
break
@ -101,67 +173,68 @@ func (event * BaseEvent) LockResources() error {
if lock_err != nil {
for _, resource := range(locked_resources) {
resource.Unlock(event)
UnlockResource(resource, event)
}
return lock_err
} else {
for _, resource := range(locked_resources) {
log.Printf("NOTIFYING %s that it's locked", resource.Name())
resource.NotifyLocked()
}
for _, resource := range(locked_resources) {
NotifyResourceLocked(resource)
}
return nil
}
func (event * BaseEvent) Finish() error {
func FinishEvent(event Event) error {
// TODO make more 'safe' like LockResources, or make UnlockResource not return errors
log.Printf("EVENT_FINISH: %s", event.Name())
for _, resource := range(event.RequiredResources()) {
err := resource.Unlock(event)
err := UnlockResource(resource, event)
if err != nil {
panic(err)
}
resource.NotifyUnlocked()
NotifyResourceUnlocked(resource)
}
err := event.DoneResource().Unlock(event)
err := UnlockResource(event.DoneResource(), event)
if err != nil {
return err
}
err = event.DoneResource().NotifyUnlocked()
event.Update(NewSignal(event, "event_done"))
NotifyResourceUnlocked(event.DoneResource())
err = event.finish()
if err != nil {
return err
}
func (event * BaseEvent) LockDone() {
event.DoneResource().Lock(event)
}
func (event * BaseEvent) Run() error {
log.Printf("EVENT_RUN: %s", event.Name())
next_action := "start"
var err error = nil
for next_action != "" {
// Check if the edge exists
action, exists := event.actions[next_action]
if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action)
return errors.New(error_str)
SendUpdate(event, NewSignal(event, "event_done"))
return nil
}
// Run the edge function
update_str := fmt.Sprintf("EVENT_ACTION: %s", next_action)
log.Printf(update_str)
next_action, err = action()
if err != nil {
return err
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
done_resource Resource
required_resources []Resource
children []Event
child_info map[string]EventInfo
child_lock sync.Mutex
actions map[string]func() (string, error)
handlers map[string]func() (string, error)
parent Event
parent_lock sync.Mutex
abort chan string
}
return nil
func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
action_fn, exists := event.actions[action]
return action_fn, exists
}
func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) {
@ -171,12 +244,12 @@ func NewBaseEvent(name string, description string, required_resources []Resource
name: name,
description: description,
id: randid(),
signal: make(chan GraphSignal, 10),
signal: make(chan GraphSignal, 100),
listeners: map[chan GraphSignal] chan GraphSignal{},
},
parent: nil,
children: []Event{},
child_info: map[Event]EventInfo{},
child_info: map[string]EventInfo{},
done_resource: done_resource,
required_resources: required_resources,
actions: map[string]func()(string, error){},
@ -184,6 +257,8 @@ func NewBaseEvent(name string, description string, required_resources []Resource
abort: make(chan string, 1),
}
LockResource(event.done_resource, &event)
event.actions["wait"] = func() (string, error) {
signal := <- event.signal
if signal.Type() == "abort" {
@ -191,7 +266,7 @@ func NewBaseEvent(name string, description string, required_resources []Resource
} else if signal.Type() == "do_action" {
return signal.Description(), nil
} else {
signal_fn, exists := event.handlers[signal.Type()]
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
return signal_fn()
}
@ -207,9 +282,6 @@ func NewEvent(name string, description string, required_resources []Resource) (*
event := NewBaseEvent(name, description, required_resources)
event_ptr := &event
// Lock the done_resource by default
event.LockDone()
event_ptr.actions["start"] = func() (string, error) {
return "", nil
}
@ -217,21 +289,32 @@ func NewEvent(name string, description string, required_resources []Resource) (*
return event_ptr
}
func (event * BaseEvent) finish() error {
return nil
}
func (event * BaseEvent) InfoType() reflect.Type {
return nil
}
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
listened_resources map[string]Resource
queue_lock sync.Mutex
}
func (queue * EventQueue) Finish() error {
func (queue * EventQueue) finish() error {
for _, resource := range(queue.listened_resources) {
resource.UnregisterChannel(queue.signal)
}
return queue.BaseEvent.Finish()
return nil
}
func (queue * EventQueue) InfoType() reflect.Type {
return reflect.TypeOf((*EventQueueInfo)(nil))
}
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
queue := &EventQueue{
@ -239,15 +322,13 @@ func NewEventQueue(name string, description string, required_resources []Resourc
listened_resources: map[string]Resource{},
}
// Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent
queue.LockDone()
queue.actions["start"] = func() (string, error) {
return "queue_event", nil
}
queue.actions["queue_event"] = func() (string, error) {
// Copy the events to sort the list
queue.LockChildren()
copied_events := make([]Event, len(queue.Children()))
copy(copied_events, queue.Children())
less := func(i int, j int) bool {
@ -269,20 +350,21 @@ func NewEventQueue(name string, description string, required_resources []Resourc
if info.state == "queued" {
wait = true
// Try to lock it
err := event.LockResources()
err := LockResources(event)
// start in new goroutine
if err != nil {
//log.Printf("Failed to lock %s: %s", event.Name(), err)
} else {
info.state = "running"
log.Printf("EVENT_START: %s", event.Name())
go func(event Event, info * EventQueueInfo, queue Event) {
log.Printf("EVENT_GOROUTINE: %s", event.Name())
err := event.Run()
err := RunEvent(event)
if err != nil {
log.Printf("EVENT_ERROR: %s", err)
}
info.state = "done"
event.Finish()
FinishEvent(event)
}(event, info, queue)
}
} else if info.state == "running" {
@ -290,11 +372,14 @@ func NewEventQueue(name string, description string, required_resources []Resourc
}
}
for _, resource := range(needed_resources) {
queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal)
}
queue.UnlockChildren()
if wait == true {
return "wait", nil
} else {
@ -310,7 +395,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc
return "queue_event", nil
}
queue.actions["event_added"] = func() (string, error) {
queue.handlers["child_added"] = func() (string, error) {
return "queue_event", nil
}
@ -325,16 +410,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc
return queue
}
// Store the nodes parent for upwards propagation of changes
func (event * BaseEvent) RegisterParent(parent Event) error{
if event.parent != nil {
return errors.New("Parent already registered")
}
event.parent = parent
return nil
}
func (event * BaseEvent) Parent() Event {
return event.parent
}
@ -352,70 +427,35 @@ func (event * BaseEvent) Children() []Event {
}
func (event * BaseEvent) ChildInfo(idx Event) EventInfo {
val, ok := event.child_info[idx]
val, ok := event.child_info[idx.ID()]
if ok == false {
return nil
}
return val
}
func (event * BaseEvent) FindChild(id string) Event {
if id == event.ID() {
return event
func (event * BaseEvent) LockChildren() {
log.Printf("LOCKING CHILDREN OF %s", event.Name())
event.child_lock.Lock()
}
for _, child := range event.Children() {
result := child.FindChild(id)
if result != nil {
return result
}
func (event * BaseEvent) UnlockChildren() {
event.child_lock.Unlock()
}
return nil
func (event * BaseEvent) LockParent() {
event.parent_lock.Lock()
}
// Checks that the type of info is equal to EventQueueInfo
func (event * EventQueue) AddChild(child Event, info EventInfo) error {
if checkType(info, (*EventQueueInfo)(nil)) == false {
return errors.New("EventQueue.AddChild passed invalid type for info")
func (event * BaseEvent) UnlockParent() {
event.parent_lock.Unlock()
}
return event.addChild(child, info)
}
func (event * BaseEvent) addChild(child Event, info EventInfo) error {
err := child.RegisterParent(event)
if err != nil {
error_str := fmt.Sprintf("Failed to register %s as a parent of %s, cancelling AddChild", event.ID(), child.ID())
return errors.New(error_str)
func (event * BaseEvent) setParent(parent Event) {
event.parent = parent
}
func (event * BaseEvent) addChild(child Event, info EventInfo) {
event.children = append(event.children, child)
event.child_info[child] = info
event.Update(NewSignal(event, "child_added"))
return nil
}
// Overloaded function AddChild checks the info passed and calls the BaseEvent.addChild
func (event * BaseEvent) AddChild(child Event, info EventInfo) error {
if info != nil {
return errors.New("info must be nil for BaseEvent children")
}
return event.addChild(child, info)
}
func checkType(first interface{}, second interface{}) bool {
if first == nil || second == nil {
if first == nil && second == nil {
return true
} else {
return false
}
}
first_type := reflect.TypeOf(first)
second_type := reflect.TypeOf(second)
return first_type == second_type
event.child_info[child.ID()] = info
}

@ -49,7 +49,8 @@ type GraphNode interface {
Name() string
Description() string
ID() string
Update(update GraphSignal) error
UpdateListeners(update GraphSignal)
update(update GraphSignal)
RegisterChannel(listener chan GraphSignal)
UnregisterChannel(listener chan GraphSignal)
UpdateChannel() chan GraphSignal
@ -79,7 +80,7 @@ func (node * BaseNode) ID() string {
}
// Create a new listener channel for the node, add it to the nodes listener list, and return the new channel
const listener_buffer = 10
const listener_buffer = 100
func (node * BaseNode) UpdateChannel() chan GraphSignal {
new_listener := make(chan GraphSignal, listener_buffer)
node.RegisterChannel(new_listener)
@ -110,17 +111,31 @@ func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) {
func (node * BaseNode) UpdateListeners(update GraphSignal) {
node.listeners_lock.Lock()
closed := []chan GraphSignal{}
for _, listener := range node.listeners {
log.Printf("UPDATE_LISTENER %s: %p", node.Name(), listener)
listener <- update
select {
case listener <- update:
default:
close(listener)
closed = append(closed, listener)
}
}
for _, listener := range(closed) {
delete(node.listeners, listener)
}
node.listeners_lock.Unlock()
}
// Basic implementation that sends the signal to the nodes channel
func (node * BaseNode) Update(signal GraphSignal) error {
log.Printf("UPDATE: BaseNode %s: %+v", node.Name(), signal)
func (node * BaseNode) update(signal GraphSignal) {
}
func SendUpdate(node GraphNode, signal GraphSignal) {
log.Printf("UPDATE %s: %+v", node.Name(), signal)
node.UpdateListeners(signal)
return nil
node.update(signal)
}

@ -2,6 +2,9 @@ package main
import (
"log"
"runtime/pprof"
"time"
"os"
)
func fake_team(org string, id string, names []string) (*Team, []*Member) {
@ -25,8 +28,8 @@ func fake_data() * EventManager {
t6, m6 := fake_team("210", "X", []string{"toby"})
t7, m7 := fake_team("210", "Y", []string{"jennifer"})
t8, m8 := fake_team("210", "Z", []string{"emily"})
t9, m9 := fake_team("666", "A", []string{"jimmy"})
t10, m10 := fake_team("666", "B", []string{"timmy"})
//t9, m9 := fake_team("666", "A", []string{"jimmy"})
//t10, m10 := fake_team("666", "B", []string{"timmy"})
//t11, m11 := fake_team("666", "C", []string{"grace"})
//t12, m12 := fake_team("666", "D", []string{"jeremy"})
//t13, m13 := fake_team("315", "W", []string{"bobby"})
@ -42,8 +45,8 @@ func fake_data() * EventManager {
teams = append(teams, t6)
teams = append(teams, t7)
teams = append(teams, t8)
teams = append(teams, t9)
teams = append(teams, t10)
//teams = append(teams, t9)
//teams = append(teams, t10)
//teams = append(teams, t11)
//teams = append(teams, t12)
//teams = append(teams, t13)
@ -59,8 +62,8 @@ func fake_data() * EventManager {
resources = append(resources, m6[0])
resources = append(resources, m7[0])
resources = append(resources, m8[0])
resources = append(resources, m9[0])
resources = append(resources, m10[0])
//resources = append(resources, m9[0])
//resources = append(resources, m10[0])
//resources = append(resources, m11[0])
//resources = append(resources, m12[0])
//resources = append(resources, m13[0])
@ -71,12 +74,6 @@ func fake_data() * EventManager {
arenas := []*Arena{}
arenas = append(arenas, NewVirtualArena("Arena 1"))
arenas = append(arenas, NewVirtualArena("Arena 2"))
arenas = append(arenas, NewVirtualArena("Arena 3"))
arenas = append(arenas, NewVirtualArena("Arena 4"))
arenas = append(arenas, NewVirtualArena("Arena 5"))
arenas = append(arenas, NewVirtualArena("Arena 6"))
arenas = append(arenas, NewVirtualArena("Arena 7"))
arenas = append(arenas, NewVirtualArena("Arena 8"))
for _, arena := range arenas {
resources = append(resources, arena)
@ -87,47 +84,55 @@ func fake_data() * EventManager {
}
alliances := []*Alliance{}
for i, team := range(teams) {
for j, team2 := range(teams) {
if i != j {
alliance := NewAlliance(team, team2)
alliances = append(alliances, alliance)
}
}
}
alliances = append(alliances, NewAlliance(t1, t2))
alliances = append(alliances, NewAlliance(t3, t4))
alliances = append(alliances, NewAlliance(t5, t6))
alliances = append(alliances, NewAlliance(t7, t8))
for _, alliance := range alliances {
resources = append(resources, alliance)
}
root_event := NewEventQueue("root_event", "", []Resource{})
stay_resource := NewResource("stay_resource", "", []Resource{})
resources = append(resources, stay_resource)
stay_event := NewEvent("stay_event", "", []Resource{stay_resource})
LockResource(stay_resource, stay_event)
event_manager := NewEventManager(root_event, resources)
arena_idx := 0
// Generate 3 games for each team by picking 3 random teams
event_manager.AddEvent(root_event, stay_event, NewEventQueueInfo(1))
go func(alliances []*Alliance, arenas []*Arena, event_manager * EventManager) {
for i, alliance := range(alliances) {
for j, alliance2 := range(alliances) {
if j != i {
if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] {
} else {
for arena_idx := 0; arena_idx < len(arenas); arena_idx++ {
match := NewMatch(alliance, alliance2, arenas[arena_idx])
log.Printf("Adding %s", match.Name())
err := event_manager.AddEvent(root_event, match, NewEventQueueInfo(i))
if err != nil {
log.Printf("Error adding %s: %s", match.Name(), err)
}
arena_idx += 1
if arena_idx >= len(arenas) {
arena_idx = 0
}
}
}
}
}
}(alliances, arenas, event_manager)
return event_manager
}
func main() {
go func() {
time.Sleep(5 * time.Second)
if false {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
}
}()
event_manager := fake_data()
log.Printf("Starting event_manager")
err := event_manager.Run()

@ -9,6 +9,7 @@ import (
type EventManager struct {
dag_nodes map[string]Resource
root_event Event
aborts []chan error
}
// root_event's requirements must be in dag_nodes, and dag_nodes must be ordered by dependency(children first)
@ -17,6 +18,7 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
manager := &EventManager{
dag_nodes: map[string]Resource{},
root_event: nil,
aborts: []chan error{},
}
// Construct the DAG
@ -39,37 +41,32 @@ func NewEventManager(root_event Event, dag_nodes []Resource) * EventManager {
// Connect to all resources(in a thread to handle reconnections), and start the first event
func (manager * EventManager) Run() error {
log.Printf("MANAGER_START")
aborts := []chan error{}
for _, resource := range(manager.dag_nodes) {
abort := make(chan error, 1)
abort_used := resource.Connect(abort)
if abort_used == true {
aborts = append(aborts, abort)
}
}
abort := make(chan error, 1)
go func(abort chan error, aborts []chan error) {
go func(abort chan error, manager * EventManager) {
<- abort
for _, c := range(aborts) {
for _, c := range(manager.aborts) {
c <- nil
}
}(abort, aborts)
}(abort, manager)
err := manager.root_event.LockResources()
err := LockResources(manager.root_event)
if err != nil {
log.Printf("MANAGER_LOCK_ERR: %s", err)
abort <- nil
return err
}
err = manager.root_event.Run()
err = RunEvent(manager.root_event)
abort <- nil
if err != nil {
log.Printf("MANAGER_RUN_ERR: %s", err)
return err
}
err = manager.root_event.Finish()
err = FinishEvent(manager.root_event)
if err != nil {
log.Printf("MANAGER_FINISH_ERR: %s", err)
return err
}
log.Printf("MANAGER_DONE")
@ -87,12 +84,13 @@ func (manager * EventManager) FindResource(id string) Resource {
}
func (manager * EventManager) FindEvent(id string) Event {
event := manager.root_event.FindChild(id)
event := FindChild(manager.root_event, id)
return event
}
func (manager * EventManager) AddResource(resource Resource) error {
log.Printf("Adding resource %s", resource.Name())
_, exists := manager.dag_nodes[resource.ID()]
if exists == true {
error_str := fmt.Sprintf("%s is already in the resource DAG, cannot add again", resource.Name())
@ -107,8 +105,13 @@ func (manager * EventManager) AddResource(resource Resource) error {
}
}
manager.dag_nodes[resource.ID()] = resource
abort := make(chan error, 1)
abort_used := resource.Connect(abort)
if abort_used == true {
manager.aborts = append(manager.aborts, abort)
}
for _, child := range resource.Children() {
child.AddParent(resource)
AddParent(child, resource)
}
return nil
}
@ -152,16 +155,16 @@ func (manager * EventManager) AddEvent(parent Event, child Event, info EventInfo
manager.root_event = child
return nil;
} else {
if manager.root_event.FindChild(parent.ID()) == nil {
if FindChild(manager.root_event, parent.ID()) == nil {
error_str := fmt.Sprintf("Event %s is not present in the event tree, cannot add %s as child", parent.ID(), child.ID())
return errors.New(error_str)
}
if manager.root_event.FindChild(child.ID()) != nil {
if FindChild(manager.root_event, child.ID()) != nil {
error_str := fmt.Sprintf("Event %s already exists in the event tree, can not add again", child.ID())
return errors.New(error_str)
}
return parent.AddChild(child, info)
return AddChild(parent, child, info)
}
}

@ -7,7 +7,7 @@ import (
)
type graph_tester testing.T
const listner_timeout = 100 * time.Millisecond
const listner_timeout = 50 * time.Millisecond
func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) {
timeout := time.After(listner_timeout)
@ -99,23 +99,21 @@ func TestResourceUpdate(t * testing.T) {
r4_l := r4.UpdateChannel()
// Calling Update() on the parent with no other parents should only notify node listeners
println("UPDATE_START")
r3.Update(NewSignal(nil, "test"))
println("UPDATE_DONE")
SendUpdate(r3, NewSignal(nil, "test"))
(*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r3")
(*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r3")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r3")
(*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r3")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings
r2.Update(NewSignal(nil, "test"))
SendUpdate(r2, NewSignal(nil, "test"))
(*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r2")
(*graph_tester)(t).CheckForValue(r2_l, "No update on r2 after updating r2")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r2")
(*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r2")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings
r1.Update(NewSignal(nil, "test"))
SendUpdate(r1, NewSignal(nil, "test"))
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after updating r1")
(*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r1")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r1")
@ -166,7 +164,7 @@ func TestLockResource(t * testing.T) {
r1 := NewResource("r1", "", []Resource{})
r2 := NewResource("r2", "", []Resource{})
r3 := NewResource("r3", "", []Resource{r1, r2})
r4 := NewResource("r3", "", []Resource{r1, r2})
r4 := NewResource("r4", "", []Resource{r1, r2})
event_manager := NewEventManager(root_event, []Resource{r1, r2, r3, r4})
@ -177,75 +175,60 @@ func TestLockResource(t * testing.T) {
r1_l := r1.UpdateChannel()
rel := root_event.UpdateChannel()
err := r3.Lock(root_event)
err := LockResource(r3, root_event)
if err != nil {
t.Fatal("Failed to lock r3")
}
err = r3.NotifyLocked()
if err != nil {
t.Fatal("Failed to notify r3 of lock")
}
NotifyResourceLocked(r3)
(*graph_tester)(t).CheckForValue(r1_l, "No value on r1 update channel")
(*graph_tester)(t).CheckForValue(rel, "No value on root_event update channel")
err = r3.Lock(root_event)
err = LockResource(r3, root_event)
if err == nil {
t.Fatal("Locked r3 after locking r3")
}
err = r4.Lock(root_event)
err = LockResource(r4, root_event)
if err == nil {
t.Fatal("Locked r4 after locking r3")
}
err = r1.Lock(root_event)
err = LockResource(r1, root_event)
if err == nil {
t.Fatal("Locked r1 after locking r3")
}
err = r3.Unlock(test_event)
err = UnlockResource(r3, test_event)
if err == nil {
t.Fatal("Unlocked r3 with event that didn't lock it")
}
err = r3.Unlock(root_event)
err = UnlockResource(r3, root_event)
if err != nil {
t.Fatal("Failed to unlock r3")
}
err = r3.NotifyUnlocked()
if err != nil {
t.Fatal("Failed to notify r3 it was unlocked")
}
NotifyResourceUnlocked(r3)
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r3")
(*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r3")
err = r4.Lock(root_event)
err = LockResource(r4, root_event)
if err != nil {
t.Fatal("Failed to lock r4 after unlocking r3")
}
NotifyResourceLocked(r4)
err = r4.NotifyLocked()
if err != nil {
t.Fatal("Failed to notify r4 it was locked")
}
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after locking r4")
(*graph_tester)(t).CheckForValue(rel, "No update on rel after locking r4")
err = r4.Unlock(root_event)
err = UnlockResource(r4, root_event)
if err != nil {
t.Fatal("Failed to unlock r4")
}
NotifyResourceUnlocked(r4)
err = r4.NotifyUnlocked()
if err != nil {
t.Fatal("Failed to notify r4 it was unlocked")
}
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after unlocking r4")
(*graph_tester)(t).CheckForValue(rel, "No update on rel after unlocking r4")
}
func TestAddToEventQueue(t * testing.T) {
@ -253,17 +236,17 @@ func TestAddToEventQueue(t * testing.T) {
event_1 := NewEvent("1", "", []Resource{})
event_2 := NewEvent("2", "", []Resource{})
err := queue.AddChild(event_1, nil)
err := AddChild(queue, event_1, nil)
if err == nil {
t.Fatal("suceeded in added nil info to queue")
}
err = queue.AddChild(event_1, &EventQueueInfo{priority: 0})
err = AddChild(queue, event_1, &EventQueueInfo{priority: 0})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
err = queue.AddChild(event_2, &EventQueueInfo{priority: 1})
err = AddChild(queue, event_2, &EventQueueInfo{priority: 1})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
@ -279,7 +262,7 @@ func TestStartBaseEvent(t * testing.T) {
(*graph_tester)(t).CheckForNone(e_l, "Update on event_1 before starting")
(*graph_tester)(t).CheckForNone(r_l, "Update on r_1 before starting")
if r.Owner() != event_1 {
if r.Owner().ID() != event_1.ID() {
t.Fatal("r is not owned by event_1")
}
@ -306,7 +289,7 @@ func TestAbortEventQueue(t * testing.T) {
if err != nil {
t.Fatal(err)
}
r1.Lock(root_event)
LockResource(r1, root_event)
e1 := NewEvent("1", "", []Resource{r1})
e1_info := NewEventQueueInfo(1)
// Add an event so that the queue doesn't auto complete
@ -319,7 +302,7 @@ func TestAbortEventQueue(t * testing.T) {
// start the queue and check that all the events are executed
go func() {
time.Sleep(time.Second)
root_event.Abort()
AbortEvent(root_event)
}()
err = manager.Run()
@ -336,12 +319,12 @@ func TestStartEventQueue(t * testing.T) {
root_event := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
rel := root_event.UpdateChannel();
res_1 := NewResource("test_resource", "", []Resource{})
res_2 := NewResource("test_resource", "", []Resource{})
res_1 := NewResource("test_resource_1", "", []Resource{})
res_2 := NewResource("test_resource_2", "", []Resource{})
manager := NewEventManager(root_event, []Resource{res_1, res_2})
e1:= NewEvent("1", "", []Resource{res_1, res_2})
e1:= NewEvent("e1", "", []Resource{res_1, res_2})
e1_r := e1.DoneResource()
e1_info := NewEventQueueInfo(1)
err := manager.AddEvent(root_event, e1, e1_info)
@ -350,7 +333,7 @@ func TestStartEventQueue(t * testing.T) {
}
(*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e1")
e2 := NewEvent("1", "", []Resource{res_1})
e2 := NewEvent("e2", "", []Resource{res_1})
e2_r := e2.DoneResource()
e2_info := NewEventQueueInfo(2)
err = manager.AddEvent(root_event, e2, e2_info)
@ -359,7 +342,7 @@ func TestStartEventQueue(t * testing.T) {
}
(*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e2")
e3 := NewEvent("1", "", []Resource{res_2})
e3 := NewEvent("e3", "", []Resource{res_2})
e3_r := e3.DoneResource()
e3_info := NewEventQueueInfo(3)
err = manager.AddEvent(root_event, e3, e3_info)
@ -375,7 +358,9 @@ func TestStartEventQueue(t * testing.T) {
// Abort the event after 5 seconds just in case
go func() {
time.Sleep(5 * time.Second)
root_event.Abort()
if r.Owner() != nil {
AbortEvent(root_event)
}
}()
// Now that an event manager is constructed with a queue and 3 basic events

@ -9,19 +9,20 @@ import (
// Resources propagate update up to multiple parents, and not downwards
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
func (resource * BaseResource) Update(signal GraphSignal) error {
log.Printf("UPDATE BaseResource %s: %+v", resource.Name(), signal)
resource.BaseNode.Update(signal)
func (resource * BaseResource) update(signal GraphSignal) {
if signal.Type() == "lock_changed" {
for _, child := range resource.Children() {
SendUpdate(child, signal)
}
} else {
for _, parent := range resource.Parents() {
err := parent.Update(signal)
if err != nil {
return err
SendUpdate(parent, signal)
}
}
return nil
if resource.lock_holder != nil {
SendUpdate(resource.lock_holder, signal)
}
}
// Resource is the interface that DAG nodes are made from
@ -30,128 +31,178 @@ func (resource * BaseResource) Update(signal GraphSignal) error {
// The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background)
type Resource interface {
GraphNode
AddParent(parent Resource) error
Owner() Event
Children() []Resource
Parents() []Resource
Lock(event Event) error
NotifyLocked() error
NotifyUnlocked() error
Unlock(event Event) error
Owner() Event
AddParent(parent Resource) error
LockParents()
UnlockParents()
SetOwner(owner Event)
LockState()
UnlockState()
lock(event Event) error
unlock(event Event) error
Connect(abort chan error) bool
}
// BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct {
BaseNode
parents []Resource
children []Resource
lock_holder Event
state_lock sync.Mutex
func AddParent(resource Resource, parent Resource) error {
if parent.ID() == resource.ID() {
error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.Name())
return errors.New(error_str)
}
func (resource * BaseResource) Connect(abort chan error) bool {
return false
resource.LockParents()
for _, p := range resource.Parents() {
if p.ID() == parent.ID() {
error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.Name(), resource.Name())
return errors.New(error_str)
}
func (resource * BaseResource) Owner() Event {
return resource.lock_holder
}
func (resource * BaseResource) NotifyUnlocked() error {
err := resource.Update(NewSignal(resource, "lock_change"))
if err != nil {
return err
}
err := resource.AddParent(parent)
resource.UnlockParents()
for _, child := range(resource.children) {
err = child.NotifyUnlocked()
if err != nil {
return err
}
func UnlockResource(resource Resource, event Event) error {
log.Printf("RESOURCE_UNLOCK: %s", resource.Name())
var err error = nil
resource.LockState()
if resource.Owner() == nil {
resource.UnlockState()
return errors.New("Resource already unlocked")
}
return nil
if resource.Owner().ID() != event.ID() {
resource.UnlockState()
return errors.New("Resource not locked by parent, unlock failed")
}
func (resource * BaseResource) NotifyLocked() error {
err := resource.Update(NewSignal(resource, "lock_change"))
var lock_err error = nil
for _, child := range resource.Children() {
err := UnlockResource(child, event)
if err != nil {
return err
lock_err = err
break
}
}
for _, child := range(resource.children) {
err = child.NotifyLocked()
if err != nil {
return err
if lock_err != nil {
resource.UnlockState()
err_str := fmt.Sprintf("Resource failed to unlock: %s", lock_err)
return errors.New(err_str)
}
resource.SetOwner(nil)
err = resource.unlock(event)
if err != nil {
resource.UnlockState()
return errors.New("Failed to unlock resource")
}
resource.lock_holder.Update(NewSignal(resource, "lock_change"))
resource.UnlockState()
signal := NewSignal(event, "lock_changed")
signal.description = "unlock"
SendUpdate(resource, signal)
return nil
}
// Grab the state mutex and check the state, if unlocked continue to hold the mutex while doing the same for children
// When the bottom of a tree is reached(no more children) go back up and set the lock state
func (resource * BaseResource) Lock(event Event) error {
return resource.lock(event)
func LockResource(resource Resource, event Event) error {
log.Printf("RESOURCE_LOCK: %s", resource.Name())
resource.LockState()
if resource.Owner() != nil {
resource.UnlockState()
err_str := fmt.Sprintf("Resource already locked: %s", resource.Name())
return errors.New(err_str)
}
func (resource * BaseResource) lock(event Event) error {
var err error = nil
resource.state_lock.Lock()
if resource.lock_holder != nil {
err_str := fmt.Sprintf("Resource already locked: %s", resource.Name())
err = errors.New(err_str)
} else {
all_children_locked := true
var lock_err error = nil
for _, child := range resource.Children() {
err = child.Lock(event)
err := LockResource(child, event)
if err != nil{
all_children_locked = false
lock_err = err
break
}
}
if all_children_locked == true {
resource.lock_holder = event
if lock_err != nil {
resource.UnlockState()
err_str := fmt.Sprintf("Resource failed to lock: %s", lock_err)
return errors.New(err_str)
}
resource.SetOwner(event)
err := resource.lock(event)
if err != nil {
resource.UnlockState()
return errors.New("Failed to lock resource")
}
resource.state_lock.Unlock()
return err
resource.UnlockState()
return nil
}
// Recurse through children, unlocking until no more children
// If the child isn't locked by the unlocker
func (resource * BaseResource) Unlock(event Event) error {
var err error = nil
//unlocked := false
func NotifyResourceLocked(resource Resource) {
signal := NewSignal(resource, "lock_changed")
signal.description = "lock"
resource.state_lock.Lock()
if resource.lock_holder == nil {
err = errors.New("Resource already unlocked")
} else if resource.lock_holder != event {
err = errors.New("Resource not locked by parent, can't unlock")
} else {
all_children_unlocked := true
for _, child := range resource.Children() {
err = child.Unlock(event)
if err != nil {
all_children_unlocked = false
break
go SendUpdate(resource, signal)
}
func NotifyResourceUnlocked(resource Resource) {
signal := NewSignal(resource, "lock_changed")
signal.description = "unlock"
go SendUpdate(resource, signal)
}
if all_children_unlocked == true{
resource.lock_holder = nil
//unlocked = true
// BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct {
BaseNode
parents []Resource
parents_lock sync.Mutex
children []Resource
children_lock sync.Mutex
lock_holder Event
state_lock sync.Mutex
}
func (resource * BaseResource) SetOwner(owner Event) {
resource.lock_holder = owner
}
func (resource * BaseResource) LockState() {
resource.state_lock.Lock()
}
func (resource * BaseResource) UnlockState() {
resource.state_lock.Unlock()
}
return err
func (resource * BaseResource) Connect(abort chan error) bool {
return false
}
func (resource * BaseResource) Owner() Event {
return resource.lock_holder
}
//BaseResources don't check anything special when locking/unlocking
func (resource * BaseResource) lock(event Event) error {
return nil
}
func (resource * BaseResource) unlock(event Event) error {
return nil
}
func (resource * BaseResource) Children() []Resource {
@ -162,23 +213,15 @@ func (resource * BaseResource) Parents() []Resource {
return resource.parents
}
// Add a parent to a DAG node
func (resource * BaseResource) AddParent(parent Resource) error {
// Don't add self as parent
if parent.ID() == resource.ID() {
error_str := fmt.Sprintf("Will not add %s as parent of itself", parent.ID())
return errors.New(error_str)
func (resource * BaseResource) LockParents() {
resource.parents_lock.Lock()
}
// Don't add parent if it's already a parent
for _, p := range resource.parents {
if p.ID() == parent.ID() {
error_str := fmt.Sprintf("%s is already a parent of %s, will not double-bond", p.ID(), resource.ID())
return errors.New(error_str)
}
func (resource * BaseResource) UnlockParents() {
resource.parents_lock.Unlock()
}
// Add the parent
func (resource * BaseResource) AddParent(parent Resource) error {
resource.parents = append(resource.parents, parent)
return nil
}
@ -190,6 +233,7 @@ func NewBaseResource(name string, description string, children []Resource) BaseR
description: description,
id: randid(),
listeners: map[chan GraphSignal]chan GraphSignal{},
signal: make(chan GraphSignal, 100),
},
parents: []Resource{},
children: children,

@ -85,36 +85,29 @@ func NewVirtualArena(name string) * Arena {
return arena
}
func (arena * Arena) Lock(event Event) error {
func (arena * Arena) lock(event Event) error {
if arena.connected == false {
log.Printf("ARENA NOT CONNECTED: %s", arena.Name())
error_str := fmt.Sprintf("%s is not connected, cannot lock", arena.Name())
return errors.New(error_str)
}
return arena.lock(event)
return nil
}
func (arena * Arena) Update(signal GraphSignal) error {
log.Printf("UPDATE Arena %s: %+v", arena.Name(), signal)
arena.BaseResource.Update(signal)
if arena.connected == true {
func (arena * Arena) update(signal GraphSignal) {
log.Printf("ARENA_UPDATE: %s", arena.Name())
arena.signal <- signal
}
return nil
}
func (arena * Arena) Connect(abort chan error) bool {
log.Printf("Connecting %s", arena.Name())
go func(arena * Arena, abort chan error) {
owner := arena.Owner()
arena.connected = true
update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name())
signal := NewSignal(arena, "arena_connected")
signal.description = update_str
arena.Update(signal)
arena.connected = true
go arena.update(signal)
log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name())
for true {
select {
@ -161,7 +154,6 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena * Arena) * Match
control: "init",
control_start: time.UnixMilli(0),
}
match.LockDone()
match.actions["start"] = func() (string, error) {
log.Printf("STARTING_MATCH %s", match.Name())

@ -132,13 +132,16 @@ func TestNewMatch(t *testing.T) {
match := NewMatch(alliance_1, alliance_2, arena)
root_event := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
event_manager := NewEventManager(root_event, []Resource{member_1, member_2, member_3, member_4, team_1, team_2, team_3, team_4, alliance_1, alliance_2, arena})
event_manager.AddEvent(root_event, match, NewEventQueueInfo(1))
go func() {
time.Sleep(time.Second * 2)
root_event.Abort()
time.Sleep(time.Second * 5)
if r.Owner() != nil {
AbortEvent(root_event)
}
}()
err := event_manager.Run()