Fix messages of size >16384 breaking

master
Liam Conway 2024-01-25 22:22:03 -07:00
parent 5089684c70
commit e08037619b
No known key found for this signature in database
1 changed files with 104 additions and 71 deletions

@ -353,7 +353,7 @@ struct BackendMessage {
} }
impl BackendMessage { impl BackendMessage {
fn from_bytes(bytes: Vec<u8>) -> Option<BackendMessage> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendMessage> {
if bytes.len() < 5 { if bytes.len() < 5 {
return None; return None;
} }
@ -413,7 +413,7 @@ impl BackendPacket {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<BackendPacket> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendPacket> {
if bytes.len() < BACKEND_PACKET_HEADER_SIZE { if bytes.len() < BACKEND_PACKET_HEADER_SIZE {
return None; return None;
} }
@ -476,7 +476,7 @@ impl ConnectMsg {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<ConnectMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<ConnectMsg> {
if bytes.len() < CONNECT_MSG_LEN { if bytes.len() < CONNECT_MSG_LEN {
return None; return None;
} }
@ -518,14 +518,14 @@ struct NoticeMsg {
} }
impl NoticeMsg { impl NoticeMsg {
fn from_bytes(bytes: Vec<u8>) -> Option<NoticeMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<NoticeMsg> {
if bytes.len() < 8 { if bytes.len() < 8 {
return None; return None;
} }
let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
match BackendMessage::from_bytes(bytes[8..].to_vec()) { match BackendMessage::from_bytes(&(bytes[8..].to_vec())) {
Some(message) => { Some(message) => {
match message.data.notice { match message.data.notice {
Some(notice) => Some(NoticeMsg{ Some(notice) => Some(NoticeMsg{
@ -554,7 +554,6 @@ struct TMClient {
time_offset: f64, time_offset: f64,
} }
const TCP_BUFFER_SIZE: usize = 10000;
impl TMClient { impl TMClient {
fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) { fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) {
let (work_tx, work_rx) = mpsc::channel(); let (work_tx, work_rx) = mpsc::channel();
@ -616,82 +615,116 @@ impl TMClient {
} }
} }
let mut incoming = [0; TCP_BUFFER_SIZE]; let mut header = [0; BACKEND_PACKET_HEADER_SIZE];
match self.stream.read(&mut incoming) { let mut read_bytes = 0;
Ok(read) => { let data_size: usize;
let data = incoming[0..read].to_vec(); match self.stream.read_exact(&mut header) {
match BackendPacket::from_bytes(data) { Ok(()) => {
match BackendPacket::from_bytes(&header.to_vec()) {
Some(packet) => { Some(packet) => {
let offset = packet.timestamp - get_float_time(); data_size = packet.size as usize;
self.time_offset = offset.clone(); log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
self.last_seq_num = packet.seq_num; },
match packet.msg_type { None => {
// Notice Message log::error!("Failed to parse BackendPacket header: {:?}", header);
4 => { return;
match NoticeMsg::from_bytes(packet.data.clone()) { },
Some(notice) => { }
log::debug!("Received notice: {:#?}", notice); },
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {
self.last_seq_num += 1; log::debug!("Resource temporarily unavailable, retrying later.");
match self.stream.write(&ack.as_bytes()) { return;
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), },
Err(error) => log::error!("ACK error: {:?}", error), Err(error) => {
} log::error!("Error reading header: {:?}", error);
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { return;
Ok(_) => log::debug!("Forwarded notice to callback engine"), },
Err(error) => log::error!("Notice forward error {:?}", error), }
}
}, let mut data = vec![0; BACKEND_PACKET_HEADER_SIZE + data_size];
None => log::error!("Notice parse error: {:?}", packet), data[..header.len()].copy_from_slice(&header);
while read_bytes < BACKEND_PACKET_HEADER_SIZE + data_size {
match self.stream.read(&mut data[read_bytes..]) {
Ok(read) => {
read_bytes += read;
log::debug!("Received {} bytes ({}/{})", read, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
},
Err(error) => {
log::error!("Error reading body data: {:?}", error);
},
}
}
match BackendPacket::from_bytes(&data) {
Some(packet) => {
let offset = packet.timestamp - get_float_time();
self.time_offset = offset.clone();
self.last_seq_num = packet.seq_num;
match packet.msg_type {
// Notice Message
4 => {
match NoticeMsg::from_bytes(&packet.data) {
Some(notice) => {
log::debug!("Received notice: {:#?}", notice);
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec());
self.last_seq_num += 1;
match self.stream.write(&ack.as_bytes()) {
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id),
Err(error) => log::error!("ACK error: {:?}", error),
}
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) {
Ok(_) => log::debug!("Forwarded notice to callback engine"),
Err(error) => log::error!("Notice forward error {:?}", error),
} }
}, },
// Response message None => log::error!("Notice parse error: {:?}", packet),
3 => { }
match BackendMessage::from_bytes(packet.data.clone()) { },
Some(message) => { // Response message
log::debug!("Received response: {:#?}", message); 3 => {
match BackendMessage::from_bytes(&packet.data) {
match self.responses.send((Box::new(message), offset)) { Some(message) => {
Ok(_) => log::debug!("Forwarded response to callback engine"), log::debug!("Received response: {:#?}", message);
Err(error) => log::error!("Response forward error {:?}", error),
} match self.responses.send((Box::new(message), offset)) {
}, Ok(_) => log::debug!("Forwarded response to callback engine"),
None => log::error!("BackendMessage parse error: {:?}", packet), Err(error) => log::error!("Response forward error {:?}", error),
} }
}, },
// Server Message None => log::error!("BackendMessage parse error: {:?}", packet),
2 => { }
match ConnectMsg::from_bytes(packet.data) { },
Some(welcome_msg) => { // Server Message
log::debug!("Received connect message: {:#?}", welcome_msg); 2 => {
if welcome_msg.pw_valid == 0 { match ConnectMsg::from_bytes(&packet.data) {
let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); Some(welcome_msg) => {
let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); log::debug!("Received connect message: {:#?}", welcome_msg);
match self.stream.write(&response.as_bytes()) { if welcome_msg.pw_valid == 0 {
Err(error) => log::error!("Send error: {:?}", error), let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username);
Ok(_) => self.last_seq_num += 1, let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes());
} match self.stream.write(&response.as_bytes()) {
} else if welcome_msg.state_valid == 0 { Err(error) => log::error!("Send error: {:?}", error),
log::error!("pw_valid but not state_valid"); Ok(_) => self.last_seq_num += 1,
} else { }
self.connected = true; } else if welcome_msg.state_valid == 0 {
log::info!("Connected to TM backend!"); log::error!("pw_valid but not state_valid");
} } else {
}, self.connected = true;
None => log::error!("Failed to parse welcome msg"), log::info!("Connected to TM backend!");
} }
}, },
_ => log::warn!("Unhandled message type: {}", packet.msg_type), None => log::error!("Failed to parse welcome msg"),
} }
}, },
None => { _ => log::warn!("Unhandled message type: {}", packet.msg_type),
log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
}, },
Err(_) => {}, None => {
log::error!("Failed to parse BackendPacket({}): {}", read_bytes, String::from_utf8_lossy(&data));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
} }