From e08037619b7da2a48a35f38a87189e66ea6bbe84 Mon Sep 17 00:00:00 2001 From: Liam Conway <11491642+liconway@users.noreply.github.com> Date: Thu, 25 Jan 2024 22:22:03 -0700 Subject: [PATCH] Fix messages of size >16384 breaking --- src/main.rs | 175 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 104 insertions(+), 71 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9c2b3a6..e82230b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -353,7 +353,7 @@ struct BackendMessage { } impl BackendMessage { - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < 5 { return None; } @@ -413,7 +413,7 @@ impl BackendPacket { }; } - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < BACKEND_PACKET_HEADER_SIZE { return None; } @@ -476,7 +476,7 @@ impl ConnectMsg { }; } - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < CONNECT_MSG_LEN { return None; } @@ -518,14 +518,14 @@ struct NoticeMsg { } impl NoticeMsg { - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < 8 { return None; } let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); - match BackendMessage::from_bytes(bytes[8..].to_vec()) { + match BackendMessage::from_bytes(&(bytes[8..].to_vec())) { Some(message) => { match message.data.notice { Some(notice) => Some(NoticeMsg{ @@ -554,7 +554,6 @@ struct TMClient { time_offset: f64, } -const TCP_BUFFER_SIZE: usize = 10000; impl TMClient { fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) { let (work_tx, work_rx) = mpsc::channel(); @@ -616,82 +615,116 @@ impl TMClient { } } - let mut incoming = [0; TCP_BUFFER_SIZE]; - match self.stream.read(&mut incoming) { - Ok(read) => { - let data = incoming[0..read].to_vec(); - match BackendPacket::from_bytes(data) { + let mut header = [0; BACKEND_PACKET_HEADER_SIZE]; + let mut read_bytes = 0; + let data_size: usize; + match self.stream.read_exact(&mut header) { + Ok(()) => { + match BackendPacket::from_bytes(&header.to_vec()) { Some(packet) => { - let offset = packet.timestamp - get_float_time(); - self.time_offset = offset.clone(); - self.last_seq_num = packet.seq_num; - match packet.msg_type { - // Notice Message - 4 => { - match NoticeMsg::from_bytes(packet.data.clone()) { - Some(notice) => { - log::debug!("Received notice: {:#?}", notice); - let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); - self.last_seq_num += 1; - match self.stream.write(&ack.as_bytes()) { - Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), - Err(error) => log::error!("ACK error: {:?}", error), - } - match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { - Ok(_) => log::debug!("Forwarded notice to callback engine"), - Err(error) => log::error!("Notice forward error {:?}", error), - } - }, - None => log::error!("Notice parse error: {:?}", packet), + data_size = packet.size as usize; + log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size); + }, + None => { + log::error!("Failed to parse BackendPacket header: {:?}", header); + return; + }, + } + }, + Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => { + log::debug!("Resource temporarily unavailable, retrying later."); + return; + }, + Err(error) => { + log::error!("Error reading header: {:?}", error); + return; + }, + } + + let mut data = vec![0; BACKEND_PACKET_HEADER_SIZE + data_size]; + data[..header.len()].copy_from_slice(&header); + + while read_bytes < BACKEND_PACKET_HEADER_SIZE + data_size { + match self.stream.read(&mut data[read_bytes..]) { + Ok(read) => { + read_bytes += read; + log::debug!("Received {} bytes ({}/{})", read, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size); + }, + Err(error) => { + log::error!("Error reading body data: {:?}", error); + }, + } + } + + match BackendPacket::from_bytes(&data) { + Some(packet) => { + let offset = packet.timestamp - get_float_time(); + self.time_offset = offset.clone(); + self.last_seq_num = packet.seq_num; + match packet.msg_type { + // Notice Message + 4 => { + match NoticeMsg::from_bytes(&packet.data) { + Some(notice) => { + log::debug!("Received notice: {:#?}", notice); + let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); + self.last_seq_num += 1; + match self.stream.write(&ack.as_bytes()) { + Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), + Err(error) => log::error!("ACK error: {:?}", error), + } + match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { + Ok(_) => log::debug!("Forwarded notice to callback engine"), + Err(error) => log::error!("Notice forward error {:?}", error), } }, - // Response message - 3 => { - match BackendMessage::from_bytes(packet.data.clone()) { - Some(message) => { - log::debug!("Received response: {:#?}", message); - - match self.responses.send((Box::new(message), offset)) { - Ok(_) => log::debug!("Forwarded response to callback engine"), - Err(error) => log::error!("Response forward error {:?}", error), - } - }, - None => log::error!("BackendMessage parse error: {:?}", packet), + None => log::error!("Notice parse error: {:?}", packet), + } + }, + // Response message + 3 => { + match BackendMessage::from_bytes(&packet.data) { + Some(message) => { + log::debug!("Received response: {:#?}", message); + + match self.responses.send((Box::new(message), offset)) { + Ok(_) => log::debug!("Forwarded response to callback engine"), + Err(error) => log::error!("Response forward error {:?}", error), } }, - // Server Message - 2 => { - match ConnectMsg::from_bytes(packet.data) { - Some(welcome_msg) => { - log::debug!("Received connect message: {:#?}", welcome_msg); - if welcome_msg.pw_valid == 0 { - let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); - let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); - match self.stream.write(&response.as_bytes()) { - Err(error) => log::error!("Send error: {:?}", error), - Ok(_) => self.last_seq_num += 1, - } - } else if welcome_msg.state_valid == 0 { - log::error!("pw_valid but not state_valid"); - } else { - self.connected = true; - log::info!("Connected to TM backend!"); - } - }, - None => log::error!("Failed to parse welcome msg"), + None => log::error!("BackendMessage parse error: {:?}", packet), + } + }, + // Server Message + 2 => { + match ConnectMsg::from_bytes(&packet.data) { + Some(welcome_msg) => { + log::debug!("Received connect message: {:#?}", welcome_msg); + if welcome_msg.pw_valid == 0 { + let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); + let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); + match self.stream.write(&response.as_bytes()) { + Err(error) => log::error!("Send error: {:?}", error), + Ok(_) => self.last_seq_num += 1, + } + } else if welcome_msg.state_valid == 0 { + log::error!("pw_valid but not state_valid"); + } else { + self.connected = true; + log::info!("Connected to TM backend!"); } }, - _ => log::warn!("Unhandled message type: {}", packet.msg_type), + None => log::error!("Failed to parse welcome msg"), } }, - None => { - log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming)); - // Sleep to prevent busy loop when TM is spamming 0 length packets - thread::sleep(Duration::from_millis(100)); - } + _ => log::warn!("Unhandled message type: {}", packet.msg_type), } }, - Err(_) => {}, + None => { + log::error!("Failed to parse BackendPacket({}): {}", read_bytes, String::from_utf8_lossy(&data)); + // Sleep to prevent busy loop when TM is spamming 0 length packets + thread::sleep(Duration::from_millis(100)); + } } }