From 90ce062b2e4ca4e219111edc7e29910a81149310 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Mon, 8 Apr 2024 17:23:55 -0600 Subject: [PATCH] Attempt at opus --- channel.go | 2 ++ cmd/client/main.go | 81 +++++++++++++++++++++++++++++++--------------- cmd/server/main.go | 5 +++ go.mod | 7 +++- go.sum | 13 ++++++++ server.go | 58 +++++++++++++++++++++++++++------ 6 files changed, 130 insertions(+), 36 deletions(-) diff --git a/channel.go b/channel.go index a0b806b..3afbb48 100644 --- a/channel.go +++ b/channel.go @@ -59,11 +59,13 @@ func(mode *RawMode) Process(session *Session, packet *Packet) []SendPacket { if slices.Contains(mode.Sessions, session.ID) == false { mode.Sessions = append(mode.Sessions, session.ID) } + case MODE_COMMAND_LEAVE: idx := slices.Index(mode.Sessions, session.ID) if idx != -1 { mode.Sessions = slices.Delete(mode.Sessions, idx, idx+1) } + case MODE_COMMAND_DATA: if slices.Contains(mode.Sessions, session.ID) { new_packet := &Packet{ diff --git a/cmd/client/main.go b/cmd/client/main.go index d15709c..e800ded 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -1,14 +1,22 @@ package main import ( + "encoding/binary" "fmt" "os" "git.metznet.ca/MetzNet/pnyx" - "github.com/gen2brain/malgo" + "github.com/gen2brain/malgo" + "github.com/hraban/opus" ) func main() { + decoders := map[pnyx.PeerID]*opus.Decoder{} + encoder, err := opus.NewEncoder(44100, 1, opus.AppVoIP) + if err != nil { + panic(err) + } + ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil) if err != nil { panic(err) @@ -17,7 +25,6 @@ func main() { defer ctx.Free() defer ctx.Uninit() - // Playback devices. infos, err := ctx.Devices(malgo.Playback) if err != nil { panic(err) @@ -59,20 +66,20 @@ func main() { } inDeviceConfig := malgo.DefaultDeviceConfig(malgo.Capture) - inDeviceConfig.Capture.Format = malgo.FormatF32 + inDeviceConfig.Capture.Format = malgo.FormatS16 inDeviceConfig.Capture.Channels = 1 inDeviceConfig.Capture.DeviceID = capture_device.ID.Pointer() - inDeviceConfig.SampleRate = capture_device.Formats[0].SampleRate - inDeviceConfig.PeriodSizeInFrames = 100 + inDeviceConfig.SampleRate = 44100 + inDeviceConfig.PeriodSizeInFrames = 882 inDeviceConfig.Alsa.NoMMap = 1 inDeviceConfig.Capture.ShareMode = malgo.Shared outDeviceConfig := malgo.DefaultDeviceConfig(malgo.Playback) - outDeviceConfig.Playback.Format = malgo.FormatF32 + outDeviceConfig.Playback.Format = malgo.FormatS16 outDeviceConfig.Playback.Channels = 1 outDeviceConfig.Playback.DeviceID = playback_device.ID.Pointer() - outDeviceConfig.SampleRate = playback_device.Formats[0].SampleRate - outDeviceConfig.PeriodSizeInFrames = 100 + outDeviceConfig.SampleRate = 44100 + outDeviceConfig.PeriodSizeInFrames = 882 outDeviceConfig.Alsa.NoMMap = 1 outDeviceConfig.Playback.ShareMode = malgo.Shared @@ -107,11 +114,19 @@ func main() { defer outDevice.Stop() onRecvFrames := func(output_samples []byte, input_samples []byte, framecount uint32) { + pcm := make([]int16, len(input_samples)/2) + for i := 0; i < len(input_samples)/2; i++ { + pcm[i] = int16(binary.BigEndian.Uint16(input_samples[2*i:])) + } + data := make([]byte, len(input_samples)) - copy(data, input_samples) + written, err := encoder.Encode(pcm, data) + if err != nil { + panic(err) + } select { - case mic <- data: + case mic <- data[:written]: default: } } @@ -162,8 +177,26 @@ func main() { fmt.Printf("ParsePacket Error %s - %x\n", err, data) continue } - _ = pnyx.PeerID(packet.Data[0:16]) - speaker <- packet.Data[16:] + peer := pnyx.PeerID(packet.Data[0:16]) + if packet.Channel == pnyx.ChannelID(1) { + decoder, exists := decoders[peer] + if exists == false { + decoder, err = opus.NewDecoder(44100, 1) + if err != nil { + panic(err) + } + } + pcm := make([]int16, 1000) + written, err := decoder.Decode(packet.Data[16:], pcm) + if err != nil { + panic(err) + } + data := make([]byte, written*2) + for i, p := range(pcm) { + binary.BigEndian.PutUint16(data[i*2:], uint16(p)) + } + speaker <- data + } } }() @@ -177,6 +210,16 @@ func main() { panic(err) } + err = client.Send(pnyx.Packet{ + Channel: pnyx.ChannelID(2), + Mode: pnyx.MODE_RAW, + Command: pnyx.MODE_COMMAND_JOIN, + Data: nil, + }) + if err != nil { + panic(err) + } + for true { data := <- mic err = client.Send(pnyx.Packet{ @@ -189,18 +232,4 @@ func main() { panic(err) } } - - err = client.Send(pnyx.Packet{ - Channel: pnyx.ChannelID(1), - Mode: pnyx.MODE_RAW, - Command: pnyx.MODE_COMMAND_LEAVE, - Data: nil, - }) - if err != nil { - panic(err) - } - err = client.Close() - if err != nil { - panic(err) - } } diff --git a/cmd/server/main.go b/cmd/server/main.go index b106551..1800aa4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,6 +28,11 @@ func main() { panic(err) } + err = server.AddChannel(pnyx.ChannelID(2), &pnyx.RawMode{}) + if err != nil { + panic(err) + } + <-os_sigs err = server.Stop() if err != nil { diff --git a/go.mod b/go.mod index 00dd52f..8f3b062 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,13 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/ebitengine/purego v0.7.1 // indirect github.com/gen2brain/malgo v0.11.21 // indirect + github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 // indirect + github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 // indirect + github.com/jezek/xgb v1.1.0 // indirect + github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 // indirect + github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 // indirect - golang.org/x/sys v0.7.0 // indirect + golang.org/x/sys v0.11.0 // indirect ) diff --git a/go.sum b/go.sum index c518d29..6b025d7 100644 --- a/go.sum +++ b/go.sum @@ -4,11 +4,24 @@ github.com/ebitengine/purego v0.7.1 h1:6/55d26lG3o9VCZX8lping+bZcmShseiqlh2bnUDi github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ= github.com/gen2brain/malgo v0.11.21 h1:qsS4Dh6zhZgmvAW5CtKRxDjQzHbc2NJlBG9eE0tgS8w= github.com/gen2brain/malgo v0.11.21/go.mod h1:f9TtuN7DVrXMiV/yIceMeWpvanyVzJQMlBecJFVMxww= +github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 h1:VLEKvjGJYAMCXw0/32r9io61tEXnMWDRxMk+peyRVFc= +github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7/go.mod h1:uF6rMu/1nvu+5DpiRLwusA6xB8zlkNoGzKn8lmYONUo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc= github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU= +github.com/jezek/xgb v1.1.0 h1:wnpxJzP1+rkbGclEkmwpVFQWpuE2PUGNUzP8SbfFobk= +github.com/jezek/xgb v1.1.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk= +github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 h1:YOp8St+CM/AQ9Vp4XYm4272E77MptJDHkwypQHIRl9Q= +github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237/go.mod h1:e7qQlOY68wOz4b82D7n+DdaptZAi+SHW0+yKiWZzEYE= +github.com/lxn/win v0.0.0-20210218163916-a377121e959e h1:H+t6A/QJMbhCSEH5rAuRxh+CtW96g0Or0Fxa9IKr4uc= +github.com/lxn/win v0.0.0-20210218163916-a377121e959e/go.mod h1:KxxjdtRkfNoYDCUP5ryK7XJJNTnpC8atvtmTheChOtk= github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 h1:/aqYkFcwlpZVXSt1cLDXppeDQlABu9zZq/mBVX3v/5w= github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9/go.mod h1:APGXJHkH8qlbefy7R7/i6a2w/nvXC85hnHm8FjaGgMo= +golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/server.go b/server.go index abab5cc..ba25d6a 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,7 @@ import ( "reflect" "sync" "sync/atomic" + "time" ) const ( @@ -19,6 +20,7 @@ const ( type ServerSession struct { Session + LastSeen time.Time IncomingPackets chan[]byte OutgoingPackets chan *Packet } @@ -160,6 +162,7 @@ func(server *Server) listen_udp() { server.sessions_lock.Lock() server.sessions[session.ID] = &ServerSession{ Session: session, + LastSeen: time.Now(), IncomingPackets: make(chan[]byte, SESSION_BUFFER_SIZE), OutgoingPackets: make(chan *Packet, SESSION_BUFFER_SIZE), } @@ -264,6 +267,7 @@ func(server *Server) listen_udp() { } session.remote = client_addr + session.LastSeen = time.Now() // TODO: Make a better server hello server_hello, err := NewSessionData(&session.Session, []byte("hello")) @@ -293,11 +297,8 @@ func(server *Server) listen_udp() { continue } - close(session.IncomingPackets) - close(session.OutgoingPackets) - server.sessions_lock.Lock() - delete(server.sessions, session_id) + server.close_session(session) server.sessions_lock.Unlock() server.Log("Session %s closed", session_id) @@ -310,6 +311,8 @@ func(server *Server) listen_udp() { continue } + session.LastSeen = time.Now() + buf_copy := make([]byte, read - COMMAND_LENGTH - ID_LENGTH) copy(buf_copy, buf[COMMAND_LENGTH+ID_LENGTH:read]) @@ -330,10 +333,12 @@ func(server *Server) listen_udp() { } server.sessions_lock.Lock() - for session_id, session := range(server.sessions) { - close(session.IncomingPackets) - close(session.OutgoingPackets) - delete(server.sessions, session_id) + sessions := make([]*ServerSession, 0, len(server.sessions)) + for _, session := range(server.sessions) { + sessions = append(sessions, session) + } + for _, session := range(sessions) { + server.close_session(session) } server.sessions_lock.Unlock() @@ -342,7 +347,7 @@ func(server *Server) listen_udp() { } func(server *Server) send_sessions() { - for true { + for server.active.Load() { packets := <- server.send_packets if packets == nil { break @@ -359,6 +364,40 @@ func(server *Server) send_sessions() { } } +func(server *Server) close_session(session *ServerSession) { + close(session.IncomingPackets) + close(session.OutgoingPackets) + delete(server.sessions, session.ID) +} + +const SESSION_TIMEOUT = time.Minute * 5 +const SESSION_TIMEOUT_CHECK = time.Minute + +func(server *Server) cleanup_sessions() { + for server.active.Load() { + select { + case <-time.After(SESSION_TIMEOUT_CHECK): + server.Log("Running stale session check") + server.sessions_lock.Lock() + now := time.Now() + stale_sessions := make([]*ServerSession, 0, len(server.sessions)) + for _, session := range(server.sessions) { + if now.Sub(session.LastSeen) >= SESSION_TIMEOUT { + server.Log("Closing stale session %s for %s", session.ID, session.Peer) + stale_sessions = append(stale_sessions, session) + } + } + + for _, session := range(stale_sessions) { + server.close_session(session) + } + + server.sessions_lock.Unlock() + // TODO: add a way for this to be shutdown instantly on server shutdown + } + } +} + func(server *Server) Start(listen string) error { was_inactive := server.active.CompareAndSwap(false, true) if was_inactive == false { @@ -379,6 +418,7 @@ func(server *Server) Start(listen string) error { go server.listen_udp() go server.send_sessions() + go server.cleanup_sessions() return nil }