diff --git a/cmd/client/main.go b/cmd/client/main.go index c49b650..301616b 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -16,7 +16,8 @@ var decoders = map[pnyx.PeerID]chan[]byte{} var encoder *opus.Encoder var sample_rate int = 0 var mic = make(chan []byte, 0) -var speaker = make(chan []byte, 0) +var speaker = make(chan []int16, 0) +var audio_data = make(chan []int16, 0) func set_sample_rate(new_sample_rate int) error { sample_rate = new_sample_rate @@ -47,22 +48,26 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate } running := true + missed := 0 for running { select { case <-time.After(20*time.Millisecond): + missed += 1 + + if missed > 100 { + fmt.Printf("Missed 100 packets from %x, stopping stream until data received\n", peer_id) + decode_chan <- <- decode_chan + } + pcm := make([]int16, sample_rate/50) err := decoder.DecodePLC(pcm) if err != nil { panic(err) } - - pcm_bytes := make([]byte, sample_rate/50*2) - for i := 0; i < sample_rate/50; i++ { - binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) - } - speaker <- pcm_bytes + audio_data <- pcm case data := <-decode_chan: + missed = 0 if data == nil { running = false } else { @@ -71,25 +76,40 @@ func handle_peer_decode(peer_id pnyx.PeerID, decode_chan chan[]byte, sample_rate if err != nil { panic(err) } + audio_data <- pcm[:written] + } + } + } + fmt.Printf("Stopping decoder routine for %x with sample_rate %d\n", peer_id, sample_rate) +} - pcm_bytes := make([]byte, written*2) - for i := 0; i < written; i++ { - binary.LittleEndian.PutUint16(pcm_bytes[i*2:], uint16(pcm[i])) +func mixer(data_chan chan []int16, speaker_chan chan []int16) { + var samples []int16 = nil + for true { + if samples == nil { + samples = <- data_chan + } else { + select { + case new_samples := <- data_chan: + for i, sample := range(new_samples) { + samples[i] += sample } - - speaker <- pcm_bytes + case speaker_chan <- samples: + samples = nil } } } - fmt.Printf("Stopping decoder routine for %x with sample_rate %d\n", peer_id, sample_rate) } + func main() { ctx, err := malgo.InitContext(nil, malgo.ContextConfig{}, nil) if err != nil { panic(err) } + go mixer(audio_data, speaker) + defer ctx.Free() defer ctx.Uninit() @@ -153,8 +173,10 @@ func main() { onSendFrames := func(output_samples []byte, input_samples []byte, framecount uint32) { select { - case data := <- speaker: - copy(output_samples, data) + case pcm := <- speaker: + for i := 0; i < sample_rate/50; i++ { + binary.LittleEndian.PutUint16(output_samples[i*2:], uint16(pcm[i])) + } default: } }