2023-07-09 14:30:30 -06:00
|
|
|
package graphvent
|
|
|
|
|
|
|
|
import (
|
2023-07-27 15:27:14 -06:00
|
|
|
"time"
|
2023-07-27 16:48:39 -06:00
|
|
|
"errors"
|
2023-07-25 21:43:15 -06:00
|
|
|
"reflect"
|
2023-07-09 14:30:30 -06:00
|
|
|
"github.com/google/uuid"
|
|
|
|
badger "github.com/dgraph-io/badger/v3"
|
|
|
|
"fmt"
|
2023-07-27 16:06:56 -06:00
|
|
|
"sync/atomic"
|
2023-08-08 14:00:17 -06:00
|
|
|
"crypto"
|
2023-08-06 12:47:47 -06:00
|
|
|
"crypto/ed25519"
|
2023-07-28 15:07:38 -06:00
|
|
|
"crypto/sha512"
|
|
|
|
"crypto/rand"
|
2023-07-09 14:30:30 -06:00
|
|
|
)
|
|
|
|
|
2023-07-28 13:12:17 -06:00
|
|
|
const (
|
|
|
|
// Magic first four bytes of serialized DB content, stored big endian
|
|
|
|
NODE_DB_MAGIC = 0x2491df14
|
|
|
|
// Total length of the node database header, has magic to verify and type_hash to map to load function
|
2023-08-07 20:26:02 -06:00
|
|
|
NODE_DB_HEADER_LEN = 32
|
2023-07-28 13:12:17 -06:00
|
|
|
EXTENSION_DB_HEADER_LEN = 16
|
2023-08-10 23:43:10 -06:00
|
|
|
QSIGNAL_DB_HEADER_LEN = 24
|
2023-08-07 20:26:02 -06:00
|
|
|
POLICY_DB_HEADER_LEN = 16
|
2023-07-28 13:12:17 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// Base NodeID, used as a special value
|
|
|
|
ZeroUUID = uuid.UUID{}
|
2023-08-31 19:50:32 -06:00
|
|
|
ZeroID = NodeID(ZeroUUID)
|
2023-07-28 13:12:17 -06:00
|
|
|
)
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// A NodeID uniquely identifies a Node
|
2023-08-31 19:50:32 -06:00
|
|
|
type NodeID uuid.UUID
|
|
|
|
func (id NodeID) MarshalBinary() ([]byte, error) {
|
|
|
|
return (uuid.UUID)(id).MarshalBinary()
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
2023-08-31 19:50:32 -06:00
|
|
|
func (id NodeID) String() string {
|
|
|
|
return (uuid.UUID)(id).String()
|
2023-07-19 20:03:13 -06:00
|
|
|
}
|
2023-08-11 16:00:36 -06:00
|
|
|
func IDFromBytes(bytes []byte) (NodeID, error) {
|
2023-08-31 19:50:32 -06:00
|
|
|
id, err := uuid.FromBytes(bytes)
|
|
|
|
return NodeID(id), err
|
2023-07-25 21:43:15 -06:00
|
|
|
}
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// Parse an ID from a string
|
2023-07-19 20:03:13 -06:00
|
|
|
func ParseID(str string) (NodeID, error) {
|
|
|
|
id_uuid, err := uuid.Parse(str)
|
|
|
|
if err != nil {
|
|
|
|
return NodeID{}, err
|
|
|
|
}
|
2023-08-31 19:50:32 -06:00
|
|
|
return NodeID(id_uuid), nil
|
2023-07-09 14:30:30 -06:00
|
|
|
}
|
|
|
|
|
2023-07-10 22:31:43 -06:00
|
|
|
// Generate a random NodeID
|
2023-07-09 14:30:30 -06:00
|
|
|
func RandID() NodeID {
|
2023-08-31 19:50:32 -06:00
|
|
|
return NodeID(uuid.New())
|
2023-07-09 14:30:30 -06:00
|
|
|
}
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// Extensions are data attached to nodes that process signals
|
2023-07-25 21:43:15 -06:00
|
|
|
type Extension interface {
|
2023-08-31 19:50:32 -06:00
|
|
|
Process(*Context, *Node, NodeID, Signal) Messages
|
2023-07-20 23:19:10 -06:00
|
|
|
}
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// A QueuedSignal is a Signal that has been Queued to trigger at a set time
|
2023-07-27 15:27:14 -06:00
|
|
|
type QueuedSignal struct {
|
2023-07-30 01:29:15 -06:00
|
|
|
Signal
|
|
|
|
time.Time
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
|
|
|
|
2023-08-10 23:43:10 -06:00
|
|
|
type PendingACL struct {
|
|
|
|
Counter int
|
|
|
|
TimeoutID uuid.UUID
|
|
|
|
Action Tree
|
|
|
|
Principal NodeID
|
|
|
|
Messages Messages
|
|
|
|
Responses []Signal
|
|
|
|
Signal Signal
|
|
|
|
Source NodeID
|
|
|
|
}
|
|
|
|
|
|
|
|
type PendingSignal struct {
|
|
|
|
Policy PolicyType
|
|
|
|
Found bool
|
|
|
|
ID uuid.UUID
|
|
|
|
}
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// Default message channel size for nodes
|
|
|
|
// Nodes represent a group of extensions that can be collectively addressed
|
2023-07-25 21:43:15 -06:00
|
|
|
type Node struct {
|
2023-09-02 17:30:52 -06:00
|
|
|
Key ed25519.PrivateKey `gv:"0"`
|
2023-07-25 21:43:15 -06:00
|
|
|
ID NodeID
|
2023-09-02 17:30:52 -06:00
|
|
|
Type NodeType `gv:"1"`
|
|
|
|
Extensions map[ExtType]Extension `gv:"3"`
|
|
|
|
Policies map[PolicyType]Policy `gv:"4"`
|
2023-07-27 15:27:14 -06:00
|
|
|
|
2023-09-02 17:30:52 -06:00
|
|
|
PendingACLs map[uuid.UUID]PendingACL `gv:"6"`
|
|
|
|
PendingSignals map[uuid.UUID]PendingSignal `gv:"7"`
|
2023-08-10 23:43:10 -06:00
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
// Channel for this node to receive messages from the Context
|
2023-08-08 14:00:17 -06:00
|
|
|
MsgChan chan *Message
|
2023-07-28 13:45:14 -06:00
|
|
|
// Size of MsgChan
|
2023-09-02 17:30:52 -06:00
|
|
|
BufferSize uint32 `gv:"2"`
|
2023-07-27 16:06:56 -06:00
|
|
|
// Channel for this node to process delayed signals
|
2023-07-27 15:27:14 -06:00
|
|
|
TimeoutChan <-chan time.Time
|
|
|
|
|
2023-07-27 16:06:56 -06:00
|
|
|
Active atomic.Bool
|
2023-07-27 15:27:14 -06:00
|
|
|
|
2023-09-02 17:30:52 -06:00
|
|
|
SignalQueue []QueuedSignal `gv:"5"`
|
2023-07-27 15:27:14 -06:00
|
|
|
NextSignal *QueuedSignal
|
|
|
|
}
|
|
|
|
|
2023-08-10 23:43:10 -06:00
|
|
|
type RuleResult int
|
|
|
|
const (
|
|
|
|
Allow RuleResult = iota
|
|
|
|
Deny
|
|
|
|
Pending
|
|
|
|
)
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
func (node *Node) Allows(ctx *Context, principal_id NodeID, action Tree)(map[PolicyType]Messages, RuleResult) {
|
2023-08-10 23:43:10 -06:00
|
|
|
pends := map[PolicyType]Messages{}
|
|
|
|
for policy_type, policy := range(node.Policies) {
|
2023-08-31 19:50:32 -06:00
|
|
|
msgs, resp := policy.Allows(ctx, principal_id, action, node)
|
2023-08-10 23:43:10 -06:00
|
|
|
if resp == Allow {
|
|
|
|
return nil, Allow
|
|
|
|
} else if resp == Pending {
|
|
|
|
pends[policy_type] = msgs
|
2023-08-08 14:00:17 -06:00
|
|
|
}
|
2023-08-07 20:26:02 -06:00
|
|
|
}
|
2023-08-10 23:43:10 -06:00
|
|
|
if len(pends) != 0 {
|
|
|
|
return pends, Pending
|
|
|
|
}
|
|
|
|
return nil, Deny
|
|
|
|
}
|
|
|
|
|
|
|
|
func (node *Node) QueueSignal(time time.Time, signal Signal) {
|
|
|
|
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
|
|
|
|
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
|
2023-08-07 20:26:02 -06:00
|
|
|
}
|
|
|
|
|
2023-08-10 23:43:10 -06:00
|
|
|
func (node *Node) DequeueSignal(id uuid.UUID) error {
|
|
|
|
idx := -1
|
|
|
|
for i, q := range(node.SignalQueue) {
|
2023-08-31 19:50:32 -06:00
|
|
|
if q.Signal.Header().ID == id {
|
2023-08-10 23:43:10 -06:00
|
|
|
idx = i
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if idx == -1 {
|
|
|
|
return fmt.Errorf("%s is not in SignalQueue", id)
|
|
|
|
}
|
|
|
|
|
|
|
|
node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1]
|
|
|
|
node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1]
|
2023-07-27 15:27:14 -06:00
|
|
|
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
|
2023-08-10 23:43:10 -06:00
|
|
|
|
|
|
|
return nil
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
func (node *Node) ClearSignalQueue() {
|
|
|
|
node.SignalQueue = []QueuedSignal{}
|
|
|
|
node.NextSignal = nil
|
|
|
|
node.TimeoutChan = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) {
|
|
|
|
var soonest_signal *QueuedSignal
|
|
|
|
var soonest_time time.Time
|
2023-07-30 11:29:58 -06:00
|
|
|
for i, signal := range(signals) {
|
2023-07-30 11:25:03 -06:00
|
|
|
if signal.Time.Compare(soonest_time) == -1 || soonest_signal == nil {
|
2023-07-30 11:29:58 -06:00
|
|
|
soonest_signal = &signals[i]
|
2023-07-27 15:27:14 -06:00
|
|
|
soonest_time = signal.Time
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if soonest_signal != nil {
|
2023-07-30 11:12:47 -06:00
|
|
|
return soonest_signal, time.After(time.Until(soonest_signal.Time))
|
2023-07-27 15:27:14 -06:00
|
|
|
} else {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-27 23:15:58 -06:00
|
|
|
func runNode(ctx *Context, node *Node) {
|
2023-07-27 15:27:14 -06:00
|
|
|
ctx.Log.Logf("node", "RUN_START: %s", node.ID)
|
2023-07-27 23:15:58 -06:00
|
|
|
err := nodeLoop(ctx, node)
|
2023-07-27 15:27:14 -06:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
ctx.Log.Logf("node", "RUN_STOP: %s", node.ID)
|
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
type StringError string
|
|
|
|
func (err StringError) String() string {
|
|
|
|
return string(err)
|
|
|
|
}
|
|
|
|
func (err StringError) Error() string {
|
|
|
|
return err.String()
|
|
|
|
}
|
|
|
|
func (err StringError) MarshalBinary() ([]byte, error) {
|
|
|
|
return []byte(string(err)), nil
|
|
|
|
}
|
|
|
|
func NewErrorField(fstring string, args ...interface{}) SerializedValue {
|
|
|
|
str := StringError(fmt.Sprintf(fstring, args...))
|
|
|
|
str_ser, err := str.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
return SerializedValue{
|
|
|
|
TypeStack: []uint64{uint64(ErrorType)},
|
|
|
|
Data: str_ser,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (node *Node) ReadFields(ctx *Context, reqs map[ExtType][]string)map[ExtType]map[string]SerializedValue {
|
|
|
|
exts := map[ExtType]map[string]SerializedValue{}
|
2023-07-28 11:21:18 -06:00
|
|
|
for ext_type, field_reqs := range(reqs) {
|
2023-08-31 19:50:32 -06:00
|
|
|
fields := map[string]SerializedValue{}
|
2023-07-28 11:21:18 -06:00
|
|
|
for _, req := range(field_reqs) {
|
2023-08-08 14:00:17 -06:00
|
|
|
ext, exists := node.Extensions[ext_type]
|
|
|
|
if exists == false {
|
2023-08-31 19:50:32 -06:00
|
|
|
fields[req] = NewErrorField("%+v does not have %+v extension", node.ID, ext_type)
|
2023-07-28 11:21:18 -06:00
|
|
|
} else {
|
2023-08-31 19:50:32 -06:00
|
|
|
f, err := SerializeField(ctx, ext, req)
|
|
|
|
if err != nil {
|
|
|
|
fields[req] = NewErrorField(err.Error())
|
|
|
|
} else {
|
|
|
|
fields[req] = f
|
|
|
|
}
|
2023-07-28 11:21:18 -06:00
|
|
|
}
|
|
|
|
}
|
2023-07-28 11:59:01 -06:00
|
|
|
exts[ext_type] = fields
|
2023-07-28 11:21:18 -06:00
|
|
|
}
|
|
|
|
return exts
|
|
|
|
}
|
|
|
|
|
2023-08-11 16:00:36 -06:00
|
|
|
// Main Loop for nodes
|
2023-07-27 23:15:58 -06:00
|
|
|
func nodeLoop(ctx *Context, node *Node) error {
|
2023-07-27 16:06:56 -06:00
|
|
|
started := node.Active.CompareAndSwap(false, true)
|
|
|
|
if started == false {
|
|
|
|
return fmt.Errorf("%s is already started, will not start again", node.ID)
|
|
|
|
}
|
2023-07-31 16:25:18 -06:00
|
|
|
|
2023-08-06 12:47:47 -06:00
|
|
|
// Perform startup actions
|
2023-08-31 19:50:32 -06:00
|
|
|
node.Process(ctx, ZeroID, NewStartSignal())
|
|
|
|
run := true
|
|
|
|
for run == true {
|
2023-08-08 14:00:17 -06:00
|
|
|
var signal Signal
|
|
|
|
var source NodeID
|
2023-07-27 15:27:14 -06:00
|
|
|
select {
|
2023-08-08 14:00:17 -06:00
|
|
|
case msg := <- node.MsgChan:
|
|
|
|
ctx.Log.Logf("node_msg", "NODE_MSG: %s - %+v", node.ID, msg.Signal)
|
2023-09-02 17:30:52 -06:00
|
|
|
signal_ser, err := SerializeValue(ctx, reflect.ValueOf(msg.Signal))
|
2023-08-08 14:00:17 -06:00
|
|
|
if err != nil {
|
2023-08-31 19:50:32 -06:00
|
|
|
ctx.Log.Logf("signal", "SIGNAL_SERIALIZE_ERR: %s - %+v", err, msg.Signal)
|
|
|
|
}
|
|
|
|
ser, err := signal_ser.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("signal", "SIGNAL_SERIALIZE_ERR: %s - %+v", err, signal_ser)
|
2023-08-08 14:00:17 -06:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
dst_id_ser, err := msg.Dest.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("signal", "SIGNAL_DEST_ID_SER_ERR: %e", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
src_id_ser, err := msg.Source.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("signal", "SIGNAL_SRC_ID_SER_ERR: %e", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
sig_data := append(dst_id_ser, src_id_ser...)
|
2023-08-08 14:00:17 -06:00
|
|
|
sig_data = append(sig_data, ser...)
|
|
|
|
validated := ed25519.Verify(msg.Principal, sig_data, msg.Signature)
|
|
|
|
if validated == false {
|
2023-09-02 17:30:52 -06:00
|
|
|
println(fmt.Sprintf("SIGNAL: %s", msg.Signal))
|
|
|
|
println(fmt.Sprintf("VERIFY_DIGEST: %+v", sig_data))
|
2023-08-08 14:00:17 -06:00
|
|
|
ctx.Log.Logf("signal", "SIGNAL_VERIFY_ERR: %s - %+v", node.ID, msg)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-08-10 23:43:10 -06:00
|
|
|
princ_id := KeyID(msg.Principal)
|
|
|
|
if princ_id != node.ID {
|
2023-08-31 19:50:32 -06:00
|
|
|
pends, resp := node.Allows(ctx, princ_id, msg.Signal.Permission())
|
2023-08-10 23:43:10 -06:00
|
|
|
if resp == Deny {
|
|
|
|
ctx.Log.Logf("policy", "SIGNAL_POLICY_DENY: %s->%s - %s", princ_id, node.ID, msg.Signal.Permission())
|
2023-08-15 18:23:06 -06:00
|
|
|
ctx.Log.Logf("policy", "SIGNAL_POLICY_SOURCE: %s", msg.Source)
|
2023-08-10 23:43:10 -06:00
|
|
|
msgs := Messages{}
|
2023-08-31 19:50:32 -06:00
|
|
|
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(msg.Signal.Header().ID, "acl denied"), msg.Source)
|
2023-08-10 23:43:10 -06:00
|
|
|
ctx.Send(msgs)
|
|
|
|
continue
|
|
|
|
} else if resp == Pending {
|
|
|
|
ctx.Log.Logf("policy", "SIGNAL_POLICY_PENDING: %s->%s - %s - %+v", princ_id, node.ID, msg.Signal.Permission(), pends)
|
2023-08-31 19:50:32 -06:00
|
|
|
timeout_signal := NewACLTimeoutSignal(msg.Signal.Header().ID)
|
2023-08-10 23:43:10 -06:00
|
|
|
node.QueueSignal(time.Now().Add(100*time.Millisecond), timeout_signal)
|
|
|
|
msgs := Messages{}
|
|
|
|
for policy_type, sigs := range(pends) {
|
|
|
|
for _, m := range(sigs) {
|
|
|
|
msgs = append(msgs, m)
|
2023-08-31 19:50:32 -06:00
|
|
|
node.PendingSignals[m.Signal.Header().ID] = PendingSignal{policy_type, false, msg.Signal.Header().ID}
|
2023-08-10 23:43:10 -06:00
|
|
|
}
|
|
|
|
}
|
2023-08-31 19:50:32 -06:00
|
|
|
node.PendingACLs[msg.Signal.Header().ID] = PendingACL{len(msgs), timeout_signal.ID, msg.Signal.Permission(), princ_id, msgs, []Signal{}, msg.Signal, msg.Source}
|
2023-08-10 23:43:10 -06:00
|
|
|
ctx.Send(msgs)
|
|
|
|
continue
|
|
|
|
} else if resp == Allow {
|
|
|
|
ctx.Log.Logf("policy", "SIGNAL_POLICY_ALLOW: %s->%s - %s", princ_id, node.ID, msg.Signal.Permission())
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
ctx.Log.Logf("policy", "SIGNAL_POLICY_SELF: %s - %s", node.ID, msg.Signal.Permission())
|
2023-08-08 14:00:17 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
signal = msg.Signal
|
|
|
|
source = msg.Source
|
|
|
|
|
2023-07-27 15:27:14 -06:00
|
|
|
case <-node.TimeoutChan:
|
2023-08-08 14:00:17 -06:00
|
|
|
signal = node.NextSignal.Signal
|
|
|
|
source = node.ID
|
2023-08-07 20:26:02 -06:00
|
|
|
|
2023-07-30 11:07:41 -06:00
|
|
|
t := node.NextSignal.Time
|
2023-07-30 01:29:15 -06:00
|
|
|
i := -1
|
|
|
|
for j, queued := range(node.SignalQueue) {
|
2023-08-31 19:50:32 -06:00
|
|
|
if queued.Signal.Header().ID == node.NextSignal.Signal.Header().ID {
|
2023-07-30 01:29:15 -06:00
|
|
|
i = j
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if i == -1 {
|
|
|
|
panic("node.NextSignal not in node.SignalQueue")
|
|
|
|
}
|
|
|
|
l := len(node.SignalQueue)
|
|
|
|
node.SignalQueue[i] = node.SignalQueue[l-1]
|
|
|
|
node.SignalQueue = node.SignalQueue[:(l-1)]
|
|
|
|
|
2023-07-27 15:27:14 -06:00
|
|
|
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
|
2023-07-30 11:02:22 -06:00
|
|
|
if node.NextSignal == nil {
|
2023-08-31 19:50:32 -06:00
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil@%+v", node.ID, signal, t, node.TimeoutChan)
|
2023-07-30 11:02:22 -06:00
|
|
|
} else {
|
2023-08-31 19:50:32 -06:00
|
|
|
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, signal, t, node.NextSignal, node.NextSignal.Time)
|
2023-07-30 11:02:22 -06:00
|
|
|
}
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
|
|
|
|
2023-08-10 23:43:10 -06:00
|
|
|
ctx.Log.Logf("node", "NODE_SIGNAL_QUEUE[%s]: %+v", node.ID, node.SignalQueue)
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
info, waiting := node.PendingSignals[signal.Header().ReqID]
|
2023-08-10 23:43:10 -06:00
|
|
|
if waiting == true {
|
|
|
|
if info.Found == false {
|
|
|
|
info.Found = true
|
2023-08-31 19:50:32 -06:00
|
|
|
node.PendingSignals[signal.Header().ReqID] = info
|
2023-08-10 23:43:10 -06:00
|
|
|
ctx.Log.Logf("pending", "FOUND_PENDING_SIGNAL: %s - %s", node.ID, signal)
|
|
|
|
req_info, exists := node.PendingACLs[info.ID]
|
|
|
|
if exists == true {
|
|
|
|
req_info.Counter -= 1
|
|
|
|
req_info.Responses = append(req_info.Responses, signal)
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
allowed := node.Policies[info.Policy].ContinueAllows(ctx, req_info, signal)
|
2023-08-10 23:43:10 -06:00
|
|
|
if allowed == Allow {
|
|
|
|
ctx.Log.Logf("policy", "DELAYED_POLICY_ALLOW: %s - %s", node.ID, req_info.Signal)
|
|
|
|
signal = req_info.Signal
|
|
|
|
source = req_info.Source
|
|
|
|
err := node.DequeueSignal(req_info.TimeoutID)
|
|
|
|
if err != nil {
|
|
|
|
panic("dequeued a passed signal")
|
|
|
|
}
|
|
|
|
delete(node.PendingACLs, info.ID)
|
|
|
|
} else if req_info.Counter == 0 {
|
|
|
|
ctx.Log.Logf("policy", "DELAYED_POLICY_DENY: %s - %s", node.ID, req_info.Signal)
|
|
|
|
// Send the denied response
|
|
|
|
msgs := Messages{}
|
2023-08-31 19:50:32 -06:00
|
|
|
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(req_info.Signal.Header().ID, "ACL_DENIED"), req_info.Source)
|
2023-08-10 23:43:10 -06:00
|
|
|
err := ctx.Send(msgs)
|
|
|
|
if err != nil {
|
|
|
|
ctx.Log.Logf("signal", "SEND_ERR: %s", err)
|
|
|
|
}
|
|
|
|
err = node.DequeueSignal(req_info.TimeoutID)
|
|
|
|
if err != nil {
|
|
|
|
panic("dequeued a passed signal")
|
|
|
|
}
|
|
|
|
delete(node.PendingACLs, info.ID)
|
|
|
|
} else {
|
|
|
|
node.PendingACLs[info.ID] = req_info
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-08-06 12:47:47 -06:00
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
switch sig := signal.(type) {
|
|
|
|
case *StopSignal:
|
2023-08-08 14:00:17 -06:00
|
|
|
msgs := Messages{}
|
2023-09-02 17:30:52 -06:00
|
|
|
msgs = msgs.Add(ctx, node.ID, node.Key, NewStatusSignal(node.ID, "stopped"), source)
|
2023-08-31 19:50:32 -06:00
|
|
|
ctx.Send(msgs)
|
|
|
|
node.Process(ctx, node.ID, NewStatusSignal(node.ID, "stopped"))
|
|
|
|
run = false
|
|
|
|
case *ReadSignal:
|
|
|
|
result := node.ReadFields(ctx, sig.Extensions)
|
|
|
|
msgs := Messages{}
|
|
|
|
msgs = msgs.Add(ctx, node.ID, node.Key, NewReadResultSignal(sig.ID, node.ID, node.Type, result), source)
|
|
|
|
msgs = msgs.Add(ctx, node.ID, node.Key, NewErrorSignal(sig.ID, "read_done"), source)
|
2023-08-08 14:00:17 -06:00
|
|
|
ctx.Send(msgs)
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
2023-07-28 11:21:18 -06:00
|
|
|
|
2023-08-08 14:00:17 -06:00
|
|
|
node.Process(ctx, source, signal)
|
2023-07-31 16:37:32 -06:00
|
|
|
// assume that processing a signal means that this nodes state changed
|
|
|
|
// TODO: remove a lot of database writes by only writing when things change,
|
|
|
|
// so need to have Process return whether or not state changed
|
|
|
|
err := WriteNode(ctx, node)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
2023-07-27 16:06:56 -06:00
|
|
|
|
|
|
|
stopped := node.Active.CompareAndSwap(true, false)
|
|
|
|
if stopped == false {
|
|
|
|
panic("BAD_STATE: stopping already stopped node")
|
|
|
|
}
|
2023-07-27 15:27:14 -06:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-08-07 20:26:02 -06:00
|
|
|
type Message struct {
|
2023-08-08 14:00:17 -06:00
|
|
|
Source NodeID
|
|
|
|
Dest NodeID
|
|
|
|
Principal ed25519.PublicKey
|
|
|
|
Signal Signal
|
|
|
|
Signature []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
type Messages []*Message
|
2023-08-31 19:50:32 -06:00
|
|
|
func (msgs Messages) Add(ctx *Context, source NodeID, principal ed25519.PrivateKey, signal Signal, dest NodeID) Messages {
|
|
|
|
msg, err := NewMessage(ctx, dest, source, principal, signal)
|
2023-08-08 14:00:17 -06:00
|
|
|
if err != nil {
|
2023-08-10 23:43:10 -06:00
|
|
|
panic(err)
|
2023-08-08 14:00:17 -06:00
|
|
|
} else {
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
}
|
|
|
|
return msgs
|
2023-08-07 20:26:02 -06:00
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
func NewMessage(ctx *Context, dest NodeID, source NodeID, principal ed25519.PrivateKey, signal Signal) (*Message, error) {
|
2023-09-02 17:30:52 -06:00
|
|
|
signal_ser, err := SerializeValue(ctx, reflect.ValueOf(signal))
|
2023-08-31 19:50:32 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ser, err := signal_ser.MarshalBinary()
|
2023-08-08 14:00:17 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
dest_ser, err := dest.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
source_ser, err := source.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
sig_data := append(dest_ser, source_ser...)
|
2023-08-08 14:00:17 -06:00
|
|
|
sig_data = append(sig_data, ser...)
|
|
|
|
|
|
|
|
sig, err := principal.Sign(rand.Reader, sig_data, crypto.Hash(0))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Message{
|
|
|
|
Dest: dest,
|
|
|
|
Source: source,
|
|
|
|
Principal: principal.Public().(ed25519.PublicKey),
|
|
|
|
Signal: signal,
|
|
|
|
Signature: sig,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
|
2023-08-31 19:50:32 -06:00
|
|
|
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
|
2023-08-08 14:00:17 -06:00
|
|
|
messages := Messages{}
|
2023-07-27 15:27:14 -06:00
|
|
|
for ext_type, ext := range(node.Extensions) {
|
2023-08-07 20:26:02 -06:00
|
|
|
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
|
|
|
|
//TODO: add extension and node info to log
|
2023-08-08 14:00:17 -06:00
|
|
|
resp := ext.Process(ctx, node, source, signal)
|
2023-08-07 20:26:02 -06:00
|
|
|
if resp != nil {
|
|
|
|
messages = append(messages, resp...)
|
|
|
|
}
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
2023-08-07 20:26:02 -06:00
|
|
|
|
2023-08-08 14:00:17 -06:00
|
|
|
return ctx.Send(messages)
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
func GetCtx[T Extension, C any](ctx *Context, ext_type ExtType) (C, error) {
|
2023-07-26 11:56:10 -06:00
|
|
|
var zero_ctx C
|
2023-08-31 19:50:32 -06:00
|
|
|
ext_info, ok := ctx.Extensions[ext_type]
|
2023-07-27 16:06:56 -06:00
|
|
|
if ok == false {
|
2023-08-31 19:50:32 -06:00
|
|
|
return zero_ctx, fmt.Errorf("%+v is not an extension in ctx", ext_type)
|
2023-07-26 11:56:10 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
ext_ctx, ok := ext_info.Data.(C)
|
|
|
|
if ok == false {
|
2023-08-31 19:50:32 -06:00
|
|
|
return zero_ctx, fmt.Errorf("context for %+v is %+v, not %+v", ext_type, reflect.TypeOf(ext_info.Data), reflect.TypeOf(zero_ctx))
|
2023-07-26 11:56:10 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
return ext_ctx, nil
|
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
func GetExt[T Extension](node *Node, ext_type ExtType) (T, error) {
|
2023-07-25 21:43:15 -06:00
|
|
|
var zero T
|
2023-07-26 00:42:12 -06:00
|
|
|
ext, exists := node.Extensions[ext_type]
|
2023-07-25 21:43:15 -06:00
|
|
|
if exists == false {
|
2023-08-31 19:50:32 -06:00
|
|
|
return zero, fmt.Errorf("%+v does not have %+v extension - %+v", node.ID, ext_type, node.Extensions)
|
2023-07-24 16:04:56 -06:00
|
|
|
}
|
2023-07-24 01:41:47 -06:00
|
|
|
|
2023-07-25 21:43:15 -06:00
|
|
|
ret, ok := ext.(T)
|
|
|
|
if ok == false {
|
2023-08-31 19:50:32 -06:00
|
|
|
return zero, fmt.Errorf("%+v in %+v is wrong type(%+v), expecting %+v", ext_type, node.ID, reflect.TypeOf(ext), reflect.TypeOf(zero))
|
2023-07-25 21:43:15 -06:00
|
|
|
}
|
2023-07-20 23:19:10 -06:00
|
|
|
|
2023-07-25 21:43:15 -06:00
|
|
|
return ret, nil
|
2023-07-20 23:19:10 -06:00
|
|
|
}
|
|
|
|
|
2023-08-06 12:47:47 -06:00
|
|
|
func KeyID(pub ed25519.PublicKey) NodeID {
|
2023-08-31 19:50:32 -06:00
|
|
|
id := uuid.NewHash(sha512.New(), ZeroUUID, pub, 3)
|
|
|
|
return NodeID(id)
|
2023-07-28 15:07:38 -06:00
|
|
|
}
|
|
|
|
|
2023-07-27 15:27:14 -06:00
|
|
|
// Create a new node in memory and start it's event loop
|
2023-08-31 22:31:29 -06:00
|
|
|
func NewNode(ctx *Context, key ed25519.PrivateKey, node_type NodeType, buffer_size uint32, policies map[PolicyType]Policy, extensions ...Extension) (*Node, error) {
|
2023-07-28 15:07:38 -06:00
|
|
|
var err error
|
2023-08-06 12:47:47 -06:00
|
|
|
var public ed25519.PublicKey
|
2023-07-28 15:07:38 -06:00
|
|
|
if key == nil {
|
2023-08-06 12:47:47 -06:00
|
|
|
public, key, err = ed25519.GenerateKey(rand.Reader)
|
2023-07-28 15:07:38 -06:00
|
|
|
if err != nil {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, err
|
2023-07-28 15:07:38 -06:00
|
|
|
}
|
2023-08-06 12:47:47 -06:00
|
|
|
} else {
|
|
|
|
public = key.Public().(ed25519.PublicKey)
|
2023-07-28 15:07:38 -06:00
|
|
|
}
|
2023-08-06 12:47:47 -06:00
|
|
|
id := KeyID(public)
|
2023-07-28 12:46:06 -06:00
|
|
|
_, exists := ctx.Node(id)
|
2023-07-26 15:08:14 -06:00
|
|
|
if exists == true {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, fmt.Errorf("Attempted to create an existing node")
|
2023-07-26 15:08:14 -06:00
|
|
|
}
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
def, exists := ctx.Nodes[node_type]
|
2023-07-27 11:33:11 -06:00
|
|
|
if exists == false {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, fmt.Errorf("Node type %+v not registered in Context", node_type)
|
2023-07-27 11:33:11 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
ext_map := map[ExtType]Extension{}
|
|
|
|
for _, ext := range(extensions) {
|
2023-08-31 19:50:32 -06:00
|
|
|
ext_type, exists := ctx.ExtensionTypes[reflect.TypeOf(ext)]
|
|
|
|
if exists == false {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, fmt.Errorf(fmt.Sprintf("%+v is not a known Extension", reflect.TypeOf(ext)))
|
2023-08-31 19:50:32 -06:00
|
|
|
}
|
|
|
|
_, exists = ext_map[ext_type]
|
2023-07-27 11:33:11 -06:00
|
|
|
if exists == true {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, fmt.Errorf("Cannot add the same extension to a node twice")
|
2023-07-27 11:33:11 -06:00
|
|
|
}
|
2023-08-31 19:50:32 -06:00
|
|
|
ext_map[ext_type] = ext
|
2023-07-27 11:33:11 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, required_ext := range(def.Extensions) {
|
|
|
|
_, exists := ext_map[required_ext]
|
|
|
|
if exists == false {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, fmt.Errorf(fmt.Sprintf("%+v requires %+v", node_type, required_ext))
|
2023-07-27 11:33:11 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-07 20:26:02 -06:00
|
|
|
if policies == nil {
|
|
|
|
policies = map[PolicyType]Policy{}
|
2023-07-27 15:27:14 -06:00
|
|
|
}
|
|
|
|
|
2023-08-28 14:52:28 -06:00
|
|
|
default_policy := NewAllNodesPolicy(Tree{
|
2023-08-31 19:50:32 -06:00
|
|
|
uint64(ErrorSignalType): nil,
|
|
|
|
uint64(ReadResultSignalType): nil,
|
|
|
|
uint64(StatusSignalType): nil,
|
2023-08-28 14:52:28 -06:00
|
|
|
})
|
|
|
|
|
|
|
|
all_nodes_policy, exists := policies[AllNodesPolicyType]
|
|
|
|
if exists == true {
|
|
|
|
policies[AllNodesPolicyType] = all_nodes_policy.Merge(&default_policy)
|
|
|
|
} else {
|
|
|
|
policies[AllNodesPolicyType] = &default_policy
|
|
|
|
}
|
|
|
|
|
2023-07-26 15:08:14 -06:00
|
|
|
node := &Node{
|
2023-07-28 15:07:38 -06:00
|
|
|
Key: key,
|
2023-07-25 21:43:15 -06:00
|
|
|
ID: id,
|
2023-07-26 00:18:11 -06:00
|
|
|
Type: node_type,
|
2023-07-27 11:33:11 -06:00
|
|
|
Extensions: ext_map,
|
2023-08-07 20:26:02 -06:00
|
|
|
Policies: policies,
|
2023-08-10 23:43:10 -06:00
|
|
|
PendingACLs: map[uuid.UUID]PendingACL{},
|
|
|
|
PendingSignals: map[uuid.UUID]PendingSignal{},
|
2023-08-08 14:00:17 -06:00
|
|
|
MsgChan: make(chan *Message, buffer_size),
|
2023-07-28 13:45:14 -06:00
|
|
|
BufferSize: buffer_size,
|
2023-08-07 20:26:02 -06:00
|
|
|
SignalQueue: []QueuedSignal{},
|
2023-07-25 21:43:15 -06:00
|
|
|
}
|
2023-07-28 12:46:06 -06:00
|
|
|
ctx.AddNode(id, node)
|
2023-08-11 16:00:36 -06:00
|
|
|
|
2023-07-28 15:07:38 -06:00
|
|
|
err = WriteNode(ctx, node)
|
2023-07-28 00:04:18 -06:00
|
|
|
if err != nil {
|
2023-08-31 22:31:29 -06:00
|
|
|
return nil, err
|
2023-07-28 00:04:18 -06:00
|
|
|
}
|
2023-07-27 15:27:14 -06:00
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
node.Process(ctx, ZeroID, NewCreateSignal())
|
2023-08-06 12:47:47 -06:00
|
|
|
|
2023-07-27 23:15:58 -06:00
|
|
|
go runNode(ctx, node)
|
2023-07-27 11:33:11 -06:00
|
|
|
|
2023-08-31 22:31:29 -06:00
|
|
|
return node, nil
|
2023-07-09 14:30:30 -06:00
|
|
|
}
|
|
|
|
|
2023-07-27 15:49:21 -06:00
|
|
|
// Write a node to the database
|
2023-07-27 15:27:14 -06:00
|
|
|
func WriteNode(ctx *Context, node *Node) error {
|
|
|
|
ctx.Log.Logf("db", "DB_WRITE: %s", node.ID)
|
|
|
|
|
2023-09-02 17:30:52 -06:00
|
|
|
node_serialized, err := SerializeValue(ctx, reflect.ValueOf(node))
|
2023-08-31 19:50:32 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
bytes, err := node_serialized.MarshalBinary()
|
2023-07-27 15:27:14 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-08-28 14:52:28 -06:00
|
|
|
ctx.Log.Logf("db_data", "DB_DATA: %+v", bytes)
|
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
id_bytes, err := node.ID.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2023-07-28 00:04:18 -06:00
|
|
|
ctx.Log.Logf("db", "DB_WRITE_ID: %+v", id_bytes)
|
2023-07-27 15:27:14 -06:00
|
|
|
|
|
|
|
return ctx.DB.Update(func(txn *badger.Txn) error {
|
|
|
|
return txn.Set(id_bytes, bytes)
|
|
|
|
})
|
|
|
|
}
|
2023-07-09 14:30:30 -06:00
|
|
|
|
2023-07-25 21:43:15 -06:00
|
|
|
func LoadNode(ctx * Context, id NodeID) (*Node, error) {
|
2023-07-27 15:27:14 -06:00
|
|
|
ctx.Log.Logf("db", "LOADING_NODE: %s", id)
|
2023-07-09 14:30:30 -06:00
|
|
|
var bytes []byte
|
|
|
|
err := ctx.DB.View(func(txn *badger.Txn) error {
|
2023-08-31 19:50:32 -06:00
|
|
|
id_bytes, err := id.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2023-07-28 00:04:18 -06:00
|
|
|
ctx.Log.Logf("db", "DB_READ_ID: %+v", id_bytes)
|
|
|
|
item, err := txn.Get(id_bytes)
|
2023-07-09 14:30:30 -06:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return item.Value(func(val []byte) error {
|
|
|
|
bytes = append([]byte{}, val...)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
})
|
2023-07-27 16:48:39 -06:00
|
|
|
if errors.Is(err, badger.ErrKeyNotFound) {
|
|
|
|
return nil, NodeNotFoundError
|
|
|
|
}else if err != nil {
|
2023-07-25 21:43:15 -06:00
|
|
|
return nil, err
|
2023-07-09 14:30:30 -06:00
|
|
|
}
|
|
|
|
|
2023-09-03 17:50:12 -06:00
|
|
|
value, remaining, err := ParseSerializedValue(bytes)
|
2023-09-02 17:30:52 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2023-09-03 17:50:12 -06:00
|
|
|
} else if len(remaining) != 0 {
|
|
|
|
return nil, fmt.Errorf("%d bytes left after parsing node from DB", len(remaining))
|
2023-09-02 17:30:52 -06:00
|
|
|
}
|
2023-09-03 17:50:12 -06:00
|
|
|
_, node_val, remaining_data, err := DeserializeValue(ctx, value, 1)
|
2023-09-02 17:30:52 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-09-03 17:50:12 -06:00
|
|
|
if len(remaining_data.TypeStack) != 0 {
|
|
|
|
return nil, fmt.Errorf("%d entries left in typestack after deserializing *Node", len(remaining_data.TypeStack))
|
|
|
|
}
|
|
|
|
if len(remaining_data.Data) != 0 {
|
|
|
|
return nil, fmt.Errorf("%d bytes left after desrializing *Node", len(remaining_data.Data))
|
2023-09-02 17:30:52 -06:00
|
|
|
}
|
2023-08-07 20:26:02 -06:00
|
|
|
|
2023-09-02 18:49:37 -06:00
|
|
|
node, ok := node_val[0].Interface().(*Node)
|
2023-09-02 17:30:52 -06:00
|
|
|
if ok == false {
|
2023-09-02 18:49:37 -06:00
|
|
|
return nil, fmt.Errorf("Deserialized %+v when expecting *Node", reflect.TypeOf(node_val).Elem())
|
2023-09-02 17:30:52 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
ctx.AddNode(id, node)
|
2023-07-25 21:43:15 -06:00
|
|
|
ctx.Log.Logf("db", "DB_NODE_LOADED: %s", id)
|
2023-07-27 23:15:58 -06:00
|
|
|
go runNode(ctx, node)
|
2023-07-27 15:27:14 -06:00
|
|
|
|
2023-08-31 19:50:32 -06:00
|
|
|
return nil, nil
|
2023-07-09 14:30:30 -06:00
|
|
|
}
|