pnyx/channel.go

213 lines
6.1 KiB
Go

package pnyx
import (
"fmt"
"slices"
"sync/atomic"
"github.com/google/uuid"
)
type ChannelID byte
const (
MODE_CHANNEL ModeID = iota
MODE_RAW
MODE_AUDIO
AUDIO_SET_SAMPLE_RATE = iota
AUDIO_GET_SAMPLE_RATE
CHANNEL_COMMAND_BUFFER_SIZE int = 2048
CHANNEL_CLOSE_BUFFER_SIZE int = 100
)
type ModeID uint8
type CommandID uint8
type Permission string
type Channel struct {
Commands chan SessionChannelCommand
ClosedSessions chan *ServerSession
Modes map[ModeID]*atomic.Value
Members atomic.Value
}
func(channel *Channel) update_state() {
for true {
select {
case session := <-channel.ClosedSessions:
members := channel.Members.Load().([]*ServerSession)
idx := slices.Index(members, session)
if idx != -1 {
new_members := make([]*ServerSession, len(members) - 1)
copy(new_members, members[:idx])
copy(new_members[idx:], members[idx+1:])
channel.Members.Store(new_members)
for _, mode_val := range(channel.Modes) {
mode := mode_val.Load().(Mode)
mode_val.Store(mode.Leave(session))
}
}
case incoming := <-channel.Commands:
if incoming.Packet == nil {
break
} else if incoming.Session.active.Load() == false {
continue
}
command := incoming.Packet
if command.Mode == MODE_CHANNEL {
switch command.Command {
case CHANNEL_COMMAND_JOIN:
members := channel.Members.Load().([]*ServerSession)
if slices.Contains(members, incoming.Session) == false {
new_members := make([]*ServerSession, len(members) + 1)
copy(new_members, members)
new_members[len(members)] = incoming.Session
channel.Members.Store(new_members)
for _, mode_val := range(channel.Modes) {
mode := mode_val.Load().(Mode)
mode_val.Store(mode.Join(incoming.Session))
}
}
case CHANNEL_COMMAND_LEAVE:
members := channel.Members.Load().([]*ServerSession)
idx := slices.Index(members, incoming.Session)
if idx != -1 {
new_members := make([]*ServerSession, len(members) - 1)
copy(new_members, members[:idx])
copy(new_members[idx:], members[idx+1:])
channel.Members.Store(new_members)
for _, mode_val := range(channel.Modes) {
mode := mode_val.Load().(Mode)
mode_val.Store(mode.Leave(incoming.Session))
}
}
}
} else {
mode, has_mode := channel.Modes[command.Mode]
if has_mode {
members := channel.Members.Load().([]*ServerSession)
mode_val := mode.Load().(Mode)
new_mode := mode_val.Command(incoming.Session, command.Command, command.ReqID, command.Channel, members, command.Data)
mode.Store(new_mode)
}
}
}
}
}
func NewChannel(modes map[ModeID]Mode) (*Channel, error) {
initial_modes := map[ModeID]*atomic.Value{}
for mode_id, mode := range(modes) {
if mode_id == MODE_CHANNEL {
return nil, fmt.Errorf("Cannot create a channel with MODE_CHANNEL(0x%02x) mode", MODE_CHANNEL)
}
initial_modes[mode_id] = new(atomic.Value)
initial_modes[mode_id].Store(mode)
}
channel := &Channel{
Commands: make(chan SessionChannelCommand, CHANNEL_COMMAND_BUFFER_SIZE),
ClosedSessions: make(chan *ServerSession, CHANNEL_CLOSE_BUFFER_SIZE),
Modes: initial_modes,
}
channel.Members.Store([]*ServerSession{})
go channel.update_state()
return channel, nil
}
type Mode interface {
Join(session *ServerSession) Mode
Leave(session *ServerSession) Mode
Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) Mode
Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte)
}
func multiplex_without_sender(origin SessionID, packet *Packet, sessions []*ServerSession) {
for _, session := range(sessions) {
if session.ID == origin {
continue
}
session.Send(packet)
}
}
func multiplex(packet *Packet, sessions []*ServerSession) {
for _, session := range(sessions) {
session.Send(packet)
}
}
type RawMode struct {
}
func(mode RawMode) Join(session *ServerSession) Mode {
return mode
}
func(mode RawMode) Leave(session *ServerSession) Mode {
return mode
}
func(mode RawMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) Mode {
return mode
}
func(mode RawMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) {
new_packet := NewPeerPacket(session.Peer, channel_id, MODE_RAW, data)
multiplex_without_sender(session.ID, new_packet, members)
}
type SampleRate byte
const (
SAMPLE_RATE_UNSET SampleRate = 0xFF
SAMPLE_RATE_24KHZ = 0x01
SAMPLE_RATE_48KHZ = 0x02
)
type AudioMode struct {
SampleRate SampleRate
}
func(mode AudioMode) Join(session *ServerSession) Mode {
return mode
}
func(mode AudioMode) Leave(session *ServerSession) Mode {
return mode
}
func(mode AudioMode) Command(session *ServerSession, command byte, request_id uuid.UUID, channel_id ChannelID, members []*ServerSession, data []byte) Mode {
switch command {
case AUDIO_SET_SAMPLE_RATE:
if len(data) == 1 {
switch SampleRate(data[0]) {
case SAMPLE_RATE_24KHZ:
fallthrough
case SAMPLE_RATE_48KHZ:
mode.SampleRate = SampleRate(data[0])
update_packet := NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, data)
multiplex(update_packet, members)
}
}
case AUDIO_GET_SAMPLE_RATE:
session.OutgoingPackets <- NewChannelCommandPacket(request_id, channel_id, MODE_AUDIO, AUDIO_SET_SAMPLE_RATE, []byte{byte(mode.SampleRate)})
}
return mode
}
func(mode AudioMode) Data(session *ServerSession, channel_id ChannelID, members []*ServerSession, data []byte) {
new_packet := NewPeerPacket(session.Peer, channel_id, MODE_AUDIO, data)
multiplex_without_sender(session.ID, new_packet, members)
}