Removed extension type from Changes, and made serializable

gql_cataclysm
noah metz 2023-10-10 11:23:44 -06:00
parent 542c5c18af
commit f82bbabc66
8 changed files with 125 additions and 31 deletions

@ -852,9 +852,9 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
} }
err = ctx.RegisterKind(reflect.Slice, SliceType, err = ctx.RegisterKind(reflect.Slice, SliceType,
func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue, error){ func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value) (SerializedValue, error) {
var data []byte
type_stack := []SerializedType{ctx_type} type_stack := []SerializedType{ctx_type}
var data []byte
if value == nil { if value == nil {
data = nil data = nil
} else if value.IsZero() { } else if value.IsZero() {
@ -864,6 +864,7 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
} else { } else {
data := make([]byte, 8) data := make([]byte, 8)
binary.BigEndian.PutUint64(data, uint64(value.Len())) binary.BigEndian.PutUint64(data, uint64(value.Len()))
var element SerializedValue var element SerializedValue
var err error var err error
for i := 0; i < value.Len(); i += 1 { for i := 0; i < value.Len(); i += 1 {
@ -874,20 +875,24 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
} }
data = append(data, element.Data...) data = append(data, element.Data...)
} }
return SerializedValue{ return SerializedValue{
append(type_stack, element.TypeStack...), append(type_stack, element.TypeStack...),
data, data,
}, nil }, nil
} }
element, err := SerializeValue(ctx, reflect_type.Elem(), nil) element, err := SerializeValue(ctx, reflect_type.Elem(), nil)
if err != nil { if err != nil {
return SerializedValue{}, err return SerializedValue{}, err
} }
return SerializedValue{ return SerializedValue{
append(type_stack, element.TypeStack...), append(type_stack, element.TypeStack...),
data, data,
}, nil }, nil
}, func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){ },
func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){
if value.Data == nil { if value.Data == nil {
elem_type, _, _, err := DeserializeValue(ctx, value) elem_type, _, _, err := DeserializeValue(ctx, value)
if err != nil { if err != nil {
@ -956,14 +961,6 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
// TODO: move functions for string serialize/deserialize out of RegisterKind
/*
err = ctx.RegisterType(reflect.TypeOf(StringError("")), ErrorType, nil, nil)
if err != nil {
return nil, err
}
*/
err = ctx.RegisterType(reflect.TypeOf(Tree{}), TreeType, func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error){ err = ctx.RegisterType(reflect.TypeOf(Tree{}), TreeType, func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error){
var data []byte var data []byte
type_stack := []SerializedType{ctx_type} type_stack := []SerializedType{ctx_type}
@ -1058,6 +1055,11 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(Changes{}), ChangesSerialized, SerializeSlice, DeserializeSlice[Changes](ctx))
if err != nil {
return nil, err
}
err = ctx.RegisterType(reflect.TypeOf(ExtType(0)), ExtTypeSerialized, SerializeUintN(8), DeserializeUintN[ExtType](8)) err = ctx.RegisterType(reflect.TypeOf(ExtType(0)), ExtTypeSerialized, SerializeUintN(8), DeserializeUintN[ExtType](8))
if err != nil { if err != nil {
return nil, err return nil, err

@ -24,7 +24,7 @@ import (
"crypto/ed25519" "crypto/ed25519"
"crypto/rand" "crypto/rand"
"crypto/x509" "crypto/x509"
"crypto/tls" //"crypto/tls"
"crypto/x509/pkix" "crypto/x509/pkix"
"math/big" "math/big"
"encoding/pem" "encoding/pem"
@ -1452,30 +1452,30 @@ func (ext *GQLExt) StartGQLServer(ctx *Context, node *Node) error {
return fmt.Errorf("Failed to start listener for server on %s", http_server.Addr) return fmt.Errorf("Failed to start listener for server on %s", http_server.Addr)
} }
cert, err := tls.X509KeyPair(ext.TLSCert, ext.TLSKey) //cert, err := tls.X509KeyPair(ext.TLSCert, ext.TLSKey)
if err != nil { //if err != nil {
return err // return err
} //}
config := tls.Config{ //config := tls.Config{
Certificates: []tls.Certificate{cert}, // Certificates: []tls.Certificate{cert},
NextProtos: []string{"http/1.1"}, // NextProtos: []string{"http/1.1"},
} //}
listener := tls.NewListener(l, &config) //listener := tls.NewListener(l, &config)
ext.http_done.Add(1) ext.http_done.Add(1)
go func(qql_ext *GQLExt) { go func(qql_ext *GQLExt) {
defer ext.http_done.Done() defer ext.http_done.Done()
err := http_server.Serve(listener) err := http_server.Serve(l)
if err != http.ErrServerClosed { if err != http.ErrServerClosed {
panic(fmt.Sprintf("Failed to start gql server: %s", err)) panic(fmt.Sprintf("Failed to start gql server: %s", err))
} }
}(ext) }(ext)
ext.tcp_listener = listener ext.tcp_listener = l
ext.http_server = http_server ext.http_server = http_server
return nil return nil
} }

@ -85,8 +85,8 @@ func TestGQLServer(t *testing.T) {
} }
client := &http.Client{Transport: skipVerifyTransport} client := &http.Client{Transport: skipVerifyTransport}
port := gql_ext.tcp_listener.Addr().(*net.TCPAddr).Port port := gql_ext.tcp_listener.Addr().(*net.TCPAddr).Port
url := fmt.Sprintf("https://localhost:%d/gql", port) url := fmt.Sprintf("http://localhost:%d/gql", port)
ws_url := fmt.Sprintf("wss://127.0.0.1:%d/gqlws", port) ws_url := fmt.Sprintf("ws://127.0.0.1:%d/gqlws", port)
req_1 := GQLPayload{ req_1 := GQLPayload{
Query: "query Node($id:String) { Node(id:$id) { ID, TypeHash } }", Query: "query Node($id:String) { Node(id:$id) { ID, TypeHash } }",
@ -187,7 +187,7 @@ func TestGQLServer(t *testing.T) {
ctx.Log.Logf("test", "SUB: %s", resp[:n]) ctx.Log.Logf("test", "SUB: %s", resp[:n])
msgs := Messages{} msgs := Messages{}
msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStatusSignal(gql.ID, "test_status"), gql.ID) msgs = msgs.Add(ctx, gql.ID, gql.Key, NewStatusSignal(gql.ID, Changes{"test_status"}), gql.ID)
err = ctx.Send(msgs) err = ctx.Send(msgs)
fatalErr(t, err) fatalErr(t, err)

@ -520,7 +520,7 @@ func (node *Node) Stop(ctx *Context) error {
} }
func (node *Node) QueueChanges(ctx *Context, changes Changes) error { func (node *Node) QueueChanges(ctx *Context, changes Changes) error {
node.QueueSignal(time.Now(), NewStatusSignal(node.ID, fmt.Sprintf("%+v", changes))) node.QueueSignal(time.Now(), NewStatusSignal(node.ID, changes))
return nil return nil
} }

@ -5,6 +5,7 @@ import (
"time" "time"
"crypto/rand" "crypto/rand"
"crypto/ed25519" "crypto/ed25519"
"slices"
) )
func TestNodeDB(t *testing.T) { func TestNodeDB(t *testing.T) {
@ -18,7 +19,7 @@ func TestNodeDB(t *testing.T) {
fatalErr(t, err) fatalErr(t, err)
_, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StatusSignal) bool { _, err = WaitForSignal(node_listener.Chan, 10*time.Millisecond, func(sig *StatusSignal) bool {
return sig.Status == "started" && sig.Source == node.ID return slices.Contains(sig.Changes, "started") && sig.Source == node.ID
}) })
msgs := Messages{} msgs := Messages{}

@ -134,6 +134,7 @@ var (
NodeStructType = NewSerializedType("NODE_STRUCT") NodeStructType = NewSerializedType("NODE_STRUCT")
QueuedSignalType = NewSerializedType("QUEUED_SIGNAL") QueuedSignalType = NewSerializedType("QUEUED_SIGNAL")
NodeTypeSerialized = NewSerializedType("NODE_TYPE") NodeTypeSerialized = NewSerializedType("NODE_TYPE")
ChangesSerialized = NewSerializedType("CHANGES")
ExtTypeSerialized = NewSerializedType("EXT_TYPE") ExtTypeSerialized = NewSerializedType("EXT_TYPE")
PolicyTypeSerialized = NewSerializedType("POLICY_TYPE") PolicyTypeSerialized = NewSerializedType("POLICY_TYPE")
ExtSerialized = NewSerializedType("EXTENSION") ExtSerialized = NewSerializedType("EXTENSION")
@ -150,6 +151,87 @@ var (
SerializedTypeSerialized = NewSerializedType("SERIALIZED_TYPE") SerializedTypeSerialized = NewSerializedType("SERIALIZED_TYPE")
) )
func SerializeSlice(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value) (SerializedValue, error) {
type_stack := []SerializedType{ctx_type}
var data []byte
if value == nil {
data = nil
} else if value.IsZero() {
data = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
} else if value.Len() == 0 {
data = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
} else {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, uint64(value.Len()))
for i := 0; i < value.Len(); i += 1 {
val := value.Index(i)
element, err := SerializeValue(ctx, reflect_type.Elem(), &val)
if err != nil {
return SerializedValue{}, err
}
data = append(data, element.Data...)
}
return SerializedValue{
type_stack,
data,
}, nil
}
return SerializedValue{
type_stack,
data,
}, nil
}
func DeserializeSlice[T any](ctx *Context) func(ctx *Context, value SerializedValue) (reflect.Type, *reflect.Value, SerializedValue, error) {
var zero T
slice_type := reflect.TypeOf(zero)
zero_value, err := SerializeValue(ctx, slice_type.Elem(), nil)
if err != nil {
panic(err)
}
saved_type_stack := zero_value.TypeStack
return func(ctx *Context, value SerializedValue) (reflect.Type, *reflect.Value, SerializedValue, error) {
if value.Data == nil {
return slice_type, nil, value, nil
} else {
var err error
var slice_size_bytes []byte
slice_size_bytes, value, err = value.PopData(8)
if err != nil {
return nil, nil, value, err
}
slice_size := binary.BigEndian.Uint64(slice_size_bytes)
slice_value := reflect.New(slice_type).Elem()
if slice_size != 0xFFFFFFFFFFFFFFFF {
slice_unaddr := reflect.MakeSlice(slice_type, int(slice_size), int(slice_size))
slice_value.Set(slice_unaddr)
for i := uint64(0); i < slice_size; i += 1 {
var element_value *reflect.Value
var err error
tmp_value := SerializedValue{
saved_type_stack,
value.Data,
}
_, element_value, tmp_value, err = DeserializeValue(ctx, tmp_value)
if err != nil {
return nil, nil, value, err
}
value.Data = tmp_value.Data
slice_elem := slice_value.Index(int(i))
slice_elem.Set(*element_value)
}
}
return slice_type, &slice_value, value, nil
}
}
}
func SerializeArray(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value) (SerializedValue, error) { func SerializeArray(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value) (SerializedValue, error) {
type_stack := []SerializedType{ctx_type} type_stack := []SerializedType{ctx_type}
if value == nil { if value == nil {

@ -71,6 +71,15 @@ func TestSerializeBasic(t *testing.T) {
}, },
StringType: nil, StringType: nil,
}) })
type test_slice []string
test_slice_type := reflect.TypeOf(test_slice{})
err = ctx.RegisterType(test_slice_type, NewSerializedType("TEST_SLICE"), SerializeSlice, DeserializeSlice[test_slice](ctx))
fatalErr(t, err)
testSerialize[test_slice](t, ctx, test_slice{"test_1", "test_2", "test_3"})
testSerialize[Changes](t, ctx, Changes{"change_1", "change_2", "change_3"})
} }
type test struct { type test struct {

@ -252,18 +252,18 @@ func NewACLTimeoutSignal(req_id uuid.UUID) *ACLTimeoutSignal {
type StatusSignal struct { type StatusSignal struct {
SignalHeader SignalHeader
Source NodeID `gv:"source"` Source NodeID `gv:"source"`
Status string `gv:"status"` Changes Changes `gv:"changes"`
} }
func (signal StatusSignal) Permission() Tree { func (signal StatusSignal) Permission() Tree {
return Tree{ return Tree{
StatusType: nil, StatusType: nil,
} }
} }
func NewStatusSignal(source NodeID, status string) *StatusSignal { func NewStatusSignal(source NodeID, changes Changes) *StatusSignal {
return &StatusSignal{ return &StatusSignal{
NewSignalHeader(Up), NewSignalHeader(Up),
source, source,
status, changes,
} }
} }