Made cancel and abort handlers instead of abort being a special signal type

graph-rework
noah metz 2023-06-06 23:04:49 -06:00
parent 3605e3ee37
commit 2d31daa916
9 changed files with 153 additions and 56 deletions

@ -186,15 +186,44 @@ func RunEvent(event Event) error {
return nil
}
func AbortEvent(event Event) error {
signal := NewSignal(event, "abort")
SendUpdate(event, signal)
func EventAbort(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
AbortChildren(event)
return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID()))
}
return "wait", nil
}
}
func EventCancel(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
CancelChildren(event)
return "", nil
}
return "wait", nil
}
}
func CancelChildren(event Event) {
event.LockChildren()
for _, child := range(event.Children()) {
AbortEvent(child)
signal := NewSignal(event, "cancel")
signal.description = child.ID()
SendUpdate(child, signal)
}
event.UnlockChildren()
}
func AbortChildren(event Event) {
event.LockChildren()
for _, child := range(event.Children()) {
signal := NewSignal(event, "abort")
signal.description = child.ID()
SendUpdate(child, signal)
}
event.UnlockChildren()
return nil
}
func LockResources(event Event) error {
@ -287,15 +316,11 @@ func EventWait(event Event) (func() (string, error)) {
} else {
log.Logf("event", "EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal)
}
if signal.Type() == "abort" {
return "", errors.New("State machine aborted by signal")
} else {
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type())
return signal_fn(signal)
}
}
return "wait", nil
case <- event.Timeout():
log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction())
@ -330,6 +355,8 @@ func NewEvent(name string, description string, required_resources []Resource) (*
event_ptr := &event
event_ptr.actions["wait"] = EventWait(event_ptr)
event_ptr.handlers["abort"] = EventAbort(event_ptr)
event_ptr.handlers["cancel"] = EventCancel(event_ptr)
event_ptr.actions["start"] = func() (string, error) {
return "", nil
@ -372,6 +399,8 @@ func NewEventQueue(name string, description string, required_resources []Resourc
}
queue.actions["wait"] = EventWait(queue)
queue.handlers["abort"] = EventAbort(queue)
queue.handlers["cancel"] = EventCancel(queue)
queue.actions["start"] = func() (string, error) {
return "queue_event", nil
@ -389,7 +418,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc
}
sort.SliceStable(copied_events, less)
wait := false
needed_resources := map[string]Resource{}
for _, event := range(copied_events) {
// make sure all the required resources are registered to update the event
@ -399,7 +427,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc
info := queue.ChildInfo(event).(*EventQueueInfo)
if info.state == "queued" {
wait = true
// Try to lock it
err := LockResources(event)
// start in new goroutine
@ -418,8 +445,6 @@ func NewEventQueue(name string, description string, required_resources []Resourc
FinishEvent(event)
}(event, info, queue)
}
} else if info.state == "running" {
wait = true
}
}
@ -435,11 +460,7 @@ func NewEventQueue(name string, description string, required_resources []Resourc
queue.UnlockChildren()
if wait == true {
return "wait", nil
} else {
return "", nil
}
}
queue.handlers["resource_connected"] = func(signal GraphSignal) (string, error) {
@ -509,3 +530,52 @@ func (event * BaseEvent) addChild(child Event, info EventInfo) {
event.children = append(event.children, child)
event.child_info[child.ID()] = info
}
type GQLEvent struct {
BaseEvent
abort chan error
}
func NewGQLEvent(listen string, child Event) * GQLEvent {
event := &GQLEvent{
BaseEvent: NewBaseEvent("GQL Handler", "", []Resource{}),
abort: make(chan error, 1),
}
event.actions["wait"] = EventWait(event)
event.handlers["abort"] = func (signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
event.abort <- nil
AbortChildren(event)
return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID()))
}
return "wait", nil
}
event.handlers["cancel"] = func (signal GraphSignal) (string, error) {
if signal.Description() == event.ID() {
event.abort <- nil
CancelChildren(event)
return "", nil
}
return "wait", nil
}
event.actions["start"] = func() (string, error) {
// start the gql handler goroutine
log.Logf("gql", "Starting GQL thread for %s", event.ID())
go func(event * GQLEvent) {
for true {
select {
case <- event.abort:
log.Logf("gql", "Stopping GQL thread for %s", event.ID())
break
}
}
}(event)
return "wait", nil
}
return event
}

