From 508f42da34e09b50e939639565c58009360fd561 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 30 Dec 2023 16:30:23 -0700 Subject: [PATCH] Reorganized --- cmd/server/main.go | 46 +++++++++++++++++++++++ go.mod | 2 +- main.go => handler.go | 85 +++++++++++-------------------------------- 3 files changed, 69 insertions(+), 64 deletions(-) create mode 100644 cmd/server/main.go rename main.go => handler.go (59%) diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..1ad0032 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "git.metznet.ca/MetzNet/htmxmqtt" + "errors" + "os/signal" + "syscall" +) + + +func main() { + handler_client, err := NewMQTTHandlerClient("tcp://localhost:1883", "", "", "htmx") + if err != nil { + panic(err) + } + + handler_1, err := handler_client.NewHandler("test", PayloadFormatFunc(`

%s

`)) + if err != nil { + panic(err) + } + + mux := http.NewServeMux() + mux.Handle("/", http.FileServer(http.Dir("./site"))) + mux.Handle("/ws", handler_1) + + server := &http.Server{ + Handler: mux, + Addr: ":8080", + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGTERM, syscall.SIGKILL) + go func(sigs chan os.Signal, server *http.Server) { + <- sigs + server.Close() + }(sigs, server) + + err = server.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) == true { + os.Stderr.WriteString("Server closed on signal\n") + } else if err != nil { + os.Stderr.WriteString(fmt.Sprintf("Server error: %s\n", err)) + } else { + os.Stderr.WriteString("Server closed\n") + } +} diff --git a/go.mod b/go.mod index 722c0a4..ff362b7 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module git.metznet.ca/MetzNet/htmx_mqtt +module git.metznet.ca/MetzNet/htmxmqtt go 1.21.5 diff --git a/main.go b/handler.go similarity index 59% rename from main.go rename to handler.go index 4353160..ac83a36 100644 --- a/main.go +++ b/handler.go @@ -1,12 +1,9 @@ -package main +package htmxmqtt import ( - "errors" "time" "net/http" "os" - "os/signal" - "syscall" "fmt" "context" "sync" @@ -19,17 +16,17 @@ import ( type MQTTFormatFunc func(mqtt.Message) []byte type MQTTHandler struct { sync.Mutex - Format MQTTFormatFunc - Channels []chan mqtt.Message + format MQTTFormatFunc + channels []chan mqtt.Message } -func (handler *MQTTHandler) ProcessMessage(client mqtt.Client, message mqtt.Message) { +func (handler *MQTTHandler) processMessage(client mqtt.Client, message mqtt.Message) { message.Ack() handler.Lock() defer handler.Unlock() - remaining := make([]chan mqtt.Message, 0, len(handler.Channels)) - for _, channel := range(handler.Channels) { + remaining := make([]chan mqtt.Message, 0, len(handler.channels)) + for _, channel := range(handler.channels) { select { case channel <- message: remaining = append(remaining, channel) @@ -38,33 +35,32 @@ func (handler *MQTTHandler) ProcessMessage(client mqtt.Client, message mqtt.Mess } } - handler.Channels = remaining + handler.channels = remaining } -func (handler *MQTTHandler) AddChannel(channel chan mqtt.Message) func() { +func (handler *MQTTHandler) addChannel(channel chan mqtt.Message) func() { handler.Lock() defer handler.Unlock() - handler.Channels = append(handler.Channels, channel) + handler.channels = append(handler.channels, channel) return func() { handler.Lock() defer handler.Unlock() - idx := slices.Index(handler.Channels, channel) + idx := slices.Index(handler.channels, channel) if idx < 0 { return } - handler.Channels[idx] = handler.Channels[len(handler.Channels)-1] - handler.Channels = handler.Channels[:len(handler.Channels)-1] + handler.channels[idx] = handler.channels[len(handler.channels)-1] + handler.channels = handler.channels[:len(handler.channels)-1] } } type MQTTHandlerClient struct { mqtt.Client - Subscriptions map[*MQTTHandler]string - - SubscribeTimeout time.Duration + subscriptions map[*MQTTHandler]string + subscribeTimeout time.Duration } func NewMQTTHandlerClient(broker string, username string, password string, id string) (*MQTTHandlerClient, error) { @@ -83,30 +79,30 @@ func NewMQTTHandlerClient(broker string, username string, password string, id st return &MQTTHandlerClient{ Client: client, - Subscriptions: map[*MQTTHandler]string{}, - SubscribeTimeout: 1*time.Second, + subscriptions: map[*MQTTHandler]string{}, + subscribeTimeout: 1*time.Second, }, nil } func (client *MQTTHandlerClient) NewHandler(subscription string, format MQTTFormatFunc) (*MQTTHandler, error) { handler := &MQTTHandler{ - Format: format, + format: format, } - sub_token := client.Subscribe(subscription, 0x00, handler.ProcessMessage) + sub_token := client.Subscribe(subscription, 0x00, handler.processMessage) - timeout := sub_token.WaitTimeout(client.SubscribeTimeout) + timeout := sub_token.WaitTimeout(client.subscribeTimeout) if timeout == false || sub_token.Error() != nil { return nil, fmt.Errorf("Failed to subscribe to %s - %e", subscription, sub_token.Error()) } - client.Subscriptions[handler] = subscription + client.subscriptions[handler] = subscription return handler, nil } func (handler *MQTTHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { channel := make(chan mqtt.Message, 1) - remove_channel := handler.AddChannel(channel) + remove_channel := handler.addChannel(channel) defer remove_channel() conn, err := websocket.Accept(w, r, nil) @@ -138,7 +134,7 @@ func (handler *MQTTHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { os.Stderr.WriteString("websocket context done") running = false case message := <- channel: - text := handler.Format(message) + text := handler.format(message) os.Stderr.WriteString(fmt.Sprintf("websocket write: %s\n", text)) err := conn.Write(ctx, websocket.MessageText, text) if err != nil { @@ -154,40 +150,3 @@ func PayloadFormatFunc(template string) MQTTFormatFunc { return []byte(fmt.Sprintf(template, message.Payload())) } } - -func main() { - handler_client, err := NewMQTTHandlerClient("tcp://localhost:1883", "", "", "htmx") - if err != nil { - panic(err) - } - - handler_1, err := handler_client.NewHandler("test", PayloadFormatFunc(`

%s

`)) - if err != nil { - panic(err) - } - - mux := http.NewServeMux() - mux.Handle("/", http.FileServer(http.Dir("./site"))) - mux.Handle("/ws", handler_1) - - server := &http.Server{ - Handler: mux, - Addr: ":8080", - } - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGTERM, syscall.SIGKILL) - go func(sigs chan os.Signal, server *http.Server) { - <- sigs - server.Close() - }(sigs, server) - - err = server.ListenAndServe() - if errors.Is(err, http.ErrServerClosed) == true { - os.Stderr.WriteString("Server closed on signal\n") - } else if err != nil { - os.Stderr.WriteString(fmt.Sprintf("Server error: %s\n", err)) - } else { - os.Stderr.WriteString("Server closed\n") - } -}