From 5da2fb29c6204292b7460132e84ce6ba9cb83912 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sun, 21 Jan 2024 00:19:44 -0700 Subject: [PATCH] Fixed request sending --- Cargo.lock | 35 ++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/main.rs | 71 +++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 89 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcee756..ffa5833 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,17 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "backtrace-on-stack-overflow" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd2d70527f3737a1ad17355e260706c1badebabd1fa06a7a053407380df841b" +dependencies = [ + "backtrace", + "libc", + "nix", +] + [[package]] name = "base64" version = "0.21.7" @@ -427,6 +438,15 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -453,6 +473,19 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nix" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c" +dependencies = [ + "bitflags 1.3.2", + "cc", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "object" version = "0.32.2" @@ -1045,8 +1078,10 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" name = "vex_mqtt_rust" version = "0.1.0" dependencies = [ + "backtrace-on-stack-overflow", "bytes", "env_logger", + "log", "openssl", "prost", "prost-build", diff --git a/Cargo.toml b/Cargo.toml index f69cebf..0990de7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,10 @@ protoc = "2.28.0" prost-build = "0.12.3" [dependencies] +backtrace-on-stack-overflow = "0.3.0" bytes = "1.5.0" env_logger = "0.11.0" +log = "0.4.20" openssl = "0.10.63" prost = "0.12.3" prost-types = "0.12.3" diff --git a/src/main.rs b/src/main.rs index 305b597..92ba1e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use prost::Message; use std::io::Cursor; use serde::{Serialize, Deserialize}; use rand_core::RngCore; +use std::time::{SystemTime, UNIX_EPOCH}; use std::io::prelude::*; @@ -187,7 +188,7 @@ impl BackendMessage { } - fn as_bytes(self: BackendMessage) -> Vec { + fn as_bytes(self: &BackendMessage) -> Vec { let mut bytes = Vec::new(); bytes.push(self.status); @@ -358,20 +359,21 @@ impl NoticeMsg { struct TMClient { stream: openssl::ssl::SslStream, - notices: mpsc::Sender, - responses: mpsc::Sender, - requests: mpsc::Receiver, + notices: mpsc::Sender>, + responses: mpsc::Sender>, + requests: mpsc::Receiver>, uuid: [u8; 16], client_name: [u8; 32], password: String, last_seq_num: u64, username: [u8; 16], + connected: bool, } struct TMConnection { - notices: mpsc::Receiver, - responses: mpsc::Receiver, - requests: mpsc::Sender, + notices: mpsc::Receiver>, + responses: mpsc::Receiver>, + requests: mpsc::Sender>, } impl TMClient { @@ -407,6 +409,7 @@ impl TMClient { password, last_seq_num: 0xFFFFFFFFFFFFFFFF, username, + connected: false, }, TMConnection{ requests: request_tx, @@ -416,33 +419,42 @@ impl TMClient { } fn process(self: &mut TMClient) { - for request in self.requests.try_iter() { - let packet = BackendPacket::new(TM_HEADER, 0.00, 3, self.last_seq_num + 1, request.as_bytes()); - match self.stream.write(&packet.as_bytes()) { - Ok(_) => self.last_seq_num += 1, - Err(error) => println!("Request send error: {:?}", error), + if self.connected == true { + for request in self.requests.try_iter() { + let time = SystemTime::now(); + let millis = time.duration_since(UNIX_EPOCH).unwrap(); + let packet = BackendPacket::new(TM_HEADER, (millis.as_millis() as f64)/1000.0, 2, self.last_seq_num + 1, request.as_bytes()); + match self.stream.write(&packet.as_bytes()) { + Ok(_) => { + log::debug!("Sent: {:?}", packet); + self.last_seq_num += 1; + }, + Err(error) => println!("Request send error: {:?}", error), + } } } + let mut incoming = [0; 2048]; match self.stream.read(&mut incoming) { Ok(read) => { let data = incoming[0..read].to_vec(); match BackendPacket::from_bytes(data) { Some(packet) => { + log::debug!("Recevied: {:?}", packet); self.last_seq_num = packet.seq_num; match packet.msg_type { // Notice Message 4 => { match NoticeMsg::from_bytes(packet.data.clone()) { Some(notice) => { - println!("Received notice: {:?}", notice); + log::debug!("Received notice: {:?}", notice); let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); self.last_seq_num += 1; match self.stream.write(&ack.as_bytes()) { Ok(_) => println!("Sent ACK for notice {}", notice.notice_id), Err(error) => println!("ACK error: {:?}", error), } - match self.notices.send(notice.notice) { + match self.notices.send(Box::new(notice.notice)) { Ok(_) => println!("Forwarded notice to callback engine"), Err(error) => println!("Notice forward error {:?}", error), } @@ -454,9 +466,14 @@ impl TMClient { 3 => { match BackendMessage::from_bytes(packet.data.clone()) { Some(message) => { - match self.responses.send(message) { - Ok(_) => println!("Forwarded response to callback engine"), - Err(error) => println!("Response forward error {:?}", error), + println!("About to send"); + match self.responses.send(Box::new(message)) { + Ok(_) => { + println!("Forwarded response to callback engine"); + } + Err(error) => { + println!("Response forward error {:?}", error) + } } }, None => println!("BackendMessage parse error: {:?}", packet), @@ -476,6 +493,7 @@ impl TMClient { } else if welcome_msg.state_valid == 0 { println!("pw_valid but not state_valid"); } else { + self.connected = true; println!("Connected to TM backend!"); } }, @@ -487,6 +505,7 @@ impl TMClient { }, None => { println!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming)); + // Sleep to prevent busy loop when TM is spamming 0 length packets thread::sleep(Duration::from_millis(100)); } } @@ -654,7 +673,21 @@ fn main() { } ); - // TODO: send "get_schedule" message and wait for it's response to know we're connected + let mut get_match_list_tuple = tm::MatchTuple::default(); + get_match_list_tuple.division = Some(1); + get_match_list_tuple.round = None; + get_match_list_tuple.instance = Some(0); + get_match_list_tuple.r#match = Some(0); + get_match_list_tuple.session = Some(0); + + let mut get_match_list_data = tm::BackendMessageData::default(); + get_match_list_data.match_tuple = Some(get_match_list_tuple); + + let get_match_list_req = BackendMessage::new(1002, get_match_list_data.clone()); + tm_connection.requests.send(Box::new(get_match_list_req)).unwrap(); + + let get_match_list_resp = tm_connection.responses.recv().unwrap(); + println!("Get Match List Response: {:?}", get_match_list_resp); while running { thread::sleep(Duration::from_millis(1000)); @@ -669,7 +702,7 @@ fn main() { } }, Some(callback) => { - let (messages, next_event) = callback(notice, event); + let (messages, next_event) = callback(*notice, event); event = next_event; for message in messages { let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload);