Updated listener to have explicit unregister. Cleanup of initializers. More stuff I don't remember.

graph-rework
noah metz 2023-06-01 22:42:47 -06:00
parent d7b13de82f
commit 55d977e9b8
8 changed files with 234 additions and 250 deletions

@ -6,19 +6,18 @@ import (
"errors" "errors"
"reflect" "reflect"
"sort" "sort"
"sync"
) )
// Update the events listeners, and notify the parent to do the same // Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) Update(reason string) error { func (event * BaseEvent) Update(signal GraphSignal) error {
log.Printf("UPDATE BaseEvent %s: %s", event.Name(), reason) log.Printf("UPDATE BaseEvent %s: %+v", event.Name(), signal)
err := event.UpdateListeners(reason)
if err != nil { event.signal <- signal
return err
} event.BaseNode.Update(signal)
if event.parent != nil{ if event.parent != nil{
return event.parent.Update("update parent") return event.parent.Update(signal)
} }
return nil return nil
} }
@ -58,7 +57,6 @@ type Event interface {
FindChild(id string) Event FindChild(id string) Event
Run() error Run() error
Abort() error Abort() error
Signal(action string) error
LockResources() error LockResources() error
Finish() error Finish() error
} }
@ -76,32 +74,16 @@ type BaseEvent struct {
children []Event children []Event
child_info map[Event]EventInfo child_info map[Event]EventInfo
actions map[string]func() (string, error) actions map[string]func() (string, error)
handlers map[string]func() (string, error)
parent Event parent Event
signal chan string
abort chan string abort chan string
} }
func (queue * EventQueue) Abort() error {
for _, event := range(queue.children) {
event.Abort()
}
for _, c := range(queue.resource_aborts) {
c <- "event abort"
}
queue.signal <- "abort"
return nil
}
func (event * BaseEvent) Abort() error { func (event * BaseEvent) Abort() error {
for _, event := range(event.children) { for _, event := range(event.children) {
event.Abort() event.Abort()
} }
event.signal <- "abort" event.signal <- NewSignal(event, "abort")
return nil
}
func (event * BaseEvent) Signal(action string) error {
event.signal <- action
return nil return nil
} }
@ -141,11 +123,17 @@ func (event * BaseEvent) Finish() error {
} }
resource.NotifyUnlocked() resource.NotifyUnlocked()
} }
err := event.DoneResource().Unlock(event) err := event.DoneResource().Unlock(event)
if err != nil { if err != nil {
return err return err
} }
return event.DoneResource().NotifyUnlocked()
err = event.DoneResource().NotifyUnlocked()
event.Update(NewSignal(event, "event_done"))
return err
} }
func (event * BaseEvent) LockDone() { func (event * BaseEvent) LockDone() {
@ -153,11 +141,11 @@ func (event * BaseEvent) LockDone() {
} }
func (event * BaseEvent) Run() error { func (event * BaseEvent) Run() error {
log.Printf("EVENT_RUN: %s", event.Name())
next_action := "start" next_action := "start"
var err error = nil var err error = nil
for next_action != "" { for next_action != "" {
// Check if the edge exists // Check if the edge exists
cur_action := next_action
action, exists := event.actions[next_action] action, exists := event.actions[next_action]
if exists == false { if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action) error_str := fmt.Sprintf("%s is not a valid action", next_action)
@ -165,37 +153,17 @@ func (event * BaseEvent) Run() error {
} }
// Run the edge function // Run the edge function
update_str := fmt.Sprintf("EVENT_ACTION: %s", next_action)
log.Printf(update_str)
next_action, err = action() next_action, err = action()
if err != nil { if err != nil {
return err return err
} else if next_action == "wait" {
// Wait for an external signal to set the next_action
signal := <- event.signal
if signal == "abort" {
return errors.New("State Machine aborted by signal")
} else {
next_action = signal
}
} else {
// next_action is already set correctly
} }
// Update the event after running the edge
update_str := fmt.Sprintf("ACTION %s: NEXT %s", cur_action, next_action)
event.Update(update_str)
} }
return nil return nil
} }
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
resource_aborts map[string]chan string
resource_lock sync.Mutex
}
func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) { func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{}) done_resource := NewResource("event_done", "signal that event is done", []Resource{})
event := BaseEvent{ event := BaseEvent{
@ -203,7 +171,8 @@ func NewBaseEvent(name string, description string, required_resources []Resource
name: name, name: name,
description: description, description: description,
id: randid(), id: randid(),
listeners: []chan string{}, signal: make(chan GraphSignal, 10),
listeners: map[chan GraphSignal] chan GraphSignal{},
}, },
parent: nil, parent: nil,
children: []Event{}, children: []Event{},
@ -211,10 +180,26 @@ func NewBaseEvent(name string, description string, required_resources []Resource
done_resource: done_resource, done_resource: done_resource,
required_resources: required_resources, required_resources: required_resources,
actions: map[string]func()(string, error){}, actions: map[string]func()(string, error){},
signal: make(chan string, 10), handlers: map[string]func()(string, error){},
abort: make(chan string, 1), abort: make(chan string, 1),
} }
event.actions["wait"] = func() (string, error) {
signal := <- event.signal
if signal.Type() == "abort" {
return "", errors.New("State machine aborted by signal")
} else if signal.Type() == "do_action" {
return signal.Description(), nil
} else {
signal_fn, exists := event.handlers[signal.Type()]
if exists == true {
return signal_fn()
}
}
// ignore signals other than "abort" and "do_action"
return "wait", nil
}
return event return event
} }
@ -232,10 +217,26 @@ func NewEvent(name string, description string, required_resources []Resource) (*
return event_ptr return event_ptr
} }
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
listened_resources map[string]Resource
}
func (queue * EventQueue) Finish() error {
for _, resource := range(queue.listened_resources) {
resource.UnregisterChannel(queue.signal)
}
return queue.BaseEvent.Finish()
}
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) { func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
queue := &EventQueue{ queue := &EventQueue{
BaseEvent: NewBaseEvent(name, description, []Resource{}), BaseEvent: NewBaseEvent(name, description, []Resource{}),
resource_aborts: map[string]chan string{}, listened_resources: map[string]Resource{},
} }
// Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent // Need to lock it with th BaseEvent since Unlock is implemented on the BaseEvent
@ -257,42 +258,11 @@ func NewEventQueue(name string, description string, required_resources []Resourc
sort.SliceStable(copied_events, less) sort.SliceStable(copied_events, less)
wait := false wait := false
needed_resources := map[string]Resource{}
for _, event := range(copied_events) { for _, event := range(copied_events) {
// Update the resource_chans // make sure all the required resources are registered to update the event
for _, resource := range(event.RequiredResources()) { for _, resource := range(event.RequiredResources()) {
queue.resource_lock.Lock() needed_resources[resource.ID()] = resource
_, exists := queue.resource_aborts[resource.ID()]
if exists == false {
log.Printf("RESOURCE_LISTENER_START: %s", resource.Name())
abort := make(chan string, 1)
queue.resource_aborts[resource.ID()] = abort
go func(queue *EventQueue, resource Resource, abort chan string) {
log.Printf("RESOURCE_LISTENER_GOROUTINE: %s", resource.Name())
resource_chan := resource.UpdateChannel()
for true {
select {
case <- abort:
queue.resource_lock.Lock()
delete(queue.resource_aborts, resource.ID())
queue.resource_lock.Unlock()
log.Printf("RESORCE_LISTENER_ABORT: %s", resource.Name())
break
case msg, ok := <- resource_chan:
if ok == false {
queue.resource_lock.Lock()
delete(queue.resource_aborts, resource.ID())
queue.resource_lock.Unlock()
log.Printf("RESOURCE_LISTENER_CLOSED: %s : %s", resource.Name(), msg)
break
}
log.Printf("RESOURCE_LISTENER_UPDATED: %s : %s", resource.Name(), msg)
queue.signal <- "resource_update"
}
}
log.Printf("RESOURCE_LISTENER_DYING: %s", resource.Name())
}(queue, resource, abort)
}
queue.resource_lock.Unlock()
} }
info := queue.ChildInfo(event).(*EventQueueInfo) info := queue.ChildInfo(event).(*EventQueueInfo)
@ -313,7 +283,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc
} }
info.state = "done" info.state = "done"
event.Finish() event.Finish()
queue.Signal("event_done")
}(event, info, queue) }(event, info, queue)
} }
} else if info.state == "running" { } else if info.state == "running" {
@ -321,6 +290,11 @@ func NewEventQueue(name string, description string, required_resources []Resourc
} }
} }
for _, resource := range(needed_resources) {
queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal)
}
if wait == true { if wait == true {
return "wait", nil return "wait", nil
} else { } else {
@ -340,6 +314,14 @@ func NewEventQueue(name string, description string, required_resources []Resourc
return "queue_event", nil return "queue_event", nil
} }
queue.handlers["lock_change"] = func() (string, error) {
return "queue_event", nil
}
queue.handlers["event_done"] = func() (string, error) {
return "queue_event", nil
}
return queue return queue
} }
@ -410,7 +392,7 @@ func (event * BaseEvent) addChild(child Event, info EventInfo) error {
event.children = append(event.children, child) event.children = append(event.children, child)
event.child_info[child] = info event.child_info[child] = info
event.Update("child added") event.Update(NewSignal(event, "child_added"))
return nil return nil
} }