@ -5,6 +5,7 @@ go 1.20
require (
github.com/google/uuid v1.3.0 // indirect
github.com/graphql-go/graphql v0.8.1 // indirect
github.com/graphql-go/handler v0.2.3 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/rs/zerolog v1.29.1 // indirect

@ -11,6 +11,8 @@ github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMN
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuMMgc=
github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ=
github.com/graphql-go/handler v0.2.3 h1:CANh8WPnl5M9uA25c2GBhPqJhE53Fg0Iue/fRNla71E=
github.com/graphql-go/handler v0.2.3/go.mod h1:leLF6RpV5uZMN1CdImAxuiayrYYhOk33bZciaUGaXeU=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=

@ -22,7 +22,7 @@ type DefaultLogger struct {
}
var log DefaultLogger = DefaultLogger{loggers: map[string]zerolog.Logger{}}
var all_components = []string{"update", "graph", "event", "resource", "manager", "test"}
var all_components = []string{"update", "graph", "event", "resource", "manager", "test", "gql"}
func (logger * DefaultLogger) Init(components []string) error {
file, err := os.OpenFile("test.log", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0666)
@ -40,9 +40,11 @@ func (logger * DefaultLogger) Init(components []string) error {
}
writer := io.MultiWriter(file, os.Stdout)
for _, c := range([]string{"event"}) {
for _, c := range(all_components) {
if component_enabled(c) == true {
logger.loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger()
} else {
panic(fmt.Sprintf("%s is not a component in DefaultLogger", c))
}
}
return nil

@ -271,11 +271,15 @@ func main() {
case update := <- arena_4_client.update:
arena_4_client.process_update(update)
}
if arena_1_client.games_done == 12 &&
arena_2_client.games_done == 12 &&
arena_3_client.games_done == 12 &&
arena_4_client.games_done == 12 {
signal := NewSignal(nil, "cancel")
signal.description = event_manager.root_event.ID()
SendUpdate(event_manager.root_event, signal)
}
}
}()
go func() {
event_manager.GQL()
}()
log.Logf("test", "Starting event_manager")

@ -16,7 +16,7 @@ type EventManager struct {
aborts []chan error
}
graphiql_string := `
const graphiql_string string = `
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
@ -76,7 +76,7 @@ graphiql_string := `
root.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
url: 'https://swapi-graphql.netlify.app/.netlify/functions/index',
url: 'http://localhost:8080/gql',
}),
defaultEditorToolsVisibility: true,
}),
@ -87,22 +87,32 @@ graphiql_string := `
`
func (manager * EventManager) GQL() error {
rootQuery := graphql.ObjectConfig{Name: "RootQuery", Field: fields}
fields := graphql.Fields{
"hello": &graphql.Field{
Type: graphql.String,
Resolve: func(p graphql.ResolveParams)(interface{}, error) {
return "world", nil
},
},
}
rootQuery := graphql.ObjectConfig{Name: "RootQuery", Fields: fields}
schemaConfig := graphql.SchemaConfig{Query: graphql.NewObject(rootQuery)}
schema, err := graphql.NewSchema(schemaConfig)
if err != nil {
return err
}
h := handler.New(&handler.Config{Schema: &schema, })
server := http.NewServeMux()
server.Handle("/graphql", h)
server.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.Header.Set("Content-Type", "text/html; charset=utf-8")
h := handler.New(&handler.Config{Schema: &schema, Pretty: true})
mux := http.NewServeMux()
mux.Handle("/gql", h)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
io.WriteString(w, graphiql_string)
})
http.ListenAndServe(":8080", mux)
return nil
}

@ -18,12 +18,12 @@ func (t * graph_tester) WaitForValue(listener chan GraphSignal, signal_type stri
case signal := <- listener:
if signal.Type() == signal_type {
if signal.Source() == nil || source == nil {
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s", signal.Type(), signal.Source())
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s\n", signal.Type(), signal.Source())
if source == nil && signal.Source() == nil{
return signal
}
} else {
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s", signal.Type(), signal.Source().Name())
fmt.Printf("SIGNAL_TYPE_FOUND: %s - %s\n", signal.Type(), signal.Source().Name())
if signal.Source().ID() == source.ID() {
return signal
}
@ -308,7 +308,7 @@ func TestStartBaseEvent(t * testing.T) {
}
func TestAbortEventQueue(t * testing.T) {
root_event := NewEventQueue("", "", []Resource{})
root_event := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
manager := NewEventManager(root_event, []Resource{})
@ -318,7 +318,7 @@ func TestAbortEventQueue(t * testing.T) {
t.Fatal(err)
}
LockResource(r1, root_event)
e1 := NewEvent("1", "", []Resource{r1})
e1 := NewEvent("event_1", "", []Resource{r1})
e1_info := NewEventQueueInfo(1)
// Add an event so that the queue doesn't auto complete
err = manager.AddEvent(root_event, e1, e1_info)
@ -330,7 +330,9 @@ func TestAbortEventQueue(t * testing.T) {
// start the queue and check that all the events are executed
go func() {
time.Sleep(100 * time.Millisecond)
AbortEvent(root_event)
abort_signal := NewSignal(nil, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal)
}()
err = manager.Run()
@ -359,7 +361,7 @@ func TestStartEventQueue(t * testing.T) {
if err != nil {
t.Fatal("Failed to add e1 to manager")
}
(*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e1")
(*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1")
e2 := NewEvent("e2", "", []Resource{res_1})
e2_r := e2.DoneResource()
@ -368,7 +370,7 @@ func TestStartEventQueue(t * testing.T) {
if err != nil {
t.Fatal("Failed to add e2 to manager")
}
(*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e2")
(*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2")
e3 := NewEvent("e3", "", []Resource{res_2})
e3_r := e3.DoneResource()
@ -377,22 +379,27 @@ func TestStartEventQueue(t * testing.T) {
if err != nil {
t.Fatal("Failed to add e3 to manager")
}
(*graph_tester)(t).CheckForValue(rel, "No update on root_event after adding e3")
e1_l := e1.UpdateChannel();
e2_l := e2.UpdateChannel();
e3_l := e3.UpdateChannel();
(*graph_tester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e3")
// Abort the event after 5 seconds just in case
go func() {
time.Sleep(5 * time.Second)
if r.Owner() != nil {
AbortEvent(root_event)
abort_signal := NewSignal(nil, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal)
}
}()
// Now that an event manager is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
(*graph_tester)(t).WaitForValue(rel, "event_done", e3, time.Second, "No event_done for e3")
signal := NewSignal(nil, "cancel")
signal.description = root_event.ID()
SendUpdate(root_event, signal)
}()
err = manager.Run()
if err != nil {
t.Fatal(err)
@ -403,17 +410,14 @@ func TestStartEventQueue(t * testing.T) {
t.Fatal("root event was not finished after starting")
}
(*graph_tester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "no e1 event_done")
if e1_r.Owner() != nil {
t.Fatal("e1 was not completed")
}
(*graph_tester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "no e2 event_done")
if e2_r.Owner() != nil {
t.Fatal(fmt.Sprintf("e2 was not completed"))
}
(*graph_tester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "no e3 event_done")
if e3_r.Owner() != nil {
t.Fatal("e3 was not completed")
}

@ -128,11 +128,9 @@ func NewVexEvent(name string, description string) * VexEvent {
}
event.actions["wait"] = EventWait(event)
event.handlers["abort"] = EventAbort(event)
event.actions["start"] = func() (string, error) {
log.Logf("vex", "STARTING_VEX_TOURNAMENT %s", event.Name())
go func() {
}()
return "wait", nil
}
@ -169,6 +167,7 @@ func NewMatch(alliance0 * Alliance, alliance1 * Alliance, arena Arena) * Match {
}
match.actions["wait"] = EventWait(match)
match.handlers["abort"] = EventAbort(match)
match.actions["start"] = func() (string, error) {
log.Logf("vex", "STARTING_MATCH %s", match.Name())

@ -144,7 +144,9 @@ func TestNewMatch(t *testing.T) {
time.Sleep(time.Second * 20)
if r.Owner() != nil {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
AbortEvent(root_event)
abort_signal := NewSignal(root_event, "abort")
abort_signal.description = root_event.ID()
SendUpdate(root_event, abort_signal)
}
}()
@ -164,6 +166,9 @@ func TestNewMatch(t *testing.T) {
SendUpdate(arena, driver_signal)
(*graph_tester)(t).WaitForValue(arena_c, "driver_running", match, 1*time.Second, "no driver_running")
(*graph_tester)(t).WaitForValue(arena_c, "driver_done", match, 6*time.Second, "no driver_done")
cancel_signal := NewSignal(nil, "cancel")
cancel_signal.description = root_event.ID()
SendUpdate(root_event, cancel_signal)
}(arena_c)
err := event_manager.Run()