graphvent/listener.go

67 lines
1.5 KiB
Go

package graphvent
import (
"reflect"
)
// A Listener extension provides a channel that can receive signals on a different thread
type ListenerExt struct {
Buffer int `gv:"buffer"`
Chan chan Signal
}
type LoadedSignal struct {
SignalHeader
}
func NewLoadedSignal() *LoadedSignal {
return &LoadedSignal{
SignalHeader: NewSignalHeader(),
}
}
type UnloadedSignal struct {
SignalHeader
}
func NewUnloadedSignal() *UnloadedSignal {
return &UnloadedSignal{
SignalHeader: NewSignalHeader(),
}
}
func (ext *ListenerExt) Load(ctx *Context, node *Node) error {
ext.Chan = make(chan Signal, ext.Buffer)
ext.Chan <- NewLoadedSignal()
return nil
}
func (ext *ListenerExt) Unload(ctx *Context, node *Node) {
ext.Chan <- NewUnloadedSignal()
close(ext.Chan)
}
// Create a new listener extension with a given buffer size
func NewListenerExt(buffer int) *ListenerExt {
return &ListenerExt{
Buffer: buffer,
Chan: make(chan Signal, buffer),
}
}
// Send the signal to the channel, logging an overflow if it occurs
func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) ([]Message, Changes) {
ctx.Log.Logf("listener", "%s - %+v", node.ID, reflect.TypeOf(signal))
ctx.Log.Logf("listener_debug", "%s->%s - %+v", source, node.ID, signal)
select {
case ext.Chan <- signal:
default:
ctx.Log.Logf("listener", "LISTENER_OVERFLOW: %s", node.ID)
}
switch sig := signal.(type) {
case *StatusSignal:
ctx.Log.Logf("listener_status", "%s - %+v", sig.Source, sig.Fields)
}
return nil, nil
}