diff --git a/event.go b/event.go index 75ce4ee..b3479cd 100644 --- a/event.go +++ b/event.go @@ -186,15 +186,44 @@ func RunEvent(event Event) error { return nil } -func AbortEvent(event Event) error { - signal := NewSignal(event, "abort") - SendUpdate(event, signal) +func EventAbort(event Event) func(signal GraphSignal) (string, error) { + return func(signal GraphSignal) (string, error) { + if signal.Description() == event.ID() { + AbortChildren(event) + return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID())) + } + return "wait", nil + } +} + +func EventCancel(event Event) func(signal GraphSignal) (string, error) { + return func(signal GraphSignal) (string, error) { + if signal.Description() == event.ID() { + CancelChildren(event) + return "", nil + } + return "wait", nil + } +} + +func CancelChildren(event Event) { event.LockChildren() for _, child := range(event.Children()) { - AbortEvent(child) + signal := NewSignal(event, "cancel") + signal.description = child.ID() + SendUpdate(child, signal) + } + event.UnlockChildren() +} + +func AbortChildren(event Event) { + event.LockChildren() + for _, child := range(event.Children()) { + signal := NewSignal(event, "abort") + signal.description = child.ID() + SendUpdate(child, signal) } event.UnlockChildren() - return nil } func LockResources(event Event) error { @@ -287,14 +316,10 @@ func EventWait(event Event) (func() (string, error)) { } else { log.Logf("event", "EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal) } - if signal.Type() == "abort" { - return "", errors.New("State machine aborted by signal") - } else { - signal_fn, exists := event.Handler(signal.Type()) - if exists == true { - log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) - return signal_fn(signal) - } + signal_fn, exists := event.Handler(signal.Type()) + if exists == true { + log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) + return signal_fn(signal) } return "wait", nil case <- event.Timeout(): @@ -330,6 +355,8 @@ func NewEvent(name string, description string, required_resources []Resource) (* event_ptr := &event event_ptr.actions["wait"] = EventWait(event_ptr) + event_ptr.handlers["abort"] = EventAbort(event_ptr) + event_ptr.handlers["cancel"] = EventCancel(event_ptr) event_ptr.actions["start"] = func() (string, error) { return "", nil @@ -372,6 +399,8 @@ func NewEventQueue(name string, description string, required_resources []Resourc } queue.actions["wait"] = EventWait(queue) + queue.handlers["abort"] = EventAbort(queue) + queue.handlers["cancel"] = EventCancel(queue) queue.actions["start"] = func() (string, error) { return "queue_event", nil @@ -389,7 +418,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc } sort.SliceStable(copied_events, less) - wait := false needed_resources := map[string]Resource{} for _, event := range(copied_events) { // make sure all the required resources are registered to update the event @@ -399,7 +427,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc info := queue.ChildInfo(event).(*EventQueueInfo) if info.state == "queued" { - wait = true // Try to lock it err := LockResources(event) // start in new goroutine @@ -418,8 +445,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc FinishEvent(event) }(event, info, queue) } - } else if info.state == "running" { - wait = true } } @@ -435,11 +460,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc queue.UnlockChildren() - if wait == true { - return "wait", nil - } else { - return "", nil - } + return "wait", nil } queue.handlers["resource_connected"] = func(signal GraphSignal) (string, error) { @@ -509,3 +530,52 @@ func (event * BaseEvent) addChild(child Event, info EventInfo) { event.children = append(event.children, child) event.child_info[child.ID()] = info } + +type GQLEvent struct { + BaseEvent + abort chan error +} + +func NewGQLEvent(listen string, child Event) * GQLEvent { + event := &GQLEvent{ + BaseEvent: NewBaseEvent("GQL Handler", "", []Resource{}), + abort: make(chan error, 1), + } + + event.actions["wait"] = EventWait(event) + + event.handlers["abort"] = func (signal GraphSignal) (string, error) { + if signal.Description() == event.ID() { + event.abort <- nil + AbortChildren(event) + return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID())) + } + return "wait", nil + } + + event.handlers["cancel"] = func (signal GraphSignal) (string, error) { + if signal.Description() == event.ID() { + event.abort <- nil + CancelChildren(event) + return "", nil + } + return "wait", nil + } + + event.actions["start"] = func() (string, error) { + // start the gql handler goroutine + log.Logf("gql", "Starting GQL thread for %s", event.ID()) + go func(event * GQLEvent) { + for true { + select { + case <- event.abort: + log.Logf("gql", "Stopping GQL thread for %s", event.ID()) + break + } + } + }(event) + return "wait", nil + } + + return event +} diff --git a/go.mod b/go.mod index e547c4c..b1f2c43 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/google/uuid v1.3.0 // indirect github.com/graphql-go/graphql v0.8.1 // indirect + github.com/graphql-go/handler v0.2.3 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/rs/zerolog v1.29.1 // indirect diff --git a/go.sum b/go.sum index 6620914..c5fb224 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMN github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os= github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuMMgc= github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ= +github.com/graphql-go/handler v0.2.3 h1:CANh8WPnl5M9uA25c2GBhPqJhE53Fg0Iue/fRNla71E= +github.com/graphql-go/handler v0.2.3/go.mod h1:leLF6RpV5uZMN1CdImAxuiayrYYhOk33bZciaUGaXeU= github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU= github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= diff --git a/graph.go b/graph.go index 44af18c..bac33bd 100644 --- a/graph.go +++ b/graph.go @@ -22,7 +22,7 @@ type DefaultLogger struct { } var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}} -var all_components = []string{"update", "graph", "event", "resource", "manager", "test"} +var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql"} func (logger * DefaultLogger) Init(components []string) error { file, err := os.OpenFile("test.log", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0666) @@ -40,9 +40,11 @@ func (logger * DefaultLogger) Init(components []string) error { } writer := io.MultiWriter(file, os.Stdout) - for _, c := range([]string{"event"}) { + for _, c := range(all_components) { if component_enabled(c) == true { logger.loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger() + } else { + panic(fmt.Sprintf("%s is not a component in DefaultLogger", c)) } } return nil diff --git a/main.go b/main.go index 91b0902..18747ed 100644 --- a/main.go +++ b/main.go @@ -271,13 +271,17 @@ func main() { case update := <- arena_4_client.update: arena_4_client.process_update(update) } + if arena_1_client.games_done == 12 && + arena_2_client.games_done == 12 && + arena_3_client.games_done == 12 && + arena_4_client.games_done == 12 { + signal := NewSignal(nil, "cancel") + signal.description = event_manager.root_event.ID() + SendUpdate(event_manager.root_event, signal) + } } }() - go func() { - event_manager.GQL() - }() - log.Logf("test", "Starting event_manager") err := event_manager.Run() if err != nil { diff --git a/manager.go b/manager.go index 79d5f80..4c7c82c 100644 --- a/manager.go +++ b/manager.go @@ -16,7 +16,7 @@ type EventManager struct { aborts []chan error } -graphiql_string := ` +const graphiql_string string = `