From e08037619b7da2a48a35f38a87189e66ea6bbe84 Mon Sep 17 00:00:00 2001 From: Liam Conway <11491642+liconway@users.noreply.github.com> Date: Thu, 25 Jan 2024 22:22:03 -0700 Subject: [PATCH 1/3] Fix messages of size >16384 breaking --- src/main.rs | 175 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 104 insertions(+), 71 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9c2b3a6..e82230b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -353,7 +353,7 @@ struct BackendMessage { } impl BackendMessage { - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < 5 { return None; } @@ -413,7 +413,7 @@ impl BackendPacket { }; } - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < BACKEND_PACKET_HEADER_SIZE { return None; } @@ -476,7 +476,7 @@ impl ConnectMsg { }; } - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < CONNECT_MSG_LEN { return None; } @@ -518,14 +518,14 @@ struct NoticeMsg { } impl NoticeMsg { - fn from_bytes(bytes: Vec) -> Option { + fn from_bytes(bytes: &Vec) -> Option { if bytes.len() < 8 { return None; } let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); - match BackendMessage::from_bytes(bytes[8..].to_vec()) { + match BackendMessage::from_bytes(&(bytes[8..].to_vec())) { Some(message) => { match message.data.notice { Some(notice) => Some(NoticeMsg{ @@ -554,7 +554,6 @@ struct TMClient { time_offset: f64, } -const TCP_BUFFER_SIZE: usize = 10000; impl TMClient { fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) { let (work_tx, work_rx) = mpsc::channel(); @@ -616,82 +615,116 @@ impl TMClient { } } - let mut incoming = [0; TCP_BUFFER_SIZE]; - match self.stream.read(&mut incoming) { - Ok(read) => { - let data = incoming[0..read].to_vec(); - match BackendPacket::from_bytes(data) { + let mut header = [0; BACKEND_PACKET_HEADER_SIZE]; + let mut read_bytes = 0; + let data_size: usize; + match self.stream.read_exact(&mut header) { + Ok(()) => { + match BackendPacket::from_bytes(&header.to_vec()) { Some(packet) => { - let offset = packet.timestamp - get_float_time(); - self.time_offset = offset.clone(); - self.last_seq_num = packet.seq_num; - match packet.msg_type { - // Notice Message - 4 => { - match NoticeMsg::from_bytes(packet.data.clone()) { - Some(notice) => { - log::debug!("Received notice: {:#?}", notice); - let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); - self.last_seq_num += 1; - match self.stream.write(&ack.as_bytes()) { - Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), - Err(error) => log::error!("ACK error: {:?}", error), - } - match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { - Ok(_) => log::debug!("Forwarded notice to callback engine"), - Err(error) => log::error!("Notice forward error {:?}", error), - } - }, - None => log::error!("Notice parse error: {:?}", packet), + data_size = packet.size as usize; + log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size); + }, + None => { + log::error!("Failed to parse BackendPacket header: {:?}", header); + return; + }, + } + }, + Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => { + log::debug!("Resource temporarily unavailable, retrying later."); + return; + }, + Err(error) => { + log::error!("Error reading header: {:?}", error); + return; + }, + } + + let mut data = vec![0; BACKEND_PACKET_HEADER_SIZE + data_size]; + data[..header.len()].copy_from_slice(&header); + + while read_bytes < BACKEND_PACKET_HEADER_SIZE + data_size { + match self.stream.read(&mut data[read_bytes..]) { + Ok(read) => { + read_bytes += read; + log::debug!("Received {} bytes ({}/{})", read, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size); + }, + Err(error) => { + log::error!("Error reading body data: {:?}", error); + }, + } + } + + match BackendPacket::from_bytes(&data) { + Some(packet) => { + let offset = packet.timestamp - get_float_time(); + self.time_offset = offset.clone(); + self.last_seq_num = packet.seq_num; + match packet.msg_type { + // Notice Message + 4 => { + match NoticeMsg::from_bytes(&packet.data) { + Some(notice) => { + log::debug!("Received notice: {:#?}", notice); + let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); + self.last_seq_num += 1; + match self.stream.write(&ack.as_bytes()) { + Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), + Err(error) => log::error!("ACK error: {:?}", error), + } + match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { + Ok(_) => log::debug!("Forwarded notice to callback engine"), + Err(error) => log::error!("Notice forward error {:?}", error), } }, - // Response message - 3 => { - match BackendMessage::from_bytes(packet.data.clone()) { - Some(message) => { - log::debug!("Received response: {:#?}", message); - - match self.responses.send((Box::new(message), offset)) { - Ok(_) => log::debug!("Forwarded response to callback engine"), - Err(error) => log::error!("Response forward error {:?}", error), - } - }, - None => log::error!("BackendMessage parse error: {:?}", packet), + None => log::error!("Notice parse error: {:?}", packet), + } + }, + // Response message + 3 => { + match BackendMessage::from_bytes(&packet.data) { + Some(message) => { + log::debug!("Received response: {:#?}", message); + + match self.responses.send((Box::new(message), offset)) { + Ok(_) => log::debug!("Forwarded response to callback engine"), + Err(error) => log::error!("Response forward error {:?}", error), } }, - // Server Message - 2 => { - match ConnectMsg::from_bytes(packet.data) { - Some(welcome_msg) => { - log::debug!("Received connect message: {:#?}", welcome_msg); - if welcome_msg.pw_valid == 0 { - let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); - let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); - match self.stream.write(&response.as_bytes()) { - Err(error) => log::error!("Send error: {:?}", error), - Ok(_) => self.last_seq_num += 1, - } - } else if welcome_msg.state_valid == 0 { - log::error!("pw_valid but not state_valid"); - } else { - self.connected = true; - log::info!("Connected to TM backend!"); - } - }, - None => log::error!("Failed to parse welcome msg"), + None => log::error!("BackendMessage parse error: {:?}", packet), + } + }, + // Server Message + 2 => { + match ConnectMsg::from_bytes(&packet.data) { + Some(welcome_msg) => { + log::debug!("Received connect message: {:#?}", welcome_msg); + if welcome_msg.pw_valid == 0 { + let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); + let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); + match self.stream.write(&response.as_bytes()) { + Err(error) => log::error!("Send error: {:?}", error), + Ok(_) => self.last_seq_num += 1, + } + } else if welcome_msg.state_valid == 0 { + log::error!("pw_valid but not state_valid"); + } else { + self.connected = true; + log::info!("Connected to TM backend!"); } }, - _ => log::warn!("Unhandled message type: {}", packet.msg_type), + None => log::error!("Failed to parse welcome msg"), } }, - None => { - log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming)); - // Sleep to prevent busy loop when TM is spamming 0 length packets - thread::sleep(Duration::from_millis(100)); - } + _ => log::warn!("Unhandled message type: {}", packet.msg_type), } }, - Err(_) => {}, + None => { + log::error!("Failed to parse BackendPacket({}): {}", read_bytes, String::from_utf8_lossy(&data)); + // Sleep to prevent busy loop when TM is spamming 0 length packets + thread::sleep(Duration::from_millis(100)); + } } } From 1e435c419e892116be191de3304d4b4b9d691c65 Mon Sep 17 00:00:00 2001 From: Liam Conway <11491642+liconway@users.noreply.github.com> Date: Thu, 25 Jan 2024 22:22:54 -0700 Subject: [PATCH 2/3] Fix env_logger anstream feature on Darwin --- Cargo.lock | 161 ++++++++++++++++++++++++----------------------------- Cargo.toml | 2 +- 2 files changed, 75 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ace3ee..0328043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,54 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "anstream" -version = "0.6.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "utf8parse", -] - -[[package]] -name = "anstyle" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" - -[[package]] -name = "anstyle-parse" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" -dependencies = [ - "windows-sys 0.52.0", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" -dependencies = [ - "anstyle", - "windows-sys 0.52.0", -] - [[package]] name = "anyhow" version = "1.0.79" @@ -126,9 +78,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "block-buffer" @@ -160,12 +112,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "colorchoice" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" - [[package]] name = "core-foundation" version = "0.9.4" @@ -223,27 +169,17 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -[[package]] -name = "env_filter" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" -dependencies = [ - "log", - "regex", -] - [[package]] name = "env_logger" -version = "0.11.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9eeb342678d785662fd2514be38c459bb925f02b68dd2a3e0f21d7ef82d979dd" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" dependencies = [ - "anstream", - "anstyle", - "env_filter", "humantime", + "is-terminal", "log", + "regex", + "termcolor", ] [[package]] @@ -370,6 +306,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hermit-abi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" + [[package]] name = "home" version = "0.5.9" @@ -395,6 +337,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is-terminal" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +dependencies = [ + "hermit-abi", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.11.0" @@ -418,9 +371,9 @@ checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -513,7 +466,7 @@ version = "0.10.63" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "cfg-if", "foreign-types", "libc", @@ -597,9 +550,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -718,9 +671,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", @@ -730,9 +683,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", @@ -789,7 +742,7 @@ version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -987,6 +940,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -1062,12 +1024,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "utf8parse" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" - [[package]] name = "vcpkg" version = "0.2.15" @@ -1120,6 +1076,37 @@ dependencies = [ "rustix", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 0c49dca..00a5873 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ prost-build = "0.12.3" backtrace-on-stack-overflow = "0.3.0" bytes = "1.5.0" dotenv = "0.15.0" -env_logger = "0.11.0" +env_logger = "0.10.2" log = "0.4.20" openssl = "0.10.63" prost = "0.12.3" From 2496c2a22a498e9b648e95dfd2adf72b81c87c9a Mon Sep 17 00:00:00 2001 From: Liam Conway <11491642+liconway@users.noreply.github.com> Date: Thu, 25 Jan 2024 22:36:17 -0700 Subject: [PATCH 3/3] Fix bug with missing bytes --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index e82230b..45f8bca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -623,6 +623,7 @@ impl TMClient { match BackendPacket::from_bytes(&header.to_vec()) { Some(packet) => { data_size = packet.size as usize; + read_bytes += BACKEND_PACKET_HEADER_SIZE; log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size); }, None => {