Added serialization of StatusSignal

gql_cataclysm
noah metz 2023-09-12 19:40:06 -06:00
parent dac0f1f273
commit de54c87e43
4 changed files with 78 additions and 43 deletions

@ -2,6 +2,7 @@ package graphvent
import ( import (
"crypto/ecdh" "crypto/ecdh"
"time"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -10,6 +11,7 @@ import (
"runtime" "runtime"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"encoding"
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
) )
@ -1021,16 +1023,56 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(PendingACL{}), PendingACLType, SerializeStruct[PendingACL](ctx), DeserializeStruct[PendingACL](ctx)) err = ctx.RegisterType(reflect.TypeOf(Up), SignalDirectionType, SerializeUintN(1), DeserializeUintN[SignalDirection](1))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(PendingSignal{}), PendingSignalType, SerializeStruct[PendingSignal](ctx), DeserializeStruct[PendingSignal](ctx)) err = ctx.RegisterType(reflect.TypeOf(ReqState(0)), ReqStateType, SerializeUintN(1), DeserializeUintN[ReqState](1))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(time.Time{}), TimeType,
func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error) {
var data []byte
type_stack := []SerializedType{ctx_type}
if value == nil {
data = nil
} else {
data = make([]byte, 8)
time_ser, err := value.Interface().(encoding.BinaryMarshaler).MarshalBinary()
if err != nil {
return SerializedValue{}, err
}
data = append(data, time_ser...)
binary.BigEndian.PutUint64(data[0:8], uint64(len(time_ser)))
}
return SerializedValue{
type_stack,
data,
}, nil
},func(ctx *Context, value SerializedValue)(reflect.Type,*reflect.Value,SerializedValue,error){
if value.Data == nil {
return reflect.TypeOf(time.Time{}), nil, value, nil
} else {
var ser_size_bytes []byte
ser_size_bytes, value, err = value.PopData(8)
if err != nil {
return nil, nil, value, err
}
ser_size := int(binary.BigEndian.Uint64(ser_size_bytes))
if ser_size > len(value.Data) {
return nil, nil, value, fmt.Errorf("ser_size %d is larger than remaining data %d", ser_size, len(value.Data))
}
data := value.Data[0:ser_size]
value.Data = value.Data[ser_size:]
time_value := reflect.New(reflect.TypeOf(time.Time{})).Elem()
time_value.Addr().Interface().(encoding.BinaryUnmarshaler).UnmarshalBinary(data)
return time_value.Type(), &time_value, value, nil
}
})
// TODO: Make registering interfaces cleaner // TODO: Make registering interfaces cleaner
var extension Extension = nil var extension Extension = nil
err = ctx.RegisterType(reflect.ValueOf(&extension).Type().Elem(), ExtSerialized, SerializeInterface, DeserializeInterface[Extension]()) err = ctx.RegisterType(reflect.ValueOf(&extension).Type().Elem(), ExtSerialized, SerializeInterface, DeserializeInterface[Extension]())
@ -1044,68 +1086,53 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(ListenerExt{}), SerializedType(ListenerExtType), SerializeStruct[ListenerExt](ctx), DeserializeStruct[ListenerExt](ctx)) var signal Signal = nil
err = ctx.RegisterType(reflect.ValueOf(&signal).Type().Elem(), SignalSerialized, SerializeInterface, DeserializeInterface[Signal]())
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(GroupExt{}), SerializedType(GroupExtType), SerializeStruct[GroupExt](ctx), DeserializeStruct[GroupExt](ctx)) err = ctx.RegisterType(reflect.TypeOf(PendingACL{}), PendingACLType, SerializeStruct[PendingACL](ctx), DeserializeStruct[PendingACL](ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(GQLExt{}), SerializedType(GQLExtType), SerializeStruct[GQLExt](ctx), DeserializeStruct[GQLExt](ctx)) err = ctx.RegisterType(reflect.TypeOf(PendingSignal{}), PendingSignalType, SerializeStruct[PendingSignal](ctx), DeserializeStruct[PendingSignal](ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(QueuedSignal{}), QueuedSignalType, SerializeStruct[QueuedSignal](ctx), DeserializeStruct[QueuedSignal](ctx)) err = ctx.RegisterType(reflect.TypeOf(ListenerExt{}), SerializedType(ListenerExtType), SerializeStruct[ListenerExt](ctx), DeserializeStruct[ListenerExt](ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(AllNodesPolicy{}), SerializedType(AllNodesPolicyType), SerializeStruct[AllNodesPolicy](ctx), DeserializeStruct[AllNodesPolicy](ctx)) err = ctx.RegisterType(reflect.TypeOf(GroupExt{}), SerializedType(GroupExtType), SerializeStruct[GroupExt](ctx), DeserializeStruct[GroupExt](ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(Node{}), NodeStructType, SerializeStruct[Node](ctx), DeserializeStruct[Node](ctx)) err = ctx.RegisterType(reflect.TypeOf(GQLExt{}), SerializedType(GQLExtType), SerializeStruct[GQLExt](ctx), DeserializeStruct[GQLExt](ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(Up), SignalDirectionType, err = ctx.RegisterType(reflect.TypeOf(QueuedSignal{}), QueuedSignalType, SerializeStruct[QueuedSignal](ctx), DeserializeStruct[QueuedSignal](ctx))
func(ctx *Context, ctx_type SerializedType, t reflect.Type, value *reflect.Value) (SerializedValue, error) { if err != nil {
var data []byte = nil return nil, err
if value != nil {
val := value.Interface().(SignalDirection)
data = []byte{byte(val)}
} }
return SerializedValue{
[]SerializedType{ctx_type}, err = ctx.RegisterType(reflect.TypeOf(AllNodesPolicy{}), SerializedType(AllNodesPolicyType), SerializeStruct[AllNodesPolicy](ctx), DeserializeStruct[AllNodesPolicy](ctx))
data,
}, nil
}, func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){
return reflect.TypeOf(Up), nil, SerializedValue{}, fmt.Errorf("unimplemented")
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = ctx.RegisterType(reflect.TypeOf(ReqState(0)), ReqStateType, err = ctx.RegisterType(reflect.TypeOf(StatusSignal{}), SerializedType(StatusSignalType), SerializeStruct[StatusSignal](ctx), DeserializeStruct[StatusSignal](ctx))
func(ctx *Context, ctx_type SerializedType, t reflect.Type, value *reflect.Value) (SerializedValue, error) { if err != nil {
var data []byte = nil return nil, err
if value != nil {
val := value.Interface().(ReqState)
data = []byte{byte(val)}
} }
return SerializedValue{
[]SerializedType{ctx_type}, err = ctx.RegisterType(reflect.TypeOf(Node{}), NodeStructType, SerializeStruct[Node](ctx), DeserializeStruct[Node](ctx))
data,
}, nil
}, func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){
return reflect.TypeOf(ReqState(0)), nil, SerializedValue{}, fmt.Errorf("unimplemented")
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -64,8 +64,12 @@ type Extension interface {
// A QueuedSignal is a Signal that has been Queued to trigger at a set time // A QueuedSignal is a Signal that has been Queued to trigger at a set time
type QueuedSignal struct { type QueuedSignal struct {
Signal Signal `gv:"signal"`
time.Time time.Time `gv:"time"`
}
func (q QueuedSignal) String() string {
return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time)
} }
type PendingACL struct { type PendingACL struct {
@ -117,6 +121,8 @@ func (node *Node) PostDeserialize(ctx *Context) error {
node.MsgChan = make(chan *Message, node.BufferSize) node.MsgChan = make(chan *Message, node.BufferSize)
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
ctx.Log.Logf("node", "signal_queue: %+v", node.SignalQueue)
ctx.Log.Logf("node", "next_signal: %+v - %+v", node.NextSignal, node.TimeoutChan)
return nil return nil
} }

@ -118,10 +118,12 @@ var (
PolicyTypeSerialized = NewSerializedType("POLICY_TYPE") PolicyTypeSerialized = NewSerializedType("POLICY_TYPE")
ExtSerialized = NewSerializedType("EXTENSION") ExtSerialized = NewSerializedType("EXTENSION")
PolicySerialized = NewSerializedType("POLICY") PolicySerialized = NewSerializedType("POLICY")
SignalSerialized = NewSerializedType("SIGNAL")
NodeIDType = NewSerializedType("NODE_ID") NodeIDType = NewSerializedType("NODE_ID")
UUIDType = NewSerializedType("UUID") UUIDType = NewSerializedType("UUID")
PendingACLType = NewSerializedType("PENDING_ACL") PendingACLType = NewSerializedType("PENDING_ACL")
PendingSignalType = NewSerializedType("PENDING_SIGNAL") PendingSignalType = NewSerializedType("PENDING_SIGNAL")
TimeType = NewSerializedType("TIME")
) )
func SerializeArray(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error){ func SerializeArray(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error){

@ -9,7 +9,7 @@ import (
schema "github.com/mekkanized/graphvent/signal" schema "github.com/mekkanized/graphvent/signal"
) )
type SignalDirection int type SignalDirection uint8
const ( const (
Up SignalDirection = iota Up SignalDirection = iota
Down Down
@ -17,9 +17,9 @@ const (
) )
type SignalHeader struct { type SignalHeader struct {
Direction SignalDirection `gv:"0"` Direction SignalDirection `gv:"direction"`
ID uuid.UUID `gv:"1"` ID uuid.UUID `gv:"id"`
ReqID uuid.UUID `gv:"2"` ReqID uuid.UUID `gv:"req_id"`
} }
type Signal interface { type Signal interface {
@ -280,8 +280,8 @@ func NewACLTimeoutSignal(req_id uuid.UUID) *ACLTimeoutSignal {
type StatusSignal struct { type StatusSignal struct {
SignalHeader SignalHeader
Source NodeID Source NodeID `gv:"source"`
Status string Status string `gv:"status"`
} }
func (signal *StatusSignal) Header() *SignalHeader { func (signal *StatusSignal) Header() *SignalHeader {
return &signal.SignalHeader return &signal.SignalHeader