Refactored GraphSignal to remove pointers and updated update path

graph-rework
noah metz 2023-06-18 18:11:59 -06:00
parent 23fbdc74a0
commit 66e6c4263c
10 changed files with 143 additions and 229 deletions

@ -11,20 +11,28 @@ import (
// Update the events listeners, and notify the parent to do the same // Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) update(signal GraphSignal) { func (event * BaseEvent) update(signal GraphSignal) {
event.signal <- signal if signal.Downwards() == false {
new_signal := signal.Trace(event.ID()) // Child->Parent
event.parent_lock.Lock()
defer event.parent_lock.Unlock()
if event.parent != nil { if event.parent != nil {
SendUpdate(event.parent, new_signal) SendUpdate(event.parent, signal)
} }
source_id := signal.Last() event.rr_lock.Lock()
defer event.rr_lock.Unlock()
for _, resource := range(event.RequiredResources()) { for _, resource := range(event.required_resources) {
if source_id != resource.ID() { SendUpdate(resource, signal)
SendUpdate(resource, new_signal) }
} else {
// Parent->Child
event.child_lock.Lock()
defer event.child_lock.Unlock()
for _, child := range(event.children) {
SendUpdate(child, signal)
} }
} }
event.signal <- signal
} }
type EventInfo interface { type EventInfo interface {
@ -197,15 +205,13 @@ func AddChild(event Event, child Event, info EventInfo) error {
child.UnlockParent() child.UnlockParent()
event.UnlockParent() event.UnlockParent()
update := NewSignal(event, "child_added")
update.description = child.Name()
SendUpdate(event, NewSignal(event, "child_added")) SendUpdate(event, NewSignal(event, "child_added"))
return nil return nil
} }
func RunEvent(event Event) error { func RunEvent(event Event) error {
log.Logf("event", "EVENT_RUN: %s", event.Name()) log.Logf("event", "EVENT_RUN: %s", event.Name())
go SendUpdate(event, NewSignal(event, "event_start")) SendUpdate(event, NewSignal(event, "event_start"))
next_action := "start" next_action := "start"
var err error = nil var err error = nil
for next_action != "" { for next_action != "" {
@ -229,42 +235,14 @@ func RunEvent(event Event) error {
func EventAbort(event Event) func(signal GraphSignal) (string, error) { func EventAbort(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) { return func(signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
AbortChildren(event)
return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID())) return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID()))
} }
return "wait", nil
}
} }
func EventCancel(event Event) func(signal GraphSignal) (string, error) { func EventCancel(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) { return func(signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
CancelChildren(event)
return "", nil return "", nil
} }
return "wait", nil
}
}
func CancelChildren(event Event) {
event.LockChildren()
for _, child := range(event.Children()) {
signal := NewSignal(event, "cancel")
signal.description = child.ID()
SendUpdate(child, signal)
}
event.UnlockChildren()
}
func AbortChildren(event Event) {
event.LockChildren()
for _, child := range(event.Children()) {
signal := NewSignal(event, "abort")
signal.description = child.ID()
SendUpdate(child, signal)
}
event.UnlockChildren()
} }
func LockResources(event Event) error { func LockResources(event Event) error {
@ -287,22 +265,19 @@ func LockResources(event Event) error {
return lock_err return lock_err
} }
for _, resource := range(locked_resources) { signal := NewDownSignal(event, "locked")
NotifyResourceLocked(resource) SendUpdate(event, signal)
}
return nil return nil
} }
func FinishEvent(event Event) error { func FinishEvent(event Event) error {
// TODO make more 'safe' like LockResources, or make UnlockResource not return errors
log.Logf("event", "EVENT_FINISH: %s", event.Name()) log.Logf("event", "EVENT_FINISH: %s", event.Name())
for _, resource := range(event.RequiredResources()) { for _, resource := range(event.RequiredResources()) {
err := UnlockResource(resource, event) err := UnlockResource(resource, event)
if err != nil { if err != nil {
panic(err) panic(err)
} }
NotifyResourceUnlocked(resource)
} }
err := UnlockResource(event.DoneResource(), event) err := UnlockResource(event.DoneResource(), event)
@ -310,7 +285,8 @@ func FinishEvent(event Event) error {
return err return err
} }
NotifyResourceUnlocked(event.DoneResource()) SendUpdate(event, NewDownSignal(event, "unlocked"))
SendUpdate(event.DoneResource(), NewDownSignal(event, "unlocked"))
err = event.finish() err = event.finish()
if err != nil { if err != nil {
@ -330,6 +306,7 @@ func FinishEvent(event Event) error {
type BaseEvent struct { type BaseEvent struct {
BaseNode BaseNode
done_resource Resource done_resource Resource
rr_lock sync.Mutex
required_resources []Resource required_resources []Resource
children []Event children []Event
child_info map[string]EventInfo child_info map[string]EventInfo
@ -353,11 +330,7 @@ func EventWait(event Event) (func() (string, error)) {
log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout()) log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout())
select { select {
case signal := <- event.Signal(): case signal := <- event.Signal():
if signal.Source() != nil { log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal)
log.Logf("event", "EVENT_SIGNAL: %s %s %s -> %+v", event.Name(), signal.Last(), signal.Source().Name(), signal)
} else {
log.Logf("event", "EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal)
}
signal_fn, exists := event.Handler(signal.Type()) signal_fn, exists := event.Handler(signal.Type())
if exists == true { if exists == true {
log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type())

@ -715,15 +715,15 @@ func GQLSignalType(p graphql.ResolveParams) (interface{}, error) {
}) })
} }
func GQLSignalDesc(p graphql.ResolveParams) (interface{}, error) { func GQLSignalSource(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Description(), nil return signal.Source(), nil
}) })
} }
func GQLSignalTime(p graphql.ResolveParams) (interface{}, error) { func GQLSignalDownwards(p graphql.ResolveParams) (interface{}, error) {
return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){ return GQLSignalFn(p, func(signal GraphSignal, p graphql.ResolveParams)(interface{}, error){
return signal.Time(), nil return signal.Downwards(), nil
}) })
} }
@ -750,13 +750,13 @@ func GQLTypeSignal() *graphql.Object {
Type: graphql.String, Type: graphql.String,
Resolve: GQLSignalType, Resolve: GQLSignalType,
}) })
gql_type_signal.AddFieldConfig("Description", &graphql.Field{ gql_type_signal.AddFieldConfig("Source", &graphql.Field{
Type: graphql.String, Type: graphql.String,
Resolve: GQLSignalDesc, Resolve: GQLSignalSource,
}) })
gql_type_signal.AddFieldConfig("Time", &graphql.Field{ gql_type_signal.AddFieldConfig("Downwards", &graphql.Field{
Type: graphql.DateTime, Type: graphql.Boolean,
Resolve: GQLSignalTime, Resolve: GQLSignalDownwards,
}) })
gql_type_signal.AddFieldConfig("String", &graphql.Field{ gql_type_signal.AddFieldConfig("String", &graphql.Field{
Type: graphql.String, Type: graphql.String,
@ -811,9 +811,12 @@ func GQLMutationUpdateEvent() *graphql.Field {
if ok == false { if ok == false {
return nil, errors.New(fmt.Sprintf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"])) return nil, errors.New(fmt.Sprintf("Failed to cast arg signal to GraphSignal: %+v", p.Args["signal"]))
} }
signal := NewSignal(server, signal_map["Type"].(string)) var signal GraphSignal = nil
signal.description = signal_map["Description"].(string) if signal_map["Downwards"] == false {
signal.time = signal_map["Time"].(time.Time) signal = NewSignal(server, signal_map["Type"].(string))
} else {
signal = NewDownSignal(server, signal_map["Type"].(string))
}
id , ok := p.Args["id"].(string) id , ok := p.Args["id"].(string)
if ok == false { if ok == false {

@ -5,7 +5,6 @@ import (
"reflect" "reflect"
"fmt" "fmt"
"errors" "errors"
"time"
) )
func GQLVexTypes() map[reflect.Type]*graphql.Object { func GQLVexTypes() map[reflect.Type]*graphql.Object {
@ -122,14 +121,7 @@ func GQLVexMutationSetMatchState() *graphql.Field {
return nil, errors.New("Failed to cast arg state to string") return nil, errors.New("Failed to cast arg state to string")
} }
start, ok := p.Args["time"].(time.Time)
if ok == false {
start = time.Now()
}
signal := NewSignal(server, state) signal := NewSignal(server, state)
signal.description = id
signal.time = start
owner := server.Owner() owner := server.Owner()
if owner == nil { if owner == nil {

@ -59,7 +59,7 @@ func (logger * DefaultLogger) Init(components []string) error {
} }
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
logger.Init([]string{"gqlws", "gqlws_new"}) logger.Init([]string{"update"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
log := l.Log() log := l.Log()
@ -71,7 +71,7 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
} }
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
logger.Init([]string{"gqlws", "gqlws_new"}) logger.Init([]string{"update"})
l, exists := logger.loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...)) l.Log().Msg(fmt.Sprintf(format, items...))
@ -85,63 +85,66 @@ func randid() string{
} }
type GraphSignal interface { type GraphSignal interface {
Source() GraphNode Downwards() bool
Source() string
Type() string Type() string
Description() string
Time() time.Time
Last() string
Trace(id string) GraphSignal
String() string String() string
} }
type BaseSignal struct { type BaseSignal struct {
source GraphNode downwards bool
signal_type string source string
description string _type string
time time.Time
last_id string
}
func (signal BaseSignal) String() string {
source_name := "nil"
source := signal.source
if source != nil {
source_name = source.Name()
}
return fmt.Sprintf("{type: %s, description: %s, source: %s, last: %s}", signal.signal_type, signal.description, source_name, signal.last_id)
} }
func (signal BaseSignal) Time() time.Time { func (signal BaseSignal) Downwards() bool {
return signal.time return signal.downwards
} }
func (signal BaseSignal) Source() GraphNode { func (signal BaseSignal) Source() string {
return signal.source return signal.source
} }
func (signal BaseSignal) Type() string { func (signal BaseSignal) Type() string {
return signal.signal_type return signal._type
} }
func (signal BaseSignal) Description() string { func (signal BaseSignal) String() string {
return signal.description return fmt.Sprintf("{downwards: %t, source: %s, type: %s}", signal.downwards, signal.source, signal._type)
} }
func (signal BaseSignal) Trace(id string) GraphSignal { type TimeSignal struct {
new_signal := signal BaseSignal
new_signal.last_id = id time time.Time
return new_signal
} }
func (signal BaseSignal) Last() string { func NewDownSignal(source GraphNode, _type string) (BaseSignal) {
return signal.last_id source_id := ""
if source != nil {
source_id = source.ID()
}
signal := BaseSignal{
downwards: true,
source: source_id,
_type: _type,
}
return signal
} }
func NewSignal(source GraphNode, signal_type string) (BaseSignal) { func NewSignal(source GraphNode, _type string) (BaseSignal) {
source_id := ""
if source != nil {
source_id = source.ID()
}
signal := BaseSignal{ signal := BaseSignal{
source: source, downwards: false,
signal_type: signal_type, source: source_id,
_type: _type,
} }
return signal return signal
} }
@ -251,17 +254,11 @@ func (node * BaseNode) update(signal GraphSignal) {
} }
func SendUpdate(node GraphNode, signal GraphSignal) { func SendUpdate(node GraphNode, signal GraphSignal) {
source := signal.Source()
source_name := "nil"
if source != nil {
source_name = source.Name()
}
node_name := "nil" node_name := "nil"
if node != nil { if node != nil {
node_name = node.Name() node_name = node.Name()
} }
log.Logf("update", "UPDATE %s -> %s: %+v", source_name, node_name, signal) log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal)
node.UpdateListeners(signal) node.UpdateListeners(signal)
node.update(signal) node.update(signal)
} }

@ -201,8 +201,7 @@ func main() {
select { select {
case <-sigs: case <-sigs:
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
signal := NewSignal(nil, "abort") signal := NewDownSignal(nil, "abort")
signal.description = event_manager.root_event.ID()
SendUpdate(event_manager.root_event, signal) SendUpdate(event_manager.root_event, signal)
break break
} }

@ -17,17 +17,10 @@ func (t * graph_tester) WaitForValue(listener chan GraphSignal, signal_type stri
select { select {
case signal := <- listener: case signal := <- listener:
if signal.Type() == signal_type { if signal.Type() == signal_type {
if signal.Source() == nil || source == nil { fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s %+v\n", signal.Type(), signal.Source(), listener)
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s\n", signal.Type(), signal.Source()) if signal.Source() == source.ID() {
if source == nil && signal.Source() == nil{
return signal return signal
} }
} else {
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s\n", signal.Type(), signal.Source().Name())
if signal.Source().ID() == source.ID() {
return signal
}
}
} }
case <-timeout_channel: case <-timeout_channel:
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
@ -211,10 +204,10 @@ func TestLockResource(t * testing.T) {
if err != nil { if err != nil {
t.Fatal("Failed to lock r3") t.Fatal("Failed to lock r3")
} }
NotifyResourceLocked(r3) SendUpdate(r3, NewDownSignal(r3, "locked"))
(*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second, "Wasn't notified of r1 lock on r1 after r3 lock") (*graph_tester)(t).WaitForValue(r1_l, "locked", r3, time.Second, "Wasn't notified of r1 lock on r1 after r3 lock")
(*graph_tester)(t).WaitForValue(rel, "lock_changed", r1, time.Second, "Wasn't notified of r1 lock on rel after r3 lock") (*graph_tester)(t).WaitForValue(rel, "locked", r3, time.Second, "Wasn't notified of r1 lock on rel after r3 lock")
err = LockResource(r3, root_event) err = LockResource(r3, root_event)
if err == nil { if err == nil {
@ -240,23 +233,23 @@ func TestLockResource(t * testing.T) {
if err != nil { if err != nil {
t.Fatal("Failed to unlock r3") t.Fatal("Failed to unlock r3")
} }
NotifyResourceUnlocked(r3) SendUpdate(r3, NewDownSignal(r3, "unlocked"))
(*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r3 unlock") (*graph_tester)(t).WaitForValue(r1_l, "unlocked", r3, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r3 unlock")
err = LockResource(r4, root_event) err = LockResource(r4, root_event)
if err != nil { if err != nil {
t.Fatal("Failed to lock r4 after unlocking r3") t.Fatal("Failed to lock r4 after unlocking r3")
} }
NotifyResourceLocked(r4) SendUpdate(r4, NewDownSignal(r4, "locked"))
(*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock") (*graph_tester)(t).WaitForValue(r1_l, "locked", r4, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock")
(*graph_tester)(t).WaitForValue(rel, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock") (*graph_tester)(t).WaitForValue(rel, "locked", r4, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock")
err = UnlockResource(r4, root_event) err = UnlockResource(r4, root_event)
if err != nil { if err != nil {
t.Fatal("Failed to unlock r4") t.Fatal("Failed to unlock r4")
} }
NotifyResourceUnlocked(r4) SendUpdate(r4, NewDownSignal(r4, "unlocked"))
(*graph_tester)(t).WaitForValue(r1_l, "lock_changed", r1, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r4 lock") (*graph_tester)(t).WaitForValue(r1_l, "unlocked", r4, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r4 lock")
} }
func TestAddToEventQueue(t * testing.T) { func TestAddToEventQueue(t * testing.T) {
@ -299,8 +292,9 @@ func TestStartBaseEvent(t * testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Check that the update channels for the event and resource have updates // Check that the update channels for the event and resource have updates
(*graph_tester)(t).CheckForValue(e_l, "No update on event_1 after starting") (*graph_tester)(t).WaitForValue(e_l, "event_start", event_1, 1*time.Second, "No event_start on e_l")
(*graph_tester)(t).CheckForValue(r_l, "No update on r_l after starting") (*graph_tester)(t).WaitForValue(e_l, "event_done", event_1, 1*time.Second, "No event_start on e_l")
(*graph_tester)(t).WaitForValue(r_l, "unlocked", event_1, 1*time.Second, "No unlocked on r_l")
if r.Owner() != nil { if r.Owner() != nil {
t.Fatal("r still owned after event completed") t.Fatal("r still owned after event completed")
@ -330,8 +324,7 @@ func TestAbortEventQueue(t * testing.T) {
// start the queue and check that all the events are executed // start the queue and check that all the events are executed
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
abort_signal := NewSignal(nil, "abort") abort_signal := NewDownSignal(nil, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal) SendUpdate(root_event, abort_signal)
}() }()
@ -388,8 +381,7 @@ func TestStartEventQueue(t * testing.T) {
go func() { go func() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
if r.Owner() != nil { if r.Owner() != nil {
abort_signal := NewSignal(nil, "abort") abort_signal := NewDownSignal(nil, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal) SendUpdate(root_event, abort_signal)
} }
}() }()
@ -400,8 +392,7 @@ func TestStartEventQueue(t * testing.T) {
(*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3") (*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3")
(*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "No event_done for e3") (*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "No event_done for e3")
(*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "No event_done for e3") (*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "No event_done for e3")
signal := NewSignal(nil, "cancel") signal := NewDownSignal(nil, "cancel")
signal.description = root_event.ID()
SendUpdate(root_event, signal) SendUpdate(root_event, signal)
}() }()

@ -9,24 +9,28 @@ import (
// Resources propagate update up to multiple parents, and not downwards // Resources propagate update up to multiple parents, and not downwards
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
func (resource * BaseResource) update(signal GraphSignal) { func (resource * BaseResource) update(signal GraphSignal) {
new_signal := signal.Trace(resource.ID())
if signal.Downwards() == false {
// Child->Parent, resource updates parent resources
resource.parents_lock.Lock()
defer resource.parents_lock.Unlock()
for _, parent := range resource.Parents() { for _, parent := range resource.Parents() {
SendUpdate(parent, new_signal) SendUpdate(parent, signal)
} }
} else {
// Parent->Child, resource updates lock holder
resource.lock_holder_lock.Lock() resource.lock_holder_lock.Lock()
defer resource.lock_holder_lock.Unlock()
if resource.lock_holder != nil { if resource.lock_holder != nil {
if resource.lock_holder.ID() != signal.Last() { SendUpdate(resource.lock_holder, signal)
lock_holder := resource.lock_holder
resource.lock_holder_lock.Unlock()
SendUpdate(lock_holder, new_signal)
} else {
resource.lock_holder_lock.Unlock()
}
} else {
resource.lock_holder_lock.Unlock()
} }
resource.children_lock.Lock()
defer resource.children_lock.Unlock()
for _, child := range(resource.children) {
SendUpdate(child, signal)
}
}
} }
// Resource is the interface that DAG nodes are made from // Resource is the interface that DAG nodes are made from
@ -75,13 +79,12 @@ func AddParent(resource Resource, parent Resource) error {
func UnlockResource(resource Resource, event Event) error { func UnlockResource(resource Resource, event Event) error {
var err error = nil var err error = nil
resource.LockState() resource.LockState()
defer resource.UnlockState()
if resource.Owner() == nil { if resource.Owner() == nil {
resource.UnlockState()
return errors.New("Resource already unlocked") return errors.New("Resource already unlocked")
} }
if resource.Owner().ID() != event.ID() { if resource.Owner().ID() != event.ID() {
resource.UnlockState()
return errors.New("Resource not locked by parent, unlock failed") return errors.New("Resource not locked by parent, unlock failed")
} }
@ -95,36 +98,29 @@ func UnlockResource(resource Resource, event Event) error {
} }
if lock_err != nil { if lock_err != nil {
resource.UnlockState() return fmt.Errorf("Resource failed to unlock: %s", lock_err)
err_str := fmt.Sprintf("Resource failed to unlock: %s", lock_err)
return errors.New(err_str)
} }
resource.SetOwner(nil) resource.SetOwner(nil)
err = resource.unlock(event) err = resource.unlock(event)
if err != nil { if err != nil {
resource.UnlockState()
return errors.New("Failed to unlock resource") return errors.New("Failed to unlock resource")
} }
resource.UnlockState()
return nil return nil
} }
func LockResource(resource Resource, node GraphNode) error { func LockResource(resource Resource, node GraphNode) error {
resource.LockState() resource.LockState()
defer resource.UnlockState()
if resource.Owner() != nil { if resource.Owner() != nil {
resource.UnlockState() return fmt.Errorf("Resource already locked: %s", resource.Name())
err_str := fmt.Sprintf("Resource already locked: %s", resource.Name())
return errors.New(err_str)
} }
err := resource.lock(node) err := resource.lock(node)
if err != nil { if err != nil {
resource.UnlockState() return fmt.Errorf("Failed to lock resource: %s", err)
err_str := fmt.Sprintf("Failed to lock resource: %s", err)
return errors.New(err_str)
} }
var lock_err error = nil var lock_err error = nil
@ -139,41 +135,15 @@ func LockResource(resource Resource, node GraphNode) error {
} }
if lock_err != nil { if lock_err != nil {
resource.UnlockState() return fmt.Errorf("Resource failed to lock: %s", lock_err)
err_str := fmt.Sprintf("Resource failed to lock: %s", lock_err)
return errors.New(err_str)
} }
log.Logf("resource", "Locked %s", resource.Name()) log.Logf("resource", "Locked %s", resource.Name())
resource.SetOwner(node) resource.SetOwner(node)
resource.UnlockState()
return nil return nil
} }
func NotifyResourceLocked(resource Resource) {
signal := NewSignal(resource, "lock_changed")
signal.description = "lock"
for _, child := range resource.Children() {
NotifyResourceLocked(child)
}
go SendUpdate(resource, signal)
}
func NotifyResourceUnlocked(resource Resource) {
signal := NewSignal(resource, "lock_changed")
signal.description = "unlock"
for _, child := range(resource.Children()) {
NotifyResourceUnlocked(child)
}
go SendUpdate(resource, signal)
}
// BaseResource is the most basic resource that can exist in the DAG // BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it // It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct { type BaseResource struct {

@ -35,8 +35,9 @@ client.subscribe({
next: (data) => { next: (data) => {
console.log("ARENA_SUB") console.log("ARENA_SUB")
let obj = JSON.parse(data.data) let obj = JSON.parse(data.data)
console.log(obj.Arena) if(obj.Arena.Owner != null) {
game_id = obj.Arena.Owner.ID game_id = obj.Arena.Owner.ID
}
}, },
error: (err) => { error: (err) => {
console.log("ARENA_SUB") console.log("ARENA_SUB")

@ -100,11 +100,9 @@ func (arena * VirtualArena) update(signal GraphSignal) {
func (arena * VirtualArena) Init(abort chan error) bool { func (arena * VirtualArena) Init(abort chan error) bool {
log.Logf("vex", "Initializing %s", arena.Name()) log.Logf("vex", "Initializing %s", arena.Name())
go func(arena * VirtualArena, abort chan error) { go func(arena * VirtualArena, abort chan error) {
update_str := fmt.Sprintf("VIRTUAL_ARENA connected: %s", arena.Name())
signal := NewSignal(arena, "resource_connected") signal := NewSignal(arena, "resource_connected")
signal.description = update_str
arena.connected = true arena.connected = true
go SendUpdate(arena, signal) SendUpdate(arena, signal)
log.Logf("vex", "VIRTUAL_ARENA goroutine starting: %s", arena.Name()) log.Logf("vex", "VIRTUAL_ARENA goroutine starting: %s", arena.Name())
for true { for true {
select { select {
@ -148,15 +146,10 @@ type Match struct {
state string state string
control string control string
control_start time.Time control_start time.Time
control_duration int control_duration time.Duration
alliances []*Alliance alliances []*Alliance
} }
func (match * Match) update(signal GraphSignal) {
new_signal := signal.Trace(match.ID())
match.BaseEvent.update(new_signal)
}
func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match { func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name()) name := fmt.Sprintf("Match: %s vs. %s on %s", alliance0.Name(), alliance1.Name(), arena.Name())
description := "A vex match" description := "A vex match"
@ -192,7 +185,6 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
match.state = "autonomous_queued" match.state = "autonomous_queued"
match.control_start = time.Now().Add(start_slack) match.control_start = time.Now().Add(start_slack)
new_signal := NewSignal(match, "autonomous_queued") new_signal := NewSignal(match, "autonomous_queued")
new_signal.time = match.control_start
go SendUpdate(match, new_signal) go SendUpdate(match, new_signal)
return "wait", nil return "wait", nil
} }
@ -205,7 +197,9 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
log.Logf("vex", "AUTONOMOUS_RUNNING: %s", match.Name()) log.Logf("vex", "AUTONOMOUS_RUNNING: %s", match.Name())
match.control = "program" match.control = "program"
match.state = "autonomous_running" match.state = "autonomous_running"
match.control_start = signal.Time() match.control_start = time.Now() // TODO
match.control_duration = TEMP_AUTON_TIME
go SendUpdate(match, NewSignal(match, "autonomous_running")) go SendUpdate(match, NewSignal(match, "autonomous_running"))
end_time := match.control_start.Add(TEMP_AUTON_TIME) end_time := match.control_start.Add(TEMP_AUTON_TIME)
@ -235,7 +229,6 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
match.state = "driver_queued" match.state = "driver_queued"
match.control_start = time.Now().Add(start_slack) match.control_start = time.Now().Add(start_slack)
new_signal := NewSignal(match, "driver_queued") new_signal := NewSignal(match, "driver_queued")
new_signal.time = match.control_start
go SendUpdate(match, new_signal) go SendUpdate(match, new_signal)
return "wait", nil return "wait", nil
} }
@ -247,7 +240,8 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
} }
match.control = "driver" match.control = "driver"
match.state = "driver_running" match.state = "driver_running"
match.control_start = signal.Time() match.control_start = time.Now() // TODO
match.control_duration = TEMP_DRIVE_TIME
go SendUpdate(match, NewSignal(match, "driver_running")) go SendUpdate(match, NewSignal(match, "driver_running"))
@ -269,8 +263,6 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
} }
match.actions["driver_done"] = func() (string, error) { match.actions["driver_done"] = func() (string, error) {
new_signal := NewSignal(match, "driver_done")
new_signal.time = time.Now()
SendUpdate(match, NewSignal(match, "driver_done")) SendUpdate(match, NewSignal(match, "driver_done"))
return "wait", nil return "wait", nil
} }

@ -144,30 +144,26 @@ func TestNewMatch(t *testing.T) {
time.Sleep(time.Second * 20) time.Sleep(time.Second * 20)
if r.Owner() != nil { if r.Owner() != nil {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
abort_signal := NewSignal(root_event, "abort") abort_signal := NewDownSignal(root_event, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal) SendUpdate(root_event, abort_signal)
} }
}() }()
go func(arena_c chan GraphSignal) { go func(arena_c chan GraphSignal) {
(*graph_tester)(t).WaitForValue(arena_c, "event_start", match, 1*time.Second, "no event_start") (*graph_tester)(t).WaitForValue(arena_c, "event_start", match, 1*time.Second, "no event_start")
SendUpdate(arena, NewSignal(nil, "queue_autonomous")) SendUpdate(arena, NewDownSignal(nil, "queue_autonomous"))
(*graph_tester)(t).WaitForValue(arena_c, "autonomous_queued", match, 1*time.Second, "no autonomous_queued") (*graph_tester)(t).WaitForValue(arena_c, "autonomous_queued", match, 1*time.Second, "no autonomous_queued")
auton_signal := NewSignal(nil, "start_autonomous") auton_signal := NewDownSignal(nil, "start_autonomous")
auton_signal.time = time.Now()
SendUpdate(arena, auton_signal) SendUpdate(arena, auton_signal)
(*graph_tester)(t).WaitForValue(arena_c, "autonomous_running", match, 1*time.Second, "no autonomous_running") (*graph_tester)(t).WaitForValue(arena_c, "autonomous_running", match, 1*time.Second, "no autonomous_running")
(*graph_tester)(t).WaitForValue(arena_c, "autonomous_done", match, 6*time.Second, "no autonomous_done") (*graph_tester)(t).WaitForValue(arena_c, "autonomous_done", match, 6*time.Second, "no autonomous_done")
SendUpdate(arena, NewSignal(nil, "queue_driver")) SendUpdate(arena, NewDownSignal(nil, "queue_driver"))
(*graph_tester)(t).WaitForValue(arena_c, "driver_queued", match, 1*time.Second, "no driver_queued") (*graph_tester)(t).WaitForValue(arena_c, "driver_queued", match, 1*time.Second, "no driver_queued")
driver_signal := NewSignal(nil, "start_driver") driver_signal := NewDownSignal(nil, "start_driver")
driver_signal.time = time.Now()
SendUpdate(arena, driver_signal) SendUpdate(arena, driver_signal)
(*graph_tester)(t).WaitForValue(arena_c, "driver_running", match, 1*time.Second, "no driver_running") (*graph_tester)(t).WaitForValue(arena_c, "driver_running", match, 1*time.Second, "no driver_running")
(*graph_tester)(t).WaitForValue(arena_c, "driver_done", match, 6*time.Second, "no driver_done") (*graph_tester)(t).WaitForValue(arena_c, "driver_done", match, 6*time.Second, "no driver_done")
cancel_signal := NewSignal(nil, "cancel") cancel_signal := NewDownSignal(nil, "cancel")
cancel_signal.description = root_event.ID()
SendUpdate(root_event, cancel_signal) SendUpdate(root_event, cancel_signal)
}(arena_c) }(arena_c)