commit 737edd36bba39b690977a19134730e316d42ed22 Author: Noah Metz Date: Sat Dec 30 14:33:52 2023 -0700 initial commit diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bb074e8 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module git.metznet.ca/MetzNet/htmx-mqtt + +go 1.21.5 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect + nhooyr.io/websocket v1.8.10 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..65d0b37 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/main.go b/main.go new file mode 100644 index 0000000..fda972d --- /dev/null +++ b/main.go @@ -0,0 +1,189 @@ +package main + +import ( + "errors" + "time" + "net/http" + "os" + "os/signal" + "syscall" + "fmt" + "context" + "sync" + "slices" + + "nhooyr.io/websocket" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type MQTTHandler struct { + sync.Mutex + Template string + Channels []chan mqtt.Message +} + +func (handler *MQTTHandler) ProcessMessage(client mqtt.Client, message mqtt.Message) { + message.Ack() + handler.Lock() + defer handler.Unlock() + + remaining := make([]chan mqtt.Message, 0, len(handler.Channels)) + for _, channel := range(handler.Channels) { + select { + case channel <- message: + remaining = append(remaining, channel) + default: + os.Stderr.WriteString("Channel overflow\n") + } + } + + handler.Channels = remaining +} + +func (handler *MQTTHandler) AddChannel(channel chan mqtt.Message) func() { + handler.Lock() + defer handler.Unlock() + + handler.Channels = append(handler.Channels, channel) + return func() { + handler.Lock() + defer handler.Unlock() + + idx := slices.Index(handler.Channels, channel) + if idx < 0 { + return + } + + handler.Channels[idx] = handler.Channels[len(handler.Channels)-1] + handler.Channels = handler.Channels[:len(handler.Channels)-1] + } +} + +type MQTTHandlerClient struct { + mqtt.Client + Subscriptions map[*MQTTHandler]string + + SubscribeTimeout time.Duration +} + +func NewMQTTHandlerClient(broker string, username string, password string, id string) (*MQTTHandlerClient, error) { + opts := mqtt.NewClientOptions() + opts.AddBroker(broker) + opts.SetClientID(id) + opts.SetUsername(username) + opts.SetPassword(password) + opts.SetKeepAlive(2 * time.Second) + opts.SetPingTimeout(1 * time.Second) + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + return nil, token.Error() + } + + return &MQTTHandlerClient{ + Client: client, + Subscriptions: map[*MQTTHandler]string{}, + SubscribeTimeout: 1*time.Second, + }, nil +} + +func (client *MQTTHandlerClient) NewHandler(subscription string, template string) (*MQTTHandler, error) { + handler := &MQTTHandler{ + Template: template, + } + + sub_token := client.Subscribe(subscription, 0x00, handler.ProcessMessage) + + timeout := sub_token.WaitTimeout(client.SubscribeTimeout) + if timeout == false || sub_token.Error() != nil { + return nil, fmt.Errorf("Failed to subscribe to %s - %e", subscription, sub_token.Error()) + } + + client.Subscriptions[handler] = subscription + return handler, nil +} + +func (handler *MQTTHandler) Format(message mqtt.Message) []byte { + return []byte(fmt.Sprintf(handler.Template, message.Payload())) +} + +func (handler *MQTTHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + channel := make(chan mqtt.Message, 1) + remove_channel := handler.AddChannel(channel) + defer remove_channel() + + conn, err := websocket.Accept(w, r, nil) + if err != nil { + os.Stderr.WriteString(fmt.Sprintf("websocket accept error: %s\n", err)) + } + + defer conn.CloseNow() + ctx, cancel_func := context.WithCancel(context.Background()) + + go func(conn *websocket.Conn, cancel_func context.CancelFunc) { + for true { + msg_type, data, err := conn.Read(ctx) + if err != nil { + os.Stderr.WriteString(fmt.Sprintf("websocket error: %s\n", err)) + cancel_func() + break + } else { + os.Stderr.WriteString(fmt.Sprintf("websocket data(%s): %s\n", msg_type, string(data))) + } + } + }(conn, cancel_func) + + running := true + done := ctx.Done() + for running == true { + select { + case <- done: + os.Stderr.WriteString("websocket context done") + running = false + case message := <- channel: + os.Stderr.WriteString(fmt.Sprintf("websocket write: %s\n", message.Payload())) + err := conn.Write(ctx, websocket.MessageText, message.Payload()) + if err != nil { + os.Stderr.WriteString(fmt.Sprintf("websocket write error: %s\n", err)) + running = false + } + } + } +} + +func main() { + handler_client, err := NewMQTTHandlerClient("tcp://localhost:1883", "", "", "htmx") + if err != nil { + panic(err) + } + + handler_1, err := handler_client.NewHandler("test", "%s") + if err != nil { + panic(err) + } + + mux := http.NewServeMux() + mux.Handle("/", http.FileServer(http.Dir("./site"))) + mux.Handle("/ws", handler_1) + + server := &http.Server{ + Handler: mux, + Addr: ":8080", + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGTERM, syscall.SIGKILL) + go func(sigs chan os.Signal, server *http.Server) { + <- sigs + server.Close() + }(sigs, server) + + err = server.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) == true { + os.Stderr.WriteString("Server closed on signal\n") + } else if err != nil { + os.Stderr.WriteString(fmt.Sprintf("Server error: %s\n", err)) + } else { + os.Stderr.WriteString("Server closed\n") + } +} diff --git a/site/index.html b/site/index.html new file mode 100644 index 0000000..59abdeb --- /dev/null +++ b/site/index.html @@ -0,0 +1,19 @@ + + + + HTMX MQTT Test Page + + + + + + + + +

Test Page

+
+
+
+
+ + diff --git a/site/style.css b/site/style.css new file mode 100644 index 0000000..e69de29