Attempt at opus

live
noah metz 2024-04-08 17:23:55 -06:00
parent 477741dae2
commit 90ce062b2e
6 changed files with 130 additions and 36 deletions

@ -59,11 +59,13 @@ func(mode *RawMode) Process(session *Session, packet *Packet) []SendPacket {
if slices.Contains(mode.Sessions, session.ID) == false { if slices.Contains(mode.Sessions, session.ID) == false {
mode.Sessions = append(mode.Sessions, session.ID) mode.Sessions = append(mode.Sessions, session.ID)
} }
case MODE_COMMAND_LEAVE: case MODE_COMMAND_LEAVE:
idx := slices.Index(mode.Sessions, session.ID) idx := slices.Index(mode.Sessions, session.ID)
if idx != -1 { if idx != -1 {
mode.Sessions = slices.Delete(mode.Sessions, idx, idx+1) mode.Sessions = slices.Delete(mode.Sessions, idx, idx+1)
} }
case MODE_COMMAND_DATA: case MODE_COMMAND_DATA:
if slices.Contains(mode.Sessions, session.ID) { if slices.Contains(mode.Sessions, session.ID) {
new_packet := &Packet{ new_packet := &Packet{

@ -1,14 +1,22 @@
package main package main
import ( import (
"encoding/binary"
"fmt" "fmt"
"os" "os"
"git.metznet.ca/MetzNet/pnyx" "git.metznet.ca/MetzNet/pnyx"
"github.com/gen2brain/malgo" "github.com/gen2brain/malgo"
"github.com/hraban/opus"
) )
func main() { func main() {
decoders := map[pnyx.PeerID]*opus.Decoder{}
encoder, err := opus.NewEncoder(44100, 1, opus.AppVoIP)
if err != nil {
panic(err)
}
ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil) ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil)
if err != nil { if err != nil {
panic(err) panic(err)
@ -17,7 +25,6 @@ func main() {
defer ctx.Free() defer ctx.Free()
defer ctx.Uninit() defer ctx.Uninit()
// Playback devices.
infos, err := ctx.Devices(malgo.Playback) infos, err := ctx.Devices(malgo.Playback)
if err != nil { if err != nil {
panic(err) panic(err)
@ -59,20 +66,20 @@ func main() {
} }
inDeviceConfig := malgo.DefaultDeviceConfig(malgo.Capture) inDeviceConfig := malgo.DefaultDeviceConfig(malgo.Capture)
inDeviceConfig.Capture.Format = malgo.FormatF32 inDeviceConfig.Capture.Format = malgo.FormatS16
inDeviceConfig.Capture.Channels = 1 inDeviceConfig.Capture.Channels = 1
inDeviceConfig.Capture.DeviceID = capture_device.ID.Pointer() inDeviceConfig.Capture.DeviceID = capture_device.ID.Pointer()
inDeviceConfig.SampleRate = capture_device.Formats[0].SampleRate inDeviceConfig.SampleRate = 44100
inDeviceConfig.PeriodSizeInFrames = 100 inDeviceConfig.PeriodSizeInFrames = 882
inDeviceConfig.Alsa.NoMMap = 1 inDeviceConfig.Alsa.NoMMap = 1
inDeviceConfig.Capture.ShareMode = malgo.Shared inDeviceConfig.Capture.ShareMode = malgo.Shared
outDeviceConfig := malgo.DefaultDeviceConfig(malgo.Playback) outDeviceConfig := malgo.DefaultDeviceConfig(malgo.Playback)
outDeviceConfig.Playback.Format = malgo.FormatF32 outDeviceConfig.Playback.Format = malgo.FormatS16
outDeviceConfig.Playback.Channels = 1 outDeviceConfig.Playback.Channels = 1
outDeviceConfig.Playback.DeviceID = playback_device.ID.Pointer() outDeviceConfig.Playback.DeviceID = playback_device.ID.Pointer()
outDeviceConfig.SampleRate = playback_device.Formats[0].SampleRate outDeviceConfig.SampleRate = 44100
outDeviceConfig.PeriodSizeInFrames = 100 outDeviceConfig.PeriodSizeInFrames = 882
outDeviceConfig.Alsa.NoMMap = 1 outDeviceConfig.Alsa.NoMMap = 1
outDeviceConfig.Playback.ShareMode = malgo.Shared outDeviceConfig.Playback.ShareMode = malgo.Shared
@ -107,11 +114,19 @@ func main() {
defer outDevice.Stop() defer outDevice.Stop()
onRecvFrames := func(output_samples []byte, input_samples []byte, framecount uint32) { onRecvFrames := func(output_samples []byte, input_samples []byte, framecount uint32) {
pcm := make([]int16, len(input_samples)/2)
for i := 0; i < len(input_samples)/2; i++ {
pcm[i] = int16(binary.BigEndian.Uint16(input_samples[2*i:]))
}
data := make([]byte, len(input_samples)) data := make([]byte, len(input_samples))
copy(data, input_samples) written, err := encoder.Encode(pcm, data)
if err != nil {
panic(err)
}
select { select {
case mic <- data: case mic <- data[:written]:
default: default:
} }
} }
@ -162,8 +177,26 @@ func main() {
fmt.Printf("ParsePacket Error %s - %x\n", err, data) fmt.Printf("ParsePacket Error %s - %x\n", err, data)
continue continue
} }
_ = pnyx.PeerID(packet.Data[0:16]) peer := pnyx.PeerID(packet.Data[0:16])
speaker <- packet.Data[16:] if packet.Channel == pnyx.ChannelID(1) {
decoder, exists := decoders[peer]
if exists == false {
decoder, err = opus.NewDecoder(44100, 1)
if err != nil {
panic(err)
}
}
pcm := make([]int16, 1000)
written, err := decoder.Decode(packet.Data[16:], pcm)
if err != nil {
panic(err)
}
data := make([]byte, written*2)
for i, p := range(pcm) {
binary.BigEndian.PutUint16(data[i*2:], uint16(p))
}
speaker <- data
}
} }
}() }()
@ -177,30 +210,26 @@ func main() {
panic(err) panic(err)
} }
for true {
data := <- mic
err = client.Send(pnyx.Packet{ err = client.Send(pnyx.Packet{
Channel: pnyx.ChannelID(1), Channel: pnyx.ChannelID(2),
Mode: pnyx.MODE_RAW, Mode: pnyx.MODE_RAW,
Command: pnyx.MODE_COMMAND_DATA, Command: pnyx.MODE_COMMAND_JOIN,
Data: data, Data: nil,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
}
for true {
data := <- mic
err = client.Send(pnyx.Packet{ err = client.Send(pnyx.Packet{
Channel: pnyx.ChannelID(1), Channel: pnyx.ChannelID(1),
Mode: pnyx.MODE_RAW, Mode: pnyx.MODE_RAW,
Command: pnyx.MODE_COMMAND_LEAVE, Command: pnyx.MODE_COMMAND_DATA,
Data: nil, Data: data,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = client.Close()
if err != nil {
panic(err)
} }
} }

@ -28,6 +28,11 @@ func main() {
panic(err) panic(err)
} }
err = server.AddChannel(pnyx.ChannelID(2), &pnyx.RawMode{})
if err != nil {
panic(err)
}
<-os_sigs <-os_sigs
err = server.Stop() err = server.Stop()
if err != nil { if err != nil {

@ -6,8 +6,13 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect
github.com/ebitengine/purego v0.7.1 // indirect github.com/ebitengine/purego v0.7.1 // indirect
github.com/gen2brain/malgo v0.11.21 // indirect github.com/gen2brain/malgo v0.11.21 // indirect
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 // indirect github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 // indirect
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 // indirect
github.com/jezek/xgb v1.1.0 // indirect
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 // indirect
github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 // indirect github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 // indirect
golang.org/x/sys v0.7.0 // indirect golang.org/x/sys v0.11.0 // indirect
) )

@ -4,11 +4,24 @@ github.com/ebitengine/purego v0.7.1 h1:6/55d26lG3o9VCZX8lping+bZcmShseiqlh2bnUDi
github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ= github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ=
github.com/gen2brain/malgo v0.11.21 h1:qsS4Dh6zhZgmvAW5CtKRxDjQzHbc2NJlBG9eE0tgS8w= github.com/gen2brain/malgo v0.11.21 h1:qsS4Dh6zhZgmvAW5CtKRxDjQzHbc2NJlBG9eE0tgS8w=
github.com/gen2brain/malgo v0.11.21/go.mod h1:f9TtuN7DVrXMiV/yIceMeWpvanyVzJQMlBecJFVMxww= github.com/gen2brain/malgo v0.11.21/go.mod h1:f9TtuN7DVrXMiV/yIceMeWpvanyVzJQMlBecJFVMxww=
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 h1:VLEKvjGJYAMCXw0/32r9io61tEXnMWDRxMk+peyRVFc=
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7/go.mod h1:uF6rMu/1nvu+5DpiRLwusA6xB8zlkNoGzKn8lmYONUo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc= github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc=
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es= github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es=
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA=
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU=
github.com/jezek/xgb v1.1.0 h1:wnpxJzP1+rkbGclEkmwpVFQWpuE2PUGNUzP8SbfFobk=
github.com/jezek/xgb v1.1.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk=
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 h1:YOp8St+CM/AQ9Vp4XYm4272E77MptJDHkwypQHIRl9Q=
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237/go.mod h1:e7qQlOY68wOz4b82D7n+DdaptZAi+SHW0+yKiWZzEYE=
github.com/lxn/win v0.0.0-20210218163916-a377121e959e h1:H+t6A/QJMbhCSEH5rAuRxh+CtW96g0Or0Fxa9IKr4uc=
github.com/lxn/win v0.0.0-20210218163916-a377121e959e/go.mod h1:KxxjdtRkfNoYDCUP5ryK7XJJNTnpC8atvtmTheChOtk=
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 h1:/aqYkFcwlpZVXSt1cLDXppeDQlABu9zZq/mBVX3v/5w= github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 h1:/aqYkFcwlpZVXSt1cLDXppeDQlABu9zZq/mBVX3v/5w=
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9/go.mod h1:APGXJHkH8qlbefy7R7/i6a2w/nvXC85hnHm8FjaGgMo= github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9/go.mod h1:APGXJHkH8qlbefy7R7/i6a2w/nvXC85hnHm8FjaGgMo=
golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

@ -10,6 +10,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
const ( const (
@ -19,6 +20,7 @@ const (
type ServerSession struct { type ServerSession struct {
Session Session
LastSeen time.Time
IncomingPackets chan[]byte IncomingPackets chan[]byte
OutgoingPackets chan *Packet OutgoingPackets chan *Packet
} }
@ -160,6 +162,7 @@ func(server *Server) listen_udp() {
server.sessions_lock.Lock() server.sessions_lock.Lock()
server.sessions[session.ID] = &ServerSession{ server.sessions[session.ID] = &ServerSession{
Session: session, Session: session,
LastSeen: time.Now(),
IncomingPackets: make(chan[]byte, SESSION_BUFFER_SIZE), IncomingPackets: make(chan[]byte, SESSION_BUFFER_SIZE),
OutgoingPackets: make(chan *Packet, SESSION_BUFFER_SIZE), OutgoingPackets: make(chan *Packet, SESSION_BUFFER_SIZE),
} }
@ -264,6 +267,7 @@ func(server *Server) listen_udp() {
} }
session.remote = client_addr session.remote = client_addr
session.LastSeen = time.Now()
// TODO: Make a better server hello // TODO: Make a better server hello
server_hello, err := NewSessionData(&session.Session, []byte("hello")) server_hello, err := NewSessionData(&session.Session, []byte("hello"))
@ -293,11 +297,8 @@ func(server *Server) listen_udp() {
continue continue
} }
close(session.IncomingPackets)
close(session.OutgoingPackets)
server.sessions_lock.Lock() server.sessions_lock.Lock()
delete(server.sessions, session_id) server.close_session(session)
server.sessions_lock.Unlock() server.sessions_lock.Unlock()
server.Log("Session %s closed", session_id) server.Log("Session %s closed", session_id)
@ -310,6 +311,8 @@ func(server *Server) listen_udp() {
continue continue
} }
session.LastSeen = time.Now()
buf_copy := make([]byte, read - COMMAND_LENGTH - ID_LENGTH) buf_copy := make([]byte, read - COMMAND_LENGTH - ID_LENGTH)
copy(buf_copy, buf[COMMAND_LENGTH+ID_LENGTH:read]) copy(buf_copy, buf[COMMAND_LENGTH+ID_LENGTH:read])
@ -330,10 +333,12 @@ func(server *Server) listen_udp() {
} }
server.sessions_lock.Lock() server.sessions_lock.Lock()
for session_id, session := range(server.sessions) { sessions := make([]*ServerSession, 0, len(server.sessions))
close(session.IncomingPackets) for _, session := range(server.sessions) {
close(session.OutgoingPackets) sessions = append(sessions, session)
delete(server.sessions, session_id) }
for _, session := range(sessions) {
server.close_session(session)
} }
server.sessions_lock.Unlock() server.sessions_lock.Unlock()
@ -342,7 +347,7 @@ func(server *Server) listen_udp() {
} }
func(server *Server) send_sessions() { func(server *Server) send_sessions() {
for true { for server.active.Load() {
packets := <- server.send_packets packets := <- server.send_packets
if packets == nil { if packets == nil {
break break
@ -359,6 +364,40 @@ func(server *Server) send_sessions() {
} }
} }
func(server *Server) close_session(session *ServerSession) {
close(session.IncomingPackets)
close(session.OutgoingPackets)
delete(server.sessions, session.ID)
}
const SESSION_TIMEOUT = time.Minute * 5
const SESSION_TIMEOUT_CHECK = time.Minute
func(server *Server) cleanup_sessions() {
for server.active.Load() {
select {
case <-time.After(SESSION_TIMEOUT_CHECK):
server.Log("Running stale session check")
server.sessions_lock.Lock()
now := time.Now()
stale_sessions := make([]*ServerSession, 0, len(server.sessions))
for _, session := range(server.sessions) {
if now.Sub(session.LastSeen) >= SESSION_TIMEOUT {
server.Log("Closing stale session %s for %s", session.ID, session.Peer)
stale_sessions = append(stale_sessions, session)
}
}
for _, session := range(stale_sessions) {
server.close_session(session)
}
server.sessions_lock.Unlock()
// TODO: add a way for this to be shutdown instantly on server shutdown
}
}
}
func(server *Server) Start(listen string) error { func(server *Server) Start(listen string) error {
was_inactive := server.active.CompareAndSwap(false, true) was_inactive := server.active.CompareAndSwap(false, true)
if was_inactive == false { if was_inactive == false {
@ -379,6 +418,7 @@ func(server *Server) Start(listen string) error {
go server.listen_udp() go server.listen_udp()
go server.send_sessions() go server.send_sessions()
go server.cleanup_sessions()
return nil return nil
} }