Reworked serialization to split type/value serilization/deserialization

gql_cataclysm
noah metz 2023-10-29 18:26:14 -06:00
parent 8a973c38b5
commit 84aee24a21
15 changed files with 1199 additions and 1475 deletions

@ -56,7 +56,7 @@ func testSend(t *testing.T, ctx *Context, signal Signal, source, destination *No
}
func TestACLBasic(t *testing.T) {
ctx := logTestContext(t, []string{"test", "acl", "policy"})
ctx := logTestContext(t, []string{"serialize_types", "deserialize_types", "test", "listener_debug", "group", "acl", "policy"})
listener, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewListenerExt(100))
fatalErr(t, err)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,111 @@
package graphvent
import (
"time"
"fmt"
)
type EventExt struct {
Name string `"name"`
State string `"state"`
Parent *NodeID `"parent"`
}
func NewEventExt(parent *NodeID, name string) *EventExt {
return &EventExt{
Name: name,
State: "init",
Parent: parent,
}
}
type EventStateSignal struct {
SignalHeader
Source NodeID
State string
Time time.Time
}
func (signal EventStateSignal) Permission() Tree {
return Tree{
SerializedType(StatusType): nil,
}
}
func (signal EventStateSignal) String() string {
return fmt.Sprintf("EventStateSignal(%s, %s, %s, %+v)", signal.SignalHeader, signal.Source, signal.State, signal.Time)
}
func NewEventStateSignal(source NodeID, state string, t time.Time) *EventStateSignal {
return &EventStateSignal{
SignalHeader: NewSignalHeader(Up),
Source: source,
State: state,
Time: t,
}
}
type EventControlSignal struct {
SignalHeader
Command string
}
func NewEventControlSignal(command string) *EventControlSignal {
return &EventControlSignal{
NewSignalHeader(Direct),
command,
}
}
func (signal EventControlSignal) Permission() Tree {
return Tree{
SerializedType(EventControlSignalType): {
Hash("command", signal.Command): nil,
},
}
}
var transitions = map[string]struct{
from_state string
to_state string
}{
"start": {
"init",
"running",
},
"stop": {
"running",
"init",
},
"finish": {
"running",
"done",
},
}
func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
if signal.Direction() == Up && ext.Parent != nil {
messages = messages.Add(ctx, *ext.Parent, node, nil, signal)
}
switch sig := signal.(type) {
case *EventControlSignal:
info, exists := transitions[sig.Command]
if exists == true {
if ext.State == info.from_state {
ext.State = info.to_state
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now()))
} else {
messages = messages.Add(ctx, source, node, nil, NewErrorSignal(sig.Id, "bad_state"))
}
} else {
messages = messages.Add(ctx, source, node, nil, NewErrorSignal(sig.Id, "bad_command"))
}
}
return messages, changes
}

@ -0,0 +1,13 @@
package graphvent
import (
"testing"
)
func TestEvent(t *testing.T) {
ctx := logTestContext(t, []string{"event", "listener"})
event_listener := NewListenerExt(100)
_, err := NewNode(ctx, nil, BaseNodeType, 100, nil, NewEventExt(nil, "Test Event"), event_listener)
fatalErr(t, err)
}

@ -31,5 +31,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.8.2 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/sys v0.13.0 // indirect
)

@ -109,6 +109,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@ -142,6 +144,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

