Merge pull request #2 from mekkanized/liconway/fix_oversized_messages

Fix oversized messages
master
Noah Metz 2024-01-25 23:43:11 -07:00 committed by GitHub
commit b7c09dcbd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 180 additions and 159 deletions

161
Cargo.lock generated

@ -26,54 +26,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "anstream"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.79" version = "1.0.79"
@ -126,9 +78,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.4.1" version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
@ -160,12 +112,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -223,27 +169,17 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]]
name = "env_filter"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
dependencies = [
"log",
"regex",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.11.0" version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eeb342678d785662fd2514be38c459bb925f02b68dd2a3e0f21d7ef82d979dd" checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
dependencies = [ dependencies = [
"anstream",
"anstyle",
"env_filter",
"humantime", "humantime",
"is-terminal",
"log", "log",
"regex",
"termcolor",
] ]
[[package]] [[package]]
@ -370,6 +306,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
[[package]] [[package]]
name = "home" name = "home"
version = "0.5.9" version = "0.5.9"
@ -395,6 +337,17 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "is-terminal"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
dependencies = [
"hermit-abi",
"rustix",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.11.0" version = "0.11.0"
@ -418,9 +371,9 @@ checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.4.12" version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -513,7 +466,7 @@ version = "0.10.63"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8"
dependencies = [ dependencies = [
"bitflags 2.4.1", "bitflags 2.4.2",
"cfg-if", "cfg-if",
"foreign-types", "foreign-types",
"libc", "libc",
@ -597,9 +550,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.76" version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -718,9 +671,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.2" version = "1.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -730,9 +683,9 @@ dependencies = [
[[package]] [[package]]
name = "regex-automata" name = "regex-automata"
version = "0.4.3" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -789,7 +742,7 @@ version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [ dependencies = [
"bitflags 2.4.1", "bitflags 2.4.2",
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
@ -987,6 +940,15 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.56" version = "1.0.56"
@ -1062,12 +1024,6 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"
@ -1120,6 +1076,37 @@ dependencies = [
"rustix", "rustix",
] ]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

@ -13,7 +13,7 @@ prost-build = "0.12.3"
backtrace-on-stack-overflow = "0.3.0" backtrace-on-stack-overflow = "0.3.0"
bytes = "1.5.0" bytes = "1.5.0"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.11.0" env_logger = "0.10.2"
log = "0.4.20" log = "0.4.20"
openssl = "0.10.63" openssl = "0.10.63"
prost = "0.12.3" prost = "0.12.3"

@ -353,7 +353,7 @@ struct BackendMessage {
} }
impl BackendMessage { impl BackendMessage {
fn from_bytes(bytes: Vec<u8>) -> Option<BackendMessage> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendMessage> {
if bytes.len() < 5 { if bytes.len() < 5 {
return None; return None;
} }
@ -413,7 +413,7 @@ impl BackendPacket {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<BackendPacket> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendPacket> {
if bytes.len() < BACKEND_PACKET_HEADER_SIZE { if bytes.len() < BACKEND_PACKET_HEADER_SIZE {
return None; return None;
} }
@ -476,7 +476,7 @@ impl ConnectMsg {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<ConnectMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<ConnectMsg> {
if bytes.len() < CONNECT_MSG_LEN { if bytes.len() < CONNECT_MSG_LEN {
return None; return None;
} }
@ -518,14 +518,14 @@ struct NoticeMsg {
} }
impl NoticeMsg { impl NoticeMsg {
fn from_bytes(bytes: Vec<u8>) -> Option<NoticeMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<NoticeMsg> {
if bytes.len() < 8 { if bytes.len() < 8 {
return None; return None;
} }
let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
match BackendMessage::from_bytes(bytes[8..].to_vec()) { match BackendMessage::from_bytes(&(bytes[8..].to_vec())) {
Some(message) => { Some(message) => {
match message.data.notice { match message.data.notice {
Some(notice) => Some(NoticeMsg{ Some(notice) => Some(NoticeMsg{
@ -554,7 +554,6 @@ struct TMClient {
time_offset: f64, time_offset: f64,
} }
const TCP_BUFFER_SIZE: usize = 10000;
impl TMClient { impl TMClient {
fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) { fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) {
let (work_tx, work_rx) = mpsc::channel(); let (work_tx, work_rx) = mpsc::channel();
@ -616,82 +615,117 @@ impl TMClient {
} }
} }
let mut incoming = [0; TCP_BUFFER_SIZE]; let mut header = [0; BACKEND_PACKET_HEADER_SIZE];
match self.stream.read(&mut incoming) { let mut read_bytes = 0;
Ok(read) => { let data_size: usize;
let data = incoming[0..read].to_vec(); match self.stream.read_exact(&mut header) {
match BackendPacket::from_bytes(data) { Ok(()) => {
match BackendPacket::from_bytes(&header.to_vec()) {
Some(packet) => { Some(packet) => {
let offset = packet.timestamp - get_float_time(); data_size = packet.size as usize;
self.time_offset = offset.clone(); read_bytes += BACKEND_PACKET_HEADER_SIZE;
self.last_seq_num = packet.seq_num; log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
match packet.msg_type { },
// Notice Message None => {
4 => { log::error!("Failed to parse BackendPacket header: {:?}", header);
match NoticeMsg::from_bytes(packet.data.clone()) { return;
Some(notice) => { },
log::debug!("Received notice: {:#?}", notice); }
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); },
self.last_seq_num += 1; Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {
match self.stream.write(&ack.as_bytes()) { log::debug!("Resource temporarily unavailable, retrying later.");
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), return;
Err(error) => log::error!("ACK error: {:?}", error), },
} Err(error) => {
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { log::error!("Error reading header: {:?}", error);
Ok(_) => log::debug!("Forwarded notice to callback engine"), return;
Err(error) => log::error!("Notice forward error {:?}", error), },
} }
},
None => log::error!("Notice parse error: {:?}", packet), let mut data = vec![0; BACKEND_PACKET_HEADER_SIZE + data_size];
data[..header.len()].copy_from_slice(&header);
while read_bytes < BACKEND_PACKET_HEADER_SIZE + data_size {
match self.stream.read(&mut data[read_bytes..]) {
Ok(read) => {
read_bytes += read;
log::debug!("Received {} bytes ({}/{})", read, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
},
Err(error) => {
log::error!("Error reading body data: {:?}", error);
},
}
}
match BackendPacket::from_bytes(&data) {
Some(packet) => {
let offset = packet.timestamp - get_float_time();
self.time_offset = offset.clone();
self.last_seq_num = packet.seq_num;
match packet.msg_type {
// Notice Message
4 => {
match NoticeMsg::from_bytes(&packet.data) {
Some(notice) => {
log::debug!("Received notice: {:#?}", notice);
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec());
self.last_seq_num += 1;
match self.stream.write(&ack.as_bytes()) {
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id),
Err(error) => log::error!("ACK error: {:?}", error),
}
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) {
Ok(_) => log::debug!("Forwarded notice to callback engine"),
Err(error) => log::error!("Notice forward error {:?}", error),
} }
}, },
// Response message None => log::error!("Notice parse error: {:?}", packet),
3 => { }
match BackendMessage::from_bytes(packet.data.clone()) { },
Some(message) => { // Response message
log::debug!("Received response: {:#?}", message); 3 => {
match BackendMessage::from_bytes(&packet.data) {
match self.responses.send((Box::new(message), offset)) { Some(message) => {
Ok(_) => log::debug!("Forwarded response to callback engine"), log::debug!("Received response: {:#?}", message);
Err(error) => log::error!("Response forward error {:?}", error),
} match self.responses.send((Box::new(message), offset)) {
}, Ok(_) => log::debug!("Forwarded response to callback engine"),
None => log::error!("BackendMessage parse error: {:?}", packet), Err(error) => log::error!("Response forward error {:?}", error),
} }
}, },
// Server Message None => log::error!("BackendMessage parse error: {:?}", packet),
2 => { }
match ConnectMsg::from_bytes(packet.data) { },
Some(welcome_msg) => { // Server Message
log::debug!("Received connect message: {:#?}", welcome_msg); 2 => {
if welcome_msg.pw_valid == 0 { match ConnectMsg::from_bytes(&packet.data) {
let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); Some(welcome_msg) => {
let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); log::debug!("Received connect message: {:#?}", welcome_msg);
match self.stream.write(&response.as_bytes()) { if welcome_msg.pw_valid == 0 {
Err(error) => log::error!("Send error: {:?}", error), let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username);
Ok(_) => self.last_seq_num += 1, let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes());
} match self.stream.write(&response.as_bytes()) {
} else if welcome_msg.state_valid == 0 { Err(error) => log::error!("Send error: {:?}", error),
log::error!("pw_valid but not state_valid"); Ok(_) => self.last_seq_num += 1,
} else { }
self.connected = true; } else if welcome_msg.state_valid == 0 {
log::info!("Connected to TM backend!"); log::error!("pw_valid but not state_valid");
} } else {
}, self.connected = true;
None => log::error!("Failed to parse welcome msg"), log::info!("Connected to TM backend!");
} }
}, },
_ => log::warn!("Unhandled message type: {}", packet.msg_type), None => log::error!("Failed to parse welcome msg"),
} }
}, },
None => { _ => log::warn!("Unhandled message type: {}", packet.msg_type),
log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
}, },
Err(_) => {}, None => {
log::error!("Failed to parse BackendPacket({}): {}", read_bytes, String::from_utf8_lossy(&data));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
} }