@ -1,7 +1,6 @@
package main package main
import ( import (
"errors"
"log" "log"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@ -13,14 +12,47 @@ func randid() string{
return uuid_str return uuid_str
} }
type GraphSignal interface {
Source() GraphNode
Type() string
Description() string
}
type BaseSignal struct {
source GraphNode
signal_type string
description string
}
func (signal BaseSignal) Source() GraphNode {
return signal.source
}
func (signal BaseSignal) Type() string {
return signal.signal_type
}
func (signal BaseSignal) Description() string {
return signal.description
}
func NewSignal(source GraphNode, signal_type string) (BaseSignal) {
signal := BaseSignal{
source: source,
signal_type: signal_type,
}
return signal
}
// GraphNode is the interface common to both DAG nodes and Event tree nodes // GraphNode is the interface common to both DAG nodes and Event tree nodes
type GraphNode interface { type GraphNode interface {
Name() string Name() string
Description() string Description() string
ID() string ID() string
UpdateListeners(info string) error Update(update GraphSignal) error
UpdateChannel() chan string RegisterChannel(listener chan GraphSignal)
Update(reason string) error UnregisterChannel(listener chan GraphSignal)
UpdateChannel() chan GraphSignal
} }
// BaseNode is the most basic implementation of the GraphNode interface // BaseNode is the most basic implementation of the GraphNode interface
@ -29,8 +61,9 @@ type BaseNode struct {
name string name string
description string description string
id string id string
listeners []chan string signal chan GraphSignal
listeners_lock sync.Mutex listeners_lock sync.Mutex
listeners map[chan GraphSignal]chan GraphSignal
} }
func (node * BaseNode) Name() string { func (node * BaseNode) Name() string {
@ -47,58 +80,47 @@ func (node * BaseNode) ID() string {
// Create a new listener channel for the node, add it to the nodes listener list, and return the new channel // Create a new listener channel for the node, add it to the nodes listener list, and return the new channel
const listener_buffer = 10 const listener_buffer = 10
func (node * BaseNode) UpdateChannel() chan string{ func (node * BaseNode) UpdateChannel() chan GraphSignal {
new_listener := make(chan string, listener_buffer) new_listener := make(chan GraphSignal, listener_buffer)
node.listeners_lock.Lock() node.RegisterChannel(new_listener)
node.listeners = append(node.listeners, new_listener)
node.listeners_lock.Unlock()
return new_listener return new_listener
} }
// Send the update to listener channels func (node * BaseNode) RegisterChannel(listener chan GraphSignal) {
func (node * BaseNode) UpdateListeners(info string) error {
closed_listeners := []int{}
listeners_closed := false
// Send each listener nil to signal it to check for new content
// if the first attempt to send it fails close the listener
node.listeners_lock.Lock() node.listeners_lock.Lock()
for i, listener := range node.listeners { _, exists := node.listeners[listener]
select { if exists == false {
case listener <- info: node.listeners[listener] = listener
default:
close(listener)
closed_listeners = append(closed_listeners, i)
listeners_closed = true
}
} }
node.listeners_lock.Unlock()
}
// If any listeners have been closed, loop over the listeners func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) {
// Add listeners to the "remaining" list if i insn't in closed_listeners node.listeners_lock.Lock()
if listeners_closed == true { _, exists := node.listeners[listener]
remaining_listeners := []chan string{} if exists == false {
for i, listener := range node.listeners { panic("Attempting to unregister non-registered listener")
listener_closed := false } else {
for _, index := range closed_listeners { delete(node.listeners, listener)
if index == i {
listener_closed = true
break
}
}
if listener_closed == false {
remaining_listeners = append(remaining_listeners, listener)
}
} }
node.listeners_lock.Unlock()
}
// Send the update to listener channels
func (node * BaseNode) UpdateListeners(update GraphSignal) {
node.listeners_lock.Lock()
node.listeners = remaining_listeners for _, listener := range node.listeners {
log.Printf("UPDATE_LISTENER %s: %p", node.Name(), listener)
listener <- update
} }
node.listeners_lock.Unlock()
return nil node.listeners_lock.Unlock()
} }
// Basic implementation must be overwritten to do anything useful // Basic implementation that sends the signal to the nodes channel
func (node * BaseNode) Update(reason string) error { func (node * BaseNode) Update(signal GraphSignal) error {
log.Printf("UPDATE: BaseNode %s: %s", node.Name(), reason) log.Printf("UPDATE: BaseNode %s: %+v", node.Name(), signal)
return errors.New("Cannot Update a BaseNode") node.UpdateListeners(signal)
return nil
} }

@ -2,7 +2,6 @@ package main
import ( import (
"log" "log"
"math/rand"
) )
func fake_team(org string, id string, names []string) (*Team, []*Member) { func fake_team(org string, id string, names []string) (*Team, []*Member) {
@ -26,10 +25,14 @@ func fake_data() * EventManager {
t6, m6 := fake_team("210", "X", []string{"toby"}) t6, m6 := fake_team("210", "X", []string{"toby"})
t7, m7 := fake_team("210", "Y", []string{"jennifer"}) t7, m7 := fake_team("210", "Y", []string{"jennifer"})
t8, m8 := fake_team("210", "Z", []string{"emily"}) t8, m8 := fake_team("210", "Z", []string{"emily"})
t9, m9 := fake_team("315", "W", []string{"bobby"}) t9, m9 := fake_team("666", "A", []string{"jimmy"})
t10, m10 := fake_team("315", "X", []string{"toby"}) t10, m10 := fake_team("666", "B", []string{"timmy"})
t11, m11 := fake_team("315", "Y", []string{"jennifer"}) //t11, m11 := fake_team("666", "C", []string{"grace"})
t12, m12 := fake_team("315", "Z", []string{"emily"}) //t12, m12 := fake_team("666", "D", []string{"jeremy"})
//t13, m13 := fake_team("315", "W", []string{"bobby"})
//t14, m14 := fake_team("315", "X", []string{"toby"})
//t15, m15 := fake_team("315", "Y", []string{"jennifer"})
//t16, m16 := fake_team("315", "Z", []string{"emily"})
teams = append(teams, t1) teams = append(teams, t1)
teams = append(teams, t2) teams = append(teams, t2)
@ -41,8 +44,12 @@ func fake_data() * EventManager {
teams = append(teams, t8) teams = append(teams, t8)
teams = append(teams, t9) teams = append(teams, t9)
teams = append(teams, t10) teams = append(teams, t10)
teams = append(teams, t11) //teams = append(teams, t11)
teams = append(teams, t12) //teams = append(teams, t12)
//teams = append(teams, t13)
//teams = append(teams, t14)
//teams = append(teams, t15)
//teams = append(teams, t16)
resources = append(resources, m1[0]) resources = append(resources, m1[0])
resources = append(resources, m2[0]) resources = append(resources, m2[0])
@ -54,25 +61,22 @@ func fake_data() * EventManager {
resources = append(resources, m8[0]) resources = append(resources, m8[0])
resources = append(resources, m9[0]) resources = append(resources, m9[0])
resources = append(resources, m10[0]) resources = append(resources, m10[0])
resources = append(resources, m11[0]) //resources = append(resources, m11[0])
resources = append(resources, m12[0]) //resources = append(resources, m12[0])
//resources = append(resources, m13[0])
alliances := []*Alliance{} //resources = append(resources, m14[0])
for i, team := range(teams){ //resources = append(resources, m15[0])
for true { //resources = append(resources, m16[0])
idx := rand.Intn(len(teams))
if idx != i {
alliance := NewAlliance(team, teams[idx])
alliances = append(alliances, alliance)
break
}
}
}
arenas := []*Arena{} arenas := []*Arena{}
arenas = append(arenas, NewVirtualArena("Arena 1")) arenas = append(arenas, NewVirtualArena("Arena 1"))
arenas = append(arenas, NewVirtualArena("Arena 2")) arenas = append(arenas, NewVirtualArena("Arena 2"))
arenas = append(arenas, NewVirtualArena("Arena 3")) arenas = append(arenas, NewVirtualArena("Arena 3"))
arenas = append(arenas, NewVirtualArena("Arena 4"))
arenas = append(arenas, NewVirtualArena("Arena 5"))
arenas = append(arenas, NewVirtualArena("Arena 6"))
arenas = append(arenas, NewVirtualArena("Arena 7"))
arenas = append(arenas, NewVirtualArena("Arena 8"))
for _, arena := range arenas { for _, arena := range arenas {
resources = append(resources, arena) resources = append(resources, arena)
@ -82,19 +86,27 @@ func fake_data() * EventManager {
resources = append(resources, team) resources = append(resources, team)
} }
alliances := []*Alliance{}
for i, team := range(teams) {
for j, team2 := range(teams) {
if i != j {
alliance := NewAlliance(team, team2)
alliances = append(alliances, alliance)
}
}
}
for _, alliance := range alliances { for _, alliance := range alliances {
resources = append(resources, alliance) resources = append(resources, alliance)
} }
root_event := NewEventQueue("root_event", "", []Resource{}) root_event := NewEventQueue("root_event", "", []Resource{})
event_manager := NewEventManager(root_event, resources) event_manager := NewEventManager(root_event, resources)
arena_idx := 0 arena_idx := 0
for i := 0; i < len(alliances)*5; i++ { // Generate 3 games for each team by picking 3 random teams
alliance := alliances[i % len(alliances)] for i, alliance := range(alliances) {
for true { for j, alliance2 := range(alliances) {
idx := rand.Intn(len(alliances)) if j != i {
if idx != i {
alliance2 := alliances[idx]
if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] { if alliance.Children()[0] == alliance2.Children()[0] || alliance.Children()[0] == alliance2.Children()[1] || alliance.Children()[1] == alliance2.Children()[0] || alliance.Children()[1] == alliance2.Children()[1] {
} else { } else {
match := NewMatch(alliance, alliance2, arenas[arena_idx]) match := NewMatch(alliance, alliance2, arenas[arena_idx])
@ -108,7 +120,6 @@ func fake_data() * EventManager {
arena_idx = 0 arena_idx = 0
} }
} }
break
} }
} }
} }

@ -55,8 +55,10 @@ func (manager * EventManager) Run() error {
c <- nil c <- nil
} }
}(abort, aborts) }(abort, aborts)
err := manager.root_event.LockResources() err := manager.root_event.LockResources()
if err != nil { if err != nil {
abort <- nil
return err return err
} }

@ -9,7 +9,7 @@ import (
type graph_tester testing.T type graph_tester testing.T
const listner_timeout = 100 * time.Millisecond const listner_timeout = 100 * time.Millisecond
func (t * graph_tester) CheckForValue(listener chan string, str string) { func (t * graph_tester) CheckForValue(listener chan GraphSignal, str string) {
timeout := time.After(listner_timeout) timeout := time.After(listner_timeout)
select { select {
case <- listener: case <- listener:
@ -19,7 +19,7 @@ func (t * graph_tester) CheckForValue(listener chan string, str string) {
} }
} }
func (t * graph_tester) CheckForNone(listener chan string, str string) { func (t * graph_tester) CheckForNone(listener chan GraphSignal, str string) {
timeout := time.After(listner_timeout) timeout := time.After(listner_timeout)
select { select {
case <- listener: case <- listener:
@ -99,21 +99,23 @@ func TestResourceUpdate(t * testing.T) {
r4_l := r4.UpdateChannel() r4_l := r4.UpdateChannel()
// Calling Update() on the parent with no other parents should only notify node listeners // Calling Update() on the parent with no other parents should only notify node listeners
r3.Update("test") println("UPDATE_START")
r3.Update(NewSignal(nil, "test"))
println("UPDATE_DONE")
(*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r3") (*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r3")
(*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r3") (*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r3")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r3") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r3")
(*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r3") (*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r3")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings // Calling Update() on a child should notify listeners of the parent and child, but not siblings
r2.Update("test") r2.Update(NewSignal(nil, "test"))
(*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r2") (*graph_tester)(t).CheckForNone(r1_l, "Update on r1 after updating r2")
(*graph_tester)(t).CheckForValue(r2_l, "No update on r2 after updating r2") (*graph_tester)(t).CheckForValue(r2_l, "No update on r2 after updating r2")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r2") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r2")
(*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r2") (*graph_tester)(t).CheckForValue(r4_l, "No update on r4 after updating r2")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings // Calling Update() on a child should notify listeners of the parent and child, but not siblings
r1.Update("test") r1.Update(NewSignal(nil, "test"))
(*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after updating r1") (*graph_tester)(t).CheckForValue(r1_l, "No update on r1 after updating r1")
(*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r1") (*graph_tester)(t).CheckForNone(r2_l, "Update on r2 after updating r1")
(*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r1") (*graph_tester)(t).CheckForValue(r3_l, "No update on r3 after updating r1")
@ -268,7 +270,7 @@ func TestAddToEventQueue(t * testing.T) {
} }
func TestStartBaseEvent(t * testing.T) { func TestStartBaseEvent(t * testing.T) {
event_1 := NewEvent("1", "", []Resource{}) event_1 := NewEvent("TestStartBaseEvent event_1", "", []Resource{})
r := event_1.DoneResource() r := event_1.DoneResource()
manager := NewEventManager(event_1, []Resource{}) manager := NewEventManager(event_1, []Resource{})
@ -331,7 +333,7 @@ func TestAbortEventQueue(t * testing.T) {
} }
func TestStartEventQueue(t * testing.T) { func TestStartEventQueue(t * testing.T) {
root_event := NewEventQueue("", "", []Resource{}) root_event := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource() r := root_event.DoneResource()
rel := root_event.UpdateChannel(); rel := root_event.UpdateChannel();
res_1 := NewResource("test_resource", "", []Resource{}) res_1 := NewResource("test_resource", "", []Resource{})

@ -9,15 +9,13 @@ import (
// Resources propagate update up to multiple parents, and not downwards // Resources propagate update up to multiple parents, and not downwards
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
func (resource * BaseResource) Update(reason string) error { func (resource * BaseResource) Update(signal GraphSignal) error {
log.Printf("UPDATE BaseResource %s: %s", resource.Name(), reason) log.Printf("UPDATE BaseResource %s: %+v", resource.Name(), signal)
err := resource.UpdateListeners(reason)
if err != nil { resource.BaseNode.Update(signal)
return err
}
for _, parent := range resource.Parents() { for _, parent := range resource.Parents() {
err := parent.Update("update parents") err := parent.Update(signal)
if err != nil { if err != nil {
return err return err
} }
@ -62,7 +60,7 @@ func (resource * BaseResource) Owner() Event {
} }
func (resource * BaseResource) NotifyUnlocked() error { func (resource * BaseResource) NotifyUnlocked() error {
err := resource.Update("finalize_unlock") err := resource.Update(NewSignal(resource, "lock_change"))
if err != nil { if err != nil {
return err return err
} }
@ -78,7 +76,7 @@ func (resource * BaseResource) NotifyUnlocked() error {
} }
func (resource * BaseResource) NotifyLocked() error { func (resource * BaseResource) NotifyLocked() error {
err := resource.Update("finalize_lock") err := resource.Update(NewSignal(resource, "lock_change"))
if err != nil { if err != nil {
return err return err
} }
@ -90,7 +88,7 @@ func (resource * BaseResource) NotifyLocked() error {
} }
} }
resource.lock_holder.Update("finalize_lock") resource.lock_holder.Update(NewSignal(resource, "lock_change"))
return nil return nil
} }
@ -153,10 +151,6 @@ func (resource * BaseResource) Unlock(event Event) error {
} }
resource.state_lock.Unlock() resource.state_lock.Unlock()
/*if unlocked == true {
resource.Update("unlocking resource")
}*/
return err return err
} }
@ -189,13 +183,13 @@ func (resource * BaseResource) AddParent(parent Resource) error {
return nil return nil
} }
func NewResource(name string, description string, children []Resource) * BaseResource { func NewBaseResource(name string, description string, children []Resource) BaseResource {
resource := &BaseResource{ resource := BaseResource{
BaseNode: BaseNode{ BaseNode: BaseNode{
name: name, name: name,
description: description, description: description,
id: randid(), id: randid(),
listeners: []chan string{}, listeners: map[chan GraphSignal]chan GraphSignal{},
}, },
parents: []Resource{}, parents: []Resource{},
children: children, children: children,
@ -203,3 +197,8 @@ func NewResource(name string, description string, children []Resource) * BaseRes
return resource return resource
} }
func NewResource(name string, description string, children []Resource) * BaseResource {
resource := NewBaseResource(name, description, children)
return &resource
}

@ -13,16 +13,7 @@ type Member struct {
func NewMember(name string) * Member { func NewMember(name string) * Member {
member := &Member{ member := &Member{
BaseResource: BaseResource{ BaseResource: NewBaseResource(name, "A Team Member", []Resource{}),
BaseNode: BaseNode{
name: name,
description: "A Team Member",
id: randid(),
listeners: []chan string{},
},
parents: []Resource{},
children: []Resource{},
},
} }
return member return member
@ -47,22 +38,15 @@ func NewTeam(org string, team string, members []*Member) * Team {
name := fmt.Sprintf("%s%s", org, team) name := fmt.Sprintf("%s%s", org, team)
description := fmt.Sprintf("Team %s", name) description := fmt.Sprintf("Team %s", name)
resource := &Team{ resource := &Team{
BaseResource: BaseResource{ BaseResource: NewBaseResource(name, description, make([]Resource, len(members))),
BaseNode: BaseNode{
name: name,
description: description,
id: randid(),
listeners: []chan string{},
},
parents: []Resource{},
children: make([]Resource, len(members)),
},
Org: org, Org: org,
Team: team, Team: team,
} }
for idx, member := range(members) { for idx, member := range(members) {
resource.children[idx] = member resource.children[idx] = member
} }
return resource return resource
} }
@ -75,16 +59,7 @@ func NewAlliance(team0 * Team, team1 * Team) * Alliance {
description := "" description := ""
resource := &Alliance{ resource := &Alliance{
BaseResource: BaseResource{ BaseResource: NewBaseResource(name, description, []Resource{team0, team1}),
BaseNode: BaseNode{
name: name,
description: description,
id: randid(),
listeners: []chan string{},
},
parents: []Resource{},
children: []Resource{team0, team1},
},
} }
return resource return resource
} }
@ -103,16 +78,7 @@ type Arena struct {
func NewVirtualArena(name string) * Arena { func NewVirtualArena(name string) * Arena {
arena := &Arena{ arena := &Arena{
BaseResource: BaseResource{ BaseResource: NewBaseResource(name, "A virtual vex arena", []Resource{}),
BaseNode: BaseNode{
name: name,
description: "A virtual vex arena",
id: randid(),
listeners: []chan string{},
},
parents: []Resource{},
children: []Resource{},
},
connected: false, connected: false,
} }
@ -128,28 +94,34 @@ func (arena * Arena) Lock(event Event) error {
return arena.lock(event) return arena.lock(event)
} }
func (arena * Arena) Update(signal GraphSignal) error {
log.Printf("UPDATE Arena %s: %+v", arena.Name(), signal)
arena.BaseResource.Update(signal)
if arena.connected == true {
arena.signal <- signal
}
return nil
}
func (arena * Arena) Connect(abort chan error) bool { func (arena * Arena) Connect(abort chan error) bool {
log.Printf("Connecting %s", arena.Name()) log.Printf("Connecting %s", arena.Name())
go func(arena * Arena, abort chan error) { go func(arena * Arena, abort chan error) {
update_channel := arena.UpdateChannel()
owner := arena.Owner() owner := arena.Owner()
var owner_channel chan string = nil
if owner != nil {
owner_channel = owner.UpdateChannel()
}
arena.connected = true arena.connected = true
update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name()) update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name())
arena.Update(update_str) signal := NewSignal(arena, "arena_connected")
signal.description = update_str
arena.Update(signal)
log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name()) log.Printf("VIRTUAL_ARENA goroutine starting: %s", arena.Name())
for true { for true {
select { select {
case <- abort: case <- abort:
log.Printf("Virtual arena %s aborting", arena.Name()) log.Printf("Virtual arena %s aborting", arena.Name())
break break
case update, ok := <- update_channel: case update := <- arena.signal:
if !ok {
panic("own update_channel closed")
}
log.Printf("%s update: %s", arena.Name(), update) log.Printf("%s update: %s", arena.Name(), update)
new_owner := arena.Owner() new_owner := arena.Owner()
if new_owner != owner { if new_owner != owner {
@ -168,17 +140,9 @@ func (arena * Arena) Connect(abort chan error) bool {
owner = new_owner owner = new_owner
if owner != nil { if owner != nil {
owner_channel = owner.UpdateChannel()
} else { } else {
owner_channel = nil
}
} }
case update, ok := <- owner_channel:
if !ok {
panic("owner update channel closed")
} }
log.Printf("%s owner update: %s", arena.Name(), update)
log.Printf("owner: %s", owner.Name())
} }
} }
}(arena, abort) }(arena, abort)

@ -130,9 +130,11 @@ func TestNewMatch(t *testing.T) {
arena := NewVirtualArena(arena_name) arena := NewVirtualArena(arena_name)
root_event := NewMatch(alliance_1, alliance_2, arena) match := NewMatch(alliance_1, alliance_2, arena)
root_event := NewEventQueue("root_event", "", []Resource{})
event_manager := NewEventManager(root_event, []Resource{member_1, member_2, member_3, member_4, team_1, team_2, team_3, team_4, alliance_1, alliance_2, arena}) event_manager := NewEventManager(root_event, []Resource{member_1, member_2, member_3, member_4, team_1, team_2, team_3, team_4, alliance_1, alliance_2, arena})
event_manager.AddEvent(root_event, match, NewEventQueueInfo(1))
go func() { go func() {
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
@ -140,7 +142,7 @@ func TestNewMatch(t *testing.T) {
}() }()
err := event_manager.Run() err := event_manager.Run()
if err == nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }