package graphvent import ( "sync" "github.com/google/uuid" "time" "os" "github.com/rs/zerolog" "fmt" ) type Logger interface { Init() error Logf(component string, format string, items ... interface{}) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) } type DefaultLogger struct { init_lock sync.Mutex Loggers map[string]zerolog.Logger Components []string } var Log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{}} func (logger * DefaultLogger) Init(components []string) error { logger.init_lock.Lock() defer logger.init_lock.Unlock() component_enabled := func (component string) bool { for _, c := range(components) { if c == component { return true } } return false } for c, _ := range(logger.Loggers) { if component_enabled(c) == false { delete(logger.Loggers, c) } } for _, c := range(components) { _, exists := logger.Loggers[c] if component_enabled(c) == true && exists == false { logger.Loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger() } } return nil } func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { l, exists := logger.Loggers[component] if exists == true { log := l.Log() for key, value := range(fields) { log = log.Str(key, fmt.Sprintf("%+v", value)) } log.Msg(fmt.Sprintf(format, items...)) } } func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { l, exists := logger.Loggers[component] if exists == true { l.Log().Msg(fmt.Sprintf(format, items...)) } } // Generate a random graphql id func randid() string{ uuid_str := uuid.New().String() return uuid_str } type GraphSignal interface { Downwards() bool Source() string Type() string String() string } type BaseSignal struct { downwards bool source string _type string } func (signal BaseSignal) Downwards() bool { return signal.downwards } func (signal BaseSignal) Source() string { return signal.source } func (signal BaseSignal) Type() string { return signal._type } func (signal BaseSignal) String() string { return fmt.Sprintf("{downwards: %t, source: %s, type: %s}", signal.downwards, signal.source, signal._type) } type TimeSignal struct { BaseSignal time time.Time } func NewBaseSignal(source GraphNode, _type string, downwards bool) BaseSignal { source_id := "" if source != nil { source_id = source.ID() } signal := BaseSignal{ downwards: downwards, source: source_id, _type: _type, } return signal } func NewDownSignal(source GraphNode, _type string) BaseSignal { return NewBaseSignal(source, _type, true) } func NewSignal(source GraphNode, _type string) BaseSignal { return NewBaseSignal(source, _type, false) } // GraphNode is the interface common to both DAG nodes and Event tree nodes type GraphNode interface { Name() string Description() string ID() string Allowed() []GraphNode Delegator(id string) GraphNode TakeLock(resource Resource) UpdateListeners(update GraphSignal) PropagateUpdate(update GraphSignal) RegisterChannel(listener chan GraphSignal) UnregisterChannel(listener chan GraphSignal) UpdateChannel() chan GraphSignal SignalChannel() chan GraphSignal } func NewBaseNode(name string, description string, id string) BaseNode { node := BaseNode{ name: name, description: description, id: id, signal: make(chan GraphSignal, 512), listeners: map[chan GraphSignal]chan GraphSignal{}, delegation_map: map[string]GraphNode{}, } Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) return node } // BaseNode is the most basic implementation of the GraphNode interface // It is used to implement functions common to Events and Resources type BaseNode struct { name string description string id string signal chan GraphSignal listeners_lock sync.Mutex listeners map[chan GraphSignal]chan GraphSignal delegation_map map[string]GraphNode } func (node * BaseNode) TakeLock(resource Resource) { _, exists := node.delegation_map[resource.ID()] if exists == true { panic("Trying to take a lock we already have") } node.delegation_map[resource.ID()] = resource.Owner() } func (node * BaseNode) Allowed() []GraphNode { return []GraphNode{} } func (node * BaseNode) Delegator(id string) GraphNode { last_owner, exists := node.delegation_map[id] if exists == false { panic("Trying to delegate a lock we don't own") } delete(node.delegation_map, id) return last_owner } func (node * BaseNode) SignalChannel() chan GraphSignal { return node.signal } func (node * BaseNode) Name() string { return node.name } func (node * BaseNode) Description() string { return node.description } func (node * BaseNode) ID() string { return node.id } // Create a new listener channel for the node, add it to the nodes listener list, and return the new channel const listener_buffer = 1000 func (node * BaseNode) UpdateChannel() chan GraphSignal { new_listener := make(chan GraphSignal, listener_buffer) node.RegisterChannel(new_listener) return new_listener } func (node * BaseNode) RegisterChannel(listener chan GraphSignal) { node.listeners_lock.Lock() _, exists := node.listeners[listener] if exists == false { node.listeners[listener] = listener } node.listeners_lock.Unlock() } func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) { node.listeners_lock.Lock() _, exists := node.listeners[listener] if exists == false { panic("Attempting to unregister non-registered listener") } else { delete(node.listeners, listener) } node.listeners_lock.Unlock() } // Send the update to listener channels func (node * BaseNode) UpdateListeners(update GraphSignal) { node.listeners_lock.Lock() closed := []chan GraphSignal{} for _, listener := range node.listeners { Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener) select { case listener <- update: default: Log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener) go func(node GraphNode, listener chan GraphSignal) { listener <- NewSignal(node, "listener_closed") close(listener) }(node, listener) closed = append(closed, listener) } } for _, listener := range(closed) { delete(node.listeners, listener) } node.listeners_lock.Unlock() } func (node * BaseNode) PropagateUpdate(signal GraphSignal) { } func SendUpdate(node GraphNode, signal GraphSignal) { node_name := "nil" if node != nil { node_name = node.Name() } Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal) node.UpdateListeners(signal) node.PropagateUpdate(signal) }