Protocol changes to support more packet types

live
noah metz 2024-04-09 17:08:46 -06:00
parent 13b7a99860
commit 17990e35d9
8 changed files with 355 additions and 129 deletions

@ -2,25 +2,70 @@ package pnyx
import ( import (
"slices" "slices"
"fmt"
) )
type ChannelID uint32 type ChannelID uint32
const ( const (
RootChannelID ChannelID = 0 MODE_CHANNEL ModeID = iota
MODE_RAW
MODE_RAW ModeID = iota CHANNEL_JOIN byte = iota
CHANNEL_LEAVE
CHANNEL_MEMBERS
MODE_COMMAND_DATA byte = 0x00 RAW_DATA = iota
MODE_COMMAND_JOIN = 0x01
MODE_COMMAND_LEAVE = 0x02
) )
type ModeID uint8 type ModeID uint8
type CommandID uint8 type CommandID uint8
type Permission string
type Channel struct { type Channel struct {
id ChannelID
modes map[ModeID]Mode modes map[ModeID]Mode
sessions []SessionID
}
func(channel *Channel) Data(session *Session, mode ModeID, data []byte) []SendPacket {
m, has_mode := channel.modes[mode]
if has_mode == false {
return nil
} else {
return m.Data(session, channel, data)
}
}
func(channel *Channel) Command(session *Session, mode ModeID, command byte, data []byte) ([]SendPacket, error) {
if mode == MODE_CHANNEL {
switch command {
case CHANNEL_JOIN:
if slices.Contains(channel.sessions, session.ID) {
return nil, fmt.Errorf("Session %s already in channel %d, can't join", session.ID, channel.id)
} else {
channel.sessions = append(channel.sessions, session.ID)
return nil, nil
}
case CHANNEL_LEAVE:
idx := slices.Index(channel.sessions, session.ID)
if idx == -1 {
return nil, fmt.Errorf("Session %s not in channel %d, can't leave", session.ID, channel.id)
} else {
channel.sessions = slices.Delete(channel.sessions, idx, idx+1)
return nil, nil
}
default:
return nil, fmt.Errorf("Unknown MODE_CHANNEL command: 0x%02x", command)
}
} else {
mode, has_mode := channel.modes[mode]
if has_mode == false {
return nil, fmt.Errorf("Channel has no mode 0x%02x", mode)
} else {
return mode.Command(session, channel, command, data)
}
}
} }
type SendPacket struct { type SendPacket struct {
@ -30,7 +75,8 @@ type SendPacket struct {
type Mode interface { type Mode interface {
// Process takes incoming packets from a session and returns a list of packets to send // Process takes incoming packets from a session and returns a list of packets to send
Process(*Session, *Packet) []SendPacket Command(session *Session, channel *Channel, command byte, data []byte) ([]SendPacket, error)
Data(session *Session, channel *Channel, data []byte) []SendPacket
} }
func multiplex(session *Session, packet *Packet, sessions []SessionID) []SendPacket { func multiplex(session *Session, packet *Packet, sessions []SessionID) []SendPacket {
@ -50,32 +96,16 @@ func multiplex(session *Session, packet *Packet, sessions []SessionID) []SendPac
} }
type RawMode struct { type RawMode struct {
Sessions []SessionID
} }
func(mode *RawMode) Process(session *Session, packet *Packet) []SendPacket { func(mode *RawMode) Command(session *Session, channel *Channel, command byte, data []byte) ([]SendPacket, error) {
switch packet.Command { return nil, fmt.Errorf("unknown raw mode command 0x%02x", command)
case MODE_COMMAND_JOIN: }
if slices.Contains(mode.Sessions, session.ID) == false {
mode.Sessions = append(mode.Sessions, session.ID)
}
case MODE_COMMAND_LEAVE:
idx := slices.Index(mode.Sessions, session.ID)
if idx != -1 {
mode.Sessions = slices.Delete(mode.Sessions, idx, idx+1)
}
case MODE_COMMAND_DATA: func(mode *RawMode) Data(session *Session, channel *Channel, data []byte) []SendPacket {
if slices.Contains(mode.Sessions, session.ID) { if slices.Contains(channel.sessions, session.ID) {
new_packet := &Packet{ new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_RAW, data)
Channel: packet.Channel, return multiplex(session, new_packet, channel.sessions)
Mode: packet.Mode,
Command: MODE_COMMAND_DATA,
Data: append(session.Peer[:], packet.Data...),
}
return multiplex(session, new_packet, mode.Sessions)
}
} }
return nil return nil
} }

@ -107,7 +107,7 @@ func NewClient(key ed25519.PrivateKey, remote string) (*Client, error) {
}, nil }, nil
} }
func(client *Client) Send(packet Packet) error { func(client *Client) Send(packet *Packet) error {
client.ConnectionLock.Lock() client.ConnectionLock.Lock()
defer client.ConnectionLock.Unlock() defer client.ConnectionLock.Unlock()

@ -180,83 +180,69 @@ func main() {
fmt.Printf("ParsePacket Error %s - %x\n", err, data) fmt.Printf("ParsePacket Error %s - %x\n", err, data)
continue continue
} }
peer := pnyx.PeerID(packet.Data[0:16])
if packet.Channel == pnyx.ChannelID(1) {
decode_chan, exists := decoders[peer]
if exists == false {
decode_chan = make(chan[]byte, 1000)
decoders[peer] = decode_chan
go func(decode_chan chan[]byte){
decoder, err := opus.NewDecoder(48000, 1)
if err != nil {
panic(err)
}
for true {
select {
case <-time.After(20*time.Millisecond):
pcm := make([]int16, 960)
err := decoder.DecodePLC(pcm)
if err != nil {
panic(err)
}
pcm_bytes := make([]byte, 960*2) switch packet := packet.(type) {
for i := 0; i < 960; i++ { case pnyx.ChannelPeerPacket:
binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) if packet.Channel == pnyx.ChannelID(0) {
} decode_chan, exists := decoders[packet.Peer]
speaker <- pcm_bytes if exists == false {
case data := <-decode_chan: decode_chan = make(chan[]byte, 1000)
pcm := make([]int16, 960) decoders[packet.Peer] = decode_chan
written, err := decoder.Decode(data, pcm)
if err != nil { go func(decode_chan chan[]byte){
panic(err) decoder, err := opus.NewDecoder(48000, 1)
} if err != nil {
panic(err)
}
pcm_bytes := make([]byte, written*2) for true {
for i := 0; i < written; i++ { select {
binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) case <-time.After(20*time.Millisecond):
pcm := make([]int16, 960)
err := decoder.DecodePLC(pcm)
if err != nil {
panic(err)
}
pcm_bytes := make([]byte, 960*2)
for i := 0; i < 960; i++ {
binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i]))
}
speaker <- pcm_bytes
case data := <-decode_chan:
pcm := make([]int16, 960)
written, err := decoder.Decode(data, pcm)
if err != nil {
panic(err)
}
pcm_bytes := make([]byte, written*2)
for i := 0; i < written; i++ {
binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i]))
}
speaker <- pcm_bytes
} }
speaker <- pcm_bytes
} }
}
}(decoders[packet.Peer])
}(decoders[peer]) }
decode_chan <- packet.Data
} }
decode_chan <- packet.Data[16:] default:
fmt.Printf("Unhandled packet type: %s\n", packet)
} }
} }
}() }()
err = client.Send(pnyx.Packet{ join_packet, _ := pnyx.NewChannelCommandPacket(pnyx.ChannelID(0), pnyx.MODE_CHANNEL, pnyx.CHANNEL_JOIN, nil)
Channel: pnyx.ChannelID(1), err = client.Send(join_packet)
Mode: pnyx.MODE_RAW,
Command: pnyx.MODE_COMMAND_JOIN,
Data: nil,
})
if err != nil {
panic(err)
}
err = client.Send(pnyx.Packet{
Channel: pnyx.ChannelID(2),
Mode: pnyx.MODE_RAW,
Command: pnyx.MODE_COMMAND_JOIN,
Data: nil,
})
if err != nil { if err != nil {
panic(err) panic(err)
} }
for true { for true {
data := <- mic data := <- mic
err = client.Send(pnyx.Packet{ err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_RAW, data))
Channel: pnyx.ChannelID(1),
Mode: pnyx.MODE_RAW,
Command: pnyx.MODE_COMMAND_DATA,
Data: data,
})
if err != nil { if err != nil {
panic(err) panic(err)
} }

@ -23,12 +23,7 @@ func main() {
panic(err) panic(err)
} }
err = server.AddChannel(pnyx.ChannelID(1), &pnyx.RawMode{}) err = server.AddChannel(pnyx.ChannelID(0), &pnyx.RawMode{})
if err != nil {
panic(err)
}
err = server.AddChannel(pnyx.ChannelID(2), &pnyx.RawMode{})
if err != nil { if err != nil {
panic(err) panic(err)
} }

@ -173,3 +173,9 @@ Also this allows for the permission map to look like:
} }
} }
} }
-------------------------------------------------------------------------------
What would the experience be like if channels were nested?

@ -1,37 +1,221 @@
package pnyx package pnyx
import ( import (
"encoding/binary" "encoding"
"fmt" "encoding/binary"
"fmt"
"github.com/google/uuid"
) )
type PacketType uint8
const (
PACKET_CHANNEL_COMMAND PacketType = iota
PACKET_CHANNEL_DATA
PACKET_CHANNEL_PEER
CHANNEL_HEADER_LEN int = 5
CHANNEL_COMMAND_LEN = CHANNEL_HEADER_LEN + COMMAND_LENGTH + ID_LENGTH
CHANNEL_PEER_LEN = CHANNEL_HEADER_LEN + ID_LENGTH
)
type Payload interface {
encoding.BinaryMarshaler
}
type Packet struct { type Packet struct {
Type PacketType
Payload Payload
}
func(packet Packet) MarshalBinary() ([]byte, error) {
payload, err := packet.Payload.MarshalBinary()
if err != nil {
return nil, err
}
return append([]byte{byte(packet.Type)}, payload...), nil
}
func ParsePacket(data []byte) (Payload, error) {
if len(data) < 1 {
return nil, fmt.Errorf("Packet too short to parse - %d/1", len(data))
}
switch PacketType(data[0]) {
case PACKET_CHANNEL_DATA:
return ParseChannelDataPacket(data[1:])
case PACKET_CHANNEL_COMMAND:
return ParseChannelCommandPacket(data[1:])
case PACKET_CHANNEL_PEER:
return ParseChannelPeerPacket(data[1:])
default:
return nil, fmt.Errorf("Don't know how to parse packet type 0x%02x", data[0])
}
}
type ChannelHeader struct {
Channel ChannelID Channel ChannelID
Mode ModeID Mode ModeID
}
func(packet ChannelHeader) MarshalBinary() ([]byte, error) {
p := binary.BigEndian.AppendUint32(nil, uint32(packet.Channel))
return append(p, byte(packet.Mode)), nil
}
func ParseChannelHeader(data []byte) (ChannelHeader, error) {
if len(data) < 5 {
return ChannelHeader{}, fmt.Errorf("Not enough bytes to parse ChannelPacket(%d/%d)", len(data), 6)
}
return ChannelHeader{
Channel: ChannelID(binary.BigEndian.Uint32(data)),
Mode: ModeID(data[4]),
}, nil
}
type ChannelCommandPacket struct {
ChannelHeader
Command byte Command byte
ReqID uuid.UUID
Data []byte Data []byte
} }
func(packet Packet) String() string { func NewChannelCommandPacket(channel ChannelID, mode ModeID, command byte, data []byte) (*Packet, uuid.UUID) {
return fmt.Sprintf("{Channel: %x, Mode: %x, Data: %x}", packet.Channel, packet.Mode, packet.Data) request_id := uuid.New()
return &Packet{
Type: PACKET_CHANNEL_COMMAND,
Payload: ChannelCommandPacket{
ChannelHeader: ChannelHeader{
Channel: channel,
Mode: mode,
},
Command: command,
ReqID: request_id,
Data: data,
},
}, request_id
} }
func(packet Packet) MarshalBinary() ([]byte, error) { func(packet ChannelCommandPacket) MarshalBinary() ([]byte, error) {
p := binary.BigEndian.AppendUint32(nil, uint32(packet.Channel)) header, err := packet.ChannelHeader.MarshalBinary()
p = append(p, byte(packet.Mode)) if err != nil {
p = append(p, byte(packet.Command)) return nil, err
return append(p, packet.Data...), nil }
data := append(header, packet.Command)
data = append(data, packet.ReqID[:]...)
return append(data, packet.Data...), nil
} }
func ParsePacket(data []byte) (*Packet, error) { func ParseChannelCommandPacket(data []byte) (ChannelCommandPacket, error) {
if len(data) < 6 { if len(data) < CHANNEL_COMMAND_LEN {
return nil, fmt.Errorf("Not enough bytes to parse Packet(%d/%d)", len(data), 6) return ChannelCommandPacket{}, fmt.Errorf("Not enough data to decode channel command packet %d/%d", len(data), CHANNEL_COMMAND_LEN)
}
header, err := ParseChannelHeader(data[:CHANNEL_HEADER_LEN])
if err != nil {
return ChannelCommandPacket{}, err
} }
command := data[CHANNEL_HEADER_LEN]
request_id := uuid.UUID(data[CHANNEL_HEADER_LEN+COMMAND_LENGTH:])
return ChannelCommandPacket{
ChannelHeader: header,
Command: command,
ReqID: request_id,
Data: data[CHANNEL_COMMAND_LEN:],
}, nil
}
type ChannelPeerPacket struct {
ChannelHeader
Peer PeerID
Data []byte
}
func NewChannelPeerPacket(peer PeerID, channel ChannelID, mode ModeID, data []byte) *Packet {
return &Packet{ return &Packet{
Channel: ChannelID(binary.BigEndian.Uint32(data)), Type: PACKET_CHANNEL_PEER,
Mode: ModeID(data[4]), Payload: ChannelPeerPacket{
Command: data[5], ChannelHeader: ChannelHeader{
Data: data[6:], Channel: channel,
Mode: mode,
},
Peer: peer,
Data: data,
},
}
}
func(packet ChannelPeerPacket) MarshalBinary() ([]byte, error) {
header, err := packet.ChannelHeader.MarshalBinary()
if err != nil {
return nil, err
}
data := append(header, packet.Peer[:]...)
return append(data, packet.Data...), nil
}
func ParseChannelPeerPacket(data []byte) (ChannelPeerPacket, error) {
if len(data) < CHANNEL_PEER_LEN {
return ChannelPeerPacket{}, fmt.Errorf("Not enough bytes to parse ServerChannelPacket: %d/%d", len(data), ID_LENGTH)
}
header, err := ParseChannelHeader(data)
if err != nil {
return ChannelPeerPacket{}, err
}
return ChannelPeerPacket{
ChannelHeader: header,
Peer: PeerID(data[CHANNEL_HEADER_LEN:]),
Data: data[CHANNEL_PEER_LEN:],
}, nil
}
type ChannelDataPacket struct {
ChannelHeader
Data []byte
}
func NewChannelDataPacket(channel ChannelID, mode ModeID, data []byte) *Packet {
return &Packet{
Type: PACKET_CHANNEL_DATA,
Payload: ChannelDataPacket{
ChannelHeader: ChannelHeader{
Channel: channel,
Mode: mode,
},
Data: data,
},
}
}
func(packet ChannelDataPacket) MarshalBinary() ([]byte, error) {
header, err := packet.ChannelHeader.MarshalBinary()
if err != nil {
return nil, err
}
return append(header, packet.Data...), nil
}
func ParseChannelDataPacket(data []byte) (ChannelDataPacket, error) {
if len(data) < CHANNEL_HEADER_LEN {
return ChannelDataPacket{}, fmt.Errorf("Not enough data to parse ChannelDataPacket")
}
header, err := ParseChannelHeader(data)
if err != nil {
return ChannelDataPacket{}, nil
}
return ChannelDataPacket{
ChannelHeader: header,
Data: data[CHANNEL_HEADER_LEN:],
}, nil }, nil
} }

@ -18,6 +18,8 @@ const (
SERVER_SEND_BUFFER_SIZE = 2048 SERVER_SEND_BUFFER_SIZE = 2048
) )
type RoleID uint32
type ServerSession struct { type ServerSession struct {
Session Session
LastSeen time.Time LastSeen time.Time
@ -40,6 +42,8 @@ type Server struct {
channels_lock sync.RWMutex channels_lock sync.RWMutex
channels map[ChannelID]*Channel channels map[ChannelID]*Channel
peers map[PeerID][]RoleID
} }
func NewServer(key ed25519.PrivateKey) (*Server, error) { func NewServer(key ed25519.PrivateKey) (*Server, error) {
@ -64,6 +68,8 @@ func NewServer(key ed25519.PrivateKey) (*Server, error) {
sessions: map[SessionID]*ServerSession{}, sessions: map[SessionID]*ServerSession{},
channels: map[ChannelID]*Channel{}, channels: map[ChannelID]*Channel{},
peers: map[PeerID][]RoleID{},
} }
server.active.Store(false) server.active.Store(false)
return server, nil return server, nil
@ -83,10 +89,6 @@ func(server *Server) RemoveChannel(id ChannelID) error {
} }
func(server *Server) AddChannel(id ChannelID, modes ...Mode) error { func(server *Server) AddChannel(id ChannelID, modes ...Mode) error {
if id == RootChannelID {
return fmt.Errorf("Cannot use root channel ID as real channel")
}
server.channels_lock.Lock() server.channels_lock.Lock()
defer server.channels_lock.Unlock() defer server.channels_lock.Unlock()
@ -111,7 +113,9 @@ func(server *Server) AddChannel(id ChannelID, modes ...Mode) error {
} }
server.channels[id] = &Channel{ server.channels[id] = &Channel{
id: id,
modes: mode_map, modes: mode_map,
sessions: []SessionID{},
} }
return nil return nil
@ -144,7 +148,7 @@ func(server *Server) listen_udp() {
for true { for true {
read, from, err := server.connection.ReadFromUDP(buf[:]) read, from, err := server.connection.ReadFromUDP(buf[:])
if err == nil { if err == nil {
var packet_type PacketType = PacketType(buf[0]) var packet_type SessionPacketType = SessionPacketType(buf[0])
switch packet_type { switch packet_type {
case SESSION_OPEN: case SESSION_OPEN:
session_open, ecdh_private, err := NewSessionOpen(server.key) session_open, ecdh_private, err := NewSessionOpen(server.key)
@ -221,24 +225,45 @@ func(server *Server) listen_udp() {
server.Log("SESSION_DATA_IN(%s) parse error - %s", session.ID, err) server.Log("SESSION_DATA_IN(%s) parse error - %s", session.ID, err)
} }
if packet.Channel == RootChannelID { switch packet := packet.(type) {
// TODO process commands on the root channel case ChannelDataPacket:
} else {
var result []SendPacket = nil var result []SendPacket = nil
server.channels_lock.RLock() server.channels_lock.RLock()
channel, exists := server.channels[packet.Channel] channel, exists := server.channels[packet.Channel]
if exists == true { if exists == true {
mode, exists := channel.modes[packet.Mode] result = channel.Data(&session.Session, packet.Mode, packet.Data)
if exists == true {
result = mode.Process(&session.Session, packet)
}
} }
server.channels_lock.RUnlock() server.channels_lock.RUnlock()
if result != nil { if exists == false {
server.Log("Packet for unknown channel %d", packet.Channel)
} else if len(result) > 0 {
//TODO: handle overflow //TODO: handle overflow
server.send_packets<-result server.send_packets<-result
} }
case ChannelCommandPacket:
var result []SendPacket = nil
server.channels_lock.RLock()
channel, exists := server.channels[packet.Channel]
if exists == true {
result, err = channel.Command(&session.Session, packet.Mode, packet.Command, packet.Data)
}
server.channels_lock.RUnlock()
if exists == false {
server.Log("Packet for unknown channel %d", packet.Channel)
} else if err != nil {
server.Log("Error processing %+v - %s", packet, err)
} else if len(result) > 0 {
//TODO: handle overflow
server.send_packets<-result
}
default:
server.Log("Unhandled packet type from session %s - 0x%02x", session.ID, err)
} }
} }

@ -34,7 +34,7 @@ type Session struct {
iv_generator mrand.Source64 iv_generator mrand.Source64
} }
type PacketType uint8 type SessionPacketType uint8
const ( const (
ID_LENGTH = 16 ID_LENGTH = 16
IV_LENGTH = aes.BlockSize IV_LENGTH = aes.BlockSize
@ -51,7 +51,7 @@ const (
/* /*
pnyx session packets pnyx session packets
*/ */
SESSION_OPEN PacketType = iota SESSION_OPEN SessionPacketType = iota
SESSION_CONNECT SESSION_CONNECT
SESSION_CLOSE SESSION_CLOSE
SESSION_DATA SESSION_DATA