From 3ebd80fedbd94aa210a276df3f56dcc690cb310a Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 20 Jan 2024 17:12:16 -0700 Subject: [PATCH] Added structures and methods for connect messages --- src/main.rs | 137 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 126 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index 592e265..4fdc7b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -187,11 +187,112 @@ impl BackendMessage { return None; } - None + return Some(BackendMessage{ + status: bytes[0], + request_id: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), + data: bytes[5..].to_vec(), + }); } fn as_bytes(self: BackendMessage) -> Vec { - return Vec::new(); + let mut bytes = Vec::new(); + + bytes.push(self.status); + bytes.extend(self.request_id.to_le_bytes()); + bytes.extend(self.data); + + return bytes; + } +} + +const BACKEND_PACKET_HEADER_SIZE: usize = 28; +#[derive(Debug)] +struct BackendPacket { + header: u32, + timestamp: f64, + msg_type: u32, + seq_num: u64, + size: u32, + data: Vec, +} + +impl BackendPacket { + fn from_bytes(bytes: Vec) -> Option { + if bytes.len() < BACKEND_PACKET_HEADER_SIZE { + return None; + } + + return Some(BackendPacket{ + header: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), + timestamp: f64::from_le_bytes(bytes[4..12].try_into().unwrap()), + msg_type: u32::from_le_bytes(bytes[12..16].try_into().unwrap()), + seq_num: u64::from_le_bytes(bytes[16..24].try_into().unwrap()), + size: u32::from_le_bytes(bytes[24..28].try_into().unwrap()), + data: bytes[28..].to_vec(), + }); + } + + fn to_bytes(self: BackendPacket) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend(self.header.to_le_bytes()); + bytes.extend(self.timestamp.to_le_bytes()); + bytes.extend(self.msg_type.to_le_bytes()); + bytes.extend(self.seq_num.to_le_bytes()); + bytes.extend(self.size.to_le_bytes()); + bytes.extend(self.data); + + return bytes; + } +} + +const CONNECT_MSG_LEN: usize = 114; +#[derive(Debug)] +struct ConnectMsg { + version: u32, + uuid: [u8; 16], + last_notice_id: u64, + username: [u8; 16], + pass_hash: [u8; 32], + pw_valid: u8, + state_valid: u8, + client_name: [u8; 32], + server_time_zone: i32, +} + +impl ConnectMsg { + fn from_bytes(bytes: Vec) -> Option { + if bytes.len() < CONNECT_MSG_LEN { + return None; + } + + return Some(ConnectMsg{ + version: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), + uuid: bytes[4..20].to_owned().try_into().unwrap(), + last_notice_id: u64::from_le_bytes(bytes[20..28].try_into().unwrap()), + username: bytes[28..44].to_owned().try_into().unwrap(), + pass_hash: bytes[44..76].to_owned().try_into().unwrap(), + pw_valid: bytes[77].to_owned(), + state_valid: bytes[78].to_owned(), + client_name: bytes[78..110].to_owned().try_into().unwrap(), + server_time_zone: i32::from_le_bytes(bytes[110..114].try_into().unwrap()), + }); + } + + fn to_bytes(self: ConnectMsg) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend(self.version.to_le_bytes()); + bytes.extend(self.uuid); + bytes.extend(self.last_notice_id.to_le_bytes()); + bytes.extend(self.username); + bytes.extend(self.pass_hash); + bytes.extend(self.pw_valid.to_le_bytes()); + bytes.extend(self.state_valid.to_le_bytes()); + bytes.extend(self.client_name); + bytes.extend(self.server_time_zone.to_le_bytes()); + + return bytes; } } @@ -215,28 +316,42 @@ impl TMClient { stream_config.set_use_server_name_indication(false); let stream = stream_config.connect("127.0.0.1", stream).unwrap(); - let stream = Arc::new(Mutex::new(stream)); + let stream_arc = Arc::new(Mutex::new(stream)); return (TMClient{ - stream: stream.clone(), + stream: stream_arc.clone(), notices: notice_tx, responses: response_tx }, TMConnection{ - stream: stream, + stream: stream_arc, notices: notice_rx, responses: response_rx },); } fn process(self: &TMClient) { + let mut incoming = [0; 2048]; let mut stream = self.stream.lock().unwrap(); match stream.read(&mut incoming) { Ok(read) => { - println!("Data({}): {}", read, String::from_utf8_lossy(&incoming)); - // If the data is a notice, parse it and send it to the notices mpsc - // If the data is a response, parse it and send it to the responses mpsc + let data = incoming[0..read].to_vec(); + match BackendPacket::from_bytes(data) { + Some(packet) => { + println!("Packet: {:?}", packet); + match packet.msg_type { + 2 => { + match ConnectMsg::from_bytes(packet.data) { + Some(welcome_msg) => println!("Welcome msg: {:?}", welcome_msg), + None => println!("Failed to parse wlecome msg"), + } + }, + _ => println!("Unhandled message type: {}", packet.msg_type), + } + }, + None => println!("Failed to parse BackendPacket"), + } }, Err(error) => println!("Error: {}", error), } @@ -391,13 +506,12 @@ fn main() { client.publish("bridge/status", QoS::AtLeastOnce, true, "{\"online\": true}").unwrap(); let mqtt_recv_thread = thread::spawn(move || - for message in connection.iter() { - println!("Message = {:?}", message); + for _ in connection.iter() { } ); let running = true; - let (tm_client, tm_connection) = TMClient::new(); + let (tm_client, tm_connection) = TMClient::new(); let tm_thread = thread::spawn(move || while running { tm_client.process(); @@ -405,6 +519,7 @@ fn main() { ); while running { + thread::sleep(Duration::from_millis(1000)); match tm_connection.notices.recv() { Ok(notice) => { let callback = callbacks.get(¬ice.id());