Changed from a static channel queue to a dynamic queue for nodes

master
noah metz 2024-03-31 15:18:47 -07:00
parent 3eee736f97
commit 11e7df2bde
4 changed files with 107 additions and 55 deletions

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
@ -943,15 +942,7 @@ func (ctx *Context) Send(node *Node, messages []Message) error {
}
target, err := ctx.getNode(msg.Node)
if err == nil {
select {
case target.MsgChan <- Message{node.ID, msg.Signal}:
ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Node)
default:
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
stack_str := string(buf[:n])
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Node, stack_str)
}
target.SendChan <- Message{node.ID, msg.Signal}
} else if errors.Is(err, NodeNotFoundError) {
// TODO: Handle finding nodes in other contexts
return err

@ -1,6 +1,71 @@
package graphvent
import (
"fmt"
)
type Message struct {
Node NodeID
Signal Signal
}
type MessageQueue struct {
out chan Message
in chan Message
buffer []Message
write_cursor int
read_cursor int
}
func (queue *MessageQueue) ProcessIncoming(message Message) {
if (queue.write_cursor + 1) == queue.read_cursor || ((queue.write_cursor + 1) == len(queue.buffer) && queue.read_cursor == 0) {
fmt.Printf("Growing queue from %d to %d\n", len(queue.buffer), len(queue.buffer)*2)
new_buffer := make([]Message, len(queue.buffer) * 2)
copy(new_buffer, queue.buffer[queue.read_cursor:])
first_chunk := len(queue.buffer) - queue.read_cursor
copy(new_buffer[first_chunk:], queue.buffer[0:queue.write_cursor])
queue.write_cursor = len(queue.buffer) - 1
queue.read_cursor = 0
queue.buffer = new_buffer
}
queue.buffer[queue.write_cursor] = message
queue.write_cursor += 1
if queue.write_cursor >= len(queue.buffer) {
queue.write_cursor = 0
}
}
func NewMessageQueue(initial int) (chan<- Message, <-chan Message) {
queue := MessageQueue{
out: make(chan Message, 0),
in: make(chan Message, 0),
buffer: make([]Message, initial),
write_cursor: 0,
read_cursor: 0,
}
go func(queue *MessageQueue) {
for true {
if queue.write_cursor != queue.read_cursor {
select {
case incoming := <-queue.in:
queue.ProcessIncoming(incoming)
case queue.out <- queue.buffer[queue.read_cursor]:
queue.read_cursor += 1
if queue.read_cursor >= len(queue.buffer) {
queue.read_cursor = 0
}
}
} else {
message := <-queue.in
queue.ProcessIncoming(message)
}
}
}(&queue)
return queue.in, queue.out
}

@ -0,0 +1,35 @@
package graphvent
import (
"encoding/binary"
"testing"
)
func sendBatch(start, end uint64, in chan<- Message) {
for i := start; i <= end; i++ {
var id NodeID
binary.BigEndian.PutUint64(id[:], i)
in <- Message{id, nil}
}
}
func TestMessageQueue(t *testing.T) {
in, out := NewMessageQueue(10)
for i := uint64(0); i < 1000; i++ {
go sendBatch(1000*i, (1000*(i+1))-1, in)
}
seen := map[NodeID]any{}
for i := uint64(0); i < 1000*1000; i++ {
read := <-out
_, already_seen := seen[read.Node]
if already_seen {
t.Fatalf("Signal %d had duplicate NodeID %s", i, read.Node)
} else {
seen[read.Node] = nil
}
}
t.Logf("Processed 1M signals through queue")
}

@ -8,7 +8,6 @@ import (
"reflect"
"sync/atomic"
"time"
"sync"
_ "github.com/dgraph-io/badger/v3"
"github.com/google/uuid"
@ -73,43 +72,6 @@ func (q QueuedSignal) String() string {
type WaitMap map[uuid.UUID]NodeID
type Queue[T any] struct {
out chan T
in chan T
buffer []T
resize sync.Mutex
}
func NewQueue[T any](initial int) *Queue[T] {
queue := Queue[T]{
out: make(chan T, 0),
in: make(chan T, 0),
buffer: make([]T, 0, initial),
}
go func(queue *Queue[T]) {
if len(queue.buffer) == 0 {
select {
}
} else {
select {
}
}
}(&queue)
return &queue
}
func (queue *Queue[T]) Put(value T) error {
return nil
}
func (queue *Queue[T]) Get(value T) error {
return nil
}
// Nodes represent a group of extensions that can be collectively addressed
type Node struct {
Key ed25519.PrivateKey `gv:"key"`
@ -119,9 +81,8 @@ type Node struct {
Extensions map[ExtType]Extension
// Channel for this node to receive messages from the Context
MsgChan chan Message
// Size of MsgChan
BufferSize uint32 `gv:"buffer_size"`
SendChan chan<- Message
RecvChan <-chan Message
// Channel for this node to process delayed signals
TimeoutChan <-chan time.Time
@ -138,7 +99,7 @@ func (node *Node) PostDeserialize(ctx *Context) error {
public := node.Key.Public().(ed25519.PublicKey)
node.ID = KeyID(public)
node.MsgChan = make(chan Message, node.BufferSize)
node.SendChan, node.RecvChan = NewMessageQueue(1000)
return nil
}
@ -308,7 +269,7 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string)
} else {
ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, signal, t, node.NextSignal, node.NextSignal.Time)
}
case msg := <- node.MsgChan:
case msg := <- node.RecvChan:
signal = msg.Signal
source = msg.Node
@ -489,12 +450,12 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size
ID: id,
Type: node_type,
Extensions: ext_map,
MsgChan: make(chan Message, buffer_size),
BufferSize: buffer_size,
SignalQueue: []QueuedSignal{},
writeSignalQueue: false,
}
node.SendChan, node.RecvChan = NewMessageQueue(1000)
err = ctx.DB.WriteNodeInit(ctx, node)
if err != nil {
return nil, err