Made logging configurable

graph-rework
noah metz 2023-06-20 16:35:16 -06:00
parent b630c015df
commit 41f28a2017
4 changed files with 63 additions and 72 deletions

@ -231,33 +231,33 @@ func LinkEvent(event Event, child Event, info EventInfo) error {
} }
func StartRootEvent(event Event) error { func StartRootEvent(event Event) error {
log.Logf("event", "ROOT_EVEN_START") Log.Logf("event", "ROOT_EVEN_START")
err := LockResources(event) err := LockResources(event)
if err != nil { if err != nil {
log.Logf("event", "ROOT_EVENT_LOCK_ERR: %s", err) Log.Logf("event", "ROOT_EVENT_LOCK_ERR: %s", err)
return err return err
} }
err = RunEvent(event) err = RunEvent(event)
if err != nil { if err != nil {
log.Logf("event", "ROOT_EVENT_RUNE_ERR: %s", err) Log.Logf("event", "ROOT_EVENT_RUNE_ERR: %s", err)
return err return err
} }
err = FinishEvent(event) err = FinishEvent(event)
if err != nil { if err != nil {
log.Logf("event", "ROOT_EVENT_FINISH_ERR: %s", err) Log.Logf("event", "ROOT_EVENT_FINISH_ERR: %s", err)
return err return err
} }
log.Logf("event", "ROOT_EVENT_DONE") Log.Logf("event", "ROOT_EVENT_DONE")
return nil return nil
} }
func RunEvent(event Event) error { func RunEvent(event Event) error {
log.Logf("event", "EVENT_RUN: %s", event.Name()) Log.Logf("event", "EVENT_RUN: %s", event.Name())
SendUpdate(event, NewSignal(event, "event_start")) SendUpdate(event, NewSignal(event, "event_start"))
next_action := "start" next_action := "start"
var err error = nil var err error = nil
@ -268,14 +268,14 @@ func RunEvent(event Event) error {
return errors.New(error_str) return errors.New(error_str)
} }
log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action) Log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action)
next_action, err = action() next_action, err = action()
if err != nil { if err != nil {
return err return err
} }
} }
log.Logf("event", "EVENT_RUN_DONE: %s", event.Name()) Log.Logf("event", "EVENT_RUN_DONE: %s", event.Name())
return nil return nil
} }
@ -293,7 +293,7 @@ func EventCancel(event Event) func(signal GraphSignal) (string, error) {
} }
func LockResources(event Event) error { func LockResources(event Event) error {
log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources()) Log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources())
locked_resources := []Resource{} locked_resources := []Resource{}
var lock_err error = nil var lock_err error = nil
for _, resource := range(event.Resources()) { for _, resource := range(event.Resources()) {
@ -309,11 +309,11 @@ func LockResources(event Event) error {
for _, resource := range(locked_resources) { for _, resource := range(locked_resources) {
UnlockResource(resource, event) UnlockResource(resource, event)
} }
log.Logf("event", "RESOURCE_LOCK_FAIL for %s: %s", event.Name(), lock_err) Log.Logf("event", "RESOURCE_LOCK_FAIL for %s: %s", event.Name(), lock_err)
return lock_err return lock_err
} }
log.Logf("event", "RESOURCE_LOCK_SUCCESS for %s", event.Name()) Log.Logf("event", "RESOURCE_LOCK_SUCCESS for %s", event.Name())
signal := NewDownSignal(event, "locked") signal := NewDownSignal(event, "locked")
SendUpdate(event, signal) SendUpdate(event, signal)
@ -321,7 +321,7 @@ func LockResources(event Event) error {
} }
func FinishEvent(event Event) error { func FinishEvent(event Event) error {
log.Logf("event", "EVENT_FINISH: %s", event.Name()) Log.Logf("event", "EVENT_FINISH: %s", event.Name())
for _, resource := range(event.Resources()) { for _, resource := range(event.Resources()) {
err := UnlockResource(resource, event) err := UnlockResource(resource, event)
if err != nil { if err != nil {
@ -378,18 +378,18 @@ func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
func EventWait(event Event) (func() (string, error)) { func EventWait(event Event) (func() (string, error)) {
return func() (string, error) { return func() (string, error) {
log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout()) Log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout())
select { select {
case signal := <- event.Signal(): case signal := <- event.Signal():
log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal) Log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal)
signal_fn, exists := event.Handler(signal.Type()) signal_fn, exists := event.Handler(signal.Type())
if exists == true { if exists == true {
log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) Log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type())
return signal_fn(signal) return signal_fn(signal)
} }
return "wait", nil return "wait", nil
case <- event.Timeout(): case <- event.Timeout():
log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction()) Log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction())
return event.TimeoutAction(), nil return event.TimeoutAction(), nil
} }
} }
@ -514,15 +514,15 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve
err := LockResources(event) err := LockResources(event)
// start in new goroutine // start in new goroutine
if err != nil { if err != nil {
//log.Logf("event", "Failed to lock %s: %s", event.Name(), err) //Log.Logf("event", "Failed to lock %s: %s", event.Name(), err)
} else { } else {
info.state = "running" info.state = "running"
log.Logf("event", "EVENT_START: %s", event.Name()) Log.Logf("event", "EVENT_START: %s", event.Name())
go func(event Event, info * EventQueueInfo, queue Event) { go func(event Event, info * EventQueueInfo, queue Event) {
log.Logf("event", "EVENT_GOROUTINE: %s", event.Name()) Log.Logf("event", "EVENT_GOROUTINE: %s", event.Name())
err := RunEvent(event) err := RunEvent(event)
if err != nil { if err != nil {
log.Logf("event", "EVENT_ERROR: %s", err) Log.Logf("event", "EVENT_ERROR: %s", err)
} }
info.state = "done" info.state = "done"
FinishEvent(event) FinishEvent(event)
@ -535,7 +535,7 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve
for _, resource := range(needed_resources) { for _, resource := range(needed_resources) {
_, exists := queue.listened_resources[resource.ID()] _, exists := queue.listened_resources[resource.ID()]
if exists == false { if exists == false {
log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name()) Log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
queue.listened_resources[resource.ID()] = resource queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal) resource.RegisterChannel(queue.signal)
} }

@ -204,17 +204,17 @@ func enableCORS(w *http.ResponseWriter) {
func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) { return func(w http.ResponseWriter, r * http.Request) {
log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr) Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr)
enableCORS(&w) enableCORS(&w)
header_map := map[string]interface{}{} header_map := map[string]interface{}{}
for header, value := range(r.Header) { for header, value := range(r.Header) {
header_map[header] = value header_map[header] = value
} }
log.Logm("gql", header_map, "REQUEST_HEADERS") Log.Logm("gql", header_map, "REQUEST_HEADERS")
str, err := io.ReadAll(r.Body) str, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
log.Logf("gql", "failed to read request body: %s", err) Log.Logf("gql", "failed to read request body: %s", err)
return return
} }
query := GQLWSPayload{} query := GQLWSPayload{}
@ -236,7 +236,7 @@ func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWr
extra_fields := map[string]interface{}{} extra_fields := map[string]interface{}{}
extra_fields["body"] = string(str) extra_fields["body"] = string(str)
extra_fields["headers"] = r.Header extra_fields["headers"] = r.Header
log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
} }
json.NewEncoder(w).Encode(result) json.NewEncoder(w).Encode(result)
} }
@ -279,7 +279,7 @@ func getOperationTypeOfReq(p graphql.Params) string{
func GQLWSDo(p graphql.Params) chan *graphql.Result { func GQLWSDo(p graphql.Params) chan *graphql.Result {
operation := getOperationTypeOfReq(p) operation := getOperationTypeOfReq(p)
log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString)
if operation == ast.OperationTypeSubscription { if operation == ast.OperationTypeSubscription {
return graphql.Subscribe(p) return graphql.Subscribe(p)
@ -291,15 +291,15 @@ func GQLWSDo(p graphql.Params) chan *graphql.Result {
func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) { return func(w http.ResponseWriter, r * http.Request) {
log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
header_map := map[string]interface{}{} header_map := map[string]interface{}{}
for header, value := range(r.Header) { for header, value := range(r.Header) {
header_map[header] = value header_map[header] = value
} }
log.Logm("gql", header_map, "REQUEST_HEADERS") Log.Logm("gql", header_map, "REQUEST_HEADERS")
u := ws.HTTPUpgrader{ u := ws.HTTPUpgrader{
Protocol: func(protocol string) bool { Protocol: func(protocol string) bool {
log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
return string(protocol) == "graphql-transport-ws" return string(protocol) == "graphql-transport-ws"
}, },
} }
@ -310,32 +310,32 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response
for { for {
// TODO: Make this a select between reading client data and getting updates from the event to push to clients" // TODO: Make this a select between reading client data and getting updates from the event to push to clients"
msg_raw, op, err := wsutil.ReadClientData(conn) msg_raw, op, err := wsutil.ReadClientData(conn)
log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
msg := GQLWSMsg{} msg := GQLWSMsg{}
json.Unmarshal(msg_raw, &msg) json.Unmarshal(msg_raw, &msg)
if err != nil { if err != nil {
log.Logf("gqlws", "WS_CLIENT_ERROR") Log.Logf("gqlws", "WS_CLIENT_ERROR")
break break
} }
if msg.Type == "connection_init" { if msg.Type == "connection_init" {
if conn_state != "init" { if conn_state != "init" {
log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state)
break break
} }
conn_state = "ready" conn_state = "ready"
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}")) err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}"))
if err != nil { if err != nil {
log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack")
break break
} }
} else if msg.Type == "ping" { } else if msg.Type == "ping" {
log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr) Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr)
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}")) err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}"))
if err != nil { if err != nil {
log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG")
} }
} else if msg.Type == "subscribe" { } else if msg.Type == "subscribe" {
log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload)
params := graphql.Params{ params := graphql.Params{
Schema: schema, Schema: schema,
Context: ctx, Context: ctx,
@ -354,23 +354,23 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response
for { for {
next, ok := <-res_chan next, ok := <-res_chan
if ok == false { if ok == false {
log.Logf("gqlws", "response channel was closed") Log.Logf("gqlws", "response channel was closed")
return return
} }
if next == nil { if next == nil {
log.Logf("gqlws", "NIL_ON_CHANNEL") Log.Logf("gqlws", "NIL_ON_CHANNEL")
return return
} }
if len(next.Errors) > 0 { if len(next.Errors) > 0 {
extra_fields := map[string]interface{}{} extra_fields := map[string]interface{}{}
extra_fields["query"] = string(msg.Payload.Query) extra_fields["query"] = string(msg.Payload.Query)
log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors) Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors)
continue continue
} }
log.Logf("gqlws", "DATA: %+v", next.Data) Log.Logf("gqlws", "DATA: %+v", next.Data)
data, err := json.Marshal(next.Data) data, err := json.Marshal(next.Data)
if err != nil { if err != nil {
log.Logf("gqlws", "ERROR: %+v", err) Log.Logf("gqlws", "ERROR: %+v", err)
continue continue
} }
msg, err := json.Marshal(GQLWSMsg{ msg, err := json.Marshal(GQLWSMsg{
@ -381,13 +381,13 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response
}, },
}) })
if err != nil { if err != nil {
log.Logf("gqlws", "ERROR: %+v", err) Log.Logf("gqlws", "ERROR: %+v", err)
continue continue
} }
err = wsutil.WriteServerMessage(conn, 1, msg) err = wsutil.WriteServerMessage(conn, 1, msg)
if err != nil { if err != nil {
log.Logf("gqlws", "ERROR: %+v", err) Log.Logf("gqlws", "ERROR: %+v", err)
continue continue
} }
} }
@ -871,7 +871,7 @@ func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object
} }
go func() { go func() {
log.Logf("gql", "GOROUTINE_START for %s", server.ID()) Log.Logf("gql", "GOROUTINE_START for %s", server.ID())
mux := http.NewServeMux() mux := http.NewServeMux()
http_handler, ws_handler := MakeGQLHandlers(server) http_handler, ws_handler := MakeGQLHandlers(server)
@ -907,7 +907,7 @@ func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object
http_done.Wait() http_done.Wait()
break break
} }
log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal) Log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal)
// Take signals to resource and send to GQL subscriptions // Take signals to resource and send to GQL subscriptions
} }
} }
@ -943,7 +943,7 @@ func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.Resolv
} }
ret, err := fn(val, p) ret, err := fn(val, p)
if err != nil { if err != nil {
log.Logf("gqlws", "type convertor error %s", err) Log.Logf("gqlws", "type convertor error %s", err)
return return
} }
c <- ret c <- ret

@ -7,7 +7,6 @@ import (
"os" "os"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"fmt" "fmt"
"io"
) )
type Logger interface { type Logger interface {
@ -17,29 +16,17 @@ type Logger interface {
} }
type DefaultLogger struct { type DefaultLogger struct {
init bool
init_lock sync.Mutex init_lock sync.Mutex
Loggers map[string]zerolog.Logger Loggers map[string]zerolog.Logger
Components []string Components []string
} }
var log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{"event"}} var Log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{}}
func (logger * DefaultLogger) Init(components []string) error { func (logger * DefaultLogger) Init(components []string) error {
logger.init_lock.Lock() logger.init_lock.Lock()
defer logger.init_lock.Unlock() defer logger.init_lock.Unlock()
if logger.init == true {
return nil
}
logger.init = true
file, err := os.OpenFile("test.log", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
component_enabled := func (component string) bool { component_enabled := func (component string) bool {
for _, c := range(components) { for _, c := range(components) {
if c == component { if c == component {
@ -49,17 +36,22 @@ func (logger * DefaultLogger) Init(components []string) error {
return false return false
} }
writer := io.MultiWriter(file, os.Stdout) for c, _ := range(logger.Loggers) {
for _, c := range(logger.Components) { if component_enabled(c) == false {
if component_enabled(c) == true { delete(logger.Loggers, c)
logger.Loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger() }
}
for _, c := range(components) {
_, exists := logger.Loggers[c]
if component_enabled(c) == true && exists == false {
logger.Loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger()
} }
} }
return nil return nil
} }
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
logger.Init(logger.Components)
l, exists := logger.Loggers[component] l, exists := logger.Loggers[component]
if exists == true { if exists == true {
log := l.Log() log := l.Log()
@ -71,7 +63,6 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
} }
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) {
logger.Init(logger.Components)
l, exists := logger.Loggers[component] l, exists := logger.Loggers[component]
if exists == true { if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...)) l.Log().Msg(fmt.Sprintf(format, items...))
@ -161,7 +152,7 @@ func NewBaseNode(name string, description string, id string) BaseNode {
signal: make(chan GraphSignal, 512), signal: make(chan GraphSignal, 512),
listeners: map[chan GraphSignal]chan GraphSignal{}, listeners: map[chan GraphSignal]chan GraphSignal{},
} }
log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name())
return node return node
} }
@ -227,11 +218,11 @@ func (node * BaseNode) UpdateListeners(update GraphSignal) {
closed := []chan GraphSignal{} closed := []chan GraphSignal{}
for _, listener := range node.listeners { for _, listener := range node.listeners {
log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener) Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener)
select { select {
case listener <- update: case listener <- update:
default: default:
log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener) Log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener)
go func(node GraphNode, listener chan GraphSignal) { go func(node GraphNode, listener chan GraphSignal) {
listener <- NewSignal(node, "listener_closed") listener <- NewSignal(node, "listener_closed")
close(listener) close(listener)
@ -255,7 +246,7 @@ func SendUpdate(node GraphNode, signal GraphSignal) {
if node != nil { if node != nil {
node_name = node.Name() node_name = node.Name()
} }
log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal) Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal)
node.UpdateListeners(signal) node.UpdateListeners(signal)
node.PropagateUpdate(signal) node.PropagateUpdate(signal)
} }

@ -34,7 +34,7 @@ func (resource * BaseResource) PropagateUpdate(signal GraphSignal) {
} }
// Resource is the interface that DAG nodes are made from // Resource is the interface that DAG nodes are made from
// A resource needs to be able to represent logical entities and connections to physical entities. // A resource needs to be able to represent Logical entities and connections to physical entities.
// A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted // A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted
// The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background) // The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background)
type Resource interface { type Resource interface {
@ -166,7 +166,7 @@ func LockResource(resource Resource, node GraphNode) error {
return fmt.Errorf("Resource failed to lock: %s", lock_err) return fmt.Errorf("Resource failed to lock: %s", lock_err)
} }
log.Logf("resource", "Locked %s", resource.Name()) Log.Logf("resource", "Locked %s", resource.Name())
resource.SetOwner(node) resource.SetOwner(node)
return nil return nil