From a061d6850c7e6a2b9b2e7b0a765cc21972e3f738 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Mon, 30 Oct 2023 19:40:30 -0600 Subject: [PATCH] Added TestEventExt and TestEvent, added return of queued signals to WaitForResponse --- acl_test.go | 12 ++++----- event.go | 25 +++++++++--------- event_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++-- gql_node.go | 2 +- gql_signal.go | 2 +- group_test.go | 6 ++--- lockable_test.go | 14 +++++----- signal.go | 20 +++++++------- 8 files changed, 108 insertions(+), 41 deletions(-) diff --git a/acl_test.go b/acl_test.go index 39001c4..5497306 100644 --- a/acl_test.go +++ b/acl_test.go @@ -26,7 +26,7 @@ func testSendACL[S Signal](t *testing.T, ctx *Context, listener *Node, action Tr fatalErr(t, err) acl_signal := NewACLSignal(listener.ID, action) - response := testSend(t, ctx, acl_signal, listener, acl_node) + response, _ := testSend(t, ctx, acl_signal, listener, acl_node) checkSignal(t, response, check) } @@ -41,7 +41,7 @@ func testErrorSignal(t *testing.T, error_string string) func(*ErrorSignal){ func testSuccess(*SuccessSignal){} -func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *Node) ResponseSignal { +func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *Node) (ResponseSignal, []Signal) { source_listener, err := GetExt[*ListenerExt](source, ListenerExtType) fatalErr(t, err) @@ -49,10 +49,10 @@ func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *No messages = messages.Add(ctx, destination.ID, source, nil, signal) fatalErr(t, ctx.Send(messages)) - response, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID()) + response, signals, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID()) fatalErr(t, err) - return response + return response, signals } func TestACLBasic(t *testing.T) { @@ -85,11 +85,11 @@ func TestACLBasic(t *testing.T) { }, testErrorSignal(t, "acl_denied")) add_subgroup_signal := NewAddSubGroupSignal("test_group") - add_subgroup_response := testSend(t, ctx, add_subgroup_signal, listener, group) + add_subgroup_response, _ := testSend(t, ctx, add_subgroup_signal, listener, group) checkSignal(t, add_subgroup_response, testSuccess) add_member_signal := NewAddMemberSignal("test_group", listener.ID) - add_member_response := testSend(t, ctx, add_member_signal, listener, group) + add_member_response, _ := testSend(t, ctx, add_member_signal, listener, group) checkSignal(t, add_member_response, testSuccess) testSendACL(t, ctx, listener, nil, []Policy{ diff --git a/event.go b/event.go index 152bd54..31fc1f4 100644 --- a/event.go +++ b/event.go @@ -65,6 +65,19 @@ func (signal EventControlSignal) Permission() Tree { } } +func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { + var messages Messages = nil + var changes Changes = nil + + if signal.Direction() == Up && ext.Parent != nil { + messages = messages.Add(ctx, *ext.Parent, node, nil, signal) + } + + return messages, changes +} + +type TestEventExt struct {} + var transitions = map[string]struct{ from_state string to_state string @@ -83,18 +96,6 @@ var transitions = map[string]struct{ }, } -func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { - var messages Messages = nil - var changes Changes = nil - - if signal.Direction() == Up && ext.Parent != nil { - messages = messages.Add(ctx, *ext.Parent, node, nil, signal) - } - - return messages, changes -} - -type TestEventExt struct {} func (ext *TestEventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { var messages Messages = nil diff --git a/event_test.go b/event_test.go index f9ad1ad..e0256f4 100644 --- a/event_test.go +++ b/event_test.go @@ -1,13 +1,77 @@ package graphvent import ( - "testing" + "reflect" + "testing" + "time" ) func TestEvent(t *testing.T) { ctx := logTestContext(t, []string{"event", "listener"}) + err := ctx.RegisterExtension(reflect.TypeOf(&TestEventExt{}), NewExtType("TEST_EVENT"), nil) + fatalErr(t, err) + + event_listener := NewListenerExt(100) - _, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewEventExt(nil, "Test Event"), event_listener) + event, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewEventExt(nil, "Test Event"), &TestEventExt{}, event_listener) fatalErr(t, err) + response, signals := testSend(t, ctx, NewEventControlSignal("start"), event, event) + switch resp := response.(type) { + case *SuccessSignal: + case *ErrorSignal: + t.Fatalf("Error response %+v", resp.Error) + default: + t.Fatalf("Unexpected response %+v", resp) + } + + var state_signal *EventStateSignal = nil + for _, signal := range(signals) { + event_state, is_event_state := signal.(*EventStateSignal) + if is_event_state == true && event_state.Source == event.ID && event_state.State == "running" { + state_signal = event_state + break + } + } + + if state_signal == nil { + state_signal, err = WaitForSignal(event_listener.Chan, 10*time.Millisecond, func(sig *EventStateSignal) bool { + return sig.Source == event.ID && sig.State == "running" + }) + fatalErr(t, err) + } + + response, signals = testSend(t, ctx, NewEventControlSignal("finish"), event, event) + switch resp := response.(type) { + case *SuccessSignal: + case *ErrorSignal: + t.Fatalf("Error response %+v", resp.Error) + default: + t.Fatalf("Unexpected response %+v", resp) + } + + state_signal = nil + for _, signal := range(signals) { + event_state, is_event_state := signal.(*EventStateSignal) + if is_event_state == true && event_state.Source == event.ID && event_state.State == "done" { + state_signal = event_state + break + } + } + + if state_signal == nil { + state_signal, err = WaitForSignal(event_listener.Chan, 10*time.Millisecond, func(sig *EventStateSignal) bool { + return sig.Source == event.ID && sig.State == "done" + }) + fatalErr(t, err) + } + + response, signals = testSend(t, ctx, NewEventControlSignal("start"), event, event) + switch resp := response.(type) { + case *SuccessSignal: + t.Fatalf("Success response starting finished TestEventExt") + case *ErrorSignal: + default: + t.Fatalf("Unexpected response %+v", resp) + } } diff --git a/gql_node.go b/gql_node.go index 78b3b2f..5598434 100644 --- a/gql_node.go +++ b/gql_node.go @@ -129,7 +129,7 @@ func ResolveNodes(ctx *ResolveContext, p graphql.ResolveParams, ids []NodeID) ([ errors := "" for sig_id, response_chan := range(resp_channels) { // Wait for the response, returning an error on timeout - response, err := WaitForResponse(response_chan, time.Millisecond*100, sig_id) + response, _, err := WaitForResponse(response_chan, time.Millisecond*100, sig_id) if err != nil { return nil, err } diff --git a/gql_signal.go b/gql_signal.go index 8efa43b..4a9fda2 100644 --- a/gql_signal.go +++ b/gql_signal.go @@ -143,7 +143,7 @@ func (ext *GQLExtContext) AddSignalMutation(name string, send_id_key string, sig return nil, err } - response, err := WaitForResponse(response_chan, 100*time.Millisecond, signal.ID()) + response, _, err := WaitForResponse(response_chan, 100*time.Millisecond, signal.ID()) if err != nil { ctx.Ext.FreeResponseChannel(signal.ID()) return nil, err diff --git a/group_test.go b/group_test.go index 4f87b85..115674a 100644 --- a/group_test.go +++ b/group_test.go @@ -17,7 +17,7 @@ func TestGroupAdd(t *testing.T) { messages = messages.Add(ctx, group.ID, group, nil, add_subgroup_signal) fatalErr(t, ctx.Send(messages)) - resp_1, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_subgroup_signal.Id) + resp_1, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_subgroup_signal.Id) fatalErr(t, err) error_1, is_error := resp_1.(*ErrorSignal) @@ -32,7 +32,7 @@ func TestGroupAdd(t *testing.T) { messages = messages.Add(ctx, group.ID, group, nil, add_member_signal) fatalErr(t, ctx.Send(messages)) - resp_2, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_member_signal.Id) + resp_2, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_member_signal.Id) fatalErr(t, err) error_2, is_error := resp_2.(*ErrorSignal) @@ -48,7 +48,7 @@ func TestGroupAdd(t *testing.T) { messages = messages.Add(ctx, group.ID, group, nil, read_signal) fatalErr(t, ctx.Send(messages)) - response, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, read_signal.Id) + response, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, read_signal.Id) fatalErr(t, err) read_response := response.(*ReadResultSignal) diff --git a/lockable_test.go b/lockable_test.go index e925da8..8db916b 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -48,7 +48,7 @@ func TestLink(t *testing.T) { err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID()) + _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID()) fatalErr(t, err) info, exists := l1_lockable.Requirements[l2.ID] @@ -64,7 +64,7 @@ func TestLink(t *testing.T) { err = ctx.Send(msgs) fatalErr(t, err) - _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID()) + _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID()) fatalErr(t, err) } @@ -109,7 +109,7 @@ func Test10KLink(t *testing.T) { lock_id, err := LockLockable(ctx, node) fatalErr(t, err) - _, err = WaitForResponse(listener.Chan, time.Second*60, lock_id) + _, _, err = WaitForResponse(listener.Chan, time.Second*60, lock_id) fatalErr(t, err) ctx.Log.Logf("test", "LOCKED_1K") @@ -147,22 +147,22 @@ func TestLock(t *testing.T) { id_1, err := LockLockable(ctx, l0) ctx.Log.Logf("test", "ID_1: %s", id_1) fatalErr(t, err) - _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) + _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) fatalErr(t, err) id_2, err := LockLockable(ctx, l1) fatalErr(t, err) - _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2) + _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2) fatalErr(t, err) id_3, err := UnlockLockable(ctx, l0) fatalErr(t, err) - _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) + _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) fatalErr(t, err) id_4, err := LockLockable(ctx, l1) fatalErr(t, err) - _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4) + _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4) fatalErr(t, err) } diff --git a/signal.go b/signal.go index e6d1c51..f35a3e2 100644 --- a/signal.go +++ b/signal.go @@ -71,7 +71,8 @@ type Signal interface { Permission() Tree } -func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UUID) (ResponseSignal, error) { +func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UUID) (ResponseSignal, []Signal, error) { + signals := []Signal{} var timeout_channel <- chan time.Time if timeout > 0 { timeout_channel = time.After(timeout) @@ -81,23 +82,24 @@ func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UU select { case signal := <- listener: if signal == nil { - return nil, fmt.Errorf("LISTENER_CLOSED") + return nil, signals, fmt.Errorf("LISTENER_CLOSED") } + resp_signal, ok := signal.(ResponseSignal) - if ok == false { - continue + if ok == true && resp_signal.ResponseID() == req_id { + return resp_signal, signals, nil + } else { + signals = append(signals, signal) } - if resp_signal.ResponseID() == req_id { - return resp_signal, nil - } case <-timeout_channel: - return nil, fmt.Errorf("LISTENER_TIMEOUT") + return nil, signals, fmt.Errorf("LISTENER_TIMEOUT") } } - return nil, fmt.Errorf("UNREACHABLE") + return nil, signals, fmt.Errorf("UNREACHABLE") } +//TODO: Add []Signal return as well for other signals func WaitForSignal[S Signal](listener chan Signal, timeout time.Duration, check func(S)bool) (S, error) { var zero S var timeout_channel <- chan time.Time