Added ncurses, moved channel join to a server command, and removed send_sessions goroutine

live
noah metz 2024-04-13 14:00:56 -06:00
parent 305c394b62
commit 3ce9c08dff
6 changed files with 154 additions and 206 deletions

@ -1,25 +1,19 @@
package pnyx
import (
"slices"
"fmt"
"fmt"
"github.com/google/uuid"
)
type ChannelID byte
const (
MODE_CHANNEL ModeID = iota
MODE_RAW
MODE_RAW ModeID = iota
MODE_AUDIO
CHANNEL_JOIN byte = iota
CHANNEL_LEAVE
CHANNEL_MEMBERS
AUDIO_SET_SAMPLE_RATE = iota
AUDIO_GET_SAMPLE_RATE
RAW_DATA = iota
)
type ModeID uint8
@ -30,103 +24,70 @@ type Channel struct {
id ChannelID
name string
modes map[ModeID]Mode
sessions []SessionID
members []*ServerSession
}
func(channel *Channel) Data(session *Session, mode ModeID, data []byte) []SendPacket {
func(channel *Channel) Data(session *ServerSession, mode ModeID, data []byte) {
m, has_mode := channel.modes[mode]
if has_mode == false {
return nil
} else {
return m.Data(session, channel, data)
if has_mode {
m.Data(session, channel.id, channel.members, data)
}
}
func(channel *Channel) Command(session *Session, packet ChannelCommandPacket) ([]SendPacket, error) {
if packet.Mode == MODE_CHANNEL {
switch packet.Command {
case CHANNEL_JOIN:
if slices.Contains(channel.sessions, session.ID) {
return nil, fmt.Errorf("Session %s already in channel %d, can't join", session.ID, channel.id)
} else {
channel.sessions = append(channel.sessions, session.ID)
return nil, nil
}
case CHANNEL_LEAVE:
idx := slices.Index(channel.sessions, session.ID)
if idx == -1 {
return nil, fmt.Errorf("Session %s not in channel %d, can't leave", session.ID, channel.id)
} else {
channel.sessions = slices.Delete(channel.sessions, idx, idx+1)
return nil, nil
}
default:
return nil, fmt.Errorf("Unknown MODE_CHANNEL command: 0x%02x", packet.Command)
}
func(channel *Channel) Command(session *ServerSession, command byte, request_id uuid.UUID, mode_id ModeID, data []byte) error {
mode, has_mode := channel.modes[mode_id]
if has_mode == false {
return fmt.Errorf("Channel has no mode 0x%02x", mode)
} else {
mode, has_mode := channel.modes[packet.Mode]
if has_mode == false {
return nil, fmt.Errorf("Channel has no mode 0x%02x", mode)
} else {
return mode.Command(session, channel, packet)
}
return mode.Command(session, command, request_id, channel.id, channel.members, data)
}
}
type SendPacket struct {
Packet *Packet
Session SessionID
func(channel *Channel) Join(client PeerID, session SessionID) {
}
func(channel *Channel) Leave(client PeerID, session SessionID) {
}
type Mode interface {
// Process takes incoming packets from a session and returns a list of packets to send
Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error)
Data(session *Session, channel *Channel, data []byte) []SendPacket
Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error
Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte)
Join(client PeerID, session SessionID)
Leave(client PeerID, session SessionID)
}
func multiplex_without_sender(origin SessionID, packet *Packet, sessions []SessionID) []SendPacket {
send_packets := make([]SendPacket, len(sessions) - 1)
i := 0
for _, session_id := range(sessions) {
if session_id == origin {
func multiplex_without_sender(origin SessionID, packet *Packet, sessions []*ServerSession) {
for _, session := range(sessions) {
if session.ID == origin {
continue
}
send_packets[i] = SendPacket{
Packet: packet,
Session: session_id,
}
i += 1
session.OutgoingPackets <- packet
}
return send_packets
}
func multiplex(packet *Packet, sessions []SessionID) []SendPacket {
send_packets := make([]SendPacket, len(sessions))
for i, session_id := range(sessions) {
send_packets[i] = SendPacket{
Packet: packet,
Session: session_id,
}
func multiplex(packet *Packet, sessions []*ServerSession) {
for _, session := range(sessions) {
session.OutgoingPackets <- packet
}
return send_packets
}
type RawMode struct {
}
func(mode *RawMode) Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error) {
return nil, fmt.Errorf("unknown raw mode command 0x%02x", packet.Command)
func(mode *RawMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error {
return fmt.Errorf("unknown raw mode command 0x%02x", command)
}
func(mode *RawMode) Data(session *Session, channel *Channel, data []byte) []SendPacket {
if slices.Contains(channel.sessions, session.ID) {
new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_RAW, data)
return multiplex_without_sender(session.ID, new_packet, channel.sessions)
}
return nil
func(mode *RawMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) {
new_packet := NewChannelPeerPacket(session.Peer, channel_id, MODE_RAW, data)
multiplex_without_sender(session.ID, new_packet, members)
}
func(mode *RawMode) Join(client PeerID, session SessionID) {
}
func(mode *RawMode) Leave(client PeerID, session SessionID) {
}
type SampleRate byte
@ -140,37 +101,38 @@ type AudioMode struct {
SampleRate SampleRate
}
func(mode *AudioMode) Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error) {
switch packet.Command {
func(mode *AudioMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error {
switch command {
case AUDIO_SET_SAMPLE_RATE:
if len(packet.Data) == 1 {
switch SampleRate(packet.Data[0]) {
if len(data) == 1 {
switch SampleRate(data[0]) {
case SAMPLE_RATE_24KHZ:
fallthrough
case SAMPLE_RATE_48KHZ:
mode.SampleRate = SampleRate(packet.Data[0])
update_packet := NewChannelCommandPacket(packet.ReqID, channel.id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, packet.Data)
return multiplex(update_packet, channel.sessions), nil
mode.SampleRate = SampleRate(data[0])
update_packet := NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, data)
multiplex(update_packet, members)
return nil
default:
return nil, fmt.Errorf("Invalid sample rate: %x", packet.Data[0])
return fmt.Errorf("Invalid sample rate: %x", data[0])
}
} else {
return nil, fmt.Errorf("Invalid AUDIO_SET_SAMPLE_RATE payload: %x", packet.Data)
return fmt.Errorf("Invalid AUDIO_SET_SAMPLE_RATE payload: %x", data)
}
case AUDIO_GET_SAMPLE_RATE:
return []SendPacket{{
Packet: NewChannelCommandPacket(packet.ReqID, channel.id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, []byte{byte(mode.SampleRate)}),
Session: session.ID,
}}, nil
session.OutgoingPackets <- NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, []byte{byte(mode.SampleRate)})
return nil
default:
return nil, fmt.Errorf("unknown audio mode command 0x%02x", packet.Command)
return fmt.Errorf("unknown audio mode command 0x%02x", command)
}
}
func(mode *AudioMode) Data(session *Session, channel *Channel, data []byte) []SendPacket {
if slices.Contains(channel.sessions, session.ID) {
new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_AUDIO, data)
return multiplex_without_sender(session.ID, new_packet, channel.sessions)
}
return nil
func(mode *AudioMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) {
new_packet := NewChannelPeerPacket(session.Peer, channel_id, MODE_AUDIO, data)
multiplex_without_sender(session.ID, new_packet, members)
}
func(mode *AudioMode) Join(client PeerID, session SessionID) {
}
func(mode *AudioMode) Leave(client PeerID, session SessionID) {
}

@ -4,12 +4,14 @@ import (
"encoding/binary"
"fmt"
"os"
"slices"
"time"
"git.metznet.ca/MetzNet/pnyx"
"github.com/gen2brain/malgo"
"github.com/google/uuid"
"github.com/hraban/opus"
"seehuhn.de/go/ncurses"
)
var decoders = map[pnyx.PeerID]chan[]byte{}
@ -23,7 +25,6 @@ func set_sample_rate(new_sample_rate int) error {
sample_rate = new_sample_rate
var err error
fmt.Printf("Creating encoder with sample_rate %d\n", new_sample_rate)
encoder, err = opus.NewEncoder(new_sample_rate, 1, opus.AppVoIP)
if err != nil {
return err
@ -41,7 +42,6 @@ func set_sample_rate(new_sample_rate int) error {
}
func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate int){
fmt.Printf("Starting decoder routine for %x with sample_rate %d\n", peer_id, sample_rate)
decoder, err := opus.NewDecoder(sample_rate, 1)
if err != nil {
panic(err)
@ -55,7 +55,6 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate
missed += 1
if missed > 100 {
fmt.Printf("Missed 100 packets from %x, stopping stream until data received\n", peer_id)
decode_chan <- <- decode_chan
}
@ -80,7 +79,6 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate
}
}
}
fmt.Printf("Stopping decoder routine for %x with sample_rate %d\n", peer_id, sample_rate)
}
func mixer(data_chan chan []int16, speaker_chan chan []int16) {
@ -185,13 +183,11 @@ func main() {
Data: onSendFrames,
}
fmt.Printf("Creating playback device %+v with format %+v\n", playback_device, playback_device.Formats[0])
outDevice, err := malgo.InitDevice(ctx.Context, outDeviceConfig, playbackCallbacks)
if err != nil {
panic(err)
}
fmt.Printf("Starting playback\n")
err = outDevice.Start()
if err != nil {
panic(err)
@ -223,13 +219,11 @@ func main() {
Data: onRecvFrames,
}
fmt.Printf("Creating capture device %+v with format %+v\n", capture_device, capture_device.Formats[0])
inDevice, err := malgo.InitDevice(ctx.Context, inDeviceConfig, captureCallbacks)
if err != nil {
panic(err)
}
fmt.Printf("Starting capture device\n")
err = inDevice.Start()
if err != nil {
panic(err)
@ -238,33 +232,28 @@ func main() {
defer inDevice.Uninit()
defer inDevice.Stop()
fmt.Printf("Starting pnyx client\n")
client, err := pnyx.NewClient(nil, os.Args[1])
if err != nil {
panic(err)
}
fmt.Printf("Started pnyx client\n")
go func() {
var buf [1024]byte
for true {
read, _, err := client.Connection.ReadFromUDP(buf[:])
if err != nil {
fmt.Printf("Read Error %s\n", err)
break
}
data, err := pnyx.ParseSessionData(&client.Session, buf[pnyx.COMMAND_LENGTH + pnyx.SESSION_ID_LENGTH:read])
if err != nil {
fmt.Printf("ParseSessionData Error %s\n", err)
continue
}
packet, err := pnyx.ParsePacket(data)
if err != nil {
fmt.Printf("ParsePacket Error %s - %x\n", err, data)
continue
}
@ -273,7 +262,6 @@ func main() {
if packet.Channel == pnyx.ChannelID(0) {
if packet.Mode == pnyx.MODE_AUDIO {
if packet.Command == pnyx.AUDIO_SET_SAMPLE_RATE {
fmt.Printf("GOT NEW SAMPLE RATE 0x%02x\n", packet.Data)
var new_sample_rate int
switch packet.Data[0] {
case byte(pnyx.SAMPLE_RATE_24KHZ):
@ -307,18 +295,11 @@ func main() {
}
}
default:
fmt.Printf("Unhandled packet type: %s\n", packet)
}
}
}()
add_packet, _ := pnyx.NewServerCommandPacket(pnyx.SERVER_COMMAND_ADD_CHANNEL, []byte{0xFF})
err = client.Send(add_packet)
if err != nil {
panic(err)
}
join_packet := pnyx.NewChannelCommandPacket(uuid.New(), pnyx.ChannelID(0), pnyx.MODE_CHANNEL, pnyx.CHANNEL_JOIN, nil)
join_packet := pnyx.NewServerCommandPacket(uuid.New(), pnyx.SERVER_COMMAND_JOIN_CHANNEL, []byte{0x00})
err = client.Send(join_packet)
if err != nil {
panic(err)
@ -330,11 +311,36 @@ func main() {
panic(err)
}
go func () {
for true {
data := <- mic
err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_AUDIO, data))
if err != nil {
panic(err)
}
}
}()
window := ncurses.Init()
defer ncurses.EndWin()
ncurses.ColorPair(1).Init(ncurses.ColorBlue, ncurses.ColorRed)
window.AddStr("pnyx client")
for true {
data := <- mic
err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_AUDIO, data))
if err != nil {
panic(err)
window.Refresh()
time.Sleep(200*time.Millisecond)
peers := make([]pnyx.PeerID, 0, len(decoders))
for peer_id := range(decoders) {
peers = append(peers, peer_id)
}
slices.SortFunc(peers, func(a, b pnyx.PeerID) int {
return slices.Compare(a[:], b[:])
})
for i, peer_id := range(peers) {
window.MvAddStr(i+1, 0, fmt.Sprintf("%x", peer_id))
}
}
}

@ -3,16 +3,11 @@ module git.metznet.ca/MetzNet/pnyx
go 1.22.0
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/ebitengine/purego v0.7.1 // indirect
github.com/gen2brain/malgo v0.11.21 // indirect
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 // indirect
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 // indirect
github.com/jezek/xgb v1.1.0 // indirect
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 // indirect
github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 // indirect
golang.org/x/sys v0.11.0 // indirect
filippo.io/edwards25519 v1.1.0
github.com/gen2brain/malgo v0.11.21
github.com/google/uuid v1.6.0
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302
seehuhn.de/go/ncurses v0.2.0
)
require golang.org/x/sys v0.14.0 // indirect

@ -1,27 +1,13 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/ebitengine/purego v0.7.1 h1:6/55d26lG3o9VCZX8lping+bZcmShseiqlh2bnUDiPA=
github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ=
github.com/gen2brain/malgo v0.11.21 h1:qsS4Dh6zhZgmvAW5CtKRxDjQzHbc2NJlBG9eE0tgS8w=
github.com/gen2brain/malgo v0.11.21/go.mod h1:f9TtuN7DVrXMiV/yIceMeWpvanyVzJQMlBecJFVMxww=
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 h1:VLEKvjGJYAMCXw0/32r9io61tEXnMWDRxMk+peyRVFc=
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7/go.mod h1:uF6rMu/1nvu+5DpiRLwusA6xB8zlkNoGzKn8lmYONUo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc=
github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es=
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA=
github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU=
github.com/jezek/xgb v1.1.0 h1:wnpxJzP1+rkbGclEkmwpVFQWpuE2PUGNUzP8SbfFobk=
github.com/jezek/xgb v1.1.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk=
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 h1:YOp8St+CM/AQ9Vp4XYm4272E77MptJDHkwypQHIRl9Q=
github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237/go.mod h1:e7qQlOY68wOz4b82D7n+DdaptZAi+SHW0+yKiWZzEYE=
github.com/lxn/win v0.0.0-20210218163916-a377121e959e h1:H+t6A/QJMbhCSEH5rAuRxh+CtW96g0Or0Fxa9IKr4uc=
github.com/lxn/win v0.0.0-20210218163916-a377121e959e/go.mod h1:KxxjdtRkfNoYDCUP5ryK7XJJNTnpC8atvtmTheChOtk=
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 h1:/aqYkFcwlpZVXSt1cLDXppeDQlABu9zZq/mBVX3v/5w=
github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9/go.mod h1:APGXJHkH8qlbefy7R7/i6a2w/nvXC85hnHm8FjaGgMo=
golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
seehuhn.de/go/ncurses v0.2.0 h1:ZV256n0GIMVEHJnECliGMffzdFsEoT7krJqdfGoYD1E=
seehuhn.de/go/ncurses v0.2.0/go.mod h1:oAc9Y+UN0tflNV0iME++z0ij9uNmIjdxQFkpGoRMd2E=

@ -18,7 +18,9 @@ const (
CHANNEL_COMMAND_LEN = CHANNEL_HEADER_LEN + COMMAND_LENGTH + REQ_ID_LENGTH
CHANNEL_PEER_LEN = CHANNEL_HEADER_LEN + PEER_ID_LENGTH
SERVER_COMMAND_ADD_CHANNEL byte = iota
SERVER_COMMAND_JOIN_CHANNEL byte = iota
SERVER_COMMAND_LEAVE_CHANNEL
SERVER_COMMAND_ADD_CHANNEL
SERVER_COMMAND_DEL_CHANNEL
)
@ -74,16 +76,15 @@ func (packet ServerCommandPacket) MarshalBinary() ([]byte, error) {
return p, nil
}
func NewServerCommandPacket(command byte, data []byte) (*Packet, uuid.UUID) {
req_id := uuid.New()
func NewServerCommandPacket(request_id uuid.UUID, command byte, data []byte) *Packet {
return &Packet{
Type: PACKET_SERVER_COMMAND,
Payload: ServerCommandPacket{
ReqID: req_id,
ReqID: request_id,
Command: command,
Data: data,
},
}, req_id
}
}
func ParseServerCommandPacket(data []byte) (ServerCommandPacket, error) {

@ -8,9 +8,10 @@ import (
"net"
"os"
"reflect"
"slices"
"sync"
"sync/atomic"
"time"
"time"
)
const (
@ -25,6 +26,7 @@ type ServerSession struct {
LastSeen time.Time
IncomingPackets chan[]byte
OutgoingPackets chan *Packet
Channels []ChannelID
}
type Server struct {
@ -35,8 +37,6 @@ type Server struct {
modes map[reflect.Type]ModeID
send_packets chan[]SendPacket
sessions_lock sync.Mutex
sessions map[SessionID]*ServerSession
@ -60,8 +60,6 @@ func NewServer(key ed25519.PrivateKey) (*Server, error) {
active: atomic.Bool{},
stopped: make(chan error, 0),
send_packets: make(chan []SendPacket, SERVER_SEND_BUFFER_SIZE),
modes: map[reflect.Type]ModeID{
reflect.TypeFor[*RawMode](): MODE_RAW,
reflect.TypeFor[*AudioMode](): MODE_AUDIO,
@ -116,7 +114,7 @@ func(server *Server) AddChannel(id ChannelID, modes ...Mode) error {
server.channels[id] = &Channel{
id: id,
modes: mode_map,
sessions: []SessionID{},
members: []*ServerSession{},
}
return nil
@ -129,7 +127,6 @@ func(server *Server) Log(format string, fields ...interface{}) {
func(server *Server) Stop() error {
was_active := server.active.CompareAndSwap(true, false)
if was_active {
close(server.send_packets)
err := server.connection.Close()
if err != nil {
return err
@ -198,6 +195,35 @@ func handle_session_incoming(session *ServerSession, server *Server) {
switch packet := packet.(type) {
case ServerCommandPacket:
switch packet.Command {
case SERVER_COMMAND_JOIN_CHANNEL:
server.Log("Got join_channel for %x with %x", session.ID, packet.Data)
if len(packet.Data) == 1 {
server.channels_lock.Lock()
channel, exists := server.channels[ChannelID(packet.Data[0])]
if exists == true {
if slices.Contains(channel.members, session) == false {
channel.members = append(channel.members, session)
channel.Join(session.Peer, session.ID)
// TODO: Send message to clients to notify of join
}
}
server.channels_lock.Unlock()
}
case SERVER_COMMAND_LEAVE_CHANNEL:
server.Log("Got leave_channel for %x with %x", session.ID, packet.Data)
if len(packet.Data) == 1 {
server.channels_lock.Lock()
channel, exists := server.channels[ChannelID(packet.Data[0])]
if exists == true {
idx := slices.Index(channel.members, session)
if idx != -1 {
channel.members = slices.Delete(channel.members, idx, idx+1)
channel.Leave(session.Peer, session.ID)
// TODO: Send message to clients to notify of join
}
}
server.channels_lock.Unlock()
}
case SERVER_COMMAND_ADD_CHANNEL:
server.Log("Got add_channel with %x", packet.Data)
case SERVER_COMMAND_DEL_CHANNEL:
@ -206,40 +232,31 @@ func handle_session_incoming(session *ServerSession, server *Server) {
server.Log("Unknown server command %x", packet.Command)
}
case ChannelDataPacket:
var result []SendPacket = nil
server.channels_lock.RLock()
channel, exists := server.channels[packet.Channel]
if exists == true {
result = channel.Data(&session.Session, packet.Mode, packet.Data)
if slices.Contains(channel.members, session) {
channel.Data(session, packet.Mode, packet.Data)
}
} else {
server.Log("Packet for unknown channel %d", packet.Channel)
}
server.channels_lock.RUnlock()
if exists == false {
server.Log("Packet for unknown channel %d", packet.Channel)
} else if len(result) > 0 {
//TODO: handle overflow
server.send_packets<-result
}
case ChannelCommandPacket:
var result []SendPacket = nil
server.channels_lock.RLock()
channel, exists := server.channels[packet.Channel]
if exists == true {
result, err = channel.Command(&session.Session, packet)
err = channel.Command(session, packet.Command, packet.ReqID, packet.Mode, packet.Data)
if err != nil {
server.Log("Error processing %+v - %s", packet, err)
}
} else {
server.Log("Packet for unknown channel %d", packet.Channel)
}
server.channels_lock.RUnlock()
if exists == false {
server.Log("Packet for unknown channel %d", packet.Channel)
} else if err != nil {
server.Log("Error processing %+v - %s", packet, err)
} else if len(result) > 0 {
//TODO: handle overflow
server.send_packets<-result
}
default:
server.Log("Unhandled packet type from session %s - 0x%02x", session.ID, err)
@ -403,24 +420,6 @@ func(server *Server) listen_udp() {
server.stopped <- nil
}
func(server *Server) send_sessions() {
for server.active.Load() {
packets := <- server.send_packets
if packets == nil {
break
}
server.sessions_lock.Lock()
for _, packet := range(packets) {
session, exists := server.sessions[packet.Session]
if exists {
session.OutgoingPackets <- packet.Packet
}
}
server.sessions_lock.Unlock()
}
}
func(server *Server) close_session(session *ServerSession) {
close(session.IncomingPackets)
close(session.OutgoingPackets)
@ -473,7 +472,6 @@ func(server *Server) Start(listen string) error {
}
go server.listen_udp()
go server.send_sessions()
go server.cleanup_sessions()
return nil