graphvent/graph.go

293 lines
6.9 KiB
Go

2023-06-18 18:33:17 -06:00
package graphvent
import (
"sync"
"github.com/google/uuid"
2023-06-03 01:38:35 -06:00
"time"
"os"
"github.com/rs/zerolog"
"fmt"
2023-06-22 15:50:42 -06:00
"encoding/json"
)
type Logger interface {
Init() error
Logf(component string, format string, items ... interface{})
Logm(component string, fields map[string]interface{}, format string, items ... interface{})
}
type DefaultLogger struct {
init_lock sync.Mutex
2023-06-18 18:54:06 -06:00
Loggers map[string]zerolog.Logger
Components []string
}
2023-06-20 16:35:16 -06:00
var Log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{}}
func (logger * DefaultLogger) Init(components []string) error {
logger.init_lock.Lock()
defer logger.init_lock.Unlock()
component_enabled := func (component string) bool {
for _, c := range(components) {
if c == component {
return true
}
}
return false
}
2023-06-20 16:35:16 -06:00
for c, _ := range(logger.Loggers) {
if component_enabled(c) == false {
delete(logger.Loggers, c)
}
}
for _, c := range(components) {
_, exists := logger.Loggers[c]
if component_enabled(c) == true && exists == false {
logger.Loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger()
}
}
return nil
}
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
2023-06-18 18:54:06 -06:00
l, exists := logger.Loggers[component]
if exists == true {
log := l.Log()
for key, value := range(fields) {
log = log.Str(key, fmt.Sprintf("%+v", value))
}
log.Msg(fmt.Sprintf(format, items...))
}
}
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
2023-06-18 18:54:06 -06:00
l, exists := logger.Loggers[component]
if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...))
}
}
// Generate a random graphql id
2023-05-31 00:37:51 -06:00
func randid() string{
uuid_str := uuid.New().String()
2023-05-31 00:37:51 -06:00
return uuid_str
}
type GraphSignal interface {
Downwards() bool
Source() string
Type() string
2023-06-04 17:23:49 -06:00
String() string
}
type BaseSignal struct {
downwards bool
source string
_type string
2023-06-04 17:23:49 -06:00
}
func (signal BaseSignal) Downwards() bool {
return signal.downwards
}
func (signal BaseSignal) Source() string {
return signal.source
}
func (signal BaseSignal) Type() string {
return signal._type
}
func (signal BaseSignal) String() string {
return fmt.Sprintf("{downwards: %t, source: %s, type: %s}", signal.downwards, signal.source, signal._type)
}
type TimeSignal struct {
BaseSignal
time time.Time
}
2023-06-19 15:03:17 -06:00
func NewBaseSignal(source GraphNode, _type string, downwards bool) BaseSignal {
source_id := ""
if source != nil {
source_id = source.ID()
}
signal := BaseSignal{
2023-06-19 15:03:17 -06:00
downwards: downwards,
source: source_id,
_type: _type,
}
return signal
}
2023-06-19 15:03:17 -06:00
func NewDownSignal(source GraphNode, _type string) BaseSignal {
return NewBaseSignal(source, _type, true)
}
2023-06-19 15:03:17 -06:00
func NewSignal(source GraphNode, _type string) BaseSignal {
return NewBaseSignal(source, _type, false)
}
2023-06-22 15:50:42 -06:00
type NodeState interface {
Name() string
Description() string
2023-06-22 15:50:42 -06:00
DelegationMap() map[string]GraphNode
}
type BaseNodeState struct {
name string
description string
delegation_map map[string]GraphNode
}
func (state * BaseNodeState) Name() string {
return state.name
}
func (state * BaseNodeState) Description() string {
return state.description
}
func (state * BaseNodeState) DelegationMap() map[string]GraphNode {
return state.delegation_map
}
// GraphNode is the interface common to both DAG nodes and Event tree nodes
type GraphNode interface {
State() NodeState
2023-05-31 00:37:51 -06:00
ID() string
UpdateListeners(update GraphSignal)
2023-06-18 19:14:07 -06:00
PropagateUpdate(update GraphSignal)
RegisterChannel(listener chan GraphSignal)
UnregisterChannel(listener chan GraphSignal)
2023-06-18 19:18:11 -06:00
SignalChannel() chan GraphSignal
}
2023-06-22 15:50:42 -06:00
func (node * BaseNode) StateLock() *sync.Mutex {
return &node.state_lock
}
func NewBaseNode(id string) BaseNode {
2023-06-03 18:56:14 -06:00
node := BaseNode{
id: id,
2023-06-04 17:23:49 -06:00
signal: make(chan GraphSignal, 512),
2023-06-03 18:56:14 -06:00
listeners: map[chan GraphSignal]chan GraphSignal{},
}
2023-06-22 15:50:42 -06:00
Log.Logf("graph", "NEW_NODE: %s", node.ID())
2023-06-03 18:56:14 -06:00
return node
}
// BaseNode is the most basic implementation of the GraphNode interface
// It is used to implement functions common to Events and Resources
type BaseNode struct {
2023-05-31 00:37:51 -06:00
id string
2023-06-22 15:50:42 -06:00
state NodeState
state_lock sync.RWMutex
signal chan GraphSignal
listeners_lock sync.Mutex
listeners map[chan GraphSignal]chan GraphSignal
}
2023-06-18 19:18:11 -06:00
func (node * BaseNode) SignalChannel() chan GraphSignal {
return node.signal
2023-06-18 19:16:11 -06:00
}
2023-06-22 15:50:42 -06:00
func (node * BaseNode) State() NodeState {
return node.state
}
2023-05-31 00:37:51 -06:00
func (node * BaseNode) ID() string {
return node.id
}
2023-06-22 15:50:42 -06:00
const listener_buffer = 100
func GetUpdateChannel(node * BaseNode) chan GraphSignal {
new_listener := make(chan GraphSignal, listener_buffer)
node.RegisterChannel(new_listener)
return new_listener
}
func (node * BaseNode) RegisterChannel(listener chan GraphSignal) {
node.listeners_lock.Lock()
_, exists := node.listeners[listener]
if exists == false {
node.listeners[listener] = listener
}
node.listeners_lock.Unlock()
}
func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) {
node.listeners_lock.Lock()
_, exists := node.listeners[listener]
if exists == false {
panic("Attempting to unregister non-registered listener")
} else {
delete(node.listeners, listener)
}
node.listeners_lock.Unlock()
}
2023-06-22 15:50:42 -06:00
func (node * BaseNode) PropagateUpdate(update GraphSignal) {
}
2023-06-22 15:50:42 -06:00
func (node * BaseNode) UpdateListeners(update GraphSignal) {
node.ListenersLock.Lock()
defer node.ListenersLock.Unlock()
closed := []chan GraphSignal
2023-06-22 15:50:42 -06:00
for _, listener := range node.Listeners() {
Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener)
select {
2023-06-22 15:50:42 -06:00
case listener <- signal:
default:
2023-06-22 15:50:42 -06:00
Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener)
go func(node GraphNode, listener chan GraphSignal) {
listener <- NewSignal(node, "listener_closed")
close(listener)
}(node, listener)
closed = append(closed, listener)
}
}
for _, listener := range(closed) {
delete(node.listeners, listener)
}
}
func SendUpdate(node GraphNode, signal GraphSignal) {
2023-06-04 17:23:49 -06:00
node_name := "nil"
if node != nil {
node_name = node.Name()
2023-06-03 01:38:35 -06:00
}
2023-06-20 16:35:16 -06:00
Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal)
2023-06-22 15:50:42 -06:00
node.ListenersLock.Lock()
defer node.ListenersLock.Unlock()
closed := []chan GraphSignal
for _, listener := range node.Listeners() {
Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener)
select {
case listener <- signal:
default:
Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener)
go func(node GraphNode, listener chan GraphSignal) {
listener <- NewSignal(node, "listener_closed")
close(listener)
}(node, listener)
closed = append(closed, listener)
}
}
for _, listener := range(closed) {
delete(node.listeners, listener)
}
2023-06-18 19:14:07 -06:00
node.PropagateUpdate(signal)
}