diff --git a/context.go b/context.go index 5b56732..80dcd24 100644 --- a/context.go +++ b/context.go @@ -2,6 +2,7 @@ package graphvent import ( "crypto/ecdh" + "time" "encoding/binary" "errors" "fmt" @@ -10,6 +11,7 @@ import ( "runtime" "sync" "github.com/google/uuid" + "encoding" badger "github.com/dgraph-io/badger/v3" ) @@ -1021,16 +1023,56 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = ctx.RegisterType(reflect.TypeOf(PendingACL{}), PendingACLType, SerializeStruct[PendingACL](ctx), DeserializeStruct[PendingACL](ctx)) + err = ctx.RegisterType(reflect.TypeOf(Up), SignalDirectionType, SerializeUintN(1), DeserializeUintN[SignalDirection](1)) if err != nil { return nil, err } - err = ctx.RegisterType(reflect.TypeOf(PendingSignal{}), PendingSignalType, SerializeStruct[PendingSignal](ctx), DeserializeStruct[PendingSignal](ctx)) + err = ctx.RegisterType(reflect.TypeOf(ReqState(0)), ReqStateType, SerializeUintN(1), DeserializeUintN[ReqState](1)) if err != nil { return nil, err } + err = ctx.RegisterType(reflect.TypeOf(time.Time{}), TimeType, + func(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error) { + var data []byte + type_stack := []SerializedType{ctx_type} + if value == nil { + data = nil + } else { + data = make([]byte, 8) + time_ser, err := value.Interface().(encoding.BinaryMarshaler).MarshalBinary() + if err != nil { + return SerializedValue{}, err + } + data = append(data, time_ser...) + binary.BigEndian.PutUint64(data[0:8], uint64(len(time_ser))) + } + return SerializedValue{ + type_stack, + data, + }, nil + },func(ctx *Context, value SerializedValue)(reflect.Type,*reflect.Value,SerializedValue,error){ + if value.Data == nil { + return reflect.TypeOf(time.Time{}), nil, value, nil + } else { + var ser_size_bytes []byte + ser_size_bytes, value, err = value.PopData(8) + if err != nil { + return nil, nil, value, err + } + ser_size := int(binary.BigEndian.Uint64(ser_size_bytes)) + if ser_size > len(value.Data) { + return nil, nil, value, fmt.Errorf("ser_size %d is larger than remaining data %d", ser_size, len(value.Data)) + } + data := value.Data[0:ser_size] + value.Data = value.Data[ser_size:] + time_value := reflect.New(reflect.TypeOf(time.Time{})).Elem() + time_value.Addr().Interface().(encoding.BinaryUnmarshaler).UnmarshalBinary(data) + return time_value.Type(), &time_value, value, nil + } + }) + // TODO: Make registering interfaces cleaner var extension Extension = nil err = ctx.RegisterType(reflect.ValueOf(&extension).Type().Elem(), ExtSerialized, SerializeInterface, DeserializeInterface[Extension]()) @@ -1044,6 +1086,22 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } + var signal Signal = nil + err = ctx.RegisterType(reflect.ValueOf(&signal).Type().Elem(), SignalSerialized, SerializeInterface, DeserializeInterface[Signal]()) + if err != nil { + return nil, err + } + + err = ctx.RegisterType(reflect.TypeOf(PendingACL{}), PendingACLType, SerializeStruct[PendingACL](ctx), DeserializeStruct[PendingACL](ctx)) + if err != nil { + return nil, err + } + + err = ctx.RegisterType(reflect.TypeOf(PendingSignal{}), PendingSignalType, SerializeStruct[PendingSignal](ctx), DeserializeStruct[PendingSignal](ctx)) + if err != nil { + return nil, err + } + err = ctx.RegisterType(reflect.TypeOf(ListenerExt{}), SerializedType(ListenerExtType), SerializeStruct[ListenerExt](ctx), DeserializeStruct[ListenerExt](ctx)) if err != nil { return nil, err @@ -1069,43 +1127,12 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - err = ctx.RegisterType(reflect.TypeOf(Node{}), NodeStructType, SerializeStruct[Node](ctx), DeserializeStruct[Node](ctx)) + err = ctx.RegisterType(reflect.TypeOf(StatusSignal{}), SerializedType(StatusSignalType), SerializeStruct[StatusSignal](ctx), DeserializeStruct[StatusSignal](ctx)) if err != nil { return nil, err } - err = ctx.RegisterType(reflect.TypeOf(Up), SignalDirectionType, - func(ctx *Context, ctx_type SerializedType, t reflect.Type, value *reflect.Value) (SerializedValue, error) { - var data []byte = nil - if value != nil { - val := value.Interface().(SignalDirection) - data = []byte{byte(val)} - } - return SerializedValue{ - []SerializedType{ctx_type}, - data, - }, nil - }, func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){ - return reflect.TypeOf(Up), nil, SerializedValue{}, fmt.Errorf("unimplemented") - }) - if err != nil { - return nil, err - } - - err = ctx.RegisterType(reflect.TypeOf(ReqState(0)), ReqStateType, - func(ctx *Context, ctx_type SerializedType, t reflect.Type, value *reflect.Value) (SerializedValue, error) { - var data []byte = nil - if value != nil { - val := value.Interface().(ReqState) - data = []byte{byte(val)} - } - return SerializedValue{ - []SerializedType{ctx_type}, - data, - }, nil - }, func(ctx *Context, value SerializedValue)(reflect.Type, *reflect.Value, SerializedValue, error){ - return reflect.TypeOf(ReqState(0)), nil, SerializedValue{}, fmt.Errorf("unimplemented") - }) + err = ctx.RegisterType(reflect.TypeOf(Node{}), NodeStructType, SerializeStruct[Node](ctx), DeserializeStruct[Node](ctx)) if err != nil { return nil, err } diff --git a/node.go b/node.go index 44381a0..e06fa5b 100644 --- a/node.go +++ b/node.go @@ -64,8 +64,12 @@ type Extension interface { // A QueuedSignal is a Signal that has been Queued to trigger at a set time type QueuedSignal struct { - Signal - time.Time + Signal `gv:"signal"` + time.Time `gv:"time"` +} + +func (q QueuedSignal) String() string { + return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time) } type PendingACL struct { @@ -117,6 +121,8 @@ func (node *Node) PostDeserialize(ctx *Context) error { node.MsgChan = make(chan *Message, node.BufferSize) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) + ctx.Log.Logf("node", "signal_queue: %+v", node.SignalQueue) + ctx.Log.Logf("node", "next_signal: %+v - %+v", node.NextSignal, node.TimeoutChan) return nil } diff --git a/serialize.go b/serialize.go index 123b670..b60887d 100644 --- a/serialize.go +++ b/serialize.go @@ -118,10 +118,12 @@ var ( PolicyTypeSerialized = NewSerializedType("POLICY_TYPE") ExtSerialized = NewSerializedType("EXTENSION") PolicySerialized = NewSerializedType("POLICY") + SignalSerialized = NewSerializedType("SIGNAL") NodeIDType = NewSerializedType("NODE_ID") UUIDType = NewSerializedType("UUID") PendingACLType = NewSerializedType("PENDING_ACL") PendingSignalType = NewSerializedType("PENDING_SIGNAL") + TimeType = NewSerializedType("TIME") ) func SerializeArray(ctx *Context, ctx_type SerializedType, reflect_type reflect.Type, value *reflect.Value)(SerializedValue,error){ diff --git a/signal.go b/signal.go index b858d85..dc58f24 100644 --- a/signal.go +++ b/signal.go @@ -9,7 +9,7 @@ import ( schema "github.com/mekkanized/graphvent/signal" ) -type SignalDirection int +type SignalDirection uint8 const ( Up SignalDirection = iota Down @@ -17,9 +17,9 @@ const ( ) type SignalHeader struct { - Direction SignalDirection `gv:"0"` - ID uuid.UUID `gv:"1"` - ReqID uuid.UUID `gv:"2"` + Direction SignalDirection `gv:"direction"` + ID uuid.UUID `gv:"id"` + ReqID uuid.UUID `gv:"req_id"` } type Signal interface { @@ -280,8 +280,8 @@ func NewACLTimeoutSignal(req_id uuid.UUID) *ACLTimeoutSignal { type StatusSignal struct { SignalHeader - Source NodeID - Status string + Source NodeID `gv:"source"` + Status string `gv:"status"` } func (signal *StatusSignal) Header() *SignalHeader { return &signal.SignalHeader