From 17990e35d90d5a77d01b19911f03ae857313bc4b Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Tue, 9 Apr 2024 17:08:46 -0600 Subject: [PATCH] Protocol changes to support more packet types --- channel.go | 88 ++++++++++++------ client.go | 2 +- cmd/client/main.go | 110 ++++++++++------------- cmd/server/main.go | 7 +- notes.txt | 6 ++ packet.go | 216 +++++++++++++++++++++++++++++++++++++++++---- server.go | 51 ++++++++--- session.go | 4 +- 8 files changed, 355 insertions(+), 129 deletions(-) diff --git a/channel.go b/channel.go index 3afbb48..7f756dc 100644 --- a/channel.go +++ b/channel.go @@ -2,25 +2,70 @@ package pnyx import ( "slices" + "fmt" ) type ChannelID uint32 const ( - RootChannelID ChannelID = 0 + MODE_CHANNEL ModeID = iota + MODE_RAW - MODE_RAW ModeID = iota + CHANNEL_JOIN byte = iota + CHANNEL_LEAVE + CHANNEL_MEMBERS - MODE_COMMAND_DATA byte = 0x00 - MODE_COMMAND_JOIN = 0x01 - MODE_COMMAND_LEAVE = 0x02 + RAW_DATA = iota ) type ModeID uint8 type CommandID uint8 +type Permission string type Channel struct { + id ChannelID modes map[ModeID]Mode + sessions []SessionID +} + +func(channel *Channel) Data(session *Session, mode ModeID, data []byte) []SendPacket { + m, has_mode := channel.modes[mode] + if has_mode == false { + return nil + } else { + return m.Data(session, channel, data) + } +} + +func(channel *Channel) Command(session *Session, mode ModeID, command byte, data []byte) ([]SendPacket, error) { + if mode == MODE_CHANNEL { + switch command { + case CHANNEL_JOIN: + if slices.Contains(channel.sessions, session.ID) { + return nil, fmt.Errorf("Session %s already in channel %d, can't join", session.ID, channel.id) + } else { + channel.sessions = append(channel.sessions, session.ID) + return nil, nil + } + case CHANNEL_LEAVE: + idx := slices.Index(channel.sessions, session.ID) + if idx == -1 { + return nil, fmt.Errorf("Session %s not in channel %d, can't leave", session.ID, channel.id) + } else { + channel.sessions = slices.Delete(channel.sessions, idx, idx+1) + return nil, nil + } + default: + return nil, fmt.Errorf("Unknown MODE_CHANNEL command: 0x%02x", command) + } + } else { + mode, has_mode := channel.modes[mode] + if has_mode == false { + return nil, fmt.Errorf("Channel has no mode 0x%02x", mode) + } else { + return mode.Command(session, channel, command, data) + } + } } type SendPacket struct { @@ -30,7 +75,8 @@ type SendPacket struct { type Mode interface { // Process takes incoming packets from a session and returns a list of packets to send - Process(*Session, *Packet) []SendPacket + Command(session *Session, channel *Channel, command byte, data []byte) ([]SendPacket, error) + Data(session *Session, channel *Channel, data []byte) []SendPacket } func multiplex(session *Session, packet *Packet, sessions []SessionID) []SendPacket { @@ -50,32 +96,16 @@ func multiplex(session *Session, packet *Packet, sessions []SessionID) []SendPac } type RawMode struct { - Sessions []SessionID } -func(mode *RawMode) Process(session *Session, packet *Packet) []SendPacket { - switch packet.Command { - case MODE_COMMAND_JOIN: - if slices.Contains(mode.Sessions, session.ID) == false { - mode.Sessions = append(mode.Sessions, session.ID) - } - - case MODE_COMMAND_LEAVE: - idx := slices.Index(mode.Sessions, session.ID) - if idx != -1 { - mode.Sessions = slices.Delete(mode.Sessions, idx, idx+1) - } +func(mode *RawMode) Command(session *Session, channel *Channel, command byte, data []byte) ([]SendPacket, error) { + return nil, fmt.Errorf("unknown raw mode command 0x%02x", command) +} - case MODE_COMMAND_DATA: - if slices.Contains(mode.Sessions, session.ID) { - new_packet := &Packet{ - Channel: packet.Channel, - Mode: packet.Mode, - Command: MODE_COMMAND_DATA, - Data: append(session.Peer[:], packet.Data...), - } - return multiplex(session, new_packet, mode.Sessions) - } +func(mode *RawMode) Data(session *Session, channel *Channel, data []byte) []SendPacket { + if slices.Contains(channel.sessions, session.ID) { + new_packet := NewChannelPeerPacket(session.Peer, channel.id, MODE_RAW, data) + return multiplex(session, new_packet, channel.sessions) } return nil } diff --git a/client.go b/client.go index 77ac7ce..5a18a28 100644 --- a/client.go +++ b/client.go @@ -107,7 +107,7 @@ func NewClient(key ed25519.PrivateKey, remote string) (*Client, error) { }, nil } -func(client *Client) Send(packet Packet) error { +func(client *Client) Send(packet *Packet) error { client.ConnectionLock.Lock() defer client.ConnectionLock.Unlock() diff --git a/cmd/client/main.go b/cmd/client/main.go index 245ece9..66f462a 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -180,83 +180,69 @@ func main() { fmt.Printf("ParsePacket Error %s - %x\n", err, data) continue } - peer := pnyx.PeerID(packet.Data[0:16]) - if packet.Channel == pnyx.ChannelID(1) { - decode_chan, exists := decoders[peer] - if exists == false { - decode_chan = make(chan[]byte, 1000) - decoders[peer] = decode_chan - - go func(decode_chan chan[]byte){ - decoder, err := opus.NewDecoder(48000, 1) - if err != nil { - panic(err) - } - - for true { - select { - case <-time.After(20*time.Millisecond): - pcm := make([]int16, 960) - err := decoder.DecodePLC(pcm) - if err != nil { - panic(err) - } - pcm_bytes := make([]byte, 960*2) - for i := 0; i < 960; i++ { - binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) - } - speaker <- pcm_bytes - case data := <-decode_chan: - pcm := make([]int16, 960) - written, err := decoder.Decode(data, pcm) - if err != nil { - panic(err) - } + switch packet := packet.(type) { + case pnyx.ChannelPeerPacket: + if packet.Channel == pnyx.ChannelID(0) { + decode_chan, exists := decoders[packet.Peer] + if exists == false { + decode_chan = make(chan[]byte, 1000) + decoders[packet.Peer] = decode_chan + + go func(decode_chan chan[]byte){ + decoder, err := opus.NewDecoder(48000, 1) + if err != nil { + panic(err) + } - pcm_bytes := make([]byte, written*2) - for i := 0; i < written; i++ { - binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) + for true { + select { + case <-time.After(20*time.Millisecond): + pcm := make([]int16, 960) + err := decoder.DecodePLC(pcm) + if err != nil { + panic(err) + } + + pcm_bytes := make([]byte, 960*2) + for i := 0; i < 960; i++ { + binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) + } + speaker <- pcm_bytes + case data := <-decode_chan: + pcm := make([]int16, 960) + written, err := decoder.Decode(data, pcm) + if err != nil { + panic(err) + } + + pcm_bytes := make([]byte, written*2) + for i := 0; i < written; i++ { + binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) + } + speaker <- pcm_bytes } - speaker <- pcm_bytes } - } - - }(decoders[peer]) + + }(decoders[packet.Peer]) + } + decode_chan <- packet.Data } - decode_chan <- packet.Data[16:] + default: + fmt.Printf("Unhandled packet type: %s\n", packet) } } }() - err = client.Send(pnyx.Packet{ - Channel: pnyx.ChannelID(1), - Mode: pnyx.MODE_RAW, - Command: pnyx.MODE_COMMAND_JOIN, - Data: nil, - }) - if err != nil { - panic(err) - } - - err = client.Send(pnyx.Packet{ - Channel: pnyx.ChannelID(2), - Mode: pnyx.MODE_RAW, - Command: pnyx.MODE_COMMAND_JOIN, - Data: nil, - }) + join_packet, _ := pnyx.NewChannelCommandPacket(pnyx.ChannelID(0), pnyx.MODE_CHANNEL, pnyx.CHANNEL_JOIN, nil) + err = client.Send(join_packet) if err != nil { panic(err) } for true { data := <- mic - err = client.Send(pnyx.Packet{ - Channel: pnyx.ChannelID(1), - Mode: pnyx.MODE_RAW, - Command: pnyx.MODE_COMMAND_DATA, - Data: data, - }) + err = client.Send(pnyx.NewChannelDataPacket(pnyx.ChannelID(0), pnyx.MODE_RAW, data)) if err != nil { panic(err) } diff --git a/cmd/server/main.go b/cmd/server/main.go index 1800aa4..42b3882 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -23,12 +23,7 @@ func main() { panic(err) } - err = server.AddChannel(pnyx.ChannelID(1), &pnyx.RawMode{}) - if err != nil { - panic(err) - } - - err = server.AddChannel(pnyx.ChannelID(2), &pnyx.RawMode{}) + err = server.AddChannel(pnyx.ChannelID(0), &pnyx.RawMode{}) if err != nil { panic(err) } diff --git a/notes.txt b/notes.txt index 11c74e4..ec287af 100644 --- a/notes.txt +++ b/notes.txt @@ -173,3 +173,9 @@ Also this allows for the permission map to look like: } } } + +------------------------------------------------------------------------------- + +What would the experience be like if channels were nested? + + diff --git a/packet.go b/packet.go index 193541e..bb9ecf5 100644 --- a/packet.go +++ b/packet.go @@ -1,37 +1,221 @@ package pnyx import ( - "encoding/binary" - "fmt" + "encoding" + "encoding/binary" + "fmt" + + "github.com/google/uuid" ) +type PacketType uint8 +const ( + PACKET_CHANNEL_COMMAND PacketType = iota + PACKET_CHANNEL_DATA + PACKET_CHANNEL_PEER + + CHANNEL_HEADER_LEN int = 5 + CHANNEL_COMMAND_LEN = CHANNEL_HEADER_LEN + COMMAND_LENGTH + ID_LENGTH + CHANNEL_PEER_LEN = CHANNEL_HEADER_LEN + ID_LENGTH +) + +type Payload interface { + encoding.BinaryMarshaler +} + type Packet struct { + Type PacketType + Payload Payload +} + +func(packet Packet) MarshalBinary() ([]byte, error) { + payload, err := packet.Payload.MarshalBinary() + if err != nil { + return nil, err + } + + return append([]byte{byte(packet.Type)}, payload...), nil +} + +func ParsePacket(data []byte) (Payload, error) { + if len(data) < 1 { + return nil, fmt.Errorf("Packet too short to parse - %d/1", len(data)) + } + + switch PacketType(data[0]) { + case PACKET_CHANNEL_DATA: + return ParseChannelDataPacket(data[1:]) + case PACKET_CHANNEL_COMMAND: + return ParseChannelCommandPacket(data[1:]) + case PACKET_CHANNEL_PEER: + return ParseChannelPeerPacket(data[1:]) + default: + return nil, fmt.Errorf("Don't know how to parse packet type 0x%02x", data[0]) + } +} + +type ChannelHeader struct { Channel ChannelID Mode ModeID +} + +func(packet ChannelHeader) MarshalBinary() ([]byte, error) { + p := binary.BigEndian.AppendUint32(nil, uint32(packet.Channel)) + return append(p, byte(packet.Mode)), nil +} + +func ParseChannelHeader(data []byte) (ChannelHeader, error) { + if len(data) < 5 { + return ChannelHeader{}, fmt.Errorf("Not enough bytes to parse ChannelPacket(%d/%d)", len(data), 6) + } + + return ChannelHeader{ + Channel: ChannelID(binary.BigEndian.Uint32(data)), + Mode: ModeID(data[4]), + }, nil +} + +type ChannelCommandPacket struct { + ChannelHeader Command byte + ReqID uuid.UUID Data []byte } -func(packet Packet) String() string { - return fmt.Sprintf("{Channel: %x, Mode: %x, Data: %x}", packet.Channel, packet.Mode, packet.Data) +func NewChannelCommandPacket(channel ChannelID, mode ModeID, command byte, data []byte) (*Packet, uuid.UUID) { + request_id := uuid.New() + return &Packet{ + Type: PACKET_CHANNEL_COMMAND, + Payload: ChannelCommandPacket{ + ChannelHeader: ChannelHeader{ + Channel: channel, + Mode: mode, + }, + Command: command, + ReqID: request_id, + Data: data, + }, + }, request_id } -func(packet Packet) MarshalBinary() ([]byte, error) { - p := binary.BigEndian.AppendUint32(nil, uint32(packet.Channel)) - p = append(p, byte(packet.Mode)) - p = append(p, byte(packet.Command)) - return append(p, packet.Data...), nil +func(packet ChannelCommandPacket) MarshalBinary() ([]byte, error) { + header, err := packet.ChannelHeader.MarshalBinary() + if err != nil { + return nil, err + } + + data := append(header, packet.Command) + data = append(data, packet.ReqID[:]...) + return append(data, packet.Data...), nil } -func ParsePacket(data []byte) (*Packet, error) { - if len(data) < 6 { - return nil, fmt.Errorf("Not enough bytes to parse Packet(%d/%d)", len(data), 6) +func ParseChannelCommandPacket(data []byte) (ChannelCommandPacket, error) { + if len(data) < CHANNEL_COMMAND_LEN { + return ChannelCommandPacket{}, fmt.Errorf("Not enough data to decode channel command packet %d/%d", len(data), CHANNEL_COMMAND_LEN) + } + + header, err := ParseChannelHeader(data[:CHANNEL_HEADER_LEN]) + if err != nil { + return ChannelCommandPacket{}, err } + command := data[CHANNEL_HEADER_LEN] + request_id := uuid.UUID(data[CHANNEL_HEADER_LEN+COMMAND_LENGTH:]) + return ChannelCommandPacket{ + ChannelHeader: header, + Command: command, + ReqID: request_id, + Data: data[CHANNEL_COMMAND_LEN:], + }, nil +} + +type ChannelPeerPacket struct { + ChannelHeader + Peer PeerID + Data []byte +} + +func NewChannelPeerPacket(peer PeerID, channel ChannelID, mode ModeID, data []byte) *Packet { return &Packet{ - Channel: ChannelID(binary.BigEndian.Uint32(data)), - Mode: ModeID(data[4]), - Command: data[5], - Data: data[6:], + Type: PACKET_CHANNEL_PEER, + Payload: ChannelPeerPacket{ + ChannelHeader: ChannelHeader{ + Channel: channel, + Mode: mode, + }, + Peer: peer, + Data: data, + }, + } +} + +func(packet ChannelPeerPacket) MarshalBinary() ([]byte, error) { + header, err := packet.ChannelHeader.MarshalBinary() + if err != nil { + return nil, err + } + + data := append(header, packet.Peer[:]...) + return append(data, packet.Data...), nil +} + +func ParseChannelPeerPacket(data []byte) (ChannelPeerPacket, error) { + if len(data) < CHANNEL_PEER_LEN { + return ChannelPeerPacket{}, fmt.Errorf("Not enough bytes to parse ServerChannelPacket: %d/%d", len(data), ID_LENGTH) + } + + header, err := ParseChannelHeader(data) + if err != nil { + return ChannelPeerPacket{}, err + } + + return ChannelPeerPacket{ + ChannelHeader: header, + Peer: PeerID(data[CHANNEL_HEADER_LEN:]), + Data: data[CHANNEL_PEER_LEN:], + }, nil +} + + +type ChannelDataPacket struct { + ChannelHeader + Data []byte +} + +func NewChannelDataPacket(channel ChannelID, mode ModeID, data []byte) *Packet { + return &Packet{ + Type: PACKET_CHANNEL_DATA, + Payload: ChannelDataPacket{ + ChannelHeader: ChannelHeader{ + Channel: channel, + Mode: mode, + }, + Data: data, + }, + } +} + +func(packet ChannelDataPacket) MarshalBinary() ([]byte, error) { + header, err := packet.ChannelHeader.MarshalBinary() + if err != nil { + return nil, err + } + + return append(header, packet.Data...), nil +} + +func ParseChannelDataPacket(data []byte) (ChannelDataPacket, error) { + if len(data) < CHANNEL_HEADER_LEN { + return ChannelDataPacket{}, fmt.Errorf("Not enough data to parse ChannelDataPacket") + } + + header, err := ParseChannelHeader(data) + if err != nil { + return ChannelDataPacket{}, nil + } + + return ChannelDataPacket{ + ChannelHeader: header, + Data: data[CHANNEL_HEADER_LEN:], }, nil } diff --git a/server.go b/server.go index 5a6ad5a..9e1f7e0 100644 --- a/server.go +++ b/server.go @@ -18,6 +18,8 @@ const ( SERVER_SEND_BUFFER_SIZE = 2048 ) +type RoleID uint32 + type ServerSession struct { Session LastSeen time.Time @@ -40,6 +42,8 @@ type Server struct { channels_lock sync.RWMutex channels map[ChannelID]*Channel + + peers map[PeerID][]RoleID } func NewServer(key ed25519.PrivateKey) (*Server, error) { @@ -64,6 +68,8 @@ func NewServer(key ed25519.PrivateKey) (*Server, error) { sessions: map[SessionID]*ServerSession{}, channels: map[ChannelID]*Channel{}, + + peers: map[PeerID][]RoleID{}, } server.active.Store(false) return server, nil @@ -83,10 +89,6 @@ func(server *Server) RemoveChannel(id ChannelID) error { } func(server *Server) AddChannel(id ChannelID, modes ...Mode) error { - if id == RootChannelID { - return fmt.Errorf("Cannot use root channel ID as real channel") - } - server.channels_lock.Lock() defer server.channels_lock.Unlock() @@ -111,7 +113,9 @@ func(server *Server) AddChannel(id ChannelID, modes ...Mode) error { } server.channels[id] = &Channel{ + id: id, modes: mode_map, + sessions: []SessionID{}, } return nil @@ -144,7 +148,7 @@ func(server *Server) listen_udp() { for true { read, from, err := server.connection.ReadFromUDP(buf[:]) if err == nil { - var packet_type PacketType = PacketType(buf[0]) + var packet_type SessionPacketType = SessionPacketType(buf[0]) switch packet_type { case SESSION_OPEN: session_open, ecdh_private, err := NewSessionOpen(server.key) @@ -221,24 +225,45 @@ func(server *Server) listen_udp() { server.Log("SESSION_DATA_IN(%s) parse error - %s", session.ID, err) } - if packet.Channel == RootChannelID { - // TODO process commands on the root channel - } else { + switch packet := packet.(type) { + case ChannelDataPacket: var result []SendPacket = nil + server.channels_lock.RLock() channel, exists := server.channels[packet.Channel] if exists == true { - mode, exists := channel.modes[packet.Mode] - if exists == true { - result = mode.Process(&session.Session, packet) - } + result = channel.Data(&session.Session, packet.Mode, packet.Data) } server.channels_lock.RUnlock() - if result != nil { + if exists == false { + server.Log("Packet for unknown channel %d", packet.Channel) + } else if len(result) > 0 { //TODO: handle overflow server.send_packets<-result } + + case ChannelCommandPacket: + var result []SendPacket = nil + + server.channels_lock.RLock() + channel, exists := server.channels[packet.Channel] + if exists == true { + result, err = channel.Command(&session.Session, packet.Mode, packet.Command, packet.Data) + } + server.channels_lock.RUnlock() + + if exists == false { + server.Log("Packet for unknown channel %d", packet.Channel) + } else if err != nil { + server.Log("Error processing %+v - %s", packet, err) + } else if len(result) > 0 { + //TODO: handle overflow + server.send_packets<-result + } + + default: + server.Log("Unhandled packet type from session %s - 0x%02x", session.ID, err) } } diff --git a/session.go b/session.go index ead97a3..2c6a438 100644 --- a/session.go +++ b/session.go @@ -34,7 +34,7 @@ type Session struct { iv_generator mrand.Source64 } -type PacketType uint8 +type SessionPacketType uint8 const ( ID_LENGTH = 16 IV_LENGTH = aes.BlockSize @@ -51,7 +51,7 @@ const ( /* pnyx session packets */ - SESSION_OPEN PacketType = iota + SESSION_OPEN SessionPacketType = iota SESSION_CONNECT SESSION_CLOSE SESSION_DATA