From 3ce9c08dffd02e2eda1e61237789ddbf66d9c0fa Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 13 Apr 2024 14:00:56 -0600 Subject: [PATCH] Added ncurses, moved channel join to a server command, and removed send_sessions goroutine --- channel.go | 158 +++++++++++++++++---------------------------- cmd/client/main.go | 58 +++++++++-------- go.mod | 19 ++---- go.sum | 24 ++----- packet.go | 11 ++-- server.go | 90 +++++++++++++------------- 6 files changed, 154 insertions(+), 206 deletions(-) diff --git a/channel.go b/channel.go index 73e893d..3c82e0b 100644 --- a/channel.go +++ b/channel.go @@ -1,25 +1,19 @@ package pnyx import ( - "slices" - "fmt" + "fmt" + + "github.com/google/uuid" ) type ChannelID byte const ( - MODE_CHANNEL ModeID = iota - MODE_RAW + MODE_RAW ModeID = iota MODE_AUDIO - CHANNEL_JOIN byte = iota - CHANNEL_LEAVE - CHANNEL_MEMBERS - AUDIO_SET_SAMPLE_RATE = iota AUDIO_GET_SAMPLE_RATE - - RAW_DATA = iota ) type ModeID uint8 @@ -30,103 +24,70 @@ type Channel struct { id ChannelID name string modes map[ModeID]Mode - sessions []SessionID + members []*ServerSession } -func(channel *Channel) Data(session *Session, mode ModeID, data []byte) []SendPacket { +func(channel *Channel) Data(session *ServerSession, mode ModeID, data []byte) { m, has_mode := channel.modes[mode] - if has_mode == false { - return nil - } else { - return m.Data(session, channel, data) + if has_mode { + m.Data(session, channel.id, channel.members, data) } } -func(channel *Channel) Command(session *Session, packet ChannelCommandPacket) ([]SendPacket, error) { - if packet.Mode == MODE_CHANNEL { - switch packet.Command { - case CHANNEL_JOIN: - if slices.Contains(channel.sessions, session.ID) { - return nil, fmt.Errorf("Session %s already in channel %d, can't join", session.ID, channel.id) - } else { - channel.sessions = append(channel.sessions, session.ID) - return nil, nil - } - case CHANNEL_LEAVE: - idx := slices.Index(channel.sessions, session.ID) - if idx == -1 { - return nil, fmt.Errorf("Session %s not in channel %d, can't leave", session.ID, channel.id) - } else { - channel.sessions = slices.Delete(channel.sessions, idx, idx+1) - return nil, nil - } - default: - return nil, fmt.Errorf("Unknown MODE_CHANNEL command: 0x%02x", packet.Command) - } +func(channel *Channel) Command(session *ServerSession, command byte, request_id uuid.UUID, mode_id ModeID, data []byte) error { + mode, has_mode := channel.modes[mode_id] + if has_mode == false { + return fmt.Errorf("Channel has no mode 0x%02x", mode) } else { - mode, has_mode := channel.modes[packet.Mode] - if has_mode == false { - return nil, fmt.Errorf("Channel has no mode 0x%02x", mode) - } else { - return mode.Command(session, channel, packet) - } + return mode.Command(session, command, request_id, channel.id, channel.members, data) } } -type SendPacket struct { - Packet *Packet - Session SessionID +func(channel *Channel) Join(client PeerID, session SessionID) { +} + +func(channel *Channel) Leave(client PeerID, session SessionID) { } type Mode interface { - // Process takes incoming packets from a session and returns a list of packets to send - Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error) - Data(session *Session, channel *Channel, data []byte) []SendPacket + Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error + Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) + + Join(client PeerID, session SessionID) + Leave(client PeerID, session SessionID) } -func multiplex_without_sender(origin SessionID, packet *Packet, sessions []SessionID) []SendPacket { - send_packets := make([]SendPacket, len(sessions) - 1) - i := 0 - for _, session_id := range(sessions) { - if session_id == origin { +func multiplex_without_sender(origin SessionID, packet *Packet, sessions []*ServerSession) { + for _, session := range(sessions) { + if session.ID == origin { continue } - send_packets[i] = SendPacket{ - Packet: packet, - Session: session_id, - } - i += 1 + session.OutgoingPackets <- packet } - - return send_packets } -func multiplex(packet *Packet, sessions []SessionID) []SendPacket { - send_packets := make([]SendPacket, len(sessions)) - for i, session_id := range(sessions) { - send_packets[i] = SendPacket{ - Packet: packet, - Session: session_id, - } +func multiplex(packet *Packet, sessions []*ServerSession) { + for _, session := range(sessions) { + session.OutgoingPackets <- packet } - - return send_packets } type RawMode struct { } -func(mode *RawMode) Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error) { - return nil, fmt.Errorf("unknown raw mode command 0x%02x", packet.Command) +func(mode *RawMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error { + return fmt.Errorf("unknown raw mode command 0x%02x", command) } -func(mode *RawMode) Data(session *Session, channel *Channel, data []byte) []SendPacket { - if slices.Contains(channel.sessions, session.ID) { - new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_RAW, data) - return multiplex_without_sender(session.ID, new_packet, channel.sessions) - } - return nil +func(mode *RawMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) { + new_packet := NewChannelPeerPacket(session.Peer, channel_id, MODE_RAW, data) + multiplex_without_sender(session.ID, new_packet, members) +} + +func(mode *RawMode) Join(client PeerID, session SessionID) { +} +func(mode *RawMode) Leave(client PeerID, session SessionID) { } type SampleRate byte @@ -140,37 +101,38 @@ type AudioMode struct { SampleRate SampleRate } -func(mode *AudioMode) Command(session *Session, channel *Channel, packet ChannelCommandPacket) ([]SendPacket, error) { - switch packet.Command { +func(mode *AudioMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) error { + switch command { case AUDIO_SET_SAMPLE_RATE: - if len(packet.Data) == 1 { - switch SampleRate(packet.Data[0]) { + if len(data) == 1 { + switch SampleRate(data[0]) { case SAMPLE_RATE_24KHZ: fallthrough case SAMPLE_RATE_48KHZ: - mode.SampleRate = SampleRate(packet.Data[0]) - update_packet := NewChannelCommandPacket(packet.ReqID, channel.id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, packet.Data) - return multiplex(update_packet, channel.sessions), nil + mode.SampleRate = SampleRate(data[0]) + update_packet := NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, data) + multiplex(update_packet, members) + return nil default: - return nil, fmt.Errorf("Invalid sample rate: %x", packet.Data[0]) + return fmt.Errorf("Invalid sample rate: %x", data[0]) } } else { - return nil, fmt.Errorf("Invalid AUDIO_SET_SAMPLE_RATE payload: %x", packet.Data) + return fmt.Errorf("Invalid AUDIO_SET_SAMPLE_RATE payload: %x", data) } case AUDIO_GET_SAMPLE_RATE: - return []SendPacket{{ - Packet: NewChannelCommandPacket(packet.ReqID, channel.id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, []byte{byte(mode.SampleRate)}), - Session: session.ID, - }}, nil + session.OutgoingPackets <- NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, []byte{byte(mode.SampleRate)}) + return nil default: - return nil, fmt.Errorf("unknown audio mode command 0x%02x", packet.Command) + return fmt.Errorf("unknown audio mode command 0x%02x", command) } } -func(mode *AudioMode) Data(session *Session, channel *Channel, data []byte) []SendPacket { - if slices.Contains(channel.sessions, session.ID) { - new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_AUDIO, data) - return multiplex_without_sender(session.ID, new_packet, channel.sessions) - } - return nil +func(mode *AudioMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) { + new_packet := NewChannelPeerPacket(session.Peer, channel_id, MODE_AUDIO, data) + multiplex_without_sender(session.ID, new_packet, members) +} + +func(mode *AudioMode) Join(client PeerID, session SessionID) { +} +func(mode *AudioMode) Leave(client PeerID, session SessionID) { } diff --git a/cmd/client/main.go b/cmd/client/main.go index 301616b..f94a19e 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -4,12 +4,14 @@ import ( "encoding/binary" "fmt" "os" + "slices" "time" "git.metznet.ca/MetzNet/pnyx" "github.com/gen2brain/malgo" "github.com/google/uuid" "github.com/hraban/opus" + "seehuhn.de/go/ncurses" ) var decoders = map[pnyx.PeerID]chan[]byte{} @@ -23,7 +25,6 @@ func set_sample_rate(new_sample_rate int) error { sample_rate = new_sample_rate var err error - fmt.Printf("Creating encoder with sample_rate %d\n", new_sample_rate) encoder, err = opus.NewEncoder(new_sample_rate, 1, opus.AppVoIP) if err != nil { return err @@ -41,7 +42,6 @@ func set_sample_rate(new_sample_rate int) error { } func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate int){ - fmt.Printf("Starting decoder routine for %x with sample_rate %d\n", peer_id, sample_rate) decoder, err := opus.NewDecoder(sample_rate, 1) if err != nil { panic(err) @@ -55,7 +55,6 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate missed += 1 if missed > 100 { - fmt.Printf("Missed 100 packets from %x, stopping stream until data received\n", peer_id) decode_chan <- <- decode_chan } @@ -80,7 +79,6 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate } } } - fmt.Printf("Stopping decoder routine for %x with sample_rate %d\n", peer_id, sample_rate) } func mixer(data_chan chan []int16, speaker_chan chan []int16) { @@ -185,13 +183,11 @@ func main() { Data: onSendFrames, } - fmt.Printf("Creating playback device %+v with format %+v\n", playback_device, playback_device.Formats[0]) outDevice, err := malgo.InitDevice(ctx.Context, outDeviceConfig, playbackCallbacks) if err != nil { panic(err) } - fmt.Printf("Starting playback\n") err = outDevice.Start() if err != nil { panic(err) @@ -223,13 +219,11 @@ func main() { Data: onRecvFrames, } - fmt.Printf("Creating capture device %+v with format %+v\n", capture_device, capture_device.Formats[0]) inDevice, err := malgo.InitDevice(ctx.Context, inDeviceConfig, captureCallbacks) if err != nil { panic(err) } - fmt.Printf("Starting capture device\n") err = inDevice.Start() if err != nil { panic(err) @@ -238,33 +232,28 @@ func main() { defer inDevice.Uninit() defer inDevice.Stop() - fmt.Printf("Starting pnyx client\n") client, err := pnyx.NewClient(nil, os.Args[1]) if err != nil { panic(err) } - fmt.Printf("Started pnyx client\n") go func() { var buf [1024]byte for true { read, _, err := client.Connection.ReadFromUDP(buf[:]) if err != nil { - fmt.Printf("Read Error %s\n", err) break } data, err := pnyx.ParseSessionData(&client.Session, buf[pnyx.COMMAND_LENGTH + pnyx.SESSION_ID_LENGTH:read]) if err != nil { - fmt.Printf("ParseSessionData Error %s\n", err) continue } packet, err := pnyx.ParsePacket(data) if err != nil { - fmt.Printf("ParsePacket Error %s - %x\n", err, data) continue } @@ -273,7 +262,6 @@ func main() { if packet.Channel == pnyx.ChannelID(0) { if packet.Mode == pnyx.MODE_AUDIO { if packet.Command == pnyx.AUDIO_SET_SAMPLE_RATE { - fmt.Printf("GOT NEW SAMPLE RATE 0x%02x\n", packet.Data) var new_sample_rate int switch packet.Data[0] { case byte(pnyx.SAMPLE_RATE_24KHZ): @@ -307,18 +295,11 @@ func main() { } } default: - fmt.Printf("Unhandled packet type: %s\n", packet) } } }() - add_packet, _ := pnyx.NewServerCommandPacket(pnyx.SERVER_COMMAND_ADD_CHANNEL, []byte{0xFF}) - err = client.Send(add_packet) - if err != nil { - panic(err) - } - - join_packet := pnyx.NewChannelCommandPacket(uuid.New(), pnyx.ChannelID(0), pnyx.MODE_CHANNEL, pnyx.CHANNEL_JOIN, nil) + join_packet := pnyx.NewServerCommandPacket(uuid.New(), pnyx.SERVER_COMMAND_JOIN_CHANNEL, []byte{0x00}) err = client.Send(join_packet) if err != nil { panic(err) @@ -330,11 +311,36 @@ func main() { panic(err) } + go func () { + for true { + data := <- mic + err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_AUDIO, data)) + if err != nil { + panic(err) + } + } + }() + + window := ncurses.Init() + defer ncurses.EndWin() + + ncurses.ColorPair(1).Init(ncurses.ColorBlue, ncurses.ColorRed) + window.AddStr("pnyx client") + for true { - data := <- mic - err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_AUDIO, data)) - if err != nil { - panic(err) + window.Refresh() + time.Sleep(200*time.Millisecond) + peers := make([]pnyx.PeerID, 0, len(decoders)) + for peer_id := range(decoders) { + peers = append(peers, peer_id) + } + + slices.SortFunc(peers, func(a, b pnyx.PeerID) int { + return slices.Compare(a[:], b[:]) + }) + + for i, peer_id := range(peers) { + window.MvAddStr(i+1, 0, fmt.Sprintf("%x", peer_id)) } } } diff --git a/go.mod b/go.mod index 8f3b062..1db1024 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,11 @@ module git.metznet.ca/MetzNet/pnyx go 1.22.0 require ( - filippo.io/edwards25519 v1.1.0 // indirect - github.com/ebitengine/purego v0.7.1 // indirect - github.com/gen2brain/malgo v0.11.21 // indirect - github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 // indirect - github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 // indirect - github.com/jezek/xgb v1.1.0 // indirect - github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 // indirect - github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect - github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 // indirect - golang.org/x/sys v0.11.0 // indirect + filippo.io/edwards25519 v1.1.0 + github.com/gen2brain/malgo v0.11.21 + github.com/google/uuid v1.6.0 + github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 + seehuhn.de/go/ncurses v0.2.0 ) + +require golang.org/x/sys v0.14.0 // indirect diff --git a/go.sum b/go.sum index 6b025d7..967883e 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,13 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/ebitengine/purego v0.7.1 h1:6/55d26lG3o9VCZX8lping+bZcmShseiqlh2bnUDiPA= -github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ= github.com/gen2brain/malgo v0.11.21 h1:qsS4Dh6zhZgmvAW5CtKRxDjQzHbc2NJlBG9eE0tgS8w= github.com/gen2brain/malgo v0.11.21/go.mod h1:f9TtuN7DVrXMiV/yIceMeWpvanyVzJQMlBecJFVMxww= -github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 h1:VLEKvjGJYAMCXw0/32r9io61tEXnMWDRxMk+peyRVFc= -github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7/go.mod h1:uF6rMu/1nvu+5DpiRLwusA6xB8zlkNoGzKn8lmYONUo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc= -github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es= github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA= github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU= -github.com/jezek/xgb v1.1.0 h1:wnpxJzP1+rkbGclEkmwpVFQWpuE2PUGNUzP8SbfFobk= -github.com/jezek/xgb v1.1.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk= -github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 h1:YOp8St+CM/AQ9Vp4XYm4272E77MptJDHkwypQHIRl9Q= -github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237/go.mod h1:e7qQlOY68wOz4b82D7n+DdaptZAi+SHW0+yKiWZzEYE= -github.com/lxn/win v0.0.0-20210218163916-a377121e959e h1:H+t6A/QJMbhCSEH5rAuRxh+CtW96g0Or0Fxa9IKr4uc= -github.com/lxn/win v0.0.0-20210218163916-a377121e959e/go.mod h1:KxxjdtRkfNoYDCUP5ryK7XJJNTnpC8atvtmTheChOtk= -github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9 h1:/aqYkFcwlpZVXSt1cLDXppeDQlABu9zZq/mBVX3v/5w= -github.com/pion/opus v0.0.0-20240403022900-1c7b6eddc7c9/go.mod h1:APGXJHkH8qlbefy7R7/i6a2w/nvXC85hnHm8FjaGgMo= -golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +seehuhn.de/go/ncurses v0.2.0 h1:ZV256n0GIMVEHJnECliGMffzdFsEoT7krJqdfGoYD1E= +seehuhn.de/go/ncurses v0.2.0/go.mod h1:oAc9Y+UN0tflNV0iME++z0ij9uNmIjdxQFkpGoRMd2E= diff --git a/packet.go b/packet.go index 0850460..5679a92 100644 --- a/packet.go +++ b/packet.go @@ -18,7 +18,9 @@ const ( CHANNEL_COMMAND_LEN = CHANNEL_HEADER_LEN + COMMAND_LENGTH + REQ_ID_LENGTH CHANNEL_PEER_LEN = CHANNEL_HEADER_LEN + PEER_ID_LENGTH - SERVER_COMMAND_ADD_CHANNEL byte = iota + SERVER_COMMAND_JOIN_CHANNEL byte = iota + SERVER_COMMAND_LEAVE_CHANNEL + SERVER_COMMAND_ADD_CHANNEL SERVER_COMMAND_DEL_CHANNEL ) @@ -74,16 +76,15 @@ func (packet ServerCommandPacket) MarshalBinary() ([]byte, error) { return p, nil } -func NewServerCommandPacket(command byte, data []byte) (*Packet, uuid.UUID) { - req_id := uuid.New() +func NewServerCommandPacket(request_id uuid.UUID, command byte, data []byte) *Packet { return &Packet{ Type: PACKET_SERVER_COMMAND, Payload: ServerCommandPacket{ - ReqID: req_id, + ReqID: request_id, Command: command, Data: data, }, - }, req_id + } } func ParseServerCommandPacket(data []byte) (ServerCommandPacket, error) { diff --git a/server.go b/server.go index 9c5f62b..bc29fde 100644 --- a/server.go +++ b/server.go @@ -8,9 +8,10 @@ import ( "net" "os" "reflect" + "slices" "sync" "sync/atomic" - "time" + "time" ) const ( @@ -25,6 +26,7 @@ type ServerSession struct { LastSeen time.Time IncomingPackets chan[]byte OutgoingPackets chan *Packet + Channels []ChannelID } type Server struct { @@ -35,8 +37,6 @@ type Server struct { modes map[reflect.Type]ModeID - send_packets chan[]SendPacket - sessions_lock sync.Mutex sessions map[SessionID]*ServerSession @@ -60,8 +60,6 @@ func NewServer(key ed25519.PrivateKey) (*Server, error) { active: atomic.Bool{}, stopped: make(chan error, 0), - send_packets: make(chan []SendPacket, SERVER_SEND_BUFFER_SIZE), - modes: map[reflect.Type]ModeID{ reflect.TypeFor[*RawMode](): MODE_RAW, reflect.TypeFor[*AudioMode](): MODE_AUDIO, @@ -116,7 +114,7 @@ func(server *Server) AddChannel(id ChannelID, modes ...Mode) error { server.channels[id] = &Channel{ id: id, modes: mode_map, - sessions: []SessionID{}, + members: []*ServerSession{}, } return nil @@ -129,7 +127,6 @@ func(server *Server) Log(format string, fields ...interface{}) { func(server *Server) Stop() error { was_active := server.active.CompareAndSwap(true, false) if was_active { - close(server.send_packets) err := server.connection.Close() if err != nil { return err @@ -198,6 +195,35 @@ func handle_session_incoming(session *ServerSession, server *Server) { switch packet := packet.(type) { case ServerCommandPacket: switch packet.Command { + case SERVER_COMMAND_JOIN_CHANNEL: + server.Log("Got join_channel for %x with %x", session.ID, packet.Data) + if len(packet.Data) == 1 { + server.channels_lock.Lock() + channel, exists := server.channels[ChannelID(packet.Data[0])] + if exists == true { + if slices.Contains(channel.members, session) == false { + channel.members = append(channel.members, session) + channel.Join(session.Peer, session.ID) + // TODO: Send message to clients to notify of join + } + } + server.channels_lock.Unlock() + } + case SERVER_COMMAND_LEAVE_CHANNEL: + server.Log("Got leave_channel for %x with %x", session.ID, packet.Data) + if len(packet.Data) == 1 { + server.channels_lock.Lock() + channel, exists := server.channels[ChannelID(packet.Data[0])] + if exists == true { + idx := slices.Index(channel.members, session) + if idx != -1 { + channel.members = slices.Delete(channel.members, idx, idx+1) + channel.Leave(session.Peer, session.ID) + // TODO: Send message to clients to notify of join + } + } + server.channels_lock.Unlock() + } case SERVER_COMMAND_ADD_CHANNEL: server.Log("Got add_channel with %x", packet.Data) case SERVER_COMMAND_DEL_CHANNEL: @@ -206,40 +232,31 @@ func handle_session_incoming(session *ServerSession, server *Server) { server.Log("Unknown server command %x", packet.Command) } case ChannelDataPacket: - var result []SendPacket = nil - server.channels_lock.RLock() channel, exists := server.channels[packet.Channel] if exists == true { - result = channel.Data(&session.Session, packet.Mode, packet.Data) + if slices.Contains(channel.members, session) { + channel.Data(session, packet.Mode, packet.Data) + } + } else { + server.Log("Packet for unknown channel %d", packet.Channel) } server.channels_lock.RUnlock() - if exists == false { - server.Log("Packet for unknown channel %d", packet.Channel) - } else if len(result) > 0 { - //TODO: handle overflow - server.send_packets<-result - } case ChannelCommandPacket: - var result []SendPacket = nil - server.channels_lock.RLock() channel, exists := server.channels[packet.Channel] if exists == true { - result, err = channel.Command(&session.Session, packet) + err = channel.Command(session, packet.Command, packet.ReqID, packet.Mode, packet.Data) + if err != nil { + server.Log("Error processing %+v - %s", packet, err) + } + } else { + server.Log("Packet for unknown channel %d", packet.Channel) } server.channels_lock.RUnlock() - if exists == false { - server.Log("Packet for unknown channel %d", packet.Channel) - } else if err != nil { - server.Log("Error processing %+v - %s", packet, err) - } else if len(result) > 0 { - //TODO: handle overflow - server.send_packets<-result - } default: server.Log("Unhandled packet type from session %s - 0x%02x", session.ID, err) @@ -403,24 +420,6 @@ func(server *Server) listen_udp() { server.stopped <- nil } -func(server *Server) send_sessions() { - for server.active.Load() { - packets := <- server.send_packets - if packets == nil { - break - } - - server.sessions_lock.Lock() - for _, packet := range(packets) { - session, exists := server.sessions[packet.Session] - if exists { - session.OutgoingPackets <- packet.Packet - } - } - server.sessions_lock.Unlock() - } -} - func(server *Server) close_session(session *ServerSession) { close(session.IncomingPackets) close(session.OutgoingPackets) @@ -473,7 +472,6 @@ func(server *Server) Start(listen string) error { } go server.listen_udp() - go server.send_sessions() go server.cleanup_sessions() return nil