Compare commits

...

10 Commits

Author SHA1 Message Date
noah metz 6bcc750e0a Add time offset publish 2024-01-27 16:32:43 -07:00
Noah Metz b7c09dcbd5
Merge pull request #2 from mekkanized/liconway/fix_oversized_messages
Fix oversized messages
2024-01-25 23:43:11 -07:00
Liam Conway 2496c2a22a
Fix bug with missing bytes 2024-01-25 22:36:17 -07:00
Liam Conway 1e435c419e
Fix env_logger anstream feature on Darwin 2024-01-25 22:22:54 -07:00
Liam Conway e08037619b
Fix messages of size >16384 breaking 2024-01-25 22:22:03 -07:00
noah metz 5089684c70 Lowered read timeout on ssl to 10ms 2024-01-22 21:26:41 -07:00
Noah Metz 43f270ab28
Merge pull request #1 from mekkanized/liconway/dotenv_config
Add basic config for TM/MQTT host
2024-01-22 20:12:15 -07:00
Liam Conway 5ada41f454
Add basic config for TM/MQTT host 2024-01-22 19:36:47 -07:00
noah metz e707420e3c Got match restart/pause working gracefully 2024-01-22 14:13:09 -07:00
noah metz 9c7a6aaa01 Moved offset calulation to happen every incoming packet 2024-01-22 13:50:37 -07:00
5 changed files with 292 additions and 175 deletions

@ -0,0 +1,3 @@
TM_HOST=127.0.0.1
TM_PASSWORD=
MQTT_HOST=localhost

1
.gitignore vendored

@ -1 +1,2 @@
/target /target
.env

168
Cargo.lock generated

@ -26,54 +26,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "anstream"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.79" version = "1.0.79"
@ -126,9 +78,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.4.1" version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
@ -160,12 +112,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -212,32 +158,28 @@ dependencies = [
] ]
[[package]] [[package]]
name = "either" name = "dotenv"
version = "1.9.0" version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]] [[package]]
name = "env_filter" name = "either"
version = "0.1.0" version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
dependencies = [
"log",
"regex",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.11.0" version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eeb342678d785662fd2514be38c459bb925f02b68dd2a3e0f21d7ef82d979dd" checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
dependencies = [ dependencies = [
"anstream",
"anstyle",
"env_filter",
"humantime", "humantime",
"is-terminal",
"log", "log",
"regex",
"termcolor",
] ]
[[package]] [[package]]
@ -364,6 +306,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
[[package]] [[package]]
name = "home" name = "home"
version = "0.5.9" version = "0.5.9"
@ -389,6 +337,17 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "is-terminal"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
dependencies = [
"hermit-abi",
"rustix",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.11.0" version = "0.11.0"
@ -412,9 +371,9 @@ checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.4.12" version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -507,7 +466,7 @@ version = "0.10.63"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8"
dependencies = [ dependencies = [
"bitflags 2.4.1", "bitflags 2.4.2",
"cfg-if", "cfg-if",
"foreign-types", "foreign-types",
"libc", "libc",
@ -591,9 +550,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.76" version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -712,9 +671,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.2" version = "1.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -724,9 +683,9 @@ dependencies = [
[[package]] [[package]]
name = "regex-automata" name = "regex-automata"
version = "0.4.3" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -783,7 +742,7 @@ version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [ dependencies = [
"bitflags 2.4.1", "bitflags 2.4.2",
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
@ -981,6 +940,15 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.56" version = "1.0.56"
@ -1056,12 +1024,6 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"
@ -1080,6 +1042,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"backtrace-on-stack-overflow", "backtrace-on-stack-overflow",
"bytes", "bytes",
"dotenv",
"env_logger", "env_logger",
"log", "log",
"openssl", "openssl",
@ -1113,6 +1076,37 @@ dependencies = [
"rustix", "rustix",
] ]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

@ -12,7 +12,8 @@ prost-build = "0.12.3"
[dependencies] [dependencies]
backtrace-on-stack-overflow = "0.3.0" backtrace-on-stack-overflow = "0.3.0"
bytes = "1.5.0" bytes = "1.5.0"
env_logger = "0.11.0" dotenv = "0.15.0"
env_logger = "0.10.2"
log = "0.4.20" log = "0.4.20"
openssl = "0.10.63" openssl = "0.10.63"
prost = "0.12.3" prost = "0.12.3"

@ -1,3 +1,5 @@
extern crate dotenv;
use rumqttc::{MqttOptions, Client, QoS, LastWill}; use rumqttc::{MqttOptions, Client, QoS, LastWill};
use bytes::Bytes; use bytes::Bytes;
use std::time::Duration; use std::time::Duration;
@ -16,6 +18,9 @@ use sha2::{Sha256, Digest};
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::mpsc; use std::sync::mpsc;
use dotenv::dotenv;
use std::env;
// MQTT Topics: // MQTT Topics:
// - division/{division_id}/{round}/{match}/score // - division/{division_id}/{round}/{match}/score
// - division/{division_id}/ranking // - division/{division_id}/ranking
@ -348,7 +353,7 @@ struct BackendMessage {
} }
impl BackendMessage { impl BackendMessage {
fn from_bytes(bytes: Vec<u8>) -> Option<BackendMessage> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendMessage> {
if bytes.len() < 5 { if bytes.len() < 5 {
return None; return None;
} }
@ -408,7 +413,7 @@ impl BackendPacket {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<BackendPacket> { fn from_bytes(bytes: &Vec<u8>) -> Option<BackendPacket> {
if bytes.len() < BACKEND_PACKET_HEADER_SIZE { if bytes.len() < BACKEND_PACKET_HEADER_SIZE {
return None; return None;
} }
@ -471,7 +476,7 @@ impl ConnectMsg {
}; };
} }
fn from_bytes(bytes: Vec<u8>) -> Option<ConnectMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<ConnectMsg> {
if bytes.len() < CONNECT_MSG_LEN { if bytes.len() < CONNECT_MSG_LEN {
return None; return None;
} }
@ -513,14 +518,14 @@ struct NoticeMsg {
} }
impl NoticeMsg { impl NoticeMsg {
fn from_bytes(bytes: Vec<u8>) -> Option<NoticeMsg> { fn from_bytes(bytes: &Vec<u8>) -> Option<NoticeMsg> {
if bytes.len() < 8 { if bytes.len() < 8 {
return None; return None;
} }
let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
match BackendMessage::from_bytes(bytes[8..].to_vec()) { match BackendMessage::from_bytes(&(bytes[8..].to_vec())) {
Some(message) => { Some(message) => {
match message.data.notice { match message.data.notice {
Some(notice) => Some(NoticeMsg{ Some(notice) => Some(NoticeMsg{
@ -547,11 +552,11 @@ struct TMClient {
username: [u8; 16], username: [u8; 16],
connected: bool, connected: bool,
time_offset: f64, time_offset: f64,
offset_tick: u16,
} }
const TCP_BUFFER_SIZE: usize = 10000;
impl TMClient { impl TMClient {
fn new(uuid: [u8; 16], client_name: [u8; 32], password: String, username: [u8; 16]) -> (TMClient, TMConnection) { fn new(uuid: [u8; 16], client_name: [u8; 32], host: String, password: String, username: [u8; 16]) -> (TMClient, TMConnection) {
let (work_tx, work_rx) = mpsc::channel(); let (work_tx, work_rx) = mpsc::channel();
let (response_tx, response_rx) = mpsc::channel(); let (response_tx, response_rx) = mpsc::channel();
let (request_tx, request_rx) = mpsc::channel(); let (request_tx, request_rx) = mpsc::channel();
@ -562,7 +567,8 @@ impl TMClient {
let connector = builder.build(); let connector = builder.build();
let stream = TcpStream::connect("127.0.0.1:5000").unwrap(); log::debug!("Connecting to TM using address {host}:5000");
let stream = TcpStream::connect(format!("{host}:5000")).unwrap();
let mut stream_config = connector.configure().unwrap(); let mut stream_config = connector.configure().unwrap();
stream_config.set_verify_hostname(false); stream_config.set_verify_hostname(false);
@ -570,9 +576,8 @@ impl TMClient {
stream_config.set_private_key_file("tm.crt", openssl::ssl::SslFiletype::PEM).unwrap(); stream_config.set_private_key_file("tm.crt", openssl::ssl::SslFiletype::PEM).unwrap();
stream_config.set_use_server_name_indication(false); stream_config.set_use_server_name_indication(false);
let stream = stream_config.connect("127.0.0.1", stream).unwrap(); let stream = stream_config.connect(&host, stream).unwrap();
stream.get_ref().set_read_timeout(Some(Duration::from_millis(100))).expect("Failed to set read timeout on socket"); stream.get_ref().set_read_timeout(Some(Duration::from_millis(10))).expect("Failed to set read timeout on socket");
return (TMClient{ return (TMClient{
stream, stream,
@ -586,6 +591,7 @@ impl TMClient {
username, username,
connected: false, connected: false,
time_offset: 0.0, time_offset: 0.0,
offset_tick: 0,
}, },
TMConnection{ TMConnection{
time_offset: 0.0, time_offset: 0.0,
@ -611,85 +617,126 @@ impl TMClient {
} }
} }
let mut incoming = [0; TCP_BUFFER_SIZE]; let mut header = [0; BACKEND_PACKET_HEADER_SIZE];
match self.stream.read(&mut incoming) { let mut read_bytes = 0;
Ok(read) => { let data_size: usize;
let data = incoming[0..read].to_vec(); match self.stream.read_exact(&mut header) {
match BackendPacket::from_bytes(data) { Ok(()) => {
match BackendPacket::from_bytes(&header.to_vec()) {
Some(packet) => { Some(packet) => {
self.last_seq_num = packet.seq_num; data_size = packet.size as usize;
match packet.msg_type { read_bytes += BACKEND_PACKET_HEADER_SIZE;
// Notice Message log::debug!("Received {} bytes ({}/{})", read_bytes, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
4 => { },
match NoticeMsg::from_bytes(packet.data.clone()) { None => {
Some(notice) => { log::error!("Failed to parse BackendPacket header: {:?}", header);
log::debug!("Received notice: {:#?}", notice); return;
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); },
self.last_seq_num += 1; }
match self.stream.write(&ack.as_bytes()) { },
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {
Err(error) => log::error!("ACK error: {:?}", error), if self.offset_tick == 0 {
} match self.work_queue.send(Work::Offset(self.time_offset)) {
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { Ok(_) => log::debug!("Sent time offset"),
Ok(_) => log::debug!("Forwarded notice to callback engine"), Err(error) => log::error!("Offset send error: {:#?}", error),
Err(error) => log::error!("Notice forward error {:?}", error), }
} self.offset_tick = 100;
}, } else {
None => log::error!("Notice parse error: {:?}", packet), self.offset_tick -= 1;
}
log::debug!("Resource temporarily unavailable, retrying later.");
return;
},
Err(error) => {
log::error!("Error reading header: {:?}", error);
return;
},
}
let mut data = vec![0; BACKEND_PACKET_HEADER_SIZE + data_size];
data[..header.len()].copy_from_slice(&header);
while read_bytes < BACKEND_PACKET_HEADER_SIZE + data_size {
match self.stream.read(&mut data[read_bytes..]) {
Ok(read) => {
read_bytes += read;
log::debug!("Received {} bytes ({}/{})", read, read_bytes, BACKEND_PACKET_HEADER_SIZE + data_size);
},
Err(error) => {
log::error!("Error reading body data: {:?}", error);
},
}
}
match BackendPacket::from_bytes(&data) {
Some(packet) => {
let offset = packet.timestamp - get_float_time();
self.time_offset = offset.clone();
self.last_seq_num = packet.seq_num;
match packet.msg_type {
// Notice Message
4 => {
match NoticeMsg::from_bytes(&packet.data) {
Some(notice) => {
log::debug!("Received notice: {:#?}", notice);
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec());
self.last_seq_num += 1;
match self.stream.write(&ack.as_bytes()) {
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id),
Err(error) => log::error!("ACK error: {:?}", error),
}
match self.work_queue.send(Work::Notice(Box::new(NoticeWithOffset{notice: notice.notice, offset: self.time_offset}))) {
Ok(_) => log::debug!("Forwarded notice to callback engine"),
Err(error) => log::error!("Notice forward error {:?}", error),
} }
}, },
// Response message None => log::error!("Notice parse error: {:?}", packet),
3 => { }
match BackendMessage::from_bytes(packet.data.clone()) { },
Some(message) => { // Response message
log::debug!("Received response: {:#?}", message); 3 => {
let offset = packet.timestamp - get_float_time(); match BackendMessage::from_bytes(&packet.data) {
self.time_offset = offset.clone(); Some(message) => {
log::info!("New offset: {}", offset); log::debug!("Received response: {:#?}", message);
log::info!("Server Timetamp: {}", packet.timestamp);
log::info!("Local Timetamp: {}", get_float_time()); match self.responses.send((Box::new(message), offset)) {
Ok(_) => log::debug!("Forwarded response to callback engine"),
match self.responses.send((Box::new(message), offset)) { Err(error) => log::error!("Response forward error {:?}", error),
Ok(_) => log::debug!("Forwarded response to callback engine"),
Err(error) => log::error!("Response forward error {:?}", error),
}
},
None => log::error!("BackendMessage parse error: {:?}", packet),
} }
}, },
// Server Message None => log::error!("BackendMessage parse error: {:?}", packet),
2 => { }
match ConnectMsg::from_bytes(packet.data) { },
Some(welcome_msg) => { // Server Message
log::debug!("Received connect message: {:#?}", welcome_msg); 2 => {
if welcome_msg.pw_valid == 0 { match ConnectMsg::from_bytes(&packet.data) {
let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); Some(welcome_msg) => {
let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); log::debug!("Received connect message: {:#?}", welcome_msg);
match self.stream.write(&response.as_bytes()) { if welcome_msg.pw_valid == 0 {
Err(error) => log::error!("Send error: {:?}", error), let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username);
Ok(_) => self.last_seq_num += 1, let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes());
} match self.stream.write(&response.as_bytes()) {
} else if welcome_msg.state_valid == 0 { Err(error) => log::error!("Send error: {:?}", error),
log::error!("pw_valid but not state_valid"); Ok(_) => self.last_seq_num += 1,
} else { }
self.connected = true; } else if welcome_msg.state_valid == 0 {
log::info!("Connected to TM backend!"); log::error!("pw_valid but not state_valid");
} } else {
}, self.connected = true;
None => log::error!("Failed to parse welcome msg"), log::info!("Connected to TM backend!");
} }
}, },
_ => log::warn!("Unhandled message type: {}", packet.msg_type), None => log::error!("Failed to parse welcome msg"),
} }
}, },
None => { _ => log::warn!("Unhandled message type: {}", packet.msg_type),
log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
}, },
Err(_) => {}, None => {
log::error!("Failed to parse BackendPacket({}): {}", read_bytes, String::from_utf8_lossy(&data));
// Sleep to prevent busy loop when TM is spamming 0 length packets
thread::sleep(Duration::from_millis(100));
}
} }
} }
@ -1106,6 +1153,47 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMC
return messages; return messages;
} }
fn on_timer_reset(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
log::info!("TIMER_RESTART: {:#?}", &notice);
let mut messages = Vec::new();
let Some(field_time) = &notice.field_time else { return Vec::new() };
let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
let Some(current_match) = field.last_known_match else { return Vec::new() };
let Some(m) = get_match(&mut event.divisions, current_match) else { return Vec::new() };
m.state = MatchState{
state: GameState::Scheduled,
start: get_float_time() - connection.time_offset,
};
match connection.state_cancels.remove(&field_info) {
None => {},
Some(cancel_tx) => {
match cancel_tx.send(()) {
Ok(_) => {},
Err(_) => {},
}
},
}
let match_state_topic = current_match.topic("/state");
let field_state_topic = field_info.topic("/state");
let serialized = serde_json::to_string_pretty(&m.state).unwrap();
messages.push(MQTTMessage{
topic: match_state_topic,
payload: serialized.clone(),
});
messages.push(MQTTMessage{
topic: field_state_topic,
payload: serialized,
});
return messages;
}
fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> { fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
let mut messages = Vec::new(); let mut messages = Vec::new();
let Some(field_time) = &notice.field_time else { return Vec::new() }; let Some(field_time) = &notice.field_time else { return Vec::new() };
@ -1119,6 +1207,16 @@ fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConne
start: get_float_time() - connection.time_offset, start: get_float_time() - connection.time_offset,
}; };
match connection.state_cancels.remove(&field_info) {
None => {},
Some(cancel_tx) => {
match cancel_tx.send(()) {
Ok(_) => {},
Err(_) => {},
}
},
}
let match_state_topic = current_match.topic("/state"); let match_state_topic = current_match.topic("/state");
let field_state_topic = field_info.topic("/state"); let field_state_topic = field_info.topic("/state");
let serialized = serde_json::to_string_pretty(&m.state).unwrap(); let serialized = serde_json::to_string_pretty(&m.state).unwrap();
@ -1136,6 +1234,7 @@ fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConne
} }
fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> { fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
log::info!("TIMER_START: {:#?}", &notice);
let Some(field_time) = &notice.field_time else { return Vec::new() }; let Some(field_time) = &notice.field_time else { return Vec::new() };
let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() }; let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() }; let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
@ -1143,7 +1242,6 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() }; let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() };
let Some(block_list) = &field_time.block_list else { return Vec::new() }; let Some(block_list) = &field_time.block_list else { return Vec::new() };
let Some(current_block_idx) = &field_time.current_block else { return Vec::new() }; let Some(current_block_idx) = &field_time.current_block else { return Vec::new() };
let Some(current_block_start) = &field_time.current_block_start else { return Vec::new() };
let Some(current_block_end) = &field_time.current_block_end else { return Vec::new() }; let Some(current_block_end) = &field_time.current_block_end else { return Vec::new() };
let current_block = &block_list.entries[*current_block_idx as usize]; let current_block = &block_list.entries[*current_block_idx as usize];
@ -1153,7 +1251,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
if current_block.r#type == Some(2) { //Auto if current_block.r#type == Some(2) { //Auto
m.state = MatchState{ m.state = MatchState{
state: GameState::Autonomous, state: GameState::Autonomous,
start: *current_block_start - connection.time_offset, start: *current_block_end - (current_block.seconds() as f64) - connection.time_offset,
}; };
let field_state_topic = field_info.topic("/state"); let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state"); let match_state_topic = tuple.topic("/state");
@ -1183,7 +1281,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
} else if current_block.r#type == Some(3) { //Driver } else if current_block.r#type == Some(3) { //Driver
m.state = MatchState{ m.state = MatchState{
state: GameState::Driver, state: GameState::Driver,
start: *current_block_start - connection.time_offset, start: *current_block_end - (current_block.seconds() as f64) - connection.time_offset,
}; };
let field_state_topic = field_info.topic("/state"); let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state"); let match_state_topic = tuple.topic("/state");
@ -1215,12 +1313,20 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
return messages; return messages;
} }
struct NoticeWithOffset {
notice: tm::Notice,
offset: f64,
}
enum Work { enum Work {
Notice(Box<tm::Notice>), Notice(Box<NoticeWithOffset>),
Offset(f64),
State(StateChange), State(StateChange),
} }
fn main() { fn main() {
dotenv().ok();
env_logger::init(); env_logger::init();
let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new(); let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new();
@ -1230,8 +1336,11 @@ fn main() {
callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start); callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start);
callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_field_assigned); callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_field_assigned);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_timer_stop); callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_timer_stop);
callbacks.insert(tm::NoticeId::NoticeFieldResetTimer, on_timer_reset);
let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883); let mqtt_host = env::var("MQTT_HOST").unwrap_or(String::from("localhost"));
log::debug!("Connecting to MQTT using address {mqtt_host}:1883");
let mut mqttoptions = MqttOptions::new("vex-bridge", mqtt_host, 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_last_will(LastWill{ mqttoptions.set_last_will(LastWill{
topic: String::from("bridge/status"), topic: String::from("bridge/status"),
@ -1254,8 +1363,10 @@ fn main() {
rand::thread_rng().fill_bytes(&mut uuid); rand::thread_rng().fill_bytes(&mut uuid);
let mut client_name = [0u8;32]; let mut client_name = [0u8;32];
rand::thread_rng().fill_bytes(&mut client_name); rand::thread_rng().fill_bytes(&mut client_name);
let tm_host = env::var("TM_HOST").unwrap_or(String::from("127.0.0.1"));
let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let (mut tm_client, mut tm_connection) = TMClient::new(uuid, client_name, String::from(""), username); let password = env::var("TM_PASSWORD").unwrap_or(String::from(""));
let (mut tm_client, mut tm_connection) = TMClient::new(uuid, client_name, tm_host, password, username);
let tm_thread = thread::spawn(move || let tm_thread = thread::spawn(move ||
while running { while running {
tm_client.process(); tm_client.process();
@ -1334,8 +1445,15 @@ fn main() {
while running { while running {
match tm_connection.work_queue.recv() { match tm_connection.work_queue.recv() {
Ok(work) => match work { Ok(work) => match work {
Work::Notice(notice) => { Work::Offset(offset) => {
let current = get_float_time() - offset;
client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(&current).unwrap()).unwrap();
},
Work::Notice(package) => {
let current = get_float_time() - package.offset;
let notice = package.notice;
let callback = callbacks.get(&notice.id()); let callback = callbacks.get(&notice.id());
client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(&current).unwrap()).unwrap();
match callback { match callback {
None => { None => {
match notice.id { match notice.id {
@ -1344,7 +1462,7 @@ fn main() {
} }
}, },
Some(callback) => { Some(callback) => {
let messages = callback(*notice, &mut event, &mut tm_connection); let messages = callback(notice, &mut event, &mut tm_connection);
for message in messages { for message in messages {
let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload); let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload);
match result { match result {