From c29981da20d71759ab3d62cb9f9c4131ef1ad819 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sun, 31 Mar 2024 17:02:30 -0700 Subject: [PATCH] Updated MessageQueue --- context.go | 18 +++++++++++------- message.go | 13 ++++++++----- node.go | 7 ++++++- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/context.go b/context.go index 41e5b0d..daf4517 100644 --- a/context.go +++ b/context.go @@ -61,7 +61,7 @@ type ExtensionFieldInfo struct { type ExtensionInfo struct { ExtType Type reflect.Type - Fields map[string]ExtensionFieldInfo + Fields map[Tag]ExtensionFieldInfo Data interface{} } @@ -328,13 +328,13 @@ func RegisterExtension[E any, T interface { *E; Extension}](ctx *Context, data i return fmt.Errorf("Cannot register extension %+v of type %+v, type already exists in context", reflect_type, ext_type) } - fields := map[string]ExtensionFieldInfo{} + fields := map[Tag]ExtensionFieldInfo{} for _, field := range(reflect.VisibleFields(reflect_type)) { gv_tag, tagged_gv := field.Tag.Lookup("gv") node_tag := field.Tag.Get("node") if tagged_gv { - fields[gv_tag] = ExtensionFieldInfo{ + fields[Tag(gv_tag)] = ExtensionFieldInfo{ Index: field.Index, Type: field.Type, NodeTag: node_tag, @@ -354,7 +354,7 @@ func RegisterExtension[E any, T interface { *E; Extension}](ctx *Context, data i type FieldMapping struct { Extension ExtType - Tag string + Tag Tag } func RegisterNodeInterface(ctx *Context, name string, fields map[string]graphql.Type) error { @@ -437,7 +437,6 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin }, } - ext_map := map[ExtType]bool{} for field_name, mapping := range(mappings) { _, duplicate := fields[field_name] if duplicate { @@ -449,7 +448,12 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin return fmt.Errorf("Cannot register node type %s, unknown extension %s", name, mapping.Extension) } - ext_map[mapping.Extension] = true + _, exists = reverse_fields[mapping.Extension] + if exists == false { + reverse_fields[mapping.Extension] = map[Tag]string{} + } + reverse_fields[mapping.Extension][mapping.Tag] = field_name + ext_field, exists := ext_info.Fields[mapping.Tag] if exists == false { @@ -496,7 +500,7 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin }) extensions := []ExtType{} - for ext_type := range(ext_map) { + for ext_type := range(reverse_fields) { extensions = append(extensions, ext_type) } diff --git a/message.go b/message.go index 9767c7d..dbf5557 100644 --- a/message.go +++ b/message.go @@ -6,8 +6,8 @@ type Message struct { } type MessageQueue struct { - out chan Message - in chan Message + out chan<- Message + in <-chan Message buffer []Message write_cursor int read_cursor int @@ -34,9 +34,12 @@ func (queue *MessageQueue) ProcessIncoming(message Message) { } func NewMessageQueue(initial int) (chan<- Message, <-chan Message) { + in := make(chan Message, 0) + out := make(chan Message, 0) + queue := MessageQueue{ - out: make(chan Message, 0), - in: make(chan Message, 0), + out: out, + in: in, buffer: make([]Message, initial), write_cursor: 0, read_cursor: 0, @@ -61,5 +64,5 @@ func NewMessageQueue(initial int) (chan<- Message, <-chan Message) { } }(&queue) - return queue.in, queue.out + return in, out } diff --git a/node.go b/node.go index 1a6b888..9cfd30f 100644 --- a/node.go +++ b/node.go @@ -85,6 +85,7 @@ type Node struct { // Channel for this node to receive messages from the Context SendChan chan<- Message RecvChan <-chan Message + // Channel for this node to process delayed signals TimeoutChan <-chan time.Time @@ -327,7 +328,10 @@ func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error } } } - node.QueueSignal(time.Time{}, NewStatusSignal(node.ID, fields)) + ctx.Log.Logf("changes", "Changes to queue from %+v: %+v", node_info.ReverseFields, fields) + if len(fields) > 0 { + node.QueueSignal(time.Time{}, NewStatusSignal(node.ID, fields)) + } return nil } } @@ -353,6 +357,7 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error { } if len(changes) != 0 { + ctx.Log.Logf("changes", "Changes to %s from %+v: %+v", node.ID, signal, changes) status_err := node.QueueChanges(ctx, changes) if status_err != nil { return status_err