Renamed signals and added ErrorSignal

gql_cataclysm
noah metz 2023-07-30 23:42:47 -06:00
parent 1af94520a8
commit fde2f3ddd4
7 changed files with 89 additions and 76 deletions

@ -106,7 +106,7 @@ func (ext *ECDHExt) Field(name string) interface{} {
func (ext *ECDHExt) HandleECDHSignal(ctx *Context, source NodeID, node *Node, signal ECDHSignal) {
ser, _ := signal.Serialize()
ctx.Log.Logf("ecdh", "ECDH_SIGNAL: %s->%s - %s", source, node.ID, ser)
switch signal.State {
switch signal.Str {
case "req":
state, exists := ext.ECDHStates[source]
if exists == false {
@ -125,7 +125,7 @@ func (ext *ECDHExt) HandleECDHSignal(ctx *Context, source NodeID, node *Node, si
case "resp":
state, exists := ext.ECDHStates[source]
if exists == false || state.ECKey == nil {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(ECDHStateSignalType), "no_req"})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(ECDHStateSignalType), "no_req"})
} else {
err := VerifyECDHSignal(time.Now(), signal, DEFAULT_ECDH_WINDOW)
if err == nil {
@ -139,11 +139,11 @@ func (ext *ECDHExt) HandleECDHSignal(ctx *Context, source NodeID, node *Node, si
}
}
default:
ctx.Log.Logf("ecdh", "unknown echd state %s", signal.State)
ctx.Log.Logf("ecdh", "unknown echd state %s", signal.Str)
}
}
func (ext *ECDHExt) HandleStateSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
func (ext *ECDHExt) HandleStateSignal(ctx *Context, source NodeID, node *Node, signal StringSignal) {
ser, _ := signal.Serialize()
ctx.Log.Logf("ecdh", "ECHD_STATE: %s->%s - %s", source, node.ID, ser)
}
@ -151,13 +151,13 @@ func (ext *ECDHExt) HandleStateSignal(ctx *Context, source NodeID, node *Node, s
func (ext *ECDHExt) HandleECDHProxySignal(ctx *Context, source NodeID, node *Node, signal ECDHProxySignal) {
state, exists := ext.ECDHStates[source]
if exists == false {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(ECDHStateSignalType), "no_req"})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(ECDHStateSignalType), "no_req"})
} else if state.SharedSecret == nil {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(ECDHStateSignalType), "no_shared"})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(ECDHStateSignalType), "no_shared"})
} else {
unwrapped_signal, err := ParseECDHProxySignal(ctx, &signal, state.SharedSecret)
if err != nil {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(ECDHStateSignalType), err.Error()})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(ECDHStateSignalType), err.Error()})
} else {
//TODO: Figure out what I was trying to do here and fix it
ctx.Send(signal.Source, signal.Dest, unwrapped_signal)
@ -173,7 +173,7 @@ func (ext *ECDHExt) Process(ctx *Context, source NodeID, node *Node, signal Sign
ecdh_signal := signal.(ECDHProxySignal)
ext.HandleECDHProxySignal(ctx, source, node, ecdh_signal)
case ECDHStateSignalType:
ecdh_signal := signal.(StateSignal)
ecdh_signal := signal.(StringSignal)
ext.HandleStateSignal(ctx, source, node, ecdh_signal)
case ECDHSignalType:
ecdh_signal := signal.(ECDHSignal)

@ -996,20 +996,20 @@ func (ext *GQLExt) Process(ctx *Context, source NodeID, node *Node, signal Signa
ctx.Log.Logf("gql", "Received read result that wasn't expected - %+v", sig)
}
} else if signal.Type() == GQLStateSignalType {
sig := signal.(StateSignal)
switch sig.State {
sig := signal.(StringSignal)
switch sig.Str {
case "start_server":
err := ext.StartGQLServer(ctx, node)
if err == nil {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(GQLStateSignalType), "server_started"})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_started"})
}
case "stop_server":
err := ext.StopGQLServer()
if err == nil {
ctx.Send(node.ID, source, StateSignal{NewDirectSignal(GQLStateSignalType), "server_stopped"})
ctx.Send(node.ID, source, StringSignal{NewDirectSignal(GQLStateSignalType), "server_stopped"})
}
default:
ctx.Log.Logf("gql", "unknown gql state %s", sig.State)
ctx.Log.Logf("gql", "unknown gql state %s", sig.Str)
}
}
}

