graphvent/node.go

517 lines
14 KiB
Go

2023-07-09 14:30:30 -06:00
package graphvent
import (
2023-08-06 12:47:47 -06:00
"crypto/ed25519"
"crypto/rand"
2023-11-06 00:50:29 -07:00
"crypto/sha512"
"fmt"
"reflect"
"sync/atomic"
"time"
"sync"
2023-11-06 00:50:29 -07:00
2024-03-04 17:30:42 -07:00
_ "github.com/dgraph-io/badger/v3"
2023-11-06 00:50:29 -07:00
"github.com/google/uuid"
2023-07-09 14:30:30 -06:00
)
var (
// Base NodeID, used as a special value
ZeroUUID = uuid.UUID{}
ZeroID = NodeID(ZeroUUID)
)
// A NodeID uniquely identifies a Node
type NodeID uuid.UUID
func (id NodeID) MarshalBinary() ([]byte, error) {
return (uuid.UUID)(id).MarshalBinary()
}
2024-03-28 20:28:07 -06:00
func (id *NodeID) UnmarshalBinary(data []byte) error {
return (*uuid.UUID)(id).UnmarshalBinary(data)
}
func (id NodeID) String() string {
return (uuid.UUID)(id).String()
}
2023-08-11 16:00:36 -06:00
func IDFromBytes(bytes []byte) (NodeID, error) {
id, err := uuid.FromBytes(bytes)
return NodeID(id), err
2023-07-25 21:43:15 -06:00
}
func (id NodeID) MarshalText() ([]byte, error) {
return []byte(id.String()), nil
}
func (id *NodeID) UnmarshalText(text []byte) error {
parsed, err := ParseID(string(text))
*id = parsed
return err
}
// Parse an ID from a string
func ParseID(str string) (NodeID, error) {
id_uuid, err := uuid.Parse(str)
if err != nil {
return NodeID{}, err
}
return NodeID(id_uuid), nil
2023-07-09 14:30:30 -06:00
}
2023-07-10 22:31:43 -06:00
// Generate a random NodeID
2023-07-09 14:30:30 -06:00
func RandID() NodeID {
return NodeID(uuid.New())
2023-07-09 14:30:30 -06:00
}
// A QueuedSignal is a Signal that has been Queued to trigger at a set time
type QueuedSignal struct {
2023-09-12 19:40:06 -06:00
Signal `gv:"signal"`
time.Time `gv:"time"`
}
func (q QueuedSignal) String() string {
return fmt.Sprintf("%+v@%s", reflect.TypeOf(q.Signal), q.Time)
}
2024-03-23 02:21:27 -06:00
type WaitMap map[uuid.UUID]NodeID
type Queue[T any] struct {
out chan T
in chan T
buffer []T
resize sync.Mutex
}
func NewQueue[T any](initial int) *Queue[T] {
queue := Queue[T]{
out: make(chan T, 0),
in: make(chan T, 0),
buffer: make([]T, 0, initial),
}
go func(queue *Queue[T]) {
}(&queue)
go func(queue *Queue[T]) {
}(&queue)
return &queue
}
func (queue *Queue[T]) Put(value T) error {
return nil
}
func (queue *Queue[T]) Get(value T) error {
return nil
}
// Nodes represent a group of extensions that can be collectively addressed
2023-07-25 21:43:15 -06:00
type Node struct {
Key ed25519.PrivateKey `gv:"key"`
2023-07-25 21:43:15 -06:00
ID NodeID
Type NodeType `gv:"type"`
2024-03-30 15:42:06 -06:00
2023-11-06 00:50:29 -07:00
Extensions map[ExtType]Extension
2023-11-06 01:28:17 -07:00
// Channel for this node to receive messages from the Context
2024-03-04 17:30:42 -07:00
MsgChan chan RecvMsg
// Size of MsgChan
BufferSize uint32 `gv:"buffer_size"`
// Channel for this node to process delayed signals
TimeoutChan <-chan time.Time
Active atomic.Bool
2023-11-04 23:21:43 -06:00
writeSignalQueue bool
2023-11-06 00:50:29 -07:00
SignalQueue []QueuedSignal
NextSignal *QueuedSignal
}
func (node *Node) PostDeserialize(ctx *Context) error {
2023-11-06 00:50:29 -07:00
node.Extensions = map[ExtType]Extension{}
public := node.Key.Public().(ed25519.PublicKey)
node.ID = KeyID(public)
2024-03-04 17:30:42 -07:00
node.MsgChan = make(chan RecvMsg, node.BufferSize)
return nil
}
2023-08-10 23:43:10 -06:00
func (node *Node) QueueSignal(time time.Time, signal Signal) {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
}
2023-08-10 23:43:10 -06:00
func (node *Node) DequeueSignal(id uuid.UUID) error {
idx := -1
for i, q := range(node.SignalQueue) {
if q.Signal.ID() == id {
2023-08-10 23:43:10 -06:00
idx = i
break
}
}
if idx == -1 {
return fmt.Errorf("%s is not in SignalQueue", id)
}
node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1]
node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
2023-08-10 23:43:10 -06:00
return nil
}
func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) {
var soonest_signal *QueuedSignal
var soonest_time time.Time
2023-07-30 11:29:58 -06:00
for i, signal := range(signals) {
2023-07-30 11:25:03 -06:00
if signal.Time.Compare(soonest_time) == -1 || soonest_signal == nil {
2023-07-30 11:29:58 -06:00
soonest_signal = &signals[i]
soonest_time = signal.Time
}
}
if soonest_signal != nil {
2024-03-28 20:28:07 -06:00
if time.Now().Compare(soonest_time) == -1 {
return soonest_signal, time.After(time.Until(soonest_signal.Time))
} else {
c := make(chan time.Time, 1)
c <- soonest_time
return soonest_signal, c
}
} else {
return nil, nil
}
}
func runNode(ctx *Context, node *Node, status chan string, control chan string) {
ctx.Log.Logf("node", "RUN_START: %s", node.ID)
err := nodeLoop(ctx, node, status, control)
if err != nil {
2023-11-06 01:28:17 -07:00
ctx.Log.Logf("node", "%s runNode err %s", node.ID, err)
}
ctx.Log.Logf("node", "RUN_STOP: %s", node.ID)
}
type StringError string
func (err StringError) String() string {
return string(err)
}
func (err StringError) Error() string {
return err.String()
}
func (err StringError) MarshalBinary() ([]byte, error) {
return []byte(string(err)), nil
}
2024-03-03 15:45:45 -07:00
2024-03-28 20:28:07 -06:00
func (node *Node) ReadFields(ctx *Context, fields []string)map[string]any {
ctx.Log.Logf("read_field", "Reading %+v on %+v", fields, node.ID)
values := map[string]any{}
node_info := ctx.NodeTypes[node.Type]
for _, field_name := range(fields) {
field_info, mapped := node_info.Fields[field_name]
if mapped {
ext := node.Extensions[field_info.Extension]
values[field_name] = reflect.ValueOf(ext).Elem().FieldByIndex(field_info.Index).Interface()
} else {
values[field_name] = fmt.Errorf("NodeType %s has no field %s", node.Type, field_name)
}
}
2024-03-28 20:28:07 -06:00
return values
}
2023-08-11 16:00:36 -06:00
// Main Loop for nodes
func nodeLoop(ctx *Context, node *Node, status chan string, control chan string) error {
is_started := node.Active.CompareAndSwap(false, true)
if is_started == false {
return fmt.Errorf("%s is already started, will not start again", node.ID)
} else {
ctx.Log.Logf("node", "Set %s active", node.ID)
}
2024-03-28 20:28:07 -06:00
ctx.Log.Logf("node_ext", "Loading extensions for %s", node.ID)
for _, extension := range(node.Extensions) {
2024-03-28 20:28:07 -06:00
ctx.Log.Logf("node_ext", "Loading extension %s for %s", reflect.TypeOf(extension), node.ID)
err := extension.Load(ctx, node)
if err != nil {
2024-03-28 20:28:07 -06:00
ctx.Log.Logf("node_ext", "Failed to load extension %s on node %s", reflect.TypeOf(extension), node.ID)
node.Active.Store(false)
return err
2024-03-28 20:28:07 -06:00
} else {
ctx.Log.Logf("node_ext", "Loaded extension %s on node %s", reflect.TypeOf(extension), node.ID)
}
}
2024-03-28 20:28:07 -06:00
ctx.Log.Logf("node_ext", "Loaded extensions for %s", node.ID)
status <- "active"
running := true
for running {
var signal Signal
var source NodeID
select {
case command := <-control:
switch command {
case "stop":
running = false
case "pause":
status <- "paused"
command := <- control
switch command {
case "resume":
status <- "resumed"
case "stop":
running = false
}
default:
ctx.Log.Logf("node", "Unknown control command %s", command)
}
case <-node.TimeoutChan:
signal = node.NextSignal.Signal
source = node.ID
2023-07-30 11:07:41 -06:00
t := node.NextSignal.Time
2023-07-30 01:29:15 -06:00
i := -1
for j, queued := range(node.SignalQueue) {
if queued.Signal.ID() == node.NextSignal.Signal.ID() {
2023-07-30 01:29:15 -06:00
i = j
break
}
}
if i == -1 {
2023-11-06 01:28:17 -07:00
ctx.Log.Logf("node", "node.NextSignal not in node.SignalQueue, paniccing")
2023-07-30 01:29:15 -06:00
panic("node.NextSignal not in node.SignalQueue")
}
l := len(node.SignalQueue)
node.SignalQueue[i] = node.SignalQueue[l-1]
node.SignalQueue = node.SignalQueue[:(l-1)]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
2023-11-04 23:21:43 -06:00
node.writeSignalQueue = true
2023-07-30 11:02:22 -06:00
if node.NextSignal == nil {
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil@%+v", node.ID, signal, t, node.TimeoutChan)
2023-07-30 11:02:22 -06:00
} else {
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, signal, t, node.NextSignal, node.NextSignal.Time)
2023-07-30 11:02:22 -06:00
}
2024-03-28 20:28:07 -06:00
case msg := <- node.MsgChan:
signal = msg.Signal
source = msg.Source
}
2023-08-10 23:43:10 -06:00
ctx.Log.Logf("node", "NODE_SIGNAL_QUEUE[%s]: %+v", node.ID, node.SignalQueue)
switch sig := signal.(type) {
case *ReadSignal:
2024-03-28 20:28:07 -06:00
result := node.ReadFields(ctx, sig.Fields)
2024-03-04 17:30:42 -07:00
msgs := []SendMsg{}
msgs = append(msgs, SendMsg{source, NewReadResultSignal(sig.ID(), node.ID, node.Type, result)})
ctx.Send(node, msgs)
default:
2023-10-07 23:00:07 -06:00
err := node.Process(ctx, source, signal)
if err != nil {
2023-11-06 01:28:17 -07:00
ctx.Log.Logf("node", "%s process error %s", node.ID, err)
panic(err)
}
2023-07-31 16:37:32 -06:00
}
}
stopped := node.Active.CompareAndSwap(true, false)
if stopped == false {
panic("BAD_STATE: stopping already stopped node")
}
status <- "stopped"
return nil
}
func (node *Node) Unload(ctx *Context) error {
for _, extension := range(node.Extensions) {
extension.Unload(ctx, node)
}
return nil
}
func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error {
2024-03-28 20:28:07 -06:00
node_info, exists := ctx.NodeTypes[node.Type]
if exists == false {
return fmt.Errorf("Node type not in context, can't map changes to field names")
} else {
fields := []string{}
for ext_type, ext_changes := range(changes) {
ext_map, ext_mapped := node_info.ReverseFields[ext_type]
if ext_mapped {
for _, ext_tag := range(ext_changes) {
field_name, tag_mapped := ext_map[ext_tag]
if tag_mapped {
fields = append(fields, field_name)
}
}
}
}
node.QueueSignal(time.Time{}, NewStatusSignal(node.ID, fields))
return nil
}
2023-10-07 23:00:07 -06:00
}
func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
ctx.Log.Logf("node_process", "PROCESSING MESSAGE: %s - %+v", node.ID, signal)
2024-03-04 17:30:42 -07:00
messages := []SendMsg{}
changes := map[ExtType]Changes{}
for ext_type, ext := range(node.Extensions) {
ctx.Log.Logf("node_process", "PROCESSING_EXTENSION: %s/%s", node.ID, ext_type)
2023-10-07 23:00:07 -06:00
ext_messages, ext_changes := ext.Process(ctx, node, source, signal)
if len(ext_messages) != 0 {
messages = append(messages, ext_messages...)
}
if len(ext_changes) != 0 {
changes[ext_type] = ext_changes
2024-03-21 14:13:54 -06:00
ctx.Log.Logf("changes", "Changes for %s ext[%+v] - %+v", node.ID, ext_type, ext_changes)
}
}
2023-11-11 14:52:08 -07:00
ctx.Log.Logf("changes", "Changes for %s after %+v - %+v", node.ID, reflect.TypeOf(signal), changes)
if len(messages) != 0 {
2024-03-04 17:30:42 -07:00
send_err := ctx.Send(node, messages)
2023-10-07 23:00:07 -06:00
if send_err != nil {
return send_err
}
}
2024-03-28 20:28:07 -06:00
if len(changes) != 0 {
status_err := node.QueueChanges(ctx, changes)
if status_err != nil {
return status_err
2023-10-07 23:00:07 -06:00
}
}
2023-10-07 23:00:07 -06:00
return nil
}
2024-03-03 15:45:45 -07:00
func GetCtx[C any, E any, T interface { *E; Extension}](ctx *Context) (C, error) {
2023-07-26 11:56:10 -06:00
var zero_ctx C
2024-03-03 15:45:45 -07:00
ext_type := ExtType(SerializedTypeFor[E]())
ext_info, ok := ctx.Extensions[ext_type]
if ok == false {
return zero_ctx, fmt.Errorf("%+v is not an extension in ctx", ext_type)
2023-07-26 11:56:10 -06:00
}
ext_ctx, ok := ext_info.Data.(C)
if ok == false {
return zero_ctx, fmt.Errorf("context for %+v is %+v, not %+v", ext_type, reflect.TypeOf(ext_info.Data), reflect.TypeOf(zero_ctx))
2023-07-26 11:56:10 -06:00
}
return ext_ctx, nil
}
2024-03-03 15:45:45 -07:00
func GetExt[E any, T interface { *E; Extension}](node *Node) (T, error) {
2023-07-25 21:43:15 -06:00
var zero T
2024-03-03 15:45:45 -07:00
ext_type := ExtType(SerializedTypeFor[E]())
ext, exists := node.Extensions[ext_type]
2023-07-25 21:43:15 -06:00
if exists == false {
return zero, fmt.Errorf("%+v does not have %+v extension - %+v", node.ID, ext_type, node.Extensions)
2023-07-24 16:04:56 -06:00
}
2023-07-24 01:41:47 -06:00
2023-07-25 21:43:15 -06:00
ret, ok := ext.(T)
if ok == false {
return zero, fmt.Errorf("%+v in %+v is wrong type(%+v), expecting %+v", ext_type, node.ID, reflect.TypeOf(ext), reflect.TypeOf(zero))
2023-07-25 21:43:15 -06:00
}
2023-07-20 23:19:10 -06:00
2023-07-25 21:43:15 -06:00
return ret, nil
2023-07-20 23:19:10 -06:00
}
2023-08-06 12:47:47 -06:00
func KeyID(pub ed25519.PublicKey) NodeID {
id := uuid.NewHash(sha512.New(), ZeroUUID, pub, 3)
return NodeID(id)
}
// Create a new node in memory and start it's event loop
2024-03-04 17:30:42 -07:00
func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size uint32, extensions ...Extension) (*Node, error) {
2024-03-28 20:28:07 -06:00
node_type := NodeTypeFor(type_name)
node_info, known_type := ctx.NodeTypes[node_type]
2024-03-03 15:45:45 -07:00
if known_type == false {
return nil, fmt.Errorf("%s is not a known node type", type_name)
}
var err error
2023-08-06 12:47:47 -06:00
var public ed25519.PublicKey
if key == nil {
2023-08-06 12:47:47 -06:00
public, key, err = ed25519.GenerateKey(rand.Reader)
if err != nil {
2023-08-31 22:31:29 -06:00
return nil, err
}
2023-08-06 12:47:47 -06:00
} else {
public = key.Public().(ed25519.PublicKey)
}
2023-08-06 12:47:47 -06:00
id := KeyID(public)
_, exists := ctx.Node(id)
if exists == true {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf("Attempted to create an existing node")
}
2023-07-27 11:33:11 -06:00
ext_map := map[ExtType]Extension{}
for _, ext := range(extensions) {
if ext == nil {
return nil, fmt.Errorf("Cannot create node with nil extension")
}
2024-03-28 20:28:07 -06:00
ext_type, exists := ctx.Extensions[ExtTypeOf(reflect.TypeOf(ext))]
if exists == false {
2024-03-28 20:28:07 -06:00
return nil, fmt.Errorf("%+v(%+v) is not a known Extension", reflect.TypeOf(ext), ExtTypeOf(reflect.TypeOf(ext)))
}
2024-03-08 00:22:51 -07:00
_, exists = ext_map[ext_type.ExtType]
2023-07-27 11:33:11 -06:00
if exists == true {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf("Cannot add the same extension to a node twice")
2023-07-27 11:33:11 -06:00
}
2024-03-08 00:22:51 -07:00
ext_map[ext_type.ExtType] = ext
2023-07-27 11:33:11 -06:00
}
2024-03-28 20:28:07 -06:00
for _, required_ext := range(node_info.RequiredExtensions) {
2023-07-27 11:33:11 -06:00
_, exists := ext_map[required_ext]
if exists == false {
2023-08-31 22:31:29 -06:00
return nil, fmt.Errorf(fmt.Sprintf("%+v requires %+v", node_type, required_ext))
2023-07-27 11:33:11 -06:00
}
}
node := &Node{
Key: key,
2023-07-25 21:43:15 -06:00
ID: id,
2024-03-28 20:28:07 -06:00
Type: node_type,
2023-07-27 11:33:11 -06:00
Extensions: ext_map,
2024-03-04 17:30:42 -07:00
MsgChan: make(chan RecvMsg, buffer_size),
BufferSize: buffer_size,
SignalQueue: []QueuedSignal{},
2024-03-08 00:22:51 -07:00
writeSignalQueue: false,
2023-07-25 21:43:15 -06:00
}
2023-08-11 16:00:36 -06:00
2024-03-30 15:42:06 -06:00
err = ctx.DB.WriteNodeInit(ctx, node)
if err != nil {
2023-08-31 22:31:29 -06:00
return nil, err
}
status := make(chan string, 0)
command := make(chan string, 0)
go runNode(ctx, node, status, command)
returned := <- status
if returned != "active" {
return nil, fmt.Errorf(returned)
}
2023-07-09 14:30:30 -06:00
ctx.AddNode(id, node, status, command)
return node, nil
2023-10-07 23:00:07 -06:00
}