Updated MessageQueue

master
noah metz 2024-03-31 17:02:30 -07:00
parent 810e17990c
commit c29981da20
3 changed files with 25 additions and 13 deletions

@ -61,7 +61,7 @@ type ExtensionFieldInfo struct {
type ExtensionInfo struct { type ExtensionInfo struct {
ExtType ExtType
Type reflect.Type Type reflect.Type
Fields map[string]ExtensionFieldInfo Fields map[Tag]ExtensionFieldInfo
Data interface{} Data interface{}
} }
@ -328,13 +328,13 @@ func RegisterExtension[E any, T interface { *E; Extension}](ctx *Context, data i
return fmt.Errorf("Cannot register extension %+v of type %+v, type already exists in context", reflect_type, ext_type) return fmt.Errorf("Cannot register extension %+v of type %+v, type already exists in context", reflect_type, ext_type)
} }
fields := map[string]ExtensionFieldInfo{} fields := map[Tag]ExtensionFieldInfo{}
for _, field := range(reflect.VisibleFields(reflect_type)) { for _, field := range(reflect.VisibleFields(reflect_type)) {
gv_tag, tagged_gv := field.Tag.Lookup("gv") gv_tag, tagged_gv := field.Tag.Lookup("gv")
node_tag := field.Tag.Get("node") node_tag := field.Tag.Get("node")
if tagged_gv { if tagged_gv {
fields[gv_tag] = ExtensionFieldInfo{ fields[Tag(gv_tag)] = ExtensionFieldInfo{
Index: field.Index, Index: field.Index,
Type: field.Type, Type: field.Type,
NodeTag: node_tag, NodeTag: node_tag,
@ -354,7 +354,7 @@ func RegisterExtension[E any, T interface { *E; Extension}](ctx *Context, data i
type FieldMapping struct { type FieldMapping struct {
Extension ExtType Extension ExtType
Tag string Tag Tag
} }
func RegisterNodeInterface(ctx *Context, name string, fields map[string]graphql.Type) error { func RegisterNodeInterface(ctx *Context, name string, fields map[string]graphql.Type) error {
@ -437,7 +437,6 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin
}, },
} }
ext_map := map[ExtType]bool{}
for field_name, mapping := range(mappings) { for field_name, mapping := range(mappings) {
_, duplicate := fields[field_name] _, duplicate := fields[field_name]
if duplicate { if duplicate {
@ -449,7 +448,12 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin
return fmt.Errorf("Cannot register node type %s, unknown extension %s", name, mapping.Extension) return fmt.Errorf("Cannot register node type %s, unknown extension %s", name, mapping.Extension)
} }
ext_map[mapping.Extension] = true _, exists = reverse_fields[mapping.Extension]
if exists == false {
reverse_fields[mapping.Extension] = map[Tag]string{}
}
reverse_fields[mapping.Extension][mapping.Tag] = field_name
ext_field, exists := ext_info.Fields[mapping.Tag] ext_field, exists := ext_info.Fields[mapping.Tag]
if exists == false { if exists == false {
@ -496,7 +500,7 @@ func RegisterNodeType(ctx *Context, name string, mappings map[string]FieldMappin
}) })
extensions := []ExtType{} extensions := []ExtType{}
for ext_type := range(ext_map) { for ext_type := range(reverse_fields) {
extensions = append(extensions, ext_type) extensions = append(extensions, ext_type)
} }

@ -6,8 +6,8 @@ type Message struct {
} }
type MessageQueue struct { type MessageQueue struct {
out chan Message out chan<- Message
in chan Message in <-chan Message
buffer []Message buffer []Message
write_cursor int write_cursor int
read_cursor int read_cursor int
@ -34,9 +34,12 @@ func (queue *MessageQueue) ProcessIncoming(message Message) {
} }
func NewMessageQueue(initial int) (chan<- Message, <-chan Message) { func NewMessageQueue(initial int) (chan<- Message, <-chan Message) {
in := make(chan Message, 0)
out := make(chan Message, 0)
queue := MessageQueue{ queue := MessageQueue{
out: make(chan Message, 0), out: out,
in: make(chan Message, 0), in: in,
buffer: make([]Message, initial), buffer: make([]Message, initial),
write_cursor: 0, write_cursor: 0,
read_cursor: 0, read_cursor: 0,
@ -61,5 +64,5 @@ func NewMessageQueue(initial int) (chan<- Message, <-chan Message) {
} }
}(&queue) }(&queue)
return queue.in, queue.out return in, out
} }

@ -85,6 +85,7 @@ type Node struct {
// Channel for this node to receive messages from the Context // Channel for this node to receive messages from the Context
SendChan chan<- Message SendChan chan<- Message
RecvChan <-chan Message RecvChan <-chan Message
// Channel for this node to process delayed signals // Channel for this node to process delayed signals
TimeoutChan <-chan time.Time TimeoutChan <-chan time.Time
@ -327,7 +328,10 @@ func (node *Node) QueueChanges(ctx *Context, changes map[ExtType]Changes) error
} }
} }
} }
node.QueueSignal(time.Time{}, NewStatusSignal(node.ID, fields)) ctx.Log.Logf("changes", "Changes to queue from %+v: %+v", node_info.ReverseFields, fields)
if len(fields) > 0 {
node.QueueSignal(time.Time{}, NewStatusSignal(node.ID, fields))
}
return nil return nil
} }
} }
@ -353,6 +357,7 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) error {
} }
if len(changes) != 0 { if len(changes) != 0 {
ctx.Log.Logf("changes", "Changes to %s from %+v: %+v", node.ID, signal, changes)
status_err := node.QueueChanges(ctx, changes) status_err := node.QueueChanges(ctx, changes)
if status_err != nil { if status_err != nil {
return status_err return status_err