diff --git a/context.go b/context.go index 52a575f..41e5b0d 100644 --- a/context.go +++ b/context.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "reflect" - "runtime" "strconv" "strings" "sync" @@ -943,15 +942,7 @@ func (ctx *Context) Send(node *Node, messages []Message) error { } target, err := ctx.getNode(msg.Node) if err == nil { - select { - case target.MsgChan <- Message{node.ID, msg.Signal}: - ctx.Log.Logf("signal_sent", "Sent %s to %s", msg.Signal, msg.Node) - default: - buf := make([]byte, 4096) - n := runtime.Stack(buf, false) - stack_str := string(buf[:n]) - return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", msg.Node, stack_str) - } + target.SendChan <- Message{node.ID, msg.Signal} } else if errors.Is(err, NodeNotFoundError) { // TODO: Handle finding nodes in other contexts return err diff --git a/message.go b/message.go index df7c5f6..9968567 100644 --- a/message.go +++ b/message.go @@ -1,6 +1,71 @@ package graphvent +import ( + "fmt" +) + type Message struct { Node NodeID Signal Signal } + +type MessageQueue struct { + out chan Message + in chan Message + buffer []Message + write_cursor int + read_cursor int +} + +func (queue *MessageQueue) ProcessIncoming(message Message) { + if (queue.write_cursor + 1) == queue.read_cursor || ((queue.write_cursor + 1) == len(queue.buffer) && queue.read_cursor == 0) { + fmt.Printf("Growing queue from %d to %d\n", len(queue.buffer), len(queue.buffer)*2) + + new_buffer := make([]Message, len(queue.buffer) * 2) + + copy(new_buffer, queue.buffer[queue.read_cursor:]) + first_chunk := len(queue.buffer) - queue.read_cursor + copy(new_buffer[first_chunk:], queue.buffer[0:queue.write_cursor]) + + queue.write_cursor = len(queue.buffer) - 1 + queue.read_cursor = 0 + queue.buffer = new_buffer + } + + queue.buffer[queue.write_cursor] = message + queue.write_cursor += 1 + if queue.write_cursor >= len(queue.buffer) { + queue.write_cursor = 0 + } +} + +func NewMessageQueue(initial int) (chan<- Message, <-chan Message) { + queue := MessageQueue{ + out: make(chan Message, 0), + in: make(chan Message, 0), + buffer: make([]Message, initial), + write_cursor: 0, + read_cursor: 0, + } + + go func(queue *MessageQueue) { + for true { + if queue.write_cursor != queue.read_cursor { + select { + case incoming := <-queue.in: + queue.ProcessIncoming(incoming) + case queue.out <- queue.buffer[queue.read_cursor]: + queue.read_cursor += 1 + if queue.read_cursor >= len(queue.buffer) { + queue.read_cursor = 0 + } + } + } else { + message := <-queue.in + queue.ProcessIncoming(message) + } + } + }(&queue) + + return queue.in, queue.out +} diff --git a/message_test.go b/message_test.go new file mode 100644 index 0000000..2b29bf9 --- /dev/null +++ b/message_test.go @@ -0,0 +1,35 @@ +package graphvent + +import ( + "encoding/binary" + "testing" +) + +func sendBatch(start, end uint64, in chan<- Message) { + for i := start; i <= end; i++ { + var id NodeID + binary.BigEndian.PutUint64(id[:], i) + in <- Message{id, nil} + } +} + +func TestMessageQueue(t *testing.T) { + in, out := NewMessageQueue(10) + + for i := uint64(0); i < 1000; i++ { + go sendBatch(1000*i, (1000*(i+1))-1, in) + } + + seen := map[NodeID]any{} + for i := uint64(0); i < 1000*1000; i++ { + read := <-out + _, already_seen := seen[read.Node] + if already_seen { + t.Fatalf("Signal %d had duplicate NodeID %s", i, read.Node) + } else { + seen[read.Node] = nil + } + } + + t.Logf("Processed 1M signals through queue") +} diff --git a/node.go b/node.go index 9c141a1..4f0a404 100644 --- a/node.go +++ b/node.go @@ -8,7 +8,6 @@ import ( "reflect" "sync/atomic" "time" - "sync" _ "github.com/dgraph-io/badger/v3" "github.com/google/uuid" @@ -73,43 +72,6 @@ func (q QueuedSignal) String() string { type WaitMap map[uuid.UUID]NodeID -type Queue[T any] struct { - out chan T - in chan T - buffer []T - resize sync.Mutex -} - -func NewQueue[T any](initial int) *Queue[T] { - queue := Queue[T]{ - out: make(chan T, 0), - in: make(chan T, 0), - buffer: make([]T, 0, initial), - } - - go func(queue *Queue[T]) { - if len(queue.buffer) == 0 { - select { - - } - } else { - select { - - } - } - }(&queue) - - return &queue -} - -func (queue *Queue[T]) Put(value T) error { - return nil -} - -func (queue *Queue[T]) Get(value T) error { - return nil -} - // Nodes represent a group of extensions that can be collectively addressed type Node struct { Key ed25519.PrivateKey `gv:"key"` @@ -119,9 +81,8 @@ type Node struct { Extensions map[ExtType]Extension // Channel for this node to receive messages from the Context - MsgChan chan Message - // Size of MsgChan - BufferSize uint32 `gv:"buffer_size"` + SendChan chan<- Message + RecvChan <-chan Message // Channel for this node to process delayed signals TimeoutChan <-chan time.Time @@ -138,7 +99,7 @@ func (node *Node) PostDeserialize(ctx *Context) error { public := node.Key.Public().(ed25519.PublicKey) node.ID = KeyID(public) - node.MsgChan = make(chan Message, node.BufferSize) + node.SendChan, node.RecvChan = NewMessageQueue(1000) return nil } @@ -308,7 +269,7 @@ func nodeLoop(ctx *Context, node *Node, status chan string, control chan string) } else { ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL: %s@%s", node.ID, signal, t, node.NextSignal, node.NextSignal.Time) } - case msg := <- node.MsgChan: + case msg := <- node.RecvChan: signal = msg.Signal source = msg.Node @@ -489,12 +450,12 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, type_name string, buffer_size ID: id, Type: node_type, Extensions: ext_map, - MsgChan: make(chan Message, buffer_size), - BufferSize: buffer_size, SignalQueue: []QueuedSignal{}, writeSignalQueue: false, } + node.SendChan, node.RecvChan = NewMessageQueue(1000) + err = ctx.DB.WriteNodeInit(ctx, node) if err != nil { return nil, err