@ -25,15 +25,15 @@ func TestGQL(t *testing.T) {
listener_ext := NewListenerExt(10)
policy := NewAllNodesPolicy(Actions{MakeAction("+")})
gql := NewNode(ctx, nil, GQLNodeType, 10, []QueuedSignal{
QueuedSignal{uuid.New(), StateSignal{NewDirectSignal(GQLStateSignalType), "start_server"}, time.Now()},
QueuedSignal{uuid.New(), StringSignal{NewDirectSignal(GQLStateSignalType), "start_server"}, time.Now()},
}, NewLockableExt(), NewACLExt(policy), gql_ext, NewGroupExt(nil), listener_ext)
n1 := NewNode(ctx, nil, TestNodeType, 10, nil, NewLockableExt(), NewACLExt(policy))
err = LinkRequirement(ctx, gql.ID, n1.ID)
fatalErr(t, err)
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StateSignal) bool {
return sig.State == "server_started"
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StringSignal) bool {
return sig.Str == "server_started"
})
fatalErr(t, err)
@ -82,9 +82,9 @@ func TestGQL(t *testing.T) {
resp_2 := SendGQL(req_2)
ctx.Log.Logf("test", "RESP_2: %s", resp_2)
ctx.Send(n1.ID, gql.ID, StateSignal{NewDirectSignal(GQLStateSignalType), "stop_server"})
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StateSignal) bool {
return sig.State == "server_stopped"
ctx.Send(n1.ID, gql.ID, StringSignal{NewDirectSignal(GQLStateSignalType), "stop_server"})
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, GQLStateSignalType, func(sig StringSignal) bool {
return sig.Str == "server_stopped"
})
}
@ -111,8 +111,8 @@ func TestGQLDB(t *testing.T) {
err = ctx.Send(gql.ID, gql.ID, StopSignal)
fatalErr(t, err)
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, StatusSignalType, func(sig IDStateSignal) bool {
return sig.State == "stopped" && sig.ID == gql.ID
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, StatusSignalType, func(sig IDStringSignal) bool {
return sig.Str == "stopped" && sig.ID == gql.ID
})
fatalErr(t, err)
@ -129,8 +129,8 @@ func TestGQLDB(t *testing.T) {
fatalErr(t, err)
err = ctx.Send(gql_loaded.ID, gql_loaded.ID, StopSignal)
fatalErr(t, err)
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, StatusSignalType, func(sig IDStateSignal) bool {
return sig.State == "stopped" && sig.ID == gql_loaded.ID
_, err = WaitForSignal(ctx, listener_ext, 100*time.Millisecond, StatusSignalType, func(sig IDStringSignal) bool {
return sig.Str == "stopped" && sig.ID == gql_loaded.ID
})
fatalErr(t, err)

@ -162,9 +162,9 @@ func LinkRequirement(ctx *Context, dependency NodeID, requirement NodeID) error
}
// Handle a LockSignal and update the extensions owner/requirement states
func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node, signal StringSignal) {
ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State
state := signal.Str
switch state {
case "unlock":
if ext.Owner == nil {
@ -317,9 +317,9 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node
}
}
func (ext *LockableExt) HandleLinkStartSignal(ctx *Context, source NodeID, node *Node, signal IDStateSignal) {
func (ext *LockableExt) HandleLinkStartSignal(ctx *Context, source NodeID, node *Node, signal IDStringSignal) {
ctx.Log.Logf("lockable", "LINK__START_SIGNAL: %s->%s %+v", source, node.ID, signal)
link_type := signal.State
link_type := signal.Str
target := signal.ID
switch link_type {
case "req":
@ -351,9 +351,9 @@ func (ext *LockableExt) HandleLinkStartSignal(ctx *Context, source NodeID, node
// Handle LinkSignal, updating the extensions requirements and dependencies as necessary
// TODO: Add unlink
func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StringSignal) {
ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State
state := signal.Str
switch state {
case "linked_as_dep":
state, exists := ext.Requirements[source]
@ -441,11 +441,11 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
case Direct:
switch signal.Type() {
case LinkSignalType:
ext.HandleLinkSignal(ctx, source, node, signal.(StateSignal))
ext.HandleLinkSignal(ctx, source, node, signal.(StringSignal))
case LockSignalType:
ext.HandleLockSignal(ctx, source, node, signal.(StateSignal))
ext.HandleLockSignal(ctx, source, node, signal.(StringSignal))
case LinkStartSignalType:
ext.HandleLinkStartSignal(ctx, source, node, signal.(IDStateSignal))
ext.HandleLinkStartSignal(ctx, source, node, signal.(IDStringSignal))
default:
}
default:

@ -40,21 +40,21 @@ func TestLink(t *testing.T) {
err := LinkRequirement(ctx, l1.ID, l2.ID)
fatalErr(t, err)
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, LinkStartSignalType, func(sig IDStateSignal) bool {
return sig.State == "linked_as_req"
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, LinkStartSignalType, func(sig IDStringSignal) bool {
return sig.Str == "linked_as_req"
})
fatalErr(t, err)
err = ctx.Send(l2.ID, l2.ID, NewStatusSignal("TEST", l2.ID))
fatalErr(t, err)
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, StatusSignalType, func(sig IDStateSignal) bool {
return sig.State == "TEST"
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, StatusSignalType, func(sig IDStringSignal) bool {
return sig.Str == "TEST"
})
fatalErr(t, err)
_, err = WaitForSignal(ctx, l2_listener, time.Millisecond*10, StatusSignalType, func(sig IDStateSignal) bool {
return sig.State == "TEST"
_, err = WaitForSignal(ctx, l2_listener, time.Millisecond*10, StatusSignalType, func(sig IDStringSignal) bool {
return sig.Str == "TEST"
})
fatalErr(t, err)
}
@ -91,8 +91,8 @@ func TestLink10K(t *testing.T) {
for range(lockables) {
_, err := WaitForSignal(ctx, l0_listener, time.Millisecond*10, LinkStartSignalType, func(sig IDStateSignal) bool {
return sig.State == "linked_as_req"
_, err := WaitForSignal(ctx, l0_listener, time.Millisecond*10, LinkStartSignalType, func(sig IDStringSignal) bool {
return sig.Str == "linked_as_req"
})
fatalErr(t, err)
}
@ -140,15 +140,15 @@ func TestLock(t *testing.T) {
err = LinkRequirement(ctx, l0.ID, l5.ID)
fatalErr(t, err)
linked_as_req := func(sig IDStateSignal) bool {
return sig.State == "linked_as_req"
linked_as_req := func(sig IDStringSignal) bool {
return sig.Str == "linked_as_req"
}
locked := func(sig StateSignal) bool {
return sig.State == "locked"
locked := func(sig StringSignal) bool {
return sig.Str == "locked"
}
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, LinkStartSignalType, linked_as_req)
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, LinkStartSignalType, linked_as_req)
fatalErr(t, err)
_, err = WaitForSignal(ctx, l1_listener, time.Millisecond*10, LinkStartSignalType, linked_as_req)
fatalErr(t, err)

@ -91,7 +91,7 @@ func TestECDH(t *testing.T) {
fatalErr(t, err)
_, err = WaitForSignal(ctx, n1_listener, 100*time.Millisecond, ECDHSignalType, func(sig ECDHSignal) bool {
return sig.State == "resp"
return sig.Str == "resp"
})
fatalErr(t, err)
time.Sleep(10*time.Millisecond)

@ -17,6 +17,7 @@ import (
type SignalDirection int
const (
StopSignalType SignalType = "STOP"
ErrorSignalType = "ERROR"
StatusSignalType = "STATUS"
LinkSignalType = "LINK"
LockSignalType = "LOCK"
@ -126,6 +127,18 @@ func NewDirectSignal(signal_type SignalType) BaseSignal {
var StopSignal = NewDownSignal(StopSignalType)
type ErrorSignal struct {
BaseSignal
Error error `json:"error"`
}
func NewErrorSignal(err error) ErrorSignal {
return ErrorSignal{
BaseSignal: NewDirectSignal(ErrorSignalType),
Error: err,
}
}
type IDSignal struct {
BaseSignal
ID NodeID `json:"id"`
@ -150,26 +163,26 @@ func NewIDSignal(signal_type SignalType, direction SignalDirection, id NodeID) I
}
}
type StateSignal struct {
type StringSignal struct {
BaseSignal
State string `json:"state"`
Str string `json:"state"`
}
func (signal StateSignal) Serialize() ([]byte, error) {
func (signal StringSignal) Serialize() ([]byte, error) {
return json.Marshal(&signal)
}
type IDStateSignal struct {
type IDStringSignal struct {
BaseSignal
ID NodeID `json:"id"`
State string `json:"state"`
Str string `json:"state"`
}
func (signal IDStateSignal) Serialize() ([]byte, error) {
func (signal IDStringSignal) Serialize() ([]byte, error) {
return json.Marshal(&signal)
}
func (signal IDStateSignal) String() string {
func (signal IDStringSignal) String() string {
ser, err := json.Marshal(signal)
if err != nil {
return "STATE_SER_ERR"
@ -177,42 +190,42 @@ func (signal IDStateSignal) String() string {
return string(ser)
}
func NewStatusSignal(status string, source NodeID) IDStateSignal {
return IDStateSignal{
func NewStatusSignal(status string, source NodeID) IDStringSignal {
return IDStringSignal{
BaseSignal: NewUpSignal(StatusSignalType),
ID: source,
State: status,
Str: status,
}
}
func NewLinkSignal(state string) StateSignal {
return StateSignal{
func NewLinkSignal(state string) StringSignal {
return StringSignal{
BaseSignal: NewDirectSignal(LinkSignalType),
State: state,
Str: state,
}
}
func NewIDStateSignal(signal_type SignalType, direction SignalDirection, state string, id NodeID) IDStateSignal {
return IDStateSignal{
func NewIDStringSignal(signal_type SignalType, direction SignalDirection, state string, id NodeID) IDStringSignal {
return IDStringSignal{
BaseSignal: NewBaseSignal(signal_type, direction),
ID: id,
State: state,
Str: state,
}
}
func NewLinkStartSignal(link_type string, target NodeID) IDStateSignal {
return NewIDStateSignal(LinkStartSignalType, Direct, link_type, target)
func NewLinkStartSignal(link_type string, target NodeID) IDStringSignal {
return NewIDStringSignal(LinkStartSignalType, Direct, link_type, target)
}
func NewLockSignal(state string) StateSignal {
return StateSignal{
func NewLockSignal(state string) StringSignal {
return StringSignal{
BaseSignal: NewDirectSignal(LockSignalType),
State: state,
Str: state,
}
}
func (signal StateSignal) Permission() Action {
return MakeAction(signal.Type(), signal.State)
func (signal StringSignal) Permission() Action {
return MakeAction(signal.Type(), signal.Str)
}
type ReadSignal struct {
@ -250,7 +263,7 @@ func NewReadResultSignal(req_id uuid.UUID, node_type NodeType, exts map[ExtType]
}
type ECDHSignal struct {
StateSignal
StringSignal
Time time.Time
ECDSA *ecdsa.PublicKey
ECDH *ecdh.PublicKey
@ -258,7 +271,7 @@ type ECDHSignal struct {
}
type ECDHSignalJSON struct {
StateSignal
StringSignal
Time time.Time `json:"time"`
ECDSA []byte `json:"ecdsa_pubkey"`
ECDH []byte `json:"ecdh_pubkey"`
@ -267,7 +280,7 @@ type ECDHSignalJSON struct {
func (signal *ECDHSignal) MarshalJSON() ([]byte, error) {
return json.Marshal(&ECDHSignalJSON{
StateSignal: signal.StateSignal,
StringSignal: signal.StringSignal,
Time: signal.Time,
ECDH: signal.ECDH.Bytes(),
ECDSA: signal.ECDH.Bytes(),
@ -310,9 +323,9 @@ func NewECDHReqSignal(ctx *Context, node *Node) (ECDHSignal, *ecdh.PrivateKey, e
}
return ECDHSignal{
StateSignal: StateSignal{
StringSignal: StringSignal{
BaseSignal: NewDirectSignal(ECDHSignalType),
State: "req",
Str: "req",
},
Time: now,
ECDSA: &node.Key.PublicKey,
@ -352,9 +365,9 @@ func NewECDHRespSignal(ctx *Context, node *Node, req ECDHSignal) (ECDHSignal, []
}
return ECDHSignal{
StateSignal: StateSignal{
StringSignal: StringSignal{
BaseSignal: NewDirectSignal(ECDHSignalType),
State: "resp",
Str: "resp",
},
Time: now,
ECDSA: &node.Key.PublicKey,