@ -924,18 +924,19 @@ func (ctx *GQLExtContext) RegisterField(gql_type graphql.Type, gql_name string,
return nil, fmt.Errorf(string(val_ser.Data))
}
field_type, field_value, _, err := DeserializeValue(ctx.Context, val_ser)
field_type, _, err := DeserializeType(ctx.Context, val_ser.TypeStack)
if err != nil {
return nil, err
}
if field_value == nil {
return nil, fmt.Errorf("%s returned a nil value of %+v type", gv_tag, field_type)
field_value, _, err := DeserializeValue(ctx.Context, field_type, val_ser.Data)
if err != nil {
return nil, err
}
ctx.Context.Log.Logf("gql", "Resolving %+v", field_value)
return resolve_fn(p, ctx, *field_value)
return resolve_fn(p, ctx, field_value)
}
ctx.Fields[gql_name] = Field{ext_type, gv_tag, &graphql.Field{
@ -1286,6 +1287,21 @@ func NewGQLExtContext() *GQLExtContext {
panic(err)
}
err = context.RegisterField(graphql.String, "EventName", EventExtType, "name", func(p graphql.ResolveParams, ctx *ResolveContext, val reflect.Value)(interface{}, error) {
name := val.String()
return name, nil
})
err = context.RegisterField(graphql.String, "State", EventExtType, "state", func(p graphql.ResolveParams, ctx *ResolveContext, val reflect.Value)(interface{}, error) {
state := val.String()
return state, nil
})
err = context.RegisterInterface("Event", "EventNode", []string{"Node"}, []string{"EventName", "State"}, map[string]SelfField{}, map[string]ListField{})
if err != nil {
panic(err)
}
sub_group_type := graphql.NewObject(graphql.ObjectConfig{
Name: "SubGroup",
Interfaces: nil,

@ -43,7 +43,7 @@ func TestGQLAuth(t *testing.T) {
}
func TestGQLServer(t *testing.T) {
ctx := logTestContext(t, []string{"test", "gqlws"})
ctx := logTestContext(t, []string{"test", "deserialize_types", "serialize_types", "gqlws"})
TestNodeType := NewNodeType("TEST")
err := ctx.RegisterNodeType(TestNodeType, []ExtType{LockableExtType})

@ -135,8 +135,15 @@ func (policy MemberOfPolicy) ContinueAllows(ctx *Context, current PendingACL, si
return Deny
}
_, sub_groups_if, _, err := DeserializeValue(ctx, sub_groups_ser)
sub_groups_type, _, err := DeserializeType(ctx, sub_groups_ser.TypeStack)
if err != nil {
ctx.Log.Logf("group", "Type deserialize error: %s", err)
return Deny
}
sub_groups_if, _, err := DeserializeValue(ctx, sub_groups_type, sub_groups_ser.Data)
if err != nil {
ctx.Log.Logf("group", "Value deserialize error: %s", err)
return Deny
}

@ -55,10 +55,16 @@ func TestGroupAdd(t *testing.T) {
sub_groups_serialized := read_response.Extensions[GroupExtType]["sub_groups"]
_, sub_groups_value, remaining, err := DeserializeValue(ctx, sub_groups_serialized)
sub_groups_type, remaining_types, err := DeserializeType(ctx, sub_groups_serialized.TypeStack)
fatalErr(t, err)
if len(remaining_types) > 0 {
t.Fatalf("Types remaining after deserializing subgroups: %d", len(remaining_types))
}
if len(remaining.Data) > 0 {
t.Fatalf("Data remaining after deserializing subgroups: %d", len(remaining.Data))
sub_groups_value, remaining, err := DeserializeValue(ctx, sub_groups_type, sub_groups_serialized.Data)
fatalErr(t, err)
if len(remaining) > 0 {
t.Fatalf("Data remaining after deserializing subgroups: %d", len(remaining_types))
}
sub_groups, ok := sub_groups_value.Interface().(map[string][]NodeID)

@ -353,7 +353,12 @@ func (policy RequirementOfPolicy) ContinueAllows(ctx *Context, current PendingAC
return Deny
}
_, reqs_if, _, err := DeserializeValue(ctx, reqs_ser)
reqs_type, _, err := DeserializeType(ctx, reqs_ser.TypeStack)
if err != nil {
return Deny
}
reqs_if, _, err := DeserializeValue(ctx, reqs_type, reqs_ser.Data)
if err != nil {
return Deny
}

@ -0,0 +1,114 @@
package graphvent
import (
"time"
"crypto/ed25519"
"crypto/rand"
"crypto"
)
type AuthInfo struct {
// The Node that issued the authorization
Identity ed25519.PublicKey
// Time the authorization was generated
Start time.Time
// Signature of Start + Principal with Identity private key
Signature []byte
}
type AuthorizationToken struct {
AuthInfo
// The private key generated by the client, encrypted with the servers public key
KeyEncrypted []byte
}
type ClientAuthorization struct {
AuthInfo
// The private key generated by the client
Key ed25519.PrivateKey
}
// Authorization structs can be passed in a message that originated from a different node than the sender
type Authorization struct {
AuthInfo
// The public key generated for this authorization
Key ed25519.PublicKey
}
type Message struct {
Dest NodeID
Source ed25519.PublicKey
Authorization *Authorization
Signal Signal
Signature []byte
}
type Messages []*Message
func (msgs Messages) Add(ctx *Context, dest NodeID, source *Node, authorization *ClientAuthorization, signal Signal) Messages {
msg, err := NewMessage(ctx, dest, source, authorization, signal)
if err != nil {
panic(err)
} else {
msgs = append(msgs, msg)
}
return msgs
}
func NewMessages(ctx *Context, dest NodeID, source *Node, authorization *ClientAuthorization, signals... Signal) Messages {
messages := Messages{}
for _, signal := range(signals) {
messages = messages.Add(ctx, dest, source, authorization, signal)
}
return messages
}
func NewMessage(ctx *Context, dest NodeID, source *Node, authorization *ClientAuthorization, signal Signal) (*Message, error) {
signal_ser, err := SerializeAny(ctx, signal)
if err != nil {
return nil, err
}
ser, err := signal_ser.MarshalBinary()
if err != nil {
return nil, err
}
dest_ser, err := dest.MarshalBinary()
if err != nil {
return nil, err
}
source_ser, err := source.ID.MarshalBinary()
if err != nil {
return nil, err
}
sig_data := append(dest_ser, source_ser...)
sig_data = append(sig_data, ser...)
var message_auth *Authorization = nil
if authorization != nil {
sig_data = append(sig_data, authorization.Signature...)
message_auth = &Authorization{
authorization.AuthInfo,
authorization.Key.Public().(ed25519.PublicKey),
}
}
sig, err := source.Key.Sign(rand.Reader, sig_data, crypto.Hash(0))
if err != nil {
return nil, err
}
return &Message{
Dest: dest,
Source: source.Key.Public().(ed25519.PublicKey),
Authorization: message_auth,
Signal: signal,
Signature: sig,
}, nil
}

@ -8,7 +8,6 @@ import (
badger "github.com/dgraph-io/badger/v3"
"fmt"
"sync/atomic"
"crypto"
"crypto/ed25519"
"crypto/sha512"
"crypto/rand"
@ -476,105 +475,6 @@ func nodeLoop(ctx *Context, node *Node) error {
return nil
}
type AuthInfo struct {
// The Node that issued the authorization
Identity ed25519.PublicKey
// Time the authorization was generated
Start time.Time
// Signature of Start + Principal with Identity private key
Signature []byte
}
type AuthorizationToken struct {
AuthInfo
// The private key generated by the client, encrypted with the servers public key
KeyEncrypted []byte
}
type ClientAuthorization struct {
AuthInfo
// The private key generated by the client
Key ed25519.PrivateKey
}
// Authorization structs can be passed in a message that originated from a different node than the sender
type Authorization struct {
AuthInfo
// The public key generated for this authorization
Key ed25519.PublicKey
}
type Message struct {
Dest NodeID
Source ed25519.PublicKey
Authorization *Authorization
Signal Signal
Signature []byte
}
type Messages []*Message
func (msgs Messages) Add(ctx *Context, dest NodeID, source *Node, authorization *ClientAuthorization, signal Signal) Messages {
msg, err := NewMessage(ctx, dest, source, authorization, signal)
if err != nil {
panic(err)
} else {
msgs = append(msgs, msg)
}
return msgs
}
func NewMessage(ctx *Context, dest NodeID, source *Node, authorization *ClientAuthorization, signal Signal) (*Message, error) {
signal_ser, err := SerializeAny(ctx, signal)
if err != nil {
return nil, err
}
ser, err := signal_ser.MarshalBinary()
if err != nil {
return nil, err
}
dest_ser, err := dest.MarshalBinary()
if err != nil {
return nil, err
}
source_ser, err := source.ID.MarshalBinary()
if err != nil {
return nil, err
}
sig_data := append(dest_ser, source_ser...)
sig_data = append(sig_data, ser...)
var message_auth *Authorization = nil
if authorization != nil {
sig_data = append(sig_data, authorization.Signature...)
message_auth = &Authorization{
authorization.AuthInfo,
authorization.Key.Public().(ed25519.PublicKey),
}
}
sig, err := source.Key.Sign(rand.Reader, sig_data, crypto.Hash(0))
if err != nil {
return nil, err
}
return &Message{
Dest: dest,
Source: source.Key.Public().(ed25519.PublicKey),
Authorization: message_auth,
Signal: signal,
Signature: sig,
}, nil
}
func (node *Node) Stop(ctx *Context) error {
if node.Active.Load() {
msg, err := NewMessage(ctx, node.ID, node, nil, NewStopSignal())
@ -805,16 +705,18 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
} else if len(remaining) != 0 {
return nil, fmt.Errorf("%d bytes left after parsing node from DB", len(remaining))
}
_, node_val, remaining_data, err := DeserializeValue(ctx, value)
node_type, remaining_types, err := DeserializeType(ctx, value.TypeStack)
if err != nil {
return nil, err
} else if len(remaining_types) != 0 {
return nil, fmt.Errorf("%d entries left in typestack after deserializing *Node", len(remaining_types))
}
if len(remaining_data.TypeStack) != 0 {
return nil, fmt.Errorf("%d entries left in typestack after deserializing *Node", len(remaining_data.TypeStack))
}
if len(remaining_data.Data) != 0 {
return nil, fmt.Errorf("%d bytes left after desrializing *Node", len(remaining_data.Data))
node_val, remaining_data, err := DeserializeValue(ctx, node_type, value.Data)
if err != nil {
return nil, err
} else if len(remaining_data) != 0 {
return nil, fmt.Errorf("%d bytes left after desrializing *Node", len(remaining_data))
}
node, ok := node_val.Interface().(*Node)

File diff suppressed because it is too large Load Diff

@ -8,14 +8,26 @@ import (
)
func TestSerializeTest(t *testing.T) {
ctx := logTestContext(t, []string{"test", "serialize"})
ctx := logTestContext(t, []string{"test", "serialize", "deserialize_types"})
testSerialize(t, ctx, map[string][]NodeID{"test_group": {RandID(), RandID(), RandID()}})
testSerialize(t, ctx, map[NodeID]ReqInfo{
RandID(): {},
RandID(): {},
RandID(): {},
})
}
func TestSerializeBasic(t *testing.T) {
ctx := logTestContext(t, []string{"test"})
testSerializeComparable[string](t, ctx, "test")
ctx := logTestContext(t, []string{"test", "serialize"})
testSerializeComparable[bool](t, ctx, true)
type bool_wrapped bool
err := ctx.RegisterType(reflect.TypeOf(bool_wrapped(true)), NewSerializedType("BOOL_WRAPPED"), nil, nil, nil, DeserializeBool[bool_wrapped])
fatalErr(t, err)
testSerializeComparable[bool_wrapped](t, ctx, true)
testSerializeSlice[[]bool](t, ctx, []bool{false, false, true, false})
testSerializeComparable[string](t, ctx, "test")
testSerializeComparable[float32](t, ctx, 0.05)
testSerializeComparable[float64](t, ctx, 0.05)
testSerializeComparable[uint](t, ctx, uint(1234))
@ -36,7 +48,19 @@ func TestSerializeBasic(t *testing.T) {
testSerializeSliceSlice[[][]string](t, ctx, [][]string{{"123", "456", "789", "101112"}, {"3253", "2341", "735", "212"}, {"123", "51"}, nil})
testSerialize(t, ctx, map[int8]map[*int8]string{})
testSerialize(t, ctx, map[int8]time.Time{
1: time.Now(),
3: time.Now().Add(time.Second),
0: time.Now().Add(time.Second*2),
4: time.Now().Add(time.Second*3),
})
testSerialize(t, ctx, Tree{
NodeTypeSerialized: nil,
SerializedTypeSerialized: Tree{
NodeTypeSerialized: Tree{},
},
})
var i interface{} = nil
testSerialize(t, ctx, i)
@ -61,7 +85,10 @@ func TestSerializeBasic(t *testing.T) {
}
test_struct_type := reflect.TypeOf(test_struct{})
err := ctx.RegisterType(test_struct_type, NewSerializedType("TEST_STRUCT"), SerializeStruct(ctx, test_struct_type), DeserializeStruct(ctx, test_struct_type))
test_struct_info, err := GetStructInfo(ctx, test_struct_type)
fatalErr(t, err)
err = ctx.RegisterType(test_struct_type, NewSerializedType("TEST_STRUCT"), nil, SerializeStruct(test_struct_info), nil, DeserializeStruct(test_struct_info))
fatalErr(t, err)
testSerialize(t, ctx, test_struct{
@ -70,7 +97,6 @@ func TestSerializeBasic(t *testing.T) {
})
testSerialize(t, ctx, Tree{
ErrorType: nil,
MapType: nil,
StringType: nil,
})
@ -89,9 +115,10 @@ func TestSerializeBasic(t *testing.T) {
type test_slice []string
test_slice_type := reflect.TypeOf(test_slice{})
err = ctx.RegisterType(test_slice_type, NewSerializedType("TEST_SLICE"), SerializeSlice, DeserializeSlice[test_slice](ctx))
err = ctx.RegisterType(test_slice_type, NewSerializedType("TEST_SLICE"), SerializeTypeStub, SerializeSlice, DeserializeTypeStub[test_slice], DeserializeSlice)
fatalErr(t, err)
testSerialize[[]string](t, ctx, []string{"test_1", "test_2", "test_3"})
testSerialize[test_slice](t, ctx, test_slice{"test_1", "test_2", "test_3"})
testSerialize[Changes](t, ctx, Changes{"change_1", "change_2", "change_3"})
@ -112,7 +139,9 @@ func TestSerializeStructTags(t *testing.T) {
test_type := NewSerializedType("TEST_STRUCT")
test_struct_type := reflect.TypeOf(test{})
ctx.Log.Logf("test", "TEST_TYPE: %+v", test_type)
ctx.RegisterType(test_struct_type, test_type, SerializeStruct(ctx, test_struct_type), DeserializeStruct(ctx, test_struct_type))
test_struct_info, err := GetStructInfo(ctx, test_struct_type)
fatalErr(t, err)
ctx.RegisterType(test_struct_type, test_type, nil, SerializeStruct(test_struct_info), nil, DeserializeStruct(test_struct_info))
test_int := 10
test_string := "test"
@ -190,7 +219,9 @@ func testSerializeComparable[T comparable](t *testing.T, ctx *Context, val T) {
func testSerialize[T any](t *testing.T, ctx *Context, val T) T {
value := reflect.ValueOf(&val).Elem()
value_serialized, err := SerializeValue(ctx, value.Type(), &value)
type_stack, err := SerializeType(ctx, value.Type())
data, err := SerializeValue(ctx, value)
value_serialized := SerializedValue{type_stack, data}
fatalErr(t, err)
ctx.Log.Logf("test", "Serialized %+v to %+v", val, value_serialized)
@ -206,19 +237,16 @@ func testSerialize[T any](t *testing.T, ctx *Context, val T) T {
t.Fatal("Data remaining after deserializing value")
}
val_type, deserialized_value, remaining_deserialize, err := DeserializeValue(ctx, val_parsed)
val_type, remaining_types, err := DeserializeType(ctx, val_parsed.TypeStack)
deserialized_value, remaining_deserialize, err := DeserializeValue(ctx, val_type, val_parsed.Data)
fatalErr(t, err)
if len(remaining_deserialize.Data) != 0 {
if len(remaining_deserialize) != 0 {
t.Fatal("Data remaining after deserializing value")
} else if len(remaining_deserialize.TypeStack) != 0 {
} else if len(remaining_types) != 0 {
t.Fatal("TypeStack remaining after deserializing value")
} else if val_type != value.Type() {
t.Fatal(fmt.Sprintf("DeserializeValue returned wrong reflect.Type %+v - %+v", val_type, reflect.TypeOf(val)))
} else if deserialized_value == nil {
t.Fatal("DeserializeValue returned no []reflect.Value")
} else if deserialized_value == nil {
t.Fatal("DeserializeValue returned nil *reflect.Value")
} else if deserialized_value.CanConvert(val_type) == false {
t.Fatal("DeserializeValue returned value that can't convert to original value")
}