Fixed request sending

master
noah metz 2024-01-21 00:19:44 -07:00
parent 9ff15e3864
commit 5da2fb29c6
3 changed files with 89 additions and 19 deletions

35
Cargo.lock generated

@ -101,6 +101,17 @@ dependencies = [
"rustc-demangle", "rustc-demangle",
] ]
[[package]]
name = "backtrace-on-stack-overflow"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fd2d70527f3737a1ad17355e260706c1badebabd1fa06a7a053407380df841b"
dependencies = [
"backtrace",
"libc",
"nix",
]
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.7" version = "0.21.7"
@ -427,6 +438,15 @@ version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.7.1" version = "0.7.1"
@ -453,6 +473,19 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c"
dependencies = [
"bitflags 1.3.2",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.32.2" version = "0.32.2"
@ -1045,8 +1078,10 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
name = "vex_mqtt_rust" name = "vex_mqtt_rust"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"backtrace-on-stack-overflow",
"bytes", "bytes",
"env_logger", "env_logger",
"log",
"openssl", "openssl",
"prost", "prost",
"prost-build", "prost-build",

@ -10,8 +10,10 @@ protoc = "2.28.0"
prost-build = "0.12.3" prost-build = "0.12.3"
[dependencies] [dependencies]
backtrace-on-stack-overflow = "0.3.0"
bytes = "1.5.0" bytes = "1.5.0"
env_logger = "0.11.0" env_logger = "0.11.0"
log = "0.4.20"
openssl = "0.10.63" openssl = "0.10.63"
prost = "0.12.3" prost = "0.12.3"
prost-types = "0.12.3" prost-types = "0.12.3"

@ -7,6 +7,7 @@ use prost::Message;
use std::io::Cursor; use std::io::Cursor;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use rand_core::RngCore; use rand_core::RngCore;
use std::time::{SystemTime, UNIX_EPOCH};
use std::io::prelude::*; use std::io::prelude::*;
@ -187,7 +188,7 @@ impl BackendMessage {
} }
fn as_bytes(self: BackendMessage) -> Vec<u8> { fn as_bytes(self: &BackendMessage) -> Vec<u8> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
bytes.push(self.status); bytes.push(self.status);
@ -358,20 +359,21 @@ impl NoticeMsg {
struct TMClient { struct TMClient {
stream: openssl::ssl::SslStream<TcpStream>, stream: openssl::ssl::SslStream<TcpStream>,
notices: mpsc::Sender<tm::Notice>, notices: mpsc::Sender<Box<tm::Notice>>,
responses: mpsc::Sender<BackendMessage>, responses: mpsc::Sender<Box<BackendMessage>>,
requests: mpsc::Receiver<BackendMessage>, requests: mpsc::Receiver<Box<BackendMessage>>,
uuid: [u8; 16], uuid: [u8; 16],
client_name: [u8; 32], client_name: [u8; 32],
password: String, password: String,
last_seq_num: u64, last_seq_num: u64,
username: [u8; 16], username: [u8; 16],
connected: bool,
} }
struct TMConnection { struct TMConnection {
notices: mpsc::Receiver<tm::Notice>, notices: mpsc::Receiver<Box<tm::Notice>>,
responses: mpsc::Receiver<BackendMessage>, responses: mpsc::Receiver<Box<BackendMessage>>,
requests: mpsc::Sender<BackendMessage>, requests: mpsc::Sender<Box<BackendMessage>>,
} }
impl TMClient { impl TMClient {
@ -407,6 +409,7 @@ impl TMClient {
password, password,
last_seq_num: 0xFFFFFFFFFFFFFFFF, last_seq_num: 0xFFFFFFFFFFFFFFFF,
username, username,
connected: false,
}, },
TMConnection{ TMConnection{
requests: request_tx, requests: request_tx,
@ -416,33 +419,42 @@ impl TMClient {
} }
fn process(self: &mut TMClient) { fn process(self: &mut TMClient) {
if self.connected == true {
for request in self.requests.try_iter() { for request in self.requests.try_iter() {
let packet = BackendPacket::new(TM_HEADER, 0.00, 3, self.last_seq_num + 1, request.as_bytes()); let time = SystemTime::now();
let millis = time.duration_since(UNIX_EPOCH).unwrap();
let packet = BackendPacket::new(TM_HEADER, (millis.as_millis() as f64)/1000.0, 2, self.last_seq_num + 1, request.as_bytes());
match self.stream.write(&packet.as_bytes()) { match self.stream.write(&packet.as_bytes()) {
Ok(_) => self.last_seq_num += 1, Ok(_) => {
log::debug!("Sent: {:?}", packet);
self.last_seq_num += 1;
},
Err(error) => println!("Request send error: {:?}", error), Err(error) => println!("Request send error: {:?}", error),
} }
} }
}
let mut incoming = [0; 2048]; let mut incoming = [0; 2048];
match self.stream.read(&mut incoming) { match self.stream.read(&mut incoming) {
Ok(read) => { Ok(read) => {
let data = incoming[0..read].to_vec(); let data = incoming[0..read].to_vec();
match BackendPacket::from_bytes(data) { match BackendPacket::from_bytes(data) {
Some(packet) => { Some(packet) => {
log::debug!("Recevied: {:?}", packet);
self.last_seq_num = packet.seq_num; self.last_seq_num = packet.seq_num;
match packet.msg_type { match packet.msg_type {
// Notice Message // Notice Message
4 => { 4 => {
match NoticeMsg::from_bytes(packet.data.clone()) { match NoticeMsg::from_bytes(packet.data.clone()) {
Some(notice) => { Some(notice) => {
println!("Received notice: {:?}", notice); log::debug!("Received notice: {:?}", notice);
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec());
self.last_seq_num += 1; self.last_seq_num += 1;
match self.stream.write(&ack.as_bytes()) { match self.stream.write(&ack.as_bytes()) {
Ok(_) => println!("Sent ACK for notice {}", notice.notice_id), Ok(_) => println!("Sent ACK for notice {}", notice.notice_id),
Err(error) => println!("ACK error: {:?}", error), Err(error) => println!("ACK error: {:?}", error),
} }
match self.notices.send(notice.notice) { match self.notices.send(Box::new(notice.notice)) {
Ok(_) => println!("Forwarded notice to callback engine"), Ok(_) => println!("Forwarded notice to callback engine"),
Err(error) => println!("Notice forward error {:?}", error), Err(error) => println!("Notice forward error {:?}", error),
} }
@ -454,9 +466,14 @@ impl TMClient {
3 => { 3 => {
match BackendMessage::from_bytes(packet.data.clone()) { match BackendMessage::from_bytes(packet.data.clone()) {
Some(message) => { Some(message) => {
match self.responses.send(message) { println!("About to send");
Ok(_) => println!("Forwarded response to callback engine"), match self.responses.send(Box::new(message)) {
Err(error) => println!("Response forward error {:?}", error), Ok(_) => {
println!("Forwarded response to callback engine");
}
Err(error) => {
println!("Response forward error {:?}", error)
}
} }
}, },
None => println!("BackendMessage parse error: {:?}", packet), None => println!("BackendMessage parse error: {:?}", packet),
@ -476,6 +493,7 @@ impl TMClient {
} else if welcome_msg.state_valid == 0 { } else if welcome_msg.state_valid == 0 {
println!("pw_valid but not state_valid"); println!("pw_valid but not state_valid");
} else { } else {
self.connected = true;
println!("Connected to TM backend!"); println!("Connected to TM backend!");
} }
}, },
@ -487,6 +505,7 @@ impl TMClient {
}, },
None => { None => {
println!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming)); println!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
} }
} }
@ -654,7 +673,21 @@ fn main() {
} }
); );
// TODO: send "get_schedule" message and wait for it's response to know we're connected let mut get_match_list_tuple = tm::MatchTuple::default();
get_match_list_tuple.division = Some(1);
get_match_list_tuple.round = None;
get_match_list_tuple.instance = Some(0);
get_match_list_tuple.r#match = Some(0);
get_match_list_tuple.session = Some(0);
let mut get_match_list_data = tm::BackendMessageData::default();
get_match_list_data.match_tuple = Some(get_match_list_tuple);
let get_match_list_req = BackendMessage::new(1002, get_match_list_data.clone());
tm_connection.requests.send(Box::new(get_match_list_req)).unwrap();
let get_match_list_resp = tm_connection.responses.recv().unwrap();
println!("Get Match List Response: {:?}", get_match_list_resp);
while running { while running {
thread::sleep(Duration::from_millis(1000)); thread::sleep(Duration::from_millis(1000));
@ -669,7 +702,7 @@ fn main() {
} }
}, },
Some(callback) => { Some(callback) => {
let (messages, next_event) = callback(notice, event); let (messages, next_event) = callback(*notice, event);
event = next_event; event = next_event;
for message in messages { for message in messages {
let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload); let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload);