graphvent/node.go

770 lines
22 KiB
Go

2023-07-09 14:30:30 -06:00
package graphvent
import (
"time"
"errors"
2023-07-25 21:43:15 -06:00
"reflect"
2023-07-09 14:30:30 -06:00
"github.com/google/uuid"
badger "github.com/dgraph-io/badger/v3"
"fmt"
"sync/atomic"
2023-08-06 12:47:47 -06:00
"crypto/ed25519"
"crypto/sha512"
"crypto/rand"
2023-07-09 14:30:30 -06:00
)
var (
// Base NodeID, used as a special value
ZeroUUID = uuid.UUID{}
ZeroID = NodeID(ZeroUUID)
)
// A NodeID uniquely identifies a Node
type NodeID uuid.UUID
func (id NodeID) MarshalBinary() ([]byte, error) {
return (uuid.UUID)(id).MarshalBinary()
}
func (id NodeID) String() string {
return (uuid.UUID)(id).String()
}
2023-08-11 16:00:36 -06:00
func IDFromBytes(bytes []byte) (NodeID, error) {
id, err := uuid.FromBytes(bytes)
return NodeID(id), err
2023-07-25 21:43:15 -06:00
}
// Parse an ID from a string
func ParseID(str string) (NodeID, error) {
id_uuid, err := uuid.Parse(str)
if err != nil {
return NodeID{}, err
}
return NodeID(id_uuid), nil
2023-07-09 14:30:30 -06:00
}
2023-07-10 22:31:43 -06:00
// Generate a random NodeID
2023-07-09 14:30:30 -06:00
func RandID() NodeID {
return NodeID(uuid.New())
2023-07-09 14:30:30 -06:00
}
2023-10-07 23:08:18 -06:00
type Changes []string
2023-10-07 23:00:07 -06:00
2023-10-07 23:08:18 -06:00
func (changes Changes) Add(detail string) Changes {
return append(changes, detail)
2023-10-07 23:00:07 -06:00
}
// Extensions are data attached to nodes that process signals
2023-07-25 21:43:15 -06:00
type Extension interface {
2023-10-07 23:00:07 -06:00
Process(*Context, *Node, NodeID, Signal) (Messages, Changes)
2023-07-20 23:19:10 -06:00
}
// A QueuedSignal is a Signal that has been Queued to trigger at a set time
type QueuedSignal struct {
2023-09-12 19:40:06 -06:00
Signal `gv:"signal"`
time.Time `gv:"time"`
}
func (q QueuedSignal) String() string {
return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time)
}
2023-08-10 23:43:10 -06:00
type PendingACL struct {
Counter int
2023-10-13 00:32:24 -06:00
Responses []ResponseSignal
2023-08-10 23:43:10 -06:00
TimeoutID uuid.UUID
Action Tree
Principal NodeID
2023-10-13 00:32:24 -06:00
2023-08-10 23:43:10 -06:00
Signal Signal
Source NodeID
}
2023-11-04 23:21:43 -06:00
type PendingACLSignal struct {
Policy uuid.UUID
2023-10-13 00:32:24 -06:00
Timeout uuid.UUID
2023-08-10 23:43:10 -06:00
ID uuid.UUID
}
// Default message channel size for nodes
// Nodes represent a group of extensions that can be collectively addressed
2023-07-25 21:43:15 -06:00
type Node struct {
Key ed25519.PrivateKey `gv:"key"`
2023-07-25 21:43:15 -06:00
ID NodeID
Type NodeType `gv:"type"`
2023-11-04 23:21:43 -06:00
// TODO: move each extension to it's own db key, and extend changes to notify which extension was changed
Extensions map[ExtType]Extension `gv:"extensions"`
Policies []Policy `gv:"policies"`
PendingACLs map[uuid.UUID]PendingACL `gv:"pending_acls"`
2023-11-04 23:21:43 -06:00
PendingACLSignals map[uuid.UUID]PendingACLSignal `gv:"pending_signal"`
2023-08-10 23:43:10 -06:00
// Channel for this node to receive messages from the Context
MsgChan chan *Message
// Size of MsgChan
BufferSize uint32 `gv:"buffer_size"`
// Channel for this node to process delayed signals
TimeoutChan <-chan time.Time
Active atomic.Bool
2023-11-04 23:21:43 -06:00
// TODO: enhance WriteNode to write SignalQueue to a different key, and use writeSignalQueue to decide whether or not to update it
writeSignalQueue bool
SignalQueue []QueuedSignal `gv:"signal_queue"`
NextSignal *QueuedSignal
}
func (node *Node) PostDeserialize(ctx *Context) error {
public := node.Key.Public().(ed25519.PublicKey)
node.ID = KeyID(public)
node.MsgChan = make(chan *Message, node.BufferSize)
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-09-12 19:40:06 -06:00
ctx.Log.Logf("node", "signal_queue: %+v", node.SignalQueue)
ctx.Log.Logf("node", "next_signal: %+v - %+v", node.NextSignal, node.TimeoutChan)
return nil
}
2023-08-10 23:43:10 -06:00
type RuleResult int
const (
Allow RuleResult = iota
Deny
Pending
)
func (node *Node) Allows(ctx *Context, principal_id NodeID, action Tree)(map[uuid.UUID]Messages, RuleResult) {
pends := map[uuid.UUID]Messages{}
for _, policy := range(node.Policies) {
msgs, resp := policy.Allows(ctx, principal_id, action, node)
2023-08-10 23:43:10 -06:00
if resp == Allow {
return nil, Allow
} else if resp == Pending {
pends[policy.ID()] = msgs
}
}
2023-08-10 23:43:10 -06:00
if len(pends) != 0 {
return pends, Pending
}
return nil, Deny
}
type WaitInfo struct {
NodeID NodeID `gv:"node"`
Timeout uuid.UUID `gv:"timeout"`
}
type WaitMap map[uuid.UUID]WaitInfo
// Removes a signal from the wait_map and dequeue the associated timeout signal
// Returns the data, and whether or not the ID was found in the wait_map
func (node *Node) ProcessResponse(wait_map WaitMap, response ResponseSignal) (WaitInfo, bool) {
wait_info, is_processed := wait_map[response.ResponseID()]
if is_processed == true {
delete(wait_map, response.ResponseID())
if response.ID() != wait_info.Timeout {
node.DequeueSignal(wait_info.Timeout)
}
return wait_info, true
}
return WaitInfo{}, false
}
// Creates a timeout signal for signal, queues it for the node at the timeout, and returns the WaitInfo
func (node *Node) QueueTimeout(dest NodeID, signal Signal, timeout time.Duration) WaitInfo {
timeout_signal := NewTimeoutSignal(signal.ID())
node.QueueSignal(time.Now().Add(timeout), timeout_signal)
return WaitInfo{
NodeID: dest,
Timeout: timeout_signal.Id,
}
}
2023-08-10 23:43:10 -06:00
func (node *Node) QueueSignal(time time.Time, signal Signal) {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
}
2023-08-10 23:43:10 -06:00
func (node *Node) DequeueSignal(id uuid.UUID) error {
idx := -1
for i, q := range(node.SignalQueue) {
if q.Signal.ID() == id {
2023-08-10 23:43:10 -06:00
idx = i
break
}
}
if idx == -1 {
return fmt.Errorf("%s is not in SignalQueue", id)
}
node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1]
node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
2023-08-10 23:43:10 -06:00
return nil
}
func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) {
var soonest_signal *QueuedSignal
var soonest_time time.Time
2023-07-30 11:29:58 -06:00
for i, signal := range(signals) {
2023-07-30 11:25:03 -06:00
if signal.Time.Compare(soonest_time) == -1 || soonest_signal == nil {
2023-07-30 11:29:58 -06:00
soonest_signal = &signals[i]
soonest_time = signal.Time
}
}
if soonest_signal != nil {
2023-07-30 11:12:47 -06:00
return soonest_signal, time.After(time.Until(soonest_signal.Time))
} else {
return nil, nil
}
}
func runNode(ctx *Context, node *Node) {
ctx.Log.Logf("node", "RUN_START: %s", node.ID)
err := nodeLoop(ctx, node)
if err != nil {
panic(err)
}
ctx.Log.Logf("node", "RUN_STOP: %s", node.ID)
}
type StringError string
func (err StringError) String() string {
return string(err)
}
func (err StringError) Error() string {
return err.String()
}
func (err StringError) MarshalBinary() ([]byte, error) {
return []byte(string(err)), nil
}
func NewErrorField(fstring string, args ...interface{}) SerializedValue {
str := StringError(fmt.Sprintf(fstring, args...))
str_ser, err := str.MarshalBinary()
if err != nil {
panic(err)
}
return SerializedValue{
TypeStack: []SerializedType{ErrorType},
Data: str_ser,
}
}
func (node *Node) ReadFields(ctx *Context, reqs map[ExtType][]string)map[ExtType]map[string]SerializedValue {
exts := map[ExtType]map[string]SerializedValue{}
for ext_type, field_reqs := range(reqs) {
fields := map[string]SerializedValue{}
for _, req := range(field_reqs) {
ext, exists := node.Extensions[ext_type]
if exists == false {
fields[req] = NewErrorField("%+v does not have %+v extension", node.ID, ext_type)
} else {
f, err := SerializeField(ctx, ext, req)
if err != nil {
fields[req] = NewErrorField(err.Error())
} else {
fields[req] = f
}
}
}
exts[ext_type] = fields
}
return exts
}
2023-08-11 16:00:36 -06:00
// Main Loop for nodes
func nodeLoop(ctx *Context, node *Node) error {
started := node.Active.CompareAndSwap(false, true)
if started == false {
return fmt.Errorf("%s is already started, will not start again", node.ID)
}
2023-08-06 12:47:47 -06:00
// Perform startup actions
node.Process(ctx, ZeroID, NewStartSignal())
2023-10-07 23:00:07 -06:00
err := WriteNode(ctx, node)
if err != nil {
panic(err)
}
run := true
for run == true {
var signal Signal
var source NodeID
select {
case msg := <- node.MsgChan:
ctx.Log.Logf("node_msg", "NODE_MSG: %s - %+v", node.ID, msg.Signal)
signal_ser, err := SerializeAny(ctx, msg.Signal)
if err != nil {
ctx.Log.Logf("signal", "SIGNAL_SERIALIZE_ERR: %s - %+v", err, msg.Signal)
}
chunks, err := signal_ser.Chunks()
if err != nil {
ctx.Log.Logf("signal", "SIGNAL_SERIALIZE_ERR: %s - %+v", err, signal_ser)
continue
}
dst_id_ser, err := msg.Dest.MarshalBinary()
if err != nil {
ctx.Log.Logf("signal", "SIGNAL_DEST_ID_SER_ERR: %e", err)
continue
}
src_id_ser, err := KeyID(msg.Source).MarshalBinary()
if err != nil {
ctx.Log.Logf("signal", "SIGNAL_SRC_ID_SER_ERR: %e", err)
continue
}
sig_data := append(dst_id_ser, src_id_ser...)
sig_data = append(sig_data, chunks.Slice()...)
if msg.Authorization != nil {
sig_data = append(sig_data, msg.Authorization.Signature...)
}
validated := ed25519.Verify(msg.Source, sig_data, msg.Signature)
if validated == false {
ctx.Log.Logf("signal_verify", "SIGNAL_VERIFY_ERR: %s - %s", node.ID, reflect.TypeOf(msg.Signal))
continue
}
var princ_id NodeID
if msg.Authorization == nil {
princ_id = KeyID(msg.Source)
} else {
err := ValidateAuthorization(*msg.Authorization, time.Hour)
if err != nil {
ctx.Log.Logf("node", "Authorization validation failed: %s", err)
continue
}
princ_id = KeyID(msg.Authorization.Identity)
}
2023-08-10 23:43:10 -06:00
if princ_id != node.ID {
pends, resp := node.Allows(ctx, princ_id, msg.Signal.Permission())
2023-08-10 23:43:10 -06:00
if resp == Deny {
ctx.Log.Logf("policy", "SIGNAL_POLICY_DENY: %s->%s - %+v(%+s)", princ_id, node.ID, reflect.TypeOf(msg.Signal), msg.Signal)
2023-08-15 18:23:06 -06:00
ctx.Log.Logf("policy", "SIGNAL_POLICY_SOURCE: %s", msg.Source)
2023-08-10 23:43:10 -06:00
msgs := Messages{}
msgs = msgs.Add(ctx, KeyID(msg.Source), node, nil, NewErrorSignal(msg.Signal.ID(), "acl denied"))
2023-08-10 23:43:10 -06:00
ctx.Send(msgs)
continue
} else if resp == Pending {
ctx.Log.Logf("policy", "SIGNAL_POLICY_PENDING: %s->%s - %s - %+v", princ_id, node.ID, msg.Signal.Permission(), pends)
timeout_signal := NewACLTimeoutSignal(msg.Signal.ID())
2023-08-10 23:43:10 -06:00
node.QueueSignal(time.Now().Add(100*time.Millisecond), timeout_signal)
msgs := Messages{}
for policy_type, sigs := range(pends) {
for _, m := range(sigs) {
msgs = append(msgs, m)
2023-10-13 00:32:24 -06:00
timeout_signal := NewTimeoutSignal(m.Signal.ID())
node.QueueSignal(time.Now().Add(time.Second), timeout_signal)
2023-11-04 23:21:43 -06:00
node.PendingACLSignals[m.Signal.ID()] = PendingACLSignal{policy_type, timeout_signal.Id, msg.Signal.ID()}
2023-08-10 23:43:10 -06:00
}
}
2023-10-13 00:32:24 -06:00
node.PendingACLs[msg.Signal.ID()] = PendingACL{
Counter: len(msgs),
TimeoutID: timeout_signal.ID(),
Action: msg.Signal.Permission(),
Principal: princ_id,
Responses: []ResponseSignal{},
Signal: msg.Signal,
Source: KeyID(msg.Source),
2023-10-13 00:32:24 -06:00
}
2023-10-03 20:14:26 -06:00
ctx.Log.Logf("policy", "Sending signals for pending ACL: %+v", msgs)
2023-08-10 23:43:10 -06:00
ctx.Send(msgs)
continue
} else if resp == Allow {
2023-09-27 18:28:56 -06:00
ctx.Log.Logf("policy", "SIGNAL_POLICY_ALLOW: %s->%s - %s", princ_id, node.ID, reflect.TypeOf(msg.Signal))
2023-08-10 23:43:10 -06:00
}
} else {
2023-09-27 18:28:56 -06:00
ctx.Log.Logf("policy", "SIGNAL_POLICY_SELF: %s - %s", node.ID, reflect.TypeOf(msg.Signal))
}
signal = msg.Signal
source = KeyID(msg.Source)
case <-node.TimeoutChan:
signal = node.NextSignal.Signal
source = node.ID
2023-07-30 11:07:41 -06:00
t := node.NextSignal.Time
2023-07-30 01:29:15 -06:00
i := -1
for j, queued := range(node.SignalQueue) {
if queued.Signal.ID() == node.NextSignal.Signal.ID() {
2023-07-30 01:29:15 -06:00
i = j
break
}
}
if i == -1 {
panic("node.NextSignal not in node.SignalQueue")
}
l := len(node.SignalQueue)
node.SignalQueue[i] = node.SignalQueue[l-1]
node.SignalQueue = node.SignalQueue[:(l-1)]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
2023-07-30 11:02:22 -06:00
if node.NextSignal == nil {
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil@%+v", node.ID, signal, t, node.TimeoutChan)
2023-07-30 11:02:22 -06:00
} else {
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, signal, t, node.NextSignal, node.NextSignal.Time)
2023-07-30 11:02:22 -06:00
}
err = WriteNode(ctx, node)
if err != nil {
ctx.Log.Logf("node", "Node Write Error: %s", err)
}
}
2023-08-10 23:43:10 -06:00
ctx.Log.Logf("node", "NODE_SIGNAL_QUEUE[%s]: %+v", node.ID, node.SignalQueue)
response, ok := signal.(ResponseSignal)
if ok == true {
2023-11-04 23:21:43 -06:00
info, waiting := node.PendingACLSignals[response.ResponseID()]
if waiting == true {
2023-11-04 23:21:43 -06:00
delete(node.PendingACLSignals, response.ResponseID())
2023-10-13 00:32:24 -06:00
ctx.Log.Logf("pending", "FOUND_PENDING_SIGNAL: %s - %s", node.ID, signal)
req_info, exists := node.PendingACLs[info.ID]
if exists == true {
req_info.Counter -= 1
req_info.Responses = append(req_info.Responses, response)
idx := -1
for i, p := range(node.Policies) {
if p.ID() == info.Policy {
idx = i
break
}
2023-10-13 00:32:24 -06:00
}
if idx == -1 {
ctx.Log.Logf("policy", "PENDING_FOR_NONEXISTENT_POLICY: %s - %s", node.ID, info.Policy)
delete(node.PendingACLs, info.ID)
} else {
allowed := node.Policies[idx].ContinueAllows(ctx, req_info, signal)
if allowed == Allow {
ctx.Log.Logf("policy", "DELAYED_POLICY_ALLOW: %s - %s", node.ID, req_info.Signal)
signal = req_info.Signal
source = req_info.Source
err := node.DequeueSignal(req_info.TimeoutID)
if err != nil {
ctx.Log.Logf("node", "dequeue error: %s", err)
}
delete(node.PendingACLs, info.ID)
2023-10-13 00:32:24 -06:00
} else if req_info.Counter == 0 {
ctx.Log.Logf("policy", "DELAYED_POLICY_DENY: %s - %s", node.ID, req_info.Signal)
// Send the denied response
msgs := Messages{}
msgs = msgs.Add(ctx, req_info.Source, node, nil, NewErrorSignal(req_info.Signal.ID(), "acl_denied"))
2023-10-13 00:32:24 -06:00
err := ctx.Send(msgs)
if err != nil {
ctx.Log.Logf("signal", "SEND_ERR: %s", err)
}
err = node.DequeueSignal(req_info.TimeoutID)
if err != nil {
ctx.Log.Logf("node", "ACL_DEQUEUE_ERROR: timeout signal not in queue when trying to clear after counter hit 0 %s, %s - %s", err, signal.ID(), req_info.TimeoutID)
}
2023-10-13 00:32:24 -06:00
delete(node.PendingACLs, info.ID)
} else {
node.PendingACLs[info.ID] = req_info
continue
}
2023-08-10 23:43:10 -06:00
}
}
}
}
2023-08-06 12:47:47 -06:00
switch sig := signal.(type) {
case *StopSignal:
node.Process(ctx, source, signal)
if source == node.ID {
node.Process(ctx, source, NewStoppedSignal(sig, node.ID))
} else {
msgs := Messages{}
msgs = msgs.Add(ctx, node.ID, node, nil, NewStoppedSignal(sig, node.ID))
ctx.Send(msgs)
}
run = false
case *ReadSignal:
result := node.ReadFields(ctx, sig.Extensions)
msgs := Messages{}
msgs = msgs.Add(ctx, source, node, nil, NewReadResultSignal(sig.ID(), node.ID, node.Type, result))
ctx.Send(msgs)
default:
2023-10-07 23:00:07 -06:00
err := node.Process(ctx, source, signal)
if err != nil {
panic(err)
}
2023-07-31 16:37:32 -06:00
}
}
stopped := node.Active.CompareAndSwap(true, false)
if stopped == false {
panic("BAD_STATE: stopping already stopped node")
}
return nil
}
func (node *Node) Stop(ctx *Context) error {
if node.Active.Load() {
msg, err := NewMessage(ctx, node.ID, node, nil, NewStopSignal())
if err != nil {
return err
}
node.MsgChan <- msg
return nil
} else {
return fmt.Errorf("Node not active")
}
}
2023-10-07 23:00:07 -06:00
func (node *Node) QueueChanges(ctx *Context, changes Changes) error {
node.QueueSignal(time.Now(), NewStatusSignal(node.ID, changes))
2023-10-07 23:00:07 -06:00
return nil
}
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
messages := Messages{}
2023-10-07 23:00:07 -06:00
changes := Changes{}
for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
2023-10-07 23:00:07 -06:00
ext_messages, ext_changes := ext.Process(ctx, node, source, signal)
if len(ext_messages) != 0 {
messages = append(messages, ext_messages...)
}
if len(ext_changes) != 0 {
changes = append(changes, ext_changes...)
}
}
if len(messages) != 0 {
2023-10-07 23:00:07 -06:00
send_err := ctx.Send(messages)
if send_err != nil {
return send_err
}
}
if len(changes) != 0 {
_, ok := signal.(*StoppedSignal)
if (ok == false) || (source != node.ID) {
write_err := WriteNodeChanges(ctx, node, changes)
if write_err != nil {
return write_err
}
status_err := node.QueueChanges(ctx, changes)
if status_err != nil {
return status_err
}
}
}
2023-10-07 23:00:07 -06:00
return nil
}
func GetCtx[C any](ctx *Context, ext_type ExtType) (C, error) {
2023-07-26 11:56:10 -06:00
var zero_ctx C
ext_info, ok := ctx.Extensions[ext_type]
if ok == false {
return zero_ctx, fmt.Errorf("%+v is not an extension in ctx", ext_type)
2023-07-26 11:56:10 -06:00
}
ext_ctx, ok := ext_info.Data.(C)
if ok == false {
return zero_ctx, fmt.Errorf("context for %+v is %+v, not %+v", ext_type, reflect.TypeOf(ext_info.Data), reflect.TypeOf(zero_ctx))
2023-07-26 11:56:10 -06:00
}
return ext_ctx, nil
}
func GetExt[T Extension](node *Node, ext_type ExtType) (T, error) {
2023-07-25 21:43:15 -06:00
var zero T
ext, exists := node.Extensions[ext_type]
2023-07-25 21:43:15 -06:00
if exists == false {
return zero, fmt.Errorf("%+v does not have %+v extension - %+v", node.ID, ext_type, node.Extensions)
2023-07-24 16:04:56 -06:00
}
2023-07-24 01:41:47 -06:00
2023-07-25 21:43:15 -06:00
ret, ok := ext.(T)
if ok == false {
return zero, fmt.Errorf("%+v in %+v is wrong type(%+v), expecting %+v", ext_type, node.ID, reflect.TypeOf(ext), reflect.TypeOf(zero))
2023-07-25 21:43:15 -06:00
}
2023-07-20 23:19:10 -06:00
2023-07-25 21:43:15 -06:00
return ret, nil
2023-07-20 23:19:10 -06:00
}
2023-08-06 12:47:47 -06:00
func KeyID(pub ed25519.PublicKey) NodeID {
id := uuid.NewHash(sha512.New(), ZeroUUID, pub, 3)
return NodeID(id)
}
// Create a new node in memory and start it's event loop
func NewNode(ctx *Context, key ed25519.PrivateKey, node_type NodeType, buffer_size uint32, policies []Policy, extensions ...Extension) (*Node, error) {
var err error
2023-08-06 12:47:47 -06:00
var public ed25519.PublicKey
if key == nil {
2023-08-06 12:47:47 -06:00
public, key, err = ed25519.GenerateKey(rand.Reader)
if err != nil {
2023-08-31 22:31:29 -06:00
return nil, err
}
2023-08-06 12:47:47 -06:00
} else {
public = key.Public().(ed25519.PublicKey)
}
2023-08-06 12:47:47 -06:00
id := KeyID(public)
_, exists := ctx.Node(id)
if exists == true {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf("Attempted to create an existing node")
}
def, exists := ctx.Nodes[node_type]
2023-07-27 11:33:11 -06:00
if exists == false {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf("Node type %+v not registered in Context", node_type)
2023-07-27 11:33:11 -06:00
}
ext_map := map[ExtType]Extension{}
for _, ext := range(extensions) {
ext_type, exists := ctx.ExtensionTypes[reflect.TypeOf(ext)]
if exists == false {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf(fmt.Sprintf("%+v is not a known Extension", reflect.TypeOf(ext)))
}
_, exists = ext_map[ext_type]
2023-07-27 11:33:11 -06:00
if exists == true {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf("Cannot add the same extension to a node twice")
2023-07-27 11:33:11 -06:00
}
ext_map[ext_type] = ext
2023-07-27 11:33:11 -06:00
}
for _, required_ext := range(def.Extensions) {
_, exists := ext_map[required_ext]
if exists == false {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf(fmt.Sprintf("%+v requires %+v", node_type, required_ext))
2023-07-27 11:33:11 -06:00
}
}
policies = append(policies, DefaultPolicy)
node := &Node{
Key: key,
2023-07-25 21:43:15 -06:00
ID: id,
2023-07-26 00:18:11 -06:00
Type: node_type,
2023-07-27 11:33:11 -06:00
Extensions: ext_map,
Policies: policies,
2023-08-10 23:43:10 -06:00
PendingACLs: map[uuid.UUID]PendingACL{},
2023-11-04 23:21:43 -06:00
PendingACLSignals: map[uuid.UUID]PendingACLSignal{},
MsgChan: make(chan *Message, buffer_size),
BufferSize: buffer_size,
SignalQueue: []QueuedSignal{},
2023-07-25 21:43:15 -06:00
}
ctx.AddNode(id, node)
2023-08-11 16:00:36 -06:00
err = node.Process(ctx, ZeroID, NewCreateSignal())
if err != nil {
2023-08-31 22:31:29 -06:00
return nil, err
}
err = WriteNode(ctx, node)
if err != nil {
return nil, err
}
2023-08-06 12:47:47 -06:00
go runNode(ctx, node)
2023-07-27 11:33:11 -06:00
2023-08-31 22:31:29 -06:00
return node, nil
2023-07-09 14:30:30 -06:00
}
2023-10-07 23:00:07 -06:00
func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error {
// TODO: optimize to not re-serialize unchanged extensions/fields(might need to cache the serialized values)
return WriteNode(ctx, node)
}
2023-07-27 15:49:21 -06:00
// Write a node to the database
func WriteNode(ctx *Context, node *Node) error {
ctx.Log.Logf("db", "DB_WRITE: %s", node.ID)
node_serialized, err := SerializeAny(ctx, node)
if err != nil {
return err
}
chunks, err := node_serialized.Chunks()
if err != nil {
return err
}
ctx.Log.Logf("db_data", "DB_DATA: %+v", chunks.Slice())
id_bytes, err := node.ID.MarshalBinary()
if err != nil {
return err
}
ctx.Log.Logf("db", "DB_WRITE_ID: %+v", id_bytes)
return ctx.DB.Update(func(txn *badger.Txn) error {
return txn.Set(id_bytes, chunks.Slice())
})
}
2023-07-09 14:30:30 -06:00
2023-07-25 21:43:15 -06:00
func LoadNode(ctx * Context, id NodeID) (*Node, error) {
ctx.Log.Logf("db", "LOADING_NODE: %s", id)
2023-07-09 14:30:30 -06:00
var bytes []byte
err := ctx.DB.View(func(txn *badger.Txn) error {
id_bytes, err := id.MarshalBinary()
if err != nil {
return err
}
ctx.Log.Logf("db", "DB_READ_ID: %+v", id_bytes)
item, err := txn.Get(id_bytes)
2023-07-09 14:30:30 -06:00
if err != nil {
return err
}
return item.Value(func(val []byte) error {
bytes = append([]byte{}, val...)
return nil
})
})
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, NodeNotFoundError
}else if err != nil {
2023-07-25 21:43:15 -06:00
return nil, err
2023-07-09 14:30:30 -06:00
}
value, remaining, err := ParseSerializedValue(bytes)
if err != nil {
return nil, err
} else if len(remaining) != 0 {
return nil, fmt.Errorf("%d bytes left after parsing node from DB", len(remaining))
}
node_type, remaining_types, err := DeserializeType(ctx, value.TypeStack)
if err != nil {
return nil, err
} else if len(remaining_types) != 0 {
return nil, fmt.Errorf("%d entries left in typestack after deserializing *Node", len(remaining_types))
}
node_val, remaining_data, err := DeserializeValue(ctx, node_type, value.Data)
if err != nil {
return nil, err
} else if len(remaining_data) != 0 {
return nil, fmt.Errorf("%d bytes left after desrializing *Node", len(remaining_data))
}
node, ok := node_val.Interface().(*Node)
if ok == false {
return nil, fmt.Errorf("Deserialized %+v when expecting *Node", node_val.Type())
}
for ext_type, ext := range(node.Extensions){
ctx.Log.Logf("serialize", "Deserialized extension: %+v - %+v", ext_type, ext)
}
ctx.AddNode(id, node)
2023-07-25 21:43:15 -06:00
ctx.Log.Logf("db", "DB_NODE_LOADED: %s", id)
go runNode(ctx, node)
return node, nil
2023-07-09 14:30:30 -06:00
}