Added TestEventExt and TestEvent, added return of queued signals to WaitForResponse

gql_cataclysm
noah metz 2023-10-30 19:40:30 -06:00
parent 2081771135
commit a061d6850c
8 changed files with 108 additions and 41 deletions

@ -26,7 +26,7 @@ func testSendACL[S Signal](t *testing.T, ctx *Context, listener *Node, action Tr
fatalErr(t, err) fatalErr(t, err)
acl_signal := NewACLSignal(listener.ID, action) acl_signal := NewACLSignal(listener.ID, action)
response := testSend(t, ctx, acl_signal, listener, acl_node) response, _ := testSend(t, ctx, acl_signal, listener, acl_node)
checkSignal(t, response, check) checkSignal(t, response, check)
} }
@ -41,7 +41,7 @@ func testErrorSignal(t *testing.T, error_string string) func(*ErrorSignal){
func testSuccess(*SuccessSignal){} func testSuccess(*SuccessSignal){}
func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *Node) ResponseSignal { func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *Node) (ResponseSignal, []Signal) {
source_listener, err := GetExt[*ListenerExt](source, ListenerExtType) source_listener, err := GetExt[*ListenerExt](source, ListenerExtType)
fatalErr(t, err) fatalErr(t, err)
@ -49,10 +49,10 @@ func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *No
messages = messages.Add(ctx, destination.ID, source, nil, signal) messages = messages.Add(ctx, destination.ID, source, nil, signal)
fatalErr(t, ctx.Send(messages)) fatalErr(t, ctx.Send(messages))
response, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID()) response, signals, err := WaitForResponse(source_listener.Chan, time.Millisecond*10, signal.ID())
fatalErr(t, err) fatalErr(t, err)
return response return response, signals
} }
func TestACLBasic(t *testing.T) { func TestACLBasic(t *testing.T) {
@ -85,11 +85,11 @@ func TestACLBasic(t *testing.T) {
}, testErrorSignal(t, "acl_denied")) }, testErrorSignal(t, "acl_denied"))
add_subgroup_signal := NewAddSubGroupSignal("test_group") add_subgroup_signal := NewAddSubGroupSignal("test_group")
add_subgroup_response := testSend(t, ctx, add_subgroup_signal, listener, group) add_subgroup_response, _ := testSend(t, ctx, add_subgroup_signal, listener, group)
checkSignal(t, add_subgroup_response, testSuccess) checkSignal(t, add_subgroup_response, testSuccess)
add_member_signal := NewAddMemberSignal("test_group", listener.ID) add_member_signal := NewAddMemberSignal("test_group", listener.ID)
add_member_response := testSend(t, ctx, add_member_signal, listener, group) add_member_response, _ := testSend(t, ctx, add_member_signal, listener, group)
checkSignal(t, add_member_response, testSuccess) checkSignal(t, add_member_response, testSuccess)
testSendACL(t, ctx, listener, nil, []Policy{ testSendACL(t, ctx, listener, nil, []Policy{

@ -65,6 +65,19 @@ func (signal EventControlSignal) Permission() Tree {
} }
} }
func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
if signal.Direction() == Up && ext.Parent != nil {
messages = messages.Add(ctx, *ext.Parent, node, nil, signal)
}
return messages, changes
}
type TestEventExt struct {}
var transitions = map[string]struct{ var transitions = map[string]struct{
from_state string from_state string
to_state string to_state string
@ -83,18 +96,6 @@ var transitions = map[string]struct{
}, },
} }
func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
if signal.Direction() == Up && ext.Parent != nil {
messages = messages.Add(ctx, *ext.Parent, node, nil, signal)
}
return messages, changes
}
type TestEventExt struct {}
func (ext *TestEventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) { func (ext *TestEventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil var messages Messages = nil

@ -1,13 +1,77 @@
package graphvent package graphvent
import ( import (
"testing" "reflect"
"testing"
"time"
) )
func TestEvent(t *testing.T) { func TestEvent(t *testing.T) {
ctx := logTestContext(t, []string{"event", "listener"}) ctx := logTestContext(t, []string{"event", "listener"})
err := ctx.RegisterExtension(reflect.TypeOf(&TestEventExt{}), NewExtType("TEST_EVENT"), nil)
fatalErr(t, err)
event_listener := NewListenerExt(100) event_listener := NewListenerExt(100)
_, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewEventExt(nil, "Test Event"), event_listener) event, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewEventExt(nil, "Test Event"), &TestEventExt{}, event_listener)
fatalErr(t, err) fatalErr(t, err)
response, signals := testSend(t, ctx, NewEventControlSignal("start"), event, event)
switch resp := response.(type) {
case *SuccessSignal:
case *ErrorSignal:
t.Fatalf("Error response %+v", resp.Error)
default:
t.Fatalf("Unexpected response %+v", resp)
}
var state_signal *EventStateSignal = nil
for _, signal := range(signals) {
event_state, is_event_state := signal.(*EventStateSignal)
if is_event_state == true && event_state.Source == event.ID && event_state.State == "running" {
state_signal = event_state
break
}
}
if state_signal == nil {
state_signal, err = WaitForSignal(event_listener.Chan, 10*time.Millisecond, func(sig *EventStateSignal) bool {
return sig.Source == event.ID && sig.State == "running"
})
fatalErr(t, err)
}
response, signals = testSend(t, ctx, NewEventControlSignal("finish"), event, event)
switch resp := response.(type) {
case *SuccessSignal:
case *ErrorSignal:
t.Fatalf("Error response %+v", resp.Error)
default:
t.Fatalf("Unexpected response %+v", resp)
}
state_signal = nil
for _, signal := range(signals) {
event_state, is_event_state := signal.(*EventStateSignal)
if is_event_state == true && event_state.Source == event.ID && event_state.State == "done" {
state_signal = event_state
break
}
}
if state_signal == nil {
state_signal, err = WaitForSignal(event_listener.Chan, 10*time.Millisecond, func(sig *EventStateSignal) bool {
return sig.Source == event.ID && sig.State == "done"
})
fatalErr(t, err)
}
response, signals = testSend(t, ctx, NewEventControlSignal("start"), event, event)
switch resp := response.(type) {
case *SuccessSignal:
t.Fatalf("Success response starting finished TestEventExt")
case *ErrorSignal:
default:
t.Fatalf("Unexpected response %+v", resp)
}
} }

@ -129,7 +129,7 @@ func ResolveNodes(ctx *ResolveContext, p graphql.ResolveParams, ids []NodeID) ([
errors := "" errors := ""
for sig_id, response_chan := range(resp_channels) { for sig_id, response_chan := range(resp_channels) {
// Wait for the response, returning an error on timeout // Wait for the response, returning an error on timeout
response, err := WaitForResponse(response_chan, time.Millisecond*100, sig_id) response, _, err := WaitForResponse(response_chan, time.Millisecond*100, sig_id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -143,7 +143,7 @@ func (ext *GQLExtContext) AddSignalMutation(name string, send_id_key string, sig
return nil, err return nil, err
} }
response, err := WaitForResponse(response_chan, 100*time.Millisecond, signal.ID()) response, _, err := WaitForResponse(response_chan, 100*time.Millisecond, signal.ID())
if err != nil { if err != nil {
ctx.Ext.FreeResponseChannel(signal.ID()) ctx.Ext.FreeResponseChannel(signal.ID())
return nil, err return nil, err

@ -17,7 +17,7 @@ func TestGroupAdd(t *testing.T) {
messages = messages.Add(ctx, group.ID, group, nil, add_subgroup_signal) messages = messages.Add(ctx, group.ID, group, nil, add_subgroup_signal)
fatalErr(t, ctx.Send(messages)) fatalErr(t, ctx.Send(messages))
resp_1, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_subgroup_signal.Id) resp_1, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_subgroup_signal.Id)
fatalErr(t, err) fatalErr(t, err)
error_1, is_error := resp_1.(*ErrorSignal) error_1, is_error := resp_1.(*ErrorSignal)
@ -32,7 +32,7 @@ func TestGroupAdd(t *testing.T) {
messages = messages.Add(ctx, group.ID, group, nil, add_member_signal) messages = messages.Add(ctx, group.ID, group, nil, add_member_signal)
fatalErr(t, ctx.Send(messages)) fatalErr(t, ctx.Send(messages))
resp_2, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_member_signal.Id) resp_2, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, add_member_signal.Id)
fatalErr(t, err) fatalErr(t, err)
error_2, is_error := resp_2.(*ErrorSignal) error_2, is_error := resp_2.(*ErrorSignal)
@ -48,7 +48,7 @@ func TestGroupAdd(t *testing.T) {
messages = messages.Add(ctx, group.ID, group, nil, read_signal) messages = messages.Add(ctx, group.ID, group, nil, read_signal)
fatalErr(t, ctx.Send(messages)) fatalErr(t, ctx.Send(messages))
response, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, read_signal.Id) response, _, err := WaitForResponse(group_listener.Chan, 10*time.Millisecond, read_signal.Id)
fatalErr(t, err) fatalErr(t, err)
read_response := response.(*ReadResultSignal) read_response := response.(*ReadResultSignal)

@ -48,7 +48,7 @@ func TestLink(t *testing.T) {
err = ctx.Send(msgs) err = ctx.Send(msgs)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID()) _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, link_signal.ID())
fatalErr(t, err) fatalErr(t, err)
info, exists := l1_lockable.Requirements[l2.ID] info, exists := l1_lockable.Requirements[l2.ID]
@ -64,7 +64,7 @@ func TestLink(t *testing.T) {
err = ctx.Send(msgs) err = ctx.Send(msgs)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID()) _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, unlink_signal.ID())
fatalErr(t, err) fatalErr(t, err)
} }
@ -109,7 +109,7 @@ func Test10KLink(t *testing.T) {
lock_id, err := LockLockable(ctx, node) lock_id, err := LockLockable(ctx, node)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(listener.Chan, time.Second*60, lock_id) _, _, err = WaitForResponse(listener.Chan, time.Second*60, lock_id)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "LOCKED_1K") ctx.Log.Logf("test", "LOCKED_1K")
@ -147,22 +147,22 @@ func TestLock(t *testing.T) {
id_1, err := LockLockable(ctx, l0) id_1, err := LockLockable(ctx, l0)
ctx.Log.Logf("test", "ID_1: %s", id_1) ctx.Log.Logf("test", "ID_1: %s", id_1)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1) _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_1)
fatalErr(t, err) fatalErr(t, err)
id_2, err := LockLockable(ctx, l1) id_2, err := LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2) _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_2)
fatalErr(t, err) fatalErr(t, err)
id_3, err := UnlockLockable(ctx, l0) id_3, err := UnlockLockable(ctx, l0)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3) _, _, err = WaitForResponse(l0_listener.Chan, time.Millisecond*10, id_3)
fatalErr(t, err) fatalErr(t, err)
id_4, err := LockLockable(ctx, l1) id_4, err := LockLockable(ctx, l1)
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4) _, _, err = WaitForResponse(l1_listener.Chan, time.Millisecond*10, id_4)
fatalErr(t, err) fatalErr(t, err)
} }

@ -71,7 +71,8 @@ type Signal interface {
Permission() Tree Permission() Tree
} }
func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UUID) (ResponseSignal, error) { func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UUID) (ResponseSignal, []Signal, error) {
signals := []Signal{}
var timeout_channel <- chan time.Time var timeout_channel <- chan time.Time
if timeout > 0 { if timeout > 0 {
timeout_channel = time.After(timeout) timeout_channel = time.After(timeout)
@ -81,23 +82,24 @@ func WaitForResponse(listener chan Signal, timeout time.Duration, req_id uuid.UU
select { select {
case signal := <- listener: case signal := <- listener:
if signal == nil { if signal == nil {
return nil, fmt.Errorf("LISTENER_CLOSED") return nil, signals, fmt.Errorf("LISTENER_CLOSED")
} }
resp_signal, ok := signal.(ResponseSignal) resp_signal, ok := signal.(ResponseSignal)
if ok == false { if ok == true && resp_signal.ResponseID() == req_id {
continue return resp_signal, signals, nil
} else {
signals = append(signals, signal)
} }
if resp_signal.ResponseID() == req_id {
return resp_signal, nil
}
case <-timeout_channel: case <-timeout_channel:
return nil, fmt.Errorf("LISTENER_TIMEOUT") return nil, signals, fmt.Errorf("LISTENER_TIMEOUT")
} }
} }
return nil, fmt.Errorf("UNREACHABLE") return nil, signals, fmt.Errorf("UNREACHABLE")
} }
//TODO: Add []Signal return as well for other signals
func WaitForSignal[S Signal](listener chan Signal, timeout time.Duration, check func(S)bool) (S, error) { func WaitForSignal[S Signal](listener chan Signal, timeout time.Duration, check func(S)bool) (S, error) {
var zero S var zero S
var timeout_channel <- chan time.Time var timeout_channel <- chan time.Time