2023-07-09 14:30:30 -06:00
package graphvent
import (
2023-08-06 12:47:47 -06:00
"crypto/ed25519"
2023-07-28 15:07:38 -06:00
"crypto/rand"
2023-11-06 00:50:29 -07:00
"crypto/sha512"
"encoding/binary"
"fmt"
"reflect"
"sync/atomic"
"time"
badger "github.com/dgraph-io/badger/v3"
"github.com/google/uuid"
2023-07-09 14:30:30 -06:00
)
2023-07-28 13:12:17 -06:00
var (
// Base NodeID, used as a special value
ZeroUUID = uuid . UUID { }
2023-08-31 19:50:32 -06:00
ZeroID = NodeID ( ZeroUUID )
2023-07-28 13:12:17 -06:00
)
2023-07-27 16:06:56 -06:00
// A NodeID uniquely identifies a Node
2023-08-31 19:50:32 -06:00
type NodeID uuid . UUID
func ( id NodeID ) MarshalBinary ( ) ( [ ] byte , error ) {
return ( uuid . UUID ) ( id ) . MarshalBinary ( )
2023-07-27 15:27:14 -06:00
}
2023-08-31 19:50:32 -06:00
func ( id NodeID ) String ( ) string {
return ( uuid . UUID ) ( id ) . String ( )
2023-07-19 20:03:13 -06:00
}
2023-08-11 16:00:36 -06:00
func IDFromBytes ( bytes [ ] byte ) ( NodeID , error ) {
2023-08-31 19:50:32 -06:00
id , err := uuid . FromBytes ( bytes )
return NodeID ( id ) , err
2023-07-25 21:43:15 -06:00
}
2023-07-27 16:06:56 -06:00
// Parse an ID from a string
2023-07-19 20:03:13 -06:00
func ParseID ( str string ) ( NodeID , error ) {
id_uuid , err := uuid . Parse ( str )
if err != nil {
return NodeID { } , err
}
2023-08-31 19:50:32 -06:00
return NodeID ( id_uuid ) , nil
2023-07-09 14:30:30 -06:00
}
2023-07-10 22:31:43 -06:00
// Generate a random NodeID
2023-07-09 14:30:30 -06:00
func RandID ( ) NodeID {
2023-08-31 19:50:32 -06:00
return NodeID ( uuid . New ( ) )
2023-07-09 14:30:30 -06:00
}
2023-07-27 16:06:56 -06:00
// A QueuedSignal is a Signal that has been Queued to trigger at a set time
2023-07-27 15:27:14 -06:00
type QueuedSignal struct {
2023-09-12 19:40:06 -06:00
Signal ` gv:"signal" `
time . Time ` gv:"time" `
}
func ( q QueuedSignal ) String ( ) string {
return fmt . Sprintf ( "%+v@%s" , reflect . TypeOf ( q . Signal ) , q . Time )
2023-07-27 15:27:14 -06:00
}
2023-08-10 23:43:10 -06:00
type PendingACL struct {
Counter int
2023-10-13 00:32:24 -06:00
Responses [ ] ResponseSignal
2023-08-10 23:43:10 -06:00
TimeoutID uuid . UUID
Action Tree
Principal NodeID
2023-10-13 00:32:24 -06:00
2023-08-10 23:43:10 -06:00
Signal Signal
Source NodeID
}
2023-11-04 23:21:43 -06:00
type PendingACLSignal struct {
2023-09-20 19:14:28 -06:00
Policy uuid . UUID
2023-10-13 00:32:24 -06:00
Timeout uuid . UUID
2023-08-10 23:43:10 -06:00
ID uuid . UUID
}
2023-07-27 16:06:56 -06:00
// Default message channel size for nodes
// Nodes represent a group of extensions that can be collectively addressed
2023-07-25 21:43:15 -06:00
type Node struct {
2023-09-11 21:47:53 -06:00
Key ed25519 . PrivateKey ` gv:"key" `
2023-07-25 21:43:15 -06:00
ID NodeID
2023-09-11 21:47:53 -06:00
Type NodeType ` gv:"type" `
2023-11-04 23:21:43 -06:00
// TODO: move each extension to it's own db key, and extend changes to notify which extension was changed
2023-11-06 00:50:29 -07:00
Extensions map [ ExtType ] Extension
2023-11-06 01:28:17 -07:00
2023-09-20 19:14:28 -06:00
Policies [ ] Policy ` gv:"policies" `
2023-07-27 15:27:14 -06:00
2023-09-11 21:47:53 -06:00
PendingACLs map [ uuid . UUID ] PendingACL ` gv:"pending_acls" `
2023-11-04 23:21:43 -06:00
PendingACLSignals map [ uuid . UUID ] PendingACLSignal ` gv:"pending_signal" `
2023-08-10 23:43:10 -06:00
2023-07-27 16:06:56 -06:00
// Channel for this node to receive messages from the Context
2023-08-08 14:00:17 -06:00
MsgChan chan * Message
2023-07-28 13:45:14 -06:00
// Size of MsgChan
2023-09-11 21:47:53 -06:00
BufferSize uint32 ` gv:"buffer_size" `
2023-07-27 16:06:56 -06:00
// Channel for this node to process delayed signals
2023-07-27 15:27:14 -06:00
TimeoutChan <- chan time . Time
2023-07-27 16:06:56 -06:00
Active atomic . Bool
2023-07-27 15:27:14 -06:00
2023-11-04 23:21:43 -06:00
// TODO: enhance WriteNode to write SignalQueue to a different key, and use writeSignalQueue to decide whether or not to update it
writeSignalQueue bool
2023-11-06 00:50:29 -07:00
SignalQueue [ ] QueuedSignal
2023-07-27 15:27:14 -06:00
NextSignal * QueuedSignal
}
2023-09-12 19:00:48 -06:00
func ( node * Node ) PostDeserialize ( ctx * Context ) error {
2023-11-06 00:50:29 -07:00
node . Extensions = map [ ExtType ] Extension { }
2023-09-12 19:00:48 -06:00
public := node . Key . Public ( ) . ( ed25519 . PublicKey )
node . ID = KeyID ( public )
node . MsgChan = make ( chan * Message , node . BufferSize )
return nil
}
2023-08-10 23:43:10 -06:00
type RuleResult int
const (
Allow RuleResult = iota
Deny
Pending
)
2023-09-20 19:14:28 -06:00
func ( node * Node ) Allows ( ctx * Context , principal_id NodeID , action Tree ) ( map [ uuid . UUID ] Messages , RuleResult ) {
pends := map [ uuid . UUID ] Messages { }
for _ , policy := range ( node . Policies ) {
2023-08-31 19:50:32 -06:00
msgs , resp := policy . Allows ( ctx , principal_id , action , node )
2023-08-10 23:43:10 -06:00
if resp == Allow {
return nil , Allow
} else if resp == Pending {
2023-09-20 19:14:28 -06:00
pends [ policy . ID ( ) ] = msgs
2023-08-08 14:00:17 -06:00
}
2023-08-07 20:26:02 -06:00
}
2023-08-10 23:43:10 -06:00
if len ( pends ) != 0 {
return pends , Pending
}
return nil , Deny
}
2023-11-13 13:23:58 -07:00
type WaitReason string
2023-11-03 21:41:06 -06:00
type WaitInfo struct {
2023-11-07 20:51:34 -07:00
Destination NodeID ` gv:"destination" `
2023-11-03 21:41:06 -06:00
Timeout uuid . UUID ` gv:"timeout" `
2023-11-13 13:23:58 -07:00
Reason WaitReason ` gv:"reason" `
2023-11-03 21:41:06 -06:00
}
type WaitMap map [ uuid . UUID ] WaitInfo
// Removes a signal from the wait_map and dequeue the associated timeout signal
// Returns the data, and whether or not the ID was found in the wait_map
func ( node * Node ) ProcessResponse ( wait_map WaitMap , response ResponseSignal ) ( WaitInfo , bool ) {
wait_info , is_processed := wait_map [ response . ResponseID ( ) ]
if is_processed == true {
delete ( wait_map , response . ResponseID ( ) )
if response . ID ( ) != wait_info . Timeout {
node . DequeueSignal ( wait_info . Timeout )
}
return wait_info , true
}
return WaitInfo { } , false
}
2023-11-13 13:23:58 -07:00
func ( node * Node ) NewTimeout ( reason WaitReason , dest NodeID , timeout time . Duration ) ( WaitInfo , uuid . UUID ) {
2023-11-07 20:51:34 -07:00
id := uuid . New ( )
timeout_signal := NewTimeoutSignal ( id )
node . QueueSignal ( time . Now ( ) . Add ( timeout ) , timeout_signal )
return WaitInfo {
Destination : dest ,
Timeout : timeout_signal . Id ,
Reason : reason ,
} , id
}
2023-11-03 21:53:49 -06:00
// Creates a timeout signal for signal, queues it for the node at the timeout, and returns the WaitInfo
2023-11-13 13:23:58 -07:00
func ( node * Node ) QueueTimeout ( reason WaitReason , dest NodeID , signal Signal , timeout time . Duration ) WaitInfo {
2023-11-03 21:41:06 -06:00
timeout_signal := NewTimeoutSignal ( signal . ID ( ) )
node . QueueSignal ( time . Now ( ) . Add ( timeout ) , timeout_signal )
2023-11-03 21:53:49 -06:00
return WaitInfo {
2023-11-07 20:51:34 -07:00
Destination : dest ,
2023-11-03 21:41:06 -06:00
Timeout : timeout_signal . Id ,
2023-11-07 20:51:34 -07:00
Reason : reason ,
2023-11-03 21:41:06 -06:00
}
}
2023-08-10 23:43:10 -06:00
func ( node * Node ) QueueSignal ( time time . Time , signal Signal ) {
node . SignalQueue = append ( node . SignalQueue , QueuedSignal { signal , time } )
node . NextSignal , node . TimeoutChan = SoonestSignal ( node . SignalQueue )
2023-11-04 23:21:43 -06:00
node . writeSignalQueue = true
2023-08-07 20:26:02 -06:00
}
2023-08-10 23:43:10 -06:00
func ( node * Node ) DequeueSignal ( id uuid . UUID ) error {
idx := - 1
for i , q := range ( node . SignalQueue ) {
2023-10-01 16:45:03 -06:00
if q . Signal . ID ( ) == id {
2023-08-10 23:43:10 -06:00
idx = i
break
}
}
if idx == - 1 {
return fmt . Errorf ( "%s is not in SignalQueue" , id )
}
node . SignalQueue [ idx ] = node . SignalQueue [ len ( node . SignalQueue ) - 1 ]
node . SignalQueue = node . SignalQueue [ : len ( node . SignalQueue ) - 1 ]
2023-07-27 15:27:14 -06:00
node . NextSignal , node . TimeoutChan = SoonestSignal ( node . SignalQueue )
2023-11-04 23:21:43 -06:00
node . writeSignalQueue = true
2023-08-10 23:43:10 -06:00
return nil
2023-07-27 15:27:14 -06:00
}
func SoonestSignal ( signals [ ] QueuedSignal ) ( * QueuedSignal , <- chan time . Time ) {
var soonest_signal * QueuedSignal
var soonest_time time . Time
2023-07-30 11:29:58 -06:00
for i , signal := range ( signals ) {
2023-07-30 11:25:03 -06:00
if signal . Time . Compare ( soonest_time ) == - 1 || soonest_signal == nil {
2023-07-30 11:29:58 -06:00
soonest_signal = & signals [ i ]
2023-07-27 15:27:14 -06:00
soonest_time = signal . Time
}
}
if soonest_signal != nil {
2023-07-30 11:12:47 -06:00
return soonest_signal , time . After ( time . Until ( soonest_signal . Time ) )
2023-07-27 15:27:14 -06:00
} else {
return nil , nil
}
}
2023-07-27 23:15:58 -06:00
func runNode ( ctx * Context , node * Node ) {
2023-07-27 15:27:14 -06:00
ctx . Log . Logf ( "node" , "RUN_START: %s" , node . ID )
2023-07-27 23:15:58 -06:00
err := nodeLoop ( ctx , node )
2023-07-27 15:27:14 -06:00
if err != nil {
2023-11-06 01:28:17 -07:00
ctx . Log . Logf ( "node" , "%s runNode err %s" , node . ID , err )
2023-07-27 15:27:14 -06:00
panic ( err )
}
ctx . Log . Logf ( "node" , "RUN_STOP: %s" , node . ID )
}
2023-08-31 19:50:32 -06:00
type StringError string
func ( err StringError ) String ( ) string {
return string ( err )
}
func ( err StringError ) Error ( ) string {
return err . String ( )
}
func ( err StringError ) MarshalBinary ( ) ( [ ] byte , error ) {
return [ ] byte ( string ( err ) ) , nil
}
2024-03-03 15:45:45 -07:00
2023-08-31 19:50:32 -06:00
func NewErrorField ( fstring string , args ... interface { } ) SerializedValue {
str := StringError ( fmt . Sprintf ( fstring , args ... ) )
str_ser , err := str . MarshalBinary ( )
if err != nil {
panic ( err )
}
return SerializedValue {
2024-03-03 15:45:45 -07:00
TypeStack : [ ] SerializedType { SerializedTypeFor [ error ] ( ) } ,
2023-08-31 19:50:32 -06:00
Data : str_ser ,
}
}
func ( node * Node ) ReadFields ( ctx * Context , reqs map [ ExtType ] [ ] string ) map [ ExtType ] map [ string ] SerializedValue {
2024-03-03 15:45:45 -07:00
ctx . Log . Logf ( "read_field" , "Reading %+v on %+v" , reqs , node . ID )
2023-08-31 19:50:32 -06:00
exts := map [ ExtType ] map [ string ] SerializedValue { }
2023-07-28 11:21:18 -06:00
for ext_type , field_reqs := range ( reqs ) {
2023-08-31 19:50:32 -06:00
fields := map [ string ] SerializedValue { }
2023-07-28 11:21:18 -06:00
for _ , req := range ( field_reqs ) {
2023-08-08 14:00:17 -06:00
ext , exists := node . Extensions [ ext_type ]
if exists == false {
2023-08-31 19:50:32 -06:00
fields [ req ] = NewErrorField ( "%+v does not have %+v extension" , node . ID , ext_type )
2023-07-28 11:21:18 -06:00
} else {
2023-08-31 19:50:32 -06:00
f , err := SerializeField ( ctx , ext , req )
if err != nil {
fields [ req ] = NewErrorField ( err . Error ( ) )
} else {
fields [ req ] = f
}
2023-07-28 11:21:18 -06:00
}
}
2023-07-28 11:59:01 -06:00
exts [ ext_type ] = fields
2023-07-28 11:21:18 -06:00
}
return exts
}
2023-08-11 16:00:36 -06:00
// Main Loop for nodes
2023-07-27 23:15:58 -06:00
func nodeLoop ( ctx * Context , node * Node ) error {
2023-07-27 16:06:56 -06:00
started := node . Active . CompareAndSwap ( false , true )
if started == false {
return fmt . Errorf ( "%s is already started, will not start again" , node . ID )
}
2023-07-31 16:25:18 -06:00
2024-03-03 16:37:03 -07:00
// Load each extension before starting the main loop
for _ , extension := range ( node . Extensions ) {
err := extension . Load ( ctx , node )
if err != nil {
return err
}
}
2023-08-31 19:50:32 -06:00
run := true
for run == true {
2023-08-08 14:00:17 -06:00
var signal Signal
var source NodeID
2023-07-27 15:27:14 -06:00
select {
2023-08-08 14:00:17 -06:00
case msg := <- node . MsgChan :
ctx . Log . Logf ( "node_msg" , "NODE_MSG: %s - %+v" , node . ID , msg . Signal )
2023-09-05 00:46:49 -06:00
signal_ser , err := SerializeAny ( ctx , msg . Signal )
2023-08-08 14:00:17 -06:00
if err != nil {
2023-08-31 19:50:32 -06:00
ctx . Log . Logf ( "signal" , "SIGNAL_SERIALIZE_ERR: %s - %+v" , err , msg . Signal )
}
2023-10-30 01:25:18 -06:00
chunks , err := signal_ser . Chunks ( )
2023-08-31 19:50:32 -06:00
if err != nil {
ctx . Log . Logf ( "signal" , "SIGNAL_SERIALIZE_ERR: %s - %+v" , err , signal_ser )
2023-08-08 14:00:17 -06:00
continue
}
2023-08-31 19:50:32 -06:00
dst_id_ser , err := msg . Dest . MarshalBinary ( )
if err != nil {
ctx . Log . Logf ( "signal" , "SIGNAL_DEST_ID_SER_ERR: %e" , err )
continue
}
2023-10-14 15:05:23 -06:00
src_id_ser , err := KeyID ( msg . Source ) . MarshalBinary ( )
2023-08-31 19:50:32 -06:00
if err != nil {
ctx . Log . Logf ( "signal" , "SIGNAL_SRC_ID_SER_ERR: %e" , err )
continue
}
sig_data := append ( dst_id_ser , src_id_ser ... )
2023-10-30 01:25:18 -06:00
sig_data = append ( sig_data , chunks . Slice ( ) ... )
2023-10-14 15:53:20 -06:00
if msg . Authorization != nil {
sig_data = append ( sig_data , msg . Authorization . Signature ... )
}
2023-10-14 15:05:23 -06:00
validated := ed25519 . Verify ( msg . Source , sig_data , msg . Signature )
2023-08-08 14:00:17 -06:00
if validated == false {
2023-10-30 13:23:08 -06:00
ctx . Log . Logf ( "signal_verify" , "SIGNAL_VERIFY_ERR: %s - %s" , node . ID , reflect . TypeOf ( msg . Signal ) )
2023-08-08 14:00:17 -06:00
continue
}
2023-10-14 15:16:56 -06:00
var princ_id NodeID
if msg . Authorization == nil {
princ_id = KeyID ( msg . Source )
} else {
err := ValidateAuthorization ( * msg . Authorization , time . Hour )
if err != nil {
ctx . Log . Logf ( "node" , "Authorization validation failed: %s" , err )
continue
}
princ_id = KeyID ( msg . Authorization . Identity )
}
2023-08-10 23:43:10 -06:00
if princ_id != node . ID {
2023-08-31 19:50:32 -06:00
pends , resp := node . Allows ( ctx , princ_id , msg . Signal . Permission ( ) )
2023-08-10 23:43:10 -06:00
if resp == Deny {
2023-09-20 11:05:47 -06:00
ctx . Log . Logf ( "policy" , "SIGNAL_POLICY_DENY: %s->%s - %+v(%+s)" , princ_id , node . ID , reflect . TypeOf ( msg . Signal ) , msg . Signal )
2023-08-15 18:23:06 -06:00
ctx . Log . Logf ( "policy" , "SIGNAL_POLICY_SOURCE: %s" , msg . Source )
2023-08-10 23:43:10 -06:00
msgs := Messages { }
2023-10-14 15:05:23 -06:00
msgs = msgs . Add ( ctx , KeyID ( msg . Source ) , node , nil , NewErrorSignal ( msg . Signal . ID ( ) , "acl denied" ) )
2023-08-10 23:43:10 -06:00
ctx . Send ( msgs )
continue
} else if resp == Pending {
ctx . Log . Logf ( "policy" , "SIGNAL_POLICY_PENDING: %s->%s - %s - %+v" , princ_id , node . ID , msg . Signal . Permission ( ) , pends )
2023-10-01 16:45:03 -06:00
timeout_signal := NewACLTimeoutSignal ( msg . Signal . ID ( ) )
2023-08-10 23:43:10 -06:00
node . QueueSignal ( time . Now ( ) . Add ( 100 * time . Millisecond ) , timeout_signal )
msgs := Messages { }
for policy_type , sigs := range ( pends ) {
for _ , m := range ( sigs ) {
msgs = append ( msgs , m )
2023-10-13 00:32:24 -06:00
timeout_signal := NewTimeoutSignal ( m . Signal . ID ( ) )
node . QueueSignal ( time . Now ( ) . Add ( time . Second ) , timeout_signal )
2023-11-04 23:21:43 -06:00
node . PendingACLSignals [ m . Signal . ID ( ) ] = PendingACLSignal { policy_type , timeout_signal . Id , msg . Signal . ID ( ) }
2023-08-10 23:43:10 -06:00
}
}
2023-10-13 00:32:24 -06:00
node . PendingACLs [ msg . Signal . ID ( ) ] = PendingACL {
Counter : len ( msgs ) ,
TimeoutID : timeout_signal . ID ( ) ,
Action : msg . Signal . Permission ( ) ,
Principal : princ_id ,
Responses : [ ] ResponseSignal { } ,
Signal : msg . Signal ,
2023-10-14 15:05:23 -06:00
Source : KeyID ( msg . Source ) ,
2023-10-13 00:32:24 -06:00
}
2023-10-03 20:14:26 -06:00
ctx . Log . Logf ( "policy" , "Sending signals for pending ACL: %+v" , msgs )
2023-08-10 23:43:10 -06:00
ctx . Send ( msgs )
continue
} else if resp == Allow {
2023-09-27 18:28:56 -06:00
ctx . Log . Logf ( "policy" , "SIGNAL_POLICY_ALLOW: %s->%s - %s" , princ_id , node . ID , reflect . TypeOf ( msg . Signal ) )
2023-08-10 23:43:10 -06:00
}
} else {
2023-09-27 18:28:56 -06:00
ctx . Log . Logf ( "policy" , "SIGNAL_POLICY_SELF: %s - %s" , node . ID , reflect . TypeOf ( msg . Signal ) )
2023-08-08 14:00:17 -06:00
}
signal = msg . Signal
2023-10-14 15:05:23 -06:00
source = KeyID ( msg . Source )
2023-08-08 14:00:17 -06:00
2023-07-27 15:27:14 -06:00
case <- node . TimeoutChan :
2023-08-08 14:00:17 -06:00
signal = node . NextSignal . Signal
source = node . ID
2023-08-07 20:26:02 -06:00
2023-07-30 11:07:41 -06:00
t := node . NextSignal . Time
2023-07-30 01:29:15 -06:00
i := - 1
for j , queued := range ( node . SignalQueue ) {
2023-10-01 16:45:03 -06:00
if queued . Signal . ID ( ) == node . NextSignal . Signal . ID ( ) {
2023-07-30 01:29:15 -06:00
i = j
break
}
}
if i == - 1 {
2023-11-06 01:28:17 -07:00
ctx . Log . Logf ( "node" , "node.NextSignal not in node.SignalQueue, paniccing" )
2023-07-30 01:29:15 -06:00
panic ( "node.NextSignal not in node.SignalQueue" )
}
l := len ( node . SignalQueue )
node . SignalQueue [ i ] = node . SignalQueue [ l - 1 ]
node . SignalQueue = node . SignalQueue [ : ( l - 1 ) ]
2023-07-27 15:27:14 -06:00
node . NextSignal , node . TimeoutChan = SoonestSignal ( node . SignalQueue )
2023-11-04 23:21:43 -06:00
node . writeSignalQueue = true
2023-07-30 11:02:22 -06:00
if node . NextSignal == nil {
2023-08-31 19:50:32 -06:00
ctx . Log . Logf ( "node" , "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil@%+v" , node . ID , signal , t , node . TimeoutChan )
2023-07-30 11:02:22 -06:00
} else {
2023-08-31 19:50:32 -06:00
ctx . Log . Logf ( "node" , "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s" , node . ID , signal , t , node . NextSignal , node . NextSignal . Time )
2023-07-30 11:02:22 -06:00
}
2023-07-27 15:27:14 -06:00
}
2023-08-10 23:43:10 -06:00
ctx . Log . Logf ( "node" , "NODE_SIGNAL_QUEUE[%s]: %+v" , node . ID , node . SignalQueue )
2023-10-01 16:45:03 -06:00
response , ok := signal . ( ResponseSignal )
if ok == true {
2023-11-04 23:21:43 -06:00
info , waiting := node . PendingACLSignals [ response . ResponseID ( ) ]
2023-10-01 16:45:03 -06:00
if waiting == true {
2023-11-04 23:21:43 -06:00
delete ( node . PendingACLSignals , response . ResponseID ( ) )
2023-10-13 00:32:24 -06:00
ctx . Log . Logf ( "pending" , "FOUND_PENDING_SIGNAL: %s - %s" , node . ID , signal )
req_info , exists := node . PendingACLs [ info . ID ]
if exists == true {
req_info . Counter -= 1
req_info . Responses = append ( req_info . Responses , response )
idx := - 1
for i , p := range ( node . Policies ) {
if p . ID ( ) == info . Policy {
idx = i
break
2023-10-01 16:45:03 -06:00
}
2023-10-13 00:32:24 -06:00
}
if idx == - 1 {
ctx . Log . Logf ( "policy" , "PENDING_FOR_NONEXISTENT_POLICY: %s - %s" , node . ID , info . Policy )
delete ( node . PendingACLs , info . ID )
} else {
allowed := node . Policies [ idx ] . ContinueAllows ( ctx , req_info , signal )
if allowed == Allow {
ctx . Log . Logf ( "policy" , "DELAYED_POLICY_ALLOW: %s - %s" , node . ID , req_info . Signal )
signal = req_info . Signal
source = req_info . Source
err := node . DequeueSignal ( req_info . TimeoutID )
if err != nil {
ctx . Log . Logf ( "node" , "dequeue error: %s" , err )
}
2023-09-20 19:14:28 -06:00
delete ( node . PendingACLs , info . ID )
2023-10-13 00:32:24 -06:00
} else if req_info . Counter == 0 {
ctx . Log . Logf ( "policy" , "DELAYED_POLICY_DENY: %s - %s" , node . ID , req_info . Signal )
// Send the denied response
msgs := Messages { }
2023-10-14 15:05:23 -06:00
msgs = msgs . Add ( ctx , req_info . Source , node , nil , NewErrorSignal ( req_info . Signal . ID ( ) , "acl_denied" ) )
2023-10-13 00:32:24 -06:00
err := ctx . Send ( msgs )
if err != nil {
ctx . Log . Logf ( "signal" , "SEND_ERR: %s" , err )
}
err = node . DequeueSignal ( req_info . TimeoutID )
if err != nil {
2023-10-15 18:34:34 -06:00
ctx . Log . Logf ( "node" , "ACL_DEQUEUE_ERROR: timeout signal not in queue when trying to clear after counter hit 0 %s, %s - %s" , err , signal . ID ( ) , req_info . TimeoutID )
2023-10-01 16:45:03 -06:00
}
2023-10-13 00:32:24 -06:00
delete ( node . PendingACLs , info . ID )
} else {
node . PendingACLs [ info . ID ] = req_info
continue
2023-09-20 19:14:28 -06:00
}
2023-08-10 23:43:10 -06:00
}
}
}
}
2023-08-06 12:47:47 -06:00
2023-08-31 19:50:32 -06:00
switch sig := signal . ( type ) {
case * ReadSignal :
result := node . ReadFields ( ctx , sig . Extensions )
msgs := Messages { }
2023-10-14 15:05:23 -06:00
msgs = msgs . Add ( ctx , source , node , nil , NewReadResultSignal ( sig . ID ( ) , node . ID , node . Type , result ) )
2023-08-08 14:00:17 -06:00
ctx . Send ( msgs )
2023-07-28 11:21:18 -06:00
2023-09-06 18:29:35 -06:00
default :
2023-10-07 23:00:07 -06:00
err := node . Process ( ctx , source , signal )
2023-09-06 18:29:35 -06:00
if err != nil {
2023-11-06 01:28:17 -07:00
ctx . Log . Logf ( "node" , "%s process error %s" , node . ID , err )
2023-09-06 18:29:35 -06:00
panic ( err )
}
2023-07-31 16:37:32 -06:00
}
2023-07-27 15:27:14 -06:00
}
2023-07-27 16:06:56 -06:00
stopped := node . Active . CompareAndSwap ( true , false )
if stopped == false {
panic ( "BAD_STATE: stopping already stopped node" )
}
2023-07-27 15:27:14 -06:00
return nil
}
2024-03-03 16:37:03 -07:00
func ( node * Node ) Unload ( ctx * Context ) error {
2023-10-06 20:04:53 -06:00
if node . Active . Load ( ) {
2024-03-03 16:37:03 -07:00
for _ , extension := range ( node . Extensions ) {
extension . Unload ( ctx , node )
2023-10-06 20:04:53 -06:00
}
return nil
} else {
return fmt . Errorf ( "Node not active" )
}
}
2024-03-03 16:37:03 -07:00
func ( node * Node ) QueueChanges ( ctx * Context , changes map [ ExtType ] Changes ) error {
2023-10-10 11:23:44 -06:00
node . QueueSignal ( time . Now ( ) , NewStatusSignal ( node . ID , changes ) )
2023-10-07 23:00:07 -06:00
return nil
}
2023-08-08 14:00:17 -06:00
func ( node * Node ) Process ( ctx * Context , source NodeID , signal Signal ) error {
2023-08-31 19:50:32 -06:00
ctx . Log . Logf ( "node_process" , "PROCESSING MESSAGE: %s - %+v" , node . ID , signal )
2023-08-08 14:00:17 -06:00
messages := Messages { }
2024-03-03 16:37:03 -07:00
changes := map [ ExtType ] Changes { }
2023-07-27 15:27:14 -06:00
for ext_type , ext := range ( node . Extensions ) {
2023-08-07 20:26:02 -06:00
ctx . Log . Logf ( "node_process" , "PROCESSING_EXTENSION: %s/%s" , node . ID , ext_type )
2023-10-07 23:00:07 -06:00
ext_messages , ext_changes := ext . Process ( ctx , node , source , signal )
if len ( ext_messages ) != 0 {
messages = append ( messages , ext_messages ... )
}
if len ( ext_changes ) != 0 {
2024-03-03 16:37:03 -07:00
changes [ ext_type ] = ext_changes
2023-08-07 20:26:02 -06:00
}
2023-07-27 15:27:14 -06:00
}
2023-11-11 14:52:08 -07:00
ctx . Log . Logf ( "changes" , "Changes for %s after %+v - %+v" , node . ID , reflect . TypeOf ( signal ) , changes )
2023-08-07 20:26:02 -06:00
2023-10-06 20:04:53 -06:00
if len ( messages ) != 0 {
2023-10-07 23:00:07 -06:00
send_err := ctx . Send ( messages )
if send_err != nil {
return send_err
}
}
if len ( changes ) != 0 {
2024-03-03 16:37:03 -07:00
write_err := WriteNodeChanges ( ctx , node , changes )
if write_err != nil {
return write_err
}
2023-10-07 23:00:07 -06:00
2024-03-03 16:37:03 -07:00
status_err := node . QueueChanges ( ctx , changes )
if status_err != nil {
return status_err
2023-10-07 23:00:07 -06:00
}
2023-10-06 20:04:53 -06:00
}
2023-10-07 23:00:07 -06:00
2023-10-06 20:04:53 -06:00
return nil
2023-07-27 15:27:14 -06:00
}
2024-03-03 15:45:45 -07:00
func GetCtx [ C any , E any , T interface { * E ; Extension } ] ( ctx * Context ) ( C , error ) {
2023-07-26 11:56:10 -06:00
var zero_ctx C
2024-03-03 15:45:45 -07:00
ext_type := ExtType ( SerializedTypeFor [ E ] ( ) )
2023-08-31 19:50:32 -06:00
ext_info , ok := ctx . Extensions [ ext_type ]
2023-07-27 16:06:56 -06:00
if ok == false {
2023-08-31 19:50:32 -06:00
return zero_ctx , fmt . Errorf ( "%+v is not an extension in ctx" , ext_type )
2023-07-26 11:56:10 -06:00
}
ext_ctx , ok := ext_info . Data . ( C )
if ok == false {
2023-08-31 19:50:32 -06:00
return zero_ctx , fmt . Errorf ( "context for %+v is %+v, not %+v" , ext_type , reflect . TypeOf ( ext_info . Data ) , reflect . TypeOf ( zero_ctx ) )
2023-07-26 11:56:10 -06:00
}
return ext_ctx , nil
}
2024-03-03 15:45:45 -07:00
func GetExt [ E any , T interface { * E ; Extension } ] ( node * Node ) ( T , error ) {
2023-07-25 21:43:15 -06:00
var zero T
2024-03-03 15:45:45 -07:00
ext_type := ExtType ( SerializedTypeFor [ E ] ( ) )
2023-07-26 00:42:12 -06:00
ext , exists := node . Extensions [ ext_type ]
2023-07-25 21:43:15 -06:00
if exists == false {
2023-08-31 19:50:32 -06:00
return zero , fmt . Errorf ( "%+v does not have %+v extension - %+v" , node . ID , ext_type , node . Extensions )
2023-07-24 16:04:56 -06:00
}
2023-07-24 01:41:47 -06:00
2023-07-25 21:43:15 -06:00
ret , ok := ext . ( T )
if ok == false {
2023-08-31 19:50:32 -06:00
return zero , fmt . Errorf ( "%+v in %+v is wrong type(%+v), expecting %+v" , ext_type , node . ID , reflect . TypeOf ( ext ) , reflect . TypeOf ( zero ) )
2023-07-25 21:43:15 -06:00
}
2023-07-20 23:19:10 -06:00
2023-07-25 21:43:15 -06:00
return ret , nil
2023-07-20 23:19:10 -06:00
}
2023-08-06 12:47:47 -06:00
func KeyID ( pub ed25519 . PublicKey ) NodeID {
2023-08-31 19:50:32 -06:00
id := uuid . NewHash ( sha512 . New ( ) , ZeroUUID , pub , 3 )
return NodeID ( id )
2023-07-28 15:07:38 -06:00
}
2023-07-27 15:27:14 -06:00
// Create a new node in memory and start it's event loop
2024-03-03 15:45:45 -07:00
func NewNode ( ctx * Context , key ed25519 . PrivateKey , type_name string , buffer_size uint32 , policies [ ] Policy , extensions ... Extension ) ( * Node , error ) {
node_type , known_type := ctx . NodeTypes [ type_name ]
if known_type == false {
return nil , fmt . Errorf ( "%s is not a known node type" , type_name )
}
2023-07-28 15:07:38 -06:00
var err error
2023-08-06 12:47:47 -06:00
var public ed25519 . PublicKey
2023-07-28 15:07:38 -06:00
if key == nil {
2023-08-06 12:47:47 -06:00
public , key , err = ed25519 . GenerateKey ( rand . Reader )
2023-07-28 15:07:38 -06:00
if err != nil {
2023-08-31 22:31:29 -06:00
return nil , err
2023-07-28 15:07:38 -06:00
}
2023-08-06 12:47:47 -06:00
} else {
public = key . Public ( ) . ( ed25519 . PublicKey )
2023-07-28 15:07:38 -06:00
}
2023-08-06 12:47:47 -06:00
id := KeyID ( public )
2023-07-28 12:46:06 -06:00
_ , exists := ctx . Node ( id )
2023-07-26 15:08:14 -06:00
if exists == true {
2023-08-31 22:31:29 -06:00
return nil , fmt . Errorf ( "Attempted to create an existing node" )
2023-07-26 15:08:14 -06:00
}
2023-08-31 19:50:32 -06:00
def , exists := ctx . Nodes [ node_type ]
2023-07-27 11:33:11 -06:00
if exists == false {
2023-08-31 22:31:29 -06:00
return nil , fmt . Errorf ( "Node type %+v not registered in Context" , node_type )
2023-07-27 11:33:11 -06:00
}
ext_map := map [ ExtType ] Extension { }
for _ , ext := range ( extensions ) {
2023-08-31 19:50:32 -06:00
ext_type , exists := ctx . ExtensionTypes [ reflect . TypeOf ( ext ) ]
if exists == false {
2023-08-31 22:31:29 -06:00
return nil , fmt . Errorf ( fmt . Sprintf ( "%+v is not a known Extension" , reflect . TypeOf ( ext ) ) )
2023-08-31 19:50:32 -06:00
}
_ , exists = ext_map [ ext_type ]
2023-07-27 11:33:11 -06:00
if exists == true {
2023-08-31 22:31:29 -06:00
return nil , fmt . Errorf ( "Cannot add the same extension to a node twice" )
2023-07-27 11:33:11 -06:00
}
2023-08-31 19:50:32 -06:00
ext_map [ ext_type ] = ext
2023-07-27 11:33:11 -06:00
}
for _ , required_ext := range ( def . Extensions ) {
_ , exists := ext_map [ required_ext ]
if exists == false {
2023-08-31 22:31:29 -06:00
return nil , fmt . Errorf ( fmt . Sprintf ( "%+v requires %+v" , node_type , required_ext ) )
2023-07-27 11:33:11 -06:00
}
}
2023-09-20 19:14:28 -06:00
policies = append ( policies , DefaultPolicy )
2023-08-28 14:52:28 -06:00
2023-07-26 15:08:14 -06:00
node := & Node {
2023-07-28 15:07:38 -06:00
Key : key ,
2023-07-25 21:43:15 -06:00
ID : id ,
2023-07-26 00:18:11 -06:00
Type : node_type ,
2023-07-27 11:33:11 -06:00
Extensions : ext_map ,
2023-08-07 20:26:02 -06:00
Policies : policies ,
2023-08-10 23:43:10 -06:00
PendingACLs : map [ uuid . UUID ] PendingACL { } ,
2023-11-04 23:21:43 -06:00
PendingACLSignals : map [ uuid . UUID ] PendingACLSignal { } ,
2023-08-08 14:00:17 -06:00
MsgChan : make ( chan * Message , buffer_size ) ,
2023-07-28 13:45:14 -06:00
BufferSize : buffer_size ,
2023-08-07 20:26:02 -06:00
SignalQueue : [ ] QueuedSignal { } ,
2023-07-25 21:43:15 -06:00
}
2023-08-11 16:00:36 -06:00
2023-11-06 00:50:29 -07:00
err = WriteNodeExtList ( ctx , node )
if err != nil {
return nil , err
}
node . writeSignalQueue = true
2024-03-03 16:37:03 -07:00
err = WriteNodeInit ( ctx , node )
2023-07-28 00:04:18 -06:00
if err != nil {
2023-08-31 22:31:29 -06:00
return nil , err
2023-07-28 00:04:18 -06:00
}
2023-07-27 15:27:14 -06:00
2023-11-06 00:50:29 -07:00
ctx . AddNode ( id , node )
2023-07-27 23:15:58 -06:00
go runNode ( ctx , node )
2023-07-27 11:33:11 -06:00
2023-08-31 22:31:29 -06:00
return node , nil
2023-07-09 14:30:30 -06:00
}
2023-11-06 00:50:29 -07:00
var extension_suffix = [ ] byte { 0xEE , 0xFF , 0xEE , 0xFF }
var signal_queue_suffix = [ ] byte { 0xAB , 0xBA , 0xAB , 0xBA }
func ExtTypeSuffix ( ext_type ExtType ) [ ] byte {
ret := make ( [ ] byte , 12 )
copy ( ret [ 0 : 4 ] , extension_suffix )
binary . BigEndian . PutUint64 ( ret [ 4 : ] , uint64 ( ext_type ) )
return ret
2023-10-07 23:00:07 -06:00
}
2023-11-06 00:50:29 -07:00
func WriteNodeExtList ( ctx * Context , node * Node ) error {
ext_list := make ( [ ] ExtType , len ( node . Extensions ) )
i := 0
for ext_type := range ( node . Extensions ) {
ext_list [ i ] = ext_type
i += 1
}
2023-07-27 15:27:14 -06:00
2024-03-03 16:37:03 -07:00
ctx . Log . Logf ( "db" , "Writing ext_list for %s - %+v" , node . ID , ext_list )
2023-11-06 00:50:29 -07:00
id_bytes , err := node . ID . MarshalBinary ( )
2023-08-31 19:50:32 -06:00
if err != nil {
return err
}
2023-11-06 00:50:29 -07:00
ext_list_serialized , err := SerializeAny ( ctx , ext_list )
2023-07-27 15:27:14 -06:00
if err != nil {
return err
}
2023-11-06 00:50:29 -07:00
return ctx . DB . Update ( func ( txn * badger . Txn ) error {
return txn . Set ( append ( id_bytes , extension_suffix ... ) , ext_list_serialized . Data )
} )
}
2023-08-28 14:52:28 -06:00
2024-03-03 16:37:03 -07:00
func WriteNodeInit ( ctx * Context , node * Node ) error {
ctx . Log . Logf ( "db" , "Writing initial entry for %s - %+v" , node . ID , node )
ext_serialized := map [ ExtType ] SerializedValue { }
for ext_type , ext := range ( node . Extensions ) {
serialized_ext , err := SerializeAny ( ctx , ext )
if err != nil {
return err
}
ext_serialized [ ext_type ] = serialized_ext
}
sq_serialized , err := SerializeAny ( ctx , node . SignalQueue )
if err != nil {
return err
}
node_serialized , err := SerializeAny ( ctx , node )
if err != nil {
return err
}
id_bytes , err := node . ID . MarshalBinary ( )
return ctx . DB . Update ( func ( txn * badger . Txn ) error {
err := txn . Set ( id_bytes , node_serialized . Data )
if err != nil {
return nil
}
err = txn . Set ( append ( id_bytes , signal_queue_suffix ... ) , sq_serialized . Data )
if err != nil {
return err
}
for ext_type , data := range ( ext_serialized ) {
err := txn . Set ( append ( id_bytes , ExtTypeSuffix ( ext_type ) ... ) , data . Data )
if err != nil {
return err
}
}
return nil
} )
}
func WriteNodeChanges ( ctx * Context , node * Node , changes map [ ExtType ] Changes ) error {
2023-11-06 00:50:29 -07:00
ctx . Log . Logf ( "db" , "Writing changes for %s - %+v" , node . ID , changes )
ext_serialized := map [ ExtType ] SerializedValue { }
for ext_type := range ( changes ) {
ext , ext_exists := node . Extensions [ ext_type ]
if ext_exists == false {
ctx . Log . Logf ( "db" , "extension 0x%x does not exist for %s" , ext_type , node . ID )
} else {
serialized_ext , err := SerializeAny ( ctx , ext )
if err != nil {
return err
}
ext_serialized [ ext_type ] = serialized_ext
}
}
var sq_serialized * SerializedValue = nil
if node . writeSignalQueue == true {
node . writeSignalQueue = false
ser , err := SerializeAny ( ctx , node . SignalQueue )
if err != nil {
return err
}
sq_serialized = & ser
}
node_serialized , err := SerializeAny ( ctx , node )
2023-08-31 19:50:32 -06:00
if err != nil {
return err
}
2023-07-27 15:27:14 -06:00
2023-11-06 00:50:29 -07:00
id_bytes , err := node . ID . MarshalBinary ( )
2023-07-27 15:27:14 -06:00
return ctx . DB . Update ( func ( txn * badger . Txn ) error {
2023-11-06 00:50:29 -07:00
err := txn . Set ( id_bytes , node_serialized . Data )
if err != nil {
return err
}
if sq_serialized != nil {
err := txn . Set ( append ( id_bytes , signal_queue_suffix ... ) , sq_serialized . Data )
if err != nil {
return err
}
}
for ext_type , data := range ( ext_serialized ) {
err := txn . Set ( append ( id_bytes , ExtTypeSuffix ( ext_type ) ... ) , data . Data )
if err != nil {
return err
}
}
return nil
2023-07-27 15:27:14 -06:00
} )
}
2023-07-09 14:30:30 -06:00
2023-11-06 00:50:29 -07:00
func LoadNode ( ctx * Context , id NodeID ) ( * Node , error ) {
2023-07-27 15:27:14 -06:00
ctx . Log . Logf ( "db" , "LOADING_NODE: %s" , id )
2023-11-06 00:50:29 -07:00
var node_bytes [ ] byte = nil
var sq_bytes [ ] byte = nil
var ext_bytes = map [ ExtType ] [ ] byte { }
2023-07-09 14:30:30 -06:00
err := ctx . DB . View ( func ( txn * badger . Txn ) error {
2023-08-31 19:50:32 -06:00
id_bytes , err := id . MarshalBinary ( )
if err != nil {
return err
}
2023-11-06 00:50:29 -07:00
node_item , err := txn . Get ( id_bytes )
if err != nil {
ctx . Log . Logf ( "db" , "node key not found" )
return err
}
node_bytes , err = node_item . ValueCopy ( nil )
2023-07-09 14:30:30 -06:00
if err != nil {
return err
}
2023-11-06 00:50:29 -07:00
sq_item , err := txn . Get ( append ( id_bytes , signal_queue_suffix ... ) )
if err != nil {
ctx . Log . Logf ( "db" , "sq key not found" )
return err
}
sq_bytes , err = sq_item . ValueCopy ( nil )
if err != nil {
return err
}
ext_list_item , err := txn . Get ( append ( id_bytes , extension_suffix ... ) )
if err != nil {
ctx . Log . Logf ( "db" , "ext_list key not found" )
return err
}
ext_list_bytes , err := ext_list_item . ValueCopy ( nil )
if err != nil {
return err
}
ext_list_value , remaining , err := DeserializeValue ( ctx , reflect . TypeOf ( [ ] ExtType { } ) , ext_list_bytes )
if err != nil {
return err
} else if len ( remaining ) > 0 {
return fmt . Errorf ( "Data remaining after ext_list deserialize %d" , len ( remaining ) )
}
ext_list , ok := ext_list_value . Interface ( ) . ( [ ] ExtType )
if ok == false {
return fmt . Errorf ( "deserialize returned wrong type %s" , ext_list_value . Type ( ) )
}
for _ , ext_type := range ( ext_list ) {
ext_item , err := txn . Get ( append ( id_bytes , ExtTypeSuffix ( ext_type ) ... ) )
if err != nil {
ctx . Log . Logf ( "db" , "ext %s key not found" , ext_type )
return err
}
ext_bytes [ ext_type ] , err = ext_item . ValueCopy ( nil )
if err != nil {
return err
}
}
return nil
2023-07-09 14:30:30 -06:00
} )
2023-11-06 00:50:29 -07:00
if err != nil {
2023-07-25 21:43:15 -06:00
return nil , err
2023-07-09 14:30:30 -06:00
}
2023-11-06 00:50:29 -07:00
node_value , remaining , err := DeserializeValue ( ctx , reflect . TypeOf ( ( * Node ) ( nil ) ) , node_bytes )
2023-09-02 17:30:52 -06:00
if err != nil {
return nil , err
2023-09-03 17:50:12 -06:00
} else if len ( remaining ) != 0 {
2023-11-06 00:50:29 -07:00
return nil , fmt . Errorf ( "data left after deserializing node %d" , len ( remaining ) )
2023-09-02 17:30:52 -06:00
}
2023-11-06 00:50:29 -07:00
node , node_ok := node_value . Interface ( ) . ( * Node )
if node_ok == false {
return nil , fmt . Errorf ( "node wrong type %s" , node_value . Type ( ) )
2023-09-02 17:30:52 -06:00
}
2023-11-06 01:28:17 -07:00
ctx . Log . Logf ( "db" , "Deserialized node bytes %+v" , node )
2023-11-06 00:50:29 -07:00
signal_queue_value , remaining , err := DeserializeValue ( ctx , reflect . TypeOf ( [ ] QueuedSignal { } ) , sq_bytes )
2023-10-29 18:26:14 -06:00
if err != nil {
return nil , err
2023-11-06 00:50:29 -07:00
} else if len ( remaining ) != 0 {
return nil , fmt . Errorf ( "data left after deserializing signal_queue %d" , len ( remaining ) )
2023-09-02 17:30:52 -06:00
}
2023-08-07 20:26:02 -06:00
2023-11-06 00:50:29 -07:00
signal_queue , sq_ok := signal_queue_value . Interface ( ) . ( [ ] QueuedSignal )
if sq_ok == false {
return nil , fmt . Errorf ( "signal queue wrong type %s" , signal_queue_value . Type ( ) )
2023-09-02 17:30:52 -06:00
}
2023-11-06 00:50:29 -07:00
for ext_type , data := range ( ext_bytes ) {
ext_info , exists := ctx . Extensions [ ext_type ]
if exists == false {
return nil , fmt . Errorf ( "0x%0x is not a known extension type" , ext_type )
}
2024-03-03 15:45:45 -07:00
ext_value , remaining , err := DeserializeValue ( ctx , ext_info . Reflect , data )
2023-11-06 00:50:29 -07:00
if err != nil {
return nil , err
} else if len ( remaining ) > 0 {
return nil , fmt . Errorf ( "data left after deserializing ext(0x%x) %d" , ext_type , len ( remaining ) )
}
ext , ext_ok := ext_value . Interface ( ) . ( Extension )
if ext_ok == false {
return nil , fmt . Errorf ( "extension wrong type %s" , ext_value . Type ( ) )
}
node . Extensions [ ext_type ] = ext
2023-09-12 19:00:48 -06:00
}
2023-11-06 00:50:29 -07:00
node . SignalQueue = signal_queue
node . NextSignal , node . TimeoutChan = SoonestSignal ( signal_queue )
2023-09-02 17:30:52 -06:00
ctx . AddNode ( id , node )
2023-11-06 01:28:17 -07:00
ctx . Log . Logf ( "db" , "loaded %+v" , node )
2023-07-27 23:15:58 -06:00
go runNode ( ctx , node )
2023-07-27 15:27:14 -06:00
2023-09-12 19:00:48 -06:00
return node , nil
2023-07-09 14:30:30 -06:00
}