From 41f28a20179a44372b640ccd67907c27f47e665e Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Tue, 20 Jun 2023 16:35:16 -0600 Subject: [PATCH] Made logging configurable --- event.go | 42 +++++++++++++++++++++--------------------- gql.go | 50 +++++++++++++++++++++++++------------------------- graph.go | 39 +++++++++++++++------------------------ resource.go | 4 ++-- 4 files changed, 63 insertions(+), 72 deletions(-) diff --git a/event.go b/event.go index 8115ba0..ab5bc0e 100644 --- a/event.go +++ b/event.go @@ -231,33 +231,33 @@ func LinkEvent(event Event, child Event, info EventInfo) error { } func StartRootEvent(event Event) error { - log.Logf("event", "ROOT_EVEN_START") + Log.Logf("event", "ROOT_EVEN_START") err := LockResources(event) if err != nil { - log.Logf("event", "ROOT_EVENT_LOCK_ERR: %s", err) + Log.Logf("event", "ROOT_EVENT_LOCK_ERR: %s", err) return err } err = RunEvent(event) if err != nil { - log.Logf("event", "ROOT_EVENT_RUNE_ERR: %s", err) + Log.Logf("event", "ROOT_EVENT_RUNE_ERR: %s", err) return err } err = FinishEvent(event) if err != nil { - log.Logf("event", "ROOT_EVENT_FINISH_ERR: %s", err) + Log.Logf("event", "ROOT_EVENT_FINISH_ERR: %s", err) return err } - log.Logf("event", "ROOT_EVENT_DONE") + Log.Logf("event", "ROOT_EVENT_DONE") return nil } func RunEvent(event Event) error { - log.Logf("event", "EVENT_RUN: %s", event.Name()) + Log.Logf("event", "EVENT_RUN: %s", event.Name()) SendUpdate(event, NewSignal(event, "event_start")) next_action := "start" var err error = nil @@ -268,14 +268,14 @@ func RunEvent(event Event) error { return errors.New(error_str) } - log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action) + Log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action) next_action, err = action() if err != nil { return err } } - log.Logf("event", "EVENT_RUN_DONE: %s", event.Name()) + Log.Logf("event", "EVENT_RUN_DONE: %s", event.Name()) return nil } @@ -293,7 +293,7 @@ func EventCancel(event Event) func(signal GraphSignal) (string, error) { } func LockResources(event Event) error { - log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources()) + Log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources()) locked_resources := []Resource{} var lock_err error = nil for _, resource := range(event.Resources()) { @@ -309,11 +309,11 @@ func LockResources(event Event) error { for _, resource := range(locked_resources) { UnlockResource(resource, event) } - log.Logf("event", "RESOURCE_LOCK_FAIL for %s: %s", event.Name(), lock_err) + Log.Logf("event", "RESOURCE_LOCK_FAIL for %s: %s", event.Name(), lock_err) return lock_err } - log.Logf("event", "RESOURCE_LOCK_SUCCESS for %s", event.Name()) + Log.Logf("event", "RESOURCE_LOCK_SUCCESS for %s", event.Name()) signal := NewDownSignal(event, "locked") SendUpdate(event, signal) @@ -321,7 +321,7 @@ func LockResources(event Event) error { } func FinishEvent(event Event) error { - log.Logf("event", "EVENT_FINISH: %s", event.Name()) + Log.Logf("event", "EVENT_FINISH: %s", event.Name()) for _, resource := range(event.Resources()) { err := UnlockResource(resource, event) if err != nil { @@ -378,18 +378,18 @@ func (event * BaseEvent) Action(action string) (func() (string, error), bool) { func EventWait(event Event) (func() (string, error)) { return func() (string, error) { - log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout()) + Log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout()) select { case signal := <- event.Signal(): - log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal) + Log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal) signal_fn, exists := event.Handler(signal.Type()) if exists == true { - log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) + Log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type()) return signal_fn(signal) } return "wait", nil case <- event.Timeout(): - log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction()) + Log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction()) return event.TimeoutAction(), nil } } @@ -514,15 +514,15 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve err := LockResources(event) // start in new goroutine if err != nil { - //log.Logf("event", "Failed to lock %s: %s", event.Name(), err) + //Log.Logf("event", "Failed to lock %s: %s", event.Name(), err) } else { info.state = "running" - log.Logf("event", "EVENT_START: %s", event.Name()) + Log.Logf("event", "EVENT_START: %s", event.Name()) go func(event Event, info * EventQueueInfo, queue Event) { - log.Logf("event", "EVENT_GOROUTINE: %s", event.Name()) + Log.Logf("event", "EVENT_GOROUTINE: %s", event.Name()) err := RunEvent(event) if err != nil { - log.Logf("event", "EVENT_ERROR: %s", err) + Log.Logf("event", "EVENT_ERROR: %s", err) } info.state = "done" FinishEvent(event) @@ -535,7 +535,7 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve for _, resource := range(needed_resources) { _, exists := queue.listened_resources[resource.ID()] if exists == false { - log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name()) + Log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name()) queue.listened_resources[resource.ID()] = resource resource.RegisterChannel(queue.signal) } diff --git a/gql.go b/gql.go index fe1e79e..15801a5 100644 --- a/gql.go +++ b/gql.go @@ -204,17 +204,17 @@ func enableCORS(w *http.ResponseWriter) { func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r * http.Request) { - log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr) + Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr) enableCORS(&w) header_map := map[string]interface{}{} for header, value := range(r.Header) { header_map[header] = value } - log.Logm("gql", header_map, "REQUEST_HEADERS") + Log.Logm("gql", header_map, "REQUEST_HEADERS") str, err := io.ReadAll(r.Body) if err != nil { - log.Logf("gql", "failed to read request body: %s", err) + Log.Logf("gql", "failed to read request body: %s", err) return } query := GQLWSPayload{} @@ -236,7 +236,7 @@ func GQLHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWr extra_fields := map[string]interface{}{} extra_fields["body"] = string(str) extra_fields["headers"] = r.Header - log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) + Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors) } json.NewEncoder(w).Encode(result) } @@ -279,7 +279,7 @@ func getOperationTypeOfReq(p graphql.Params) string{ func GQLWSDo(p graphql.Params) chan *graphql.Result { operation := getOperationTypeOfReq(p) - log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) + Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString) if operation == ast.OperationTypeSubscription { return graphql.Subscribe(p) @@ -291,15 +291,15 @@ func GQLWSDo(p graphql.Params) chan *graphql.Result { func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r * http.Request) { - log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) + Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr) header_map := map[string]interface{}{} for header, value := range(r.Header) { header_map[header] = value } - log.Logm("gql", header_map, "REQUEST_HEADERS") + Log.Logm("gql", header_map, "REQUEST_HEADERS") u := ws.HTTPUpgrader{ Protocol: func(protocol string) bool { - log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) + Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol)) return string(protocol) == "graphql-transport-ws" }, } @@ -310,32 +310,32 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response for { // TODO: Make this a select between reading client data and getting updates from the event to push to clients" msg_raw, op, err := wsutil.ReadClientData(conn) - log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) + Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err) msg := GQLWSMsg{} json.Unmarshal(msg_raw, &msg) if err != nil { - log.Logf("gqlws", "WS_CLIENT_ERROR") + Log.Logf("gqlws", "WS_CLIENT_ERROR") break } if msg.Type == "connection_init" { if conn_state != "init" { - log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) + Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state) break } conn_state = "ready" err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}")) if err != nil { - log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") + Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack") break } } else if msg.Type == "ping" { - log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr) + Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr) err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}")) if err != nil { - log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") + Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG") } } else if msg.Type == "subscribe" { - log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) + Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload) params := graphql.Params{ Schema: schema, Context: ctx, @@ -354,23 +354,23 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response for { next, ok := <-res_chan if ok == false { - log.Logf("gqlws", "response channel was closed") + Log.Logf("gqlws", "response channel was closed") return } if next == nil { - log.Logf("gqlws", "NIL_ON_CHANNEL") + Log.Logf("gqlws", "NIL_ON_CHANNEL") return } if len(next.Errors) > 0 { extra_fields := map[string]interface{}{} extra_fields["query"] = string(msg.Payload.Query) - log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors) + Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors) continue } - log.Logf("gqlws", "DATA: %+v", next.Data) + Log.Logf("gqlws", "DATA: %+v", next.Data) data, err := json.Marshal(next.Data) if err != nil { - log.Logf("gqlws", "ERROR: %+v", err) + Log.Logf("gqlws", "ERROR: %+v", err) continue } msg, err := json.Marshal(GQLWSMsg{ @@ -381,13 +381,13 @@ func GQLWSHandler(schema graphql.Schema, ctx context.Context) func(http.Response }, }) if err != nil { - log.Logf("gqlws", "ERROR: %+v", err) + Log.Logf("gqlws", "ERROR: %+v", err) continue } err = wsutil.WriteServerMessage(conn, 1, msg) if err != nil { - log.Logf("gqlws", "ERROR: %+v", err) + Log.Logf("gqlws", "ERROR: %+v", err) continue } } @@ -871,7 +871,7 @@ func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object } go func() { - log.Logf("gql", "GOROUTINE_START for %s", server.ID()) + Log.Logf("gql", "GOROUTINE_START for %s", server.ID()) mux := http.NewServeMux() http_handler, ws_handler := MakeGQLHandlers(server) @@ -907,7 +907,7 @@ func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object http_done.Wait() break } - log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal) + Log.Logf("gql", "GOROUTINE_SIGNAL for %s: %+v", server.ID(), signal) // Take signals to resource and send to GQL subscriptions } } @@ -943,7 +943,7 @@ func GQLSubscribeFn(p graphql.ResolveParams, fn func(GraphSignal, graphql.Resolv } ret, err := fn(val, p) if err != nil { - log.Logf("gqlws", "type convertor error %s", err) + Log.Logf("gqlws", "type convertor error %s", err) return } c <- ret diff --git a/graph.go b/graph.go index 6012b41..6c387bf 100644 --- a/graph.go +++ b/graph.go @@ -7,7 +7,6 @@ import ( "os" "github.com/rs/zerolog" "fmt" - "io" ) type Logger interface { @@ -17,29 +16,17 @@ type Logger interface { } type DefaultLogger struct { - init bool init_lock sync.Mutex Loggers map[string]zerolog.Logger Components []string } -var log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{"event"}} +var Log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{}} func (logger * DefaultLogger) Init(components []string) error { logger.init_lock.Lock() defer logger.init_lock.Unlock() - if logger.init == true { - return nil - } - - logger.init = true - - file, err := os.OpenFile("test.log", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - component_enabled := func (component string) bool { for _, c := range(components) { if c == component { @@ -49,17 +36,22 @@ func (logger * DefaultLogger) Init(components []string) error { return false } - writer := io.MultiWriter(file, os.Stdout) - for _, c := range(logger.Components) { - if component_enabled(c) == true { - logger.Loggers[c] = zerolog.New(writer).With().Timestamp().Str("component", c).Logger() + for c, _ := range(logger.Loggers) { + if component_enabled(c) == false { + delete(logger.Loggers, c) + } + } + + for _, c := range(components) { + _, exists := logger.Loggers[c] + if component_enabled(c) == true && exists == false { + logger.Loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger() } } return nil } func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { - logger.Init(logger.Components) l, exists := logger.Loggers[component] if exists == true { log := l.Log() @@ -71,7 +63,6 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface } func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { - logger.Init(logger.Components) l, exists := logger.Loggers[component] if exists == true { l.Log().Msg(fmt.Sprintf(format, items...)) @@ -161,7 +152,7 @@ func NewBaseNode(name string, description string, id string) BaseNode { signal: make(chan GraphSignal, 512), listeners: map[chan GraphSignal]chan GraphSignal{}, } - log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) + Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) return node } @@ -227,11 +218,11 @@ func (node * BaseNode) UpdateListeners(update GraphSignal) { closed := []chan GraphSignal{} for _, listener := range node.listeners { - log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener) + Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener) select { case listener <- update: default: - log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener) + Log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener) go func(node GraphNode, listener chan GraphSignal) { listener <- NewSignal(node, "listener_closed") close(listener) @@ -255,7 +246,7 @@ func SendUpdate(node GraphNode, signal GraphSignal) { if node != nil { node_name = node.Name() } - log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal) + Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal) node.UpdateListeners(signal) node.PropagateUpdate(signal) } diff --git a/resource.go b/resource.go index 22b1f8c..1bbd3c9 100644 --- a/resource.go +++ b/resource.go @@ -34,7 +34,7 @@ func (resource * BaseResource) PropagateUpdate(signal GraphSignal) { } // Resource is the interface that DAG nodes are made from -// A resource needs to be able to represent logical entities and connections to physical entities. +// A resource needs to be able to represent Logical entities and connections to physical entities. // A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted // The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background) type Resource interface { @@ -166,7 +166,7 @@ func LockResource(resource Resource, node GraphNode) error { return fmt.Errorf("Resource failed to lock: %s", lock_err) } - log.Logf("resource", "Locked %s", resource.Name()) + Log.Logf("resource", "Locked %s", resource.Name()) resource.SetOwner(node) return nil