Fixed PeerPacket serialization issue

live
noah metz 2024-04-16 15:34:53 -06:00
parent c3f38ef089
commit af8ea403cd
4 changed files with 29 additions and 36 deletions

@ -106,13 +106,13 @@ func multiplex_without_sender(origin SessionID, packet *Packet, sessions []*Serv
continue continue
} }
session.OutgoingPackets <- packet session.Send(packet)
} }
} }
func multiplex(packet *Packet, sessions []*ServerSession) { func multiplex(packet *Packet, sessions []*ServerSession) {
for _, session := range(sessions) { for _, session := range(sessions) {
session.OutgoingPackets <- packet session.Send(packet)
} }
} }

@ -16,8 +16,7 @@ var decoders = map[pnyx.PeerID]chan[]byte{}
var encoder *opus.Encoder var encoder *opus.Encoder
var sample_rate int = 0 var sample_rate int = 0
var mic = make(chan []byte, 0) var mic = make(chan []byte, 0)
var speaker = make(chan []int16, 0) var speaker = make(chan []int16, 1)
var audio_data = make(chan []int16, 0)
func set_sample_rate(new_sample_rate int) error { func set_sample_rate(new_sample_rate int) error {
fmt.Printf("Setting sample rate to %d\n", new_sample_rate) fmt.Printf("Setting sample rate to %d\n", new_sample_rate)
@ -62,7 +61,10 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate
if err != nil { if err != nil {
panic(err) panic(err)
} }
audio_data <- pcm select {
case speaker <- pcm:
default:
}
case data := <-decode_chan: case data := <-decode_chan:
missed = 0 missed = 0
@ -74,39 +76,21 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate
if err != nil { if err != nil {
panic(err) panic(err)
} }
audio_data <- pcm[:written] select {
} case speaker <- pcm[:written]:
} default:
}
}
func mixer(data_chan chan []int16, speaker_chan chan []int16) {
var samples []int16 = nil
for true {
if samples == nil {
samples = <- data_chan
} else {
select {
case new_samples := <- data_chan:
for i, sample := range(new_samples) {
samples[i] += sample
} }
case speaker_chan <- samples:
samples = nil
} }
} }
} }
} }
func main() { func main() {
ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil) ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
go mixer(audio_data, speaker)
defer ctx.Free() defer ctx.Free()
defer ctx.Uninit() defer ctx.Uninit()

@ -136,7 +136,7 @@ func(packet ChannelHeader) MarshalBinary() ([]byte, error) {
func ParseChannelHeader(data []byte) (ChannelHeader, error) { func ParseChannelHeader(data []byte) (ChannelHeader, error) {
if len(data) < 2 { if len(data) < 2 {
return ChannelHeader{}, fmt.Errorf("Not enough bytes to parse ChannelPacket(%d/%d)", len(data), 6) return ChannelHeader{}, fmt.Errorf("Not enough bytes to parse ChannelPacket(%d/%d)", len(data), 2)
} }
return ChannelHeader{ return ChannelHeader{
@ -227,8 +227,11 @@ func(packet PeerPacket) MarshalBinary() ([]byte, error) {
return nil, err return nil, err
} }
data := append(header, packet.Peer[:]...) data := make([]byte, CHANNEL_PEER_LEN + len(packet.Data))
return append(data, packet.Data...), nil copy(data, header)
copy(data[CHANNEL_HEADER_LEN:], packet.Peer[:])
copy(data[CHANNEL_PEER_LEN:], packet.Data)
return data, nil
} }
func ParsePeerPacket(data []byte) (PeerPacket, error) { func ParsePeerPacket(data []byte) (PeerPacket, error) {
@ -244,7 +247,7 @@ func ParsePeerPacket(data []byte) (PeerPacket, error) {
return PeerPacket{ return PeerPacket{
ChannelHeader: header, ChannelHeader: header,
Peer: PeerID(data[CHANNEL_HEADER_LEN:]), Peer: PeerID(data[CHANNEL_HEADER_LEN:]),
Data: data[CHANNEL_PEER_LEN:], Data: data[CHANNEL_HEADER_LEN + PEER_ID_LENGTH:],
}, nil }, nil
} }

@ -23,12 +23,19 @@ type RoleID uint32
type ServerSession struct { type ServerSession struct {
Session Session
active atomic.Bool
LastSeen time.Time LastSeen time.Time
IncomingPackets chan[]byte IncomingPackets chan[]byte
OutgoingPackets chan Payload OutgoingPackets chan Payload
Channels []ChannelID Channels []ChannelID
} }
func(session *ServerSession) Send(payload Payload) {
if session.active.Load() {
session.OutgoingPackets <- payload
}
}
type Server struct { type Server struct {
key ed25519.PrivateKey key ed25519.PrivateKey
active atomic.Bool active atomic.Bool
@ -90,7 +97,7 @@ const SESSION_BUFFER_SIZE = 256
func handle_session_outgoing(session *ServerSession, server *Server) { func handle_session_outgoing(session *ServerSession, server *Server) {
server.Log("Starting session outgoing goroutine %s", session.ID) server.Log("Starting session outgoing goroutine %s", session.ID)
for true { for session.active.Load() {
packet := <- session.OutgoingPackets packet := <- session.OutgoingPackets
if packet == nil { if packet == nil {
break break
@ -133,22 +140,19 @@ type SessionChannelCommand struct {
func handle_session_incoming(session *ServerSession, server *Server) { func handle_session_incoming(session *ServerSession, server *Server) {
server.Log("Starting session incoming goroutine %s", session.ID) server.Log("Starting session incoming goroutine %s", session.ID)
ping_timer := time.After(SESSION_PING_TIME) ping_timer := time.After(SESSION_PING_TIME)
running := true for session.active.Load() {
for running {
select { select {
case <- ping_timer: case <- ping_timer:
if time.Now().Add(-1*SESSION_TIMEOUT).Compare(session.LastSeen) != 1 { if time.Now().Add(-1*SESSION_TIMEOUT).Compare(session.LastSeen) != 1 {
server.sessions_lock.Lock() server.sessions_lock.Lock()
server.close_session(session) server.close_session(session)
server.sessions_lock.Unlock() server.sessions_lock.Unlock()
running = false
} else { } else {
session.OutgoingPackets <- NewPingPacket() session.OutgoingPackets <- NewPingPacket()
ping_timer = time.After(SESSION_PING_TIME) ping_timer = time.After(SESSION_PING_TIME)
} }
case encrypted := <- session.IncomingPackets: case encrypted := <- session.IncomingPackets:
if encrypted == nil { if encrypted == nil {
running = false
continue continue
} }
@ -185,7 +189,7 @@ func handle_session_incoming(session *ServerSession, server *Server) {
if slices.Contains(members, session) { if slices.Contains(members, session) {
mode, has_mode := channel.Modes[packet.Mode] mode, has_mode := channel.Modes[packet.Mode]
if has_mode { if has_mode {
mode.Load().(Mode).Data(session, packet.Channel, members, data) mode.Load().(Mode).Data(session, packet.Channel, members, packet.Data)
} }
} }
} else { } else {
@ -214,6 +218,7 @@ func(server *Server) handle_session_open(client_session_open []byte, from *net.U
IncomingPackets: make(chan[]byte, SESSION_BUFFER_SIZE), IncomingPackets: make(chan[]byte, SESSION_BUFFER_SIZE),
OutgoingPackets: make(chan Payload, SESSION_BUFFER_SIZE), OutgoingPackets: make(chan Payload, SESSION_BUFFER_SIZE),
} }
server.sessions[session.ID].active.Store(true)
server.sessions_lock.Unlock() server.sessions_lock.Unlock()
go handle_session_outgoing(server.sessions[session.ID], server) go handle_session_outgoing(server.sessions[session.ID], server)
@ -362,6 +367,7 @@ func(server *Server) listen_udp() {
} }
func(server *Server) close_session(session *ServerSession) { func(server *Server) close_session(session *ServerSession) {
session.active.Store(false)
close(session.IncomingPackets) close(session.IncomingPackets)
close(session.OutgoingPackets) close(session.OutgoingPackets)
delete(server.sessions, session.ID) delete(server.sessions, session.ID)