use rumqttc::{MqttOptions, Client, QoS, LastWill}; use bytes::Bytes; use std::time::Duration; use std::thread; use std::collections::hash_map::HashMap; use prost::Message; use std::io::Cursor; use serde::{Serialize, Deserialize}; use rand_core::RngCore; use std::time::{SystemTime, UNIX_EPOCH}; use std::io::prelude::*; use sha2::{Sha256, Digest}; use std::net::TcpStream; use std::sync::mpsc; // MQTT Topics: // - division/{division_id}/{round}/{match}/score // - division/{division_id}/ranking // - field/{fieldset_id}/{field_id}/score // - field/{fieldset_id}/{field_id}/state // - field/{fieldset_id}/{field_id} // - team/{team_string} pub mod tm { include!(concat!(env!("OUT_DIR"), "/tm.rs")); } #[derive(Serialize, Deserialize, Debug, Clone)] enum GameSide { Red, Blue, Tie, } #[derive(Serialize, Deserialize, Debug, Clone)] struct AllianceScore { auton_wp: bool, team_goal: i32, team_zone: i32, green_goal: i32, green_zone: i32, elevation_tiers: [i32; 2], } #[derive(Serialize, Deserialize, Debug, Clone)] struct MatchScore { autonomous_winner: Option, red_score: AllianceScore, red_total: i32, blue_score: AllianceScore, blue_total: i32, } #[derive(Serialize, Deserialize, Debug, Clone)] enum GameState { Scheduled, Stopped, Timeout, Driver, DriverDone, Autonomous, AutonomousDone, Abandoned, Scored, } #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] enum Round { None = 0, Practice = 1, Qualification = 2, QuarterFinals = 3, SemiFinals = 4, Finals = 5, RoundOf16 = 6, RoundOf32 = 7, RoundOf64 = 8, RoundOf128 = 9, TopN = 15, RoundRobin = 16, PreEliminations = 20, Eliminations = 21, } fn int_to_round(round: i32) -> Round { match round { 1 => Round::Practice, 2 => Round::Qualification, 3 => Round::QuarterFinals, 4 => Round::SemiFinals, 5 => Round::Finals, 6 => Round::RoundOf16, 7 => Round::RoundOf32, 8 => Round::RoundOf64, 9 => Round::RoundOf128, 15 => Round::TopN, 16 => Round::RoundRobin, 20 => Round::PreEliminations, 21 => Round::Eliminations, _ => Round::None, } } #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] struct MatchTuple { division: i32, round: Round, instance: i32, match_num: i32, session: i32, } impl MatchTuple { fn topic(self: &MatchTuple, suffix: &str) -> String { format!("division/{}/{:?}/{}{}", &self.division, &self.round, &self.match_num, suffix) } } #[derive(Debug)] struct MQTTMessage { topic: String, payload: String, } #[derive(Serialize, Deserialize, Debug, Clone)] struct Rank { team: String, rank: i32, } #[derive(Serialize, Deserialize, Debug, Clone)] struct Event { divisions: HashMap, field_sets: HashMap, rankings: Vec, } fn get_field(sets: &mut HashMap, tuple: FieldTuple) -> Option<&mut Field> { match sets.get_mut(&tuple.set) { None => {}, Some(set) => { for (_, field) in &mut set.fields { if field.tuple == tuple { return Some(field); } } }, }; None } fn get_match(divisions: &mut HashMap, tuple: MatchTuple) -> Option<&mut Match> { match divisions.get_mut(&tuple.division) { None => {}, Some(division) => { for m in &mut division.matches { if m.info.tuple == tuple { return Some(m); } } }, }; None } impl Event { fn new() -> Event { Event{ rankings: Vec::new(), divisions: HashMap::new(), field_sets: HashMap::new(), } } // TODO: remove extra entries instead of just adding new ones fn parse_field_sets(self: &mut Event, sets: tm::FieldSetList) { for set in sets.field_sets { let mut fields = HashMap::new(); for field in &set.fields { fields.insert(field.id(), Field{ tuple: FieldTuple{ set: set.id(), id: field.id(), }, name: String::from(field.name()), last_known_match: None, }); } self.field_sets.insert(set.id(), FieldSet{ fields, }); } } fn parse_division_list(self: &mut Event, division_list: tm::DivisionList) { for division in division_list.divisions { self.divisions.insert(division.id() as i32, Division{ name: String::from(division.name()), state: None, matches: Vec::new(), }); } } fn parse_match_schedule(self: &mut Event, schedule: tm::MatchSchedule) { let mut matches: HashMap> = HashMap::new(); for m in schedule.matches.iter() { let tuple = tm_tuple_to_struct(m.match_tuple.clone().unwrap()); let Some(field) = get_field_tuple(&m.assigned_field) else {continue;}; let Some(state) = &m.state else {continue;}; let Some(scheduled) = m.time_scheduled else {continue;}; let Some(started) = m.time_started else {continue;}; let Some(resumed) = m.time_resumed else {continue;}; let red_1 = m.alliances[0].teams[0].number(); let red_2 = m.alliances[0].teams[1].number(); let blue_1 = m.alliances[1].teams[0].number(); let blue_2 = m.alliances[1].teams[1].number(); let match_state: MatchState; match state { 0 => match_state = MatchState{ state: GameState::Scheduled, start: scheduled as f64 / 1000.0, }, 3 => { // If the match is active, use it's current state // since I can't find a way to request the auton/driver state match self.divisions.get(&tuple.division) { None => match_state = MatchState{ state: GameState::Scheduled, start: started as f64 / 1000.0, }, Some(division) => { match division.matches.iter().find(|a| a.info.tuple == tuple) { None => match_state = MatchState{ state: GameState::Scheduled, start: started as f64 / 1000.0, }, Some(event_m) => match_state = event_m.state.clone(), } }, } }, 4 => match_state = MatchState{ state: GameState::Scored, start: resumed as f64 / 1000.0, }, 5 => match_state = MatchState{ state: GameState::Scored, start: resumed as f64 / 1000.0, }, _ => match_state = MatchState{ state: GameState::Scheduled, start: started as f64 / 1000.0, }, } match matches.get_mut(&tuple.division) { Some(match_list) => { match_list.push(Match{ state: match_state, info: MatchInfo{ tuple: tuple.clone(), red_teams: [String::from(red_1), String::from(red_2)], blue_teams: [String::from(blue_1), String::from(blue_2)], field, }, score: None, }) }, None => { let mut new_match_list = Vec::new(); new_match_list.push(Match{ state: match_state, info: MatchInfo{ tuple: tuple.clone(), red_teams: [String::from(red_1), String::from(red_2)], blue_teams: [String::from(blue_1), String::from(blue_2)], field, }, score: None, }); matches.insert(tuple.division, new_match_list); }, } } for (id, match_list) in matches { match self.divisions.get_mut(&id) { None => log::warn!("parsed match list with nonexistant division {}", id), Some(division) => division.matches = match_list, } } } } #[derive(Serialize, Deserialize, Debug, Clone)] struct DivisionState { current_field: FieldTuple, current_match: MatchTuple, } #[derive(Serialize, Deserialize, Debug, Clone)] struct Division { name: String, state: Option, matches: Vec, } #[derive(Serialize, Deserialize, Debug, Clone)] struct FieldSet { fields: HashMap, } #[derive(Serialize, Deserialize, Debug, Clone)] struct Field { name: String, tuple: FieldTuple, last_known_match: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] struct MatchState { state: GameState, start: f64, } #[derive(Serialize, Deserialize, Debug, Clone)] struct MatchInfo { tuple: MatchTuple, red_teams: [String; 2], blue_teams: [String; 2], field: FieldTuple, } #[derive(Serialize, Deserialize, Debug, Clone)] struct Match { state: MatchState, info: MatchInfo, score: Option, } #[derive(Debug)] struct BackendMessage { status: u8, request_id: u32, data: tm::BackendMessageData, } impl BackendMessage { fn from_bytes(bytes: Vec) -> Option { if bytes.len() < 5 { return None; } let mut pb_data = Cursor::new(bytes[5..].to_vec()); match tm::BackendMessageData::decode(&mut pb_data) { Ok(data) => Some(BackendMessage{ status: bytes[0], request_id: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), data, }), Err(_) => None, } } fn as_bytes(self: &BackendMessage) -> Vec { let mut bytes = Vec::new(); bytes.push(self.status); bytes.extend(self.request_id.to_le_bytes()); bytes.extend(self.data.encode_to_vec()); return bytes; } fn new(request_id: u32, data: tm::BackendMessageData) -> BackendMessage { BackendMessage{ status: 0, request_id, data, } } } const BACKEND_PACKET_HEADER_SIZE: usize = 28; #[derive(Debug)] struct BackendPacket { header: u32, timestamp: f64, msg_type: u32, seq_num: u64, size: u32, data: Vec, } const TM_HEADER: u32 = 0x55D33DAA; impl BackendPacket { fn new(header: u32, timestamp: f64, msg_type: u32, seq_num: u64, data: Vec) -> BackendPacket { return BackendPacket{ header, timestamp, msg_type, seq_num, size: data.len().try_into().unwrap(), data, }; } fn from_bytes(bytes: Vec) -> Option { if bytes.len() < BACKEND_PACKET_HEADER_SIZE { return None; } return Some(BackendPacket{ header: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), timestamp: f64::from_le_bytes(bytes[4..12].try_into().unwrap()), msg_type: u32::from_le_bytes(bytes[12..16].try_into().unwrap()), seq_num: u64::from_le_bytes(bytes[16..24].try_into().unwrap()), size: u32::from_le_bytes(bytes[24..28].try_into().unwrap()), data: bytes[28..].to_vec(), }); } fn as_bytes(self: &BackendPacket) -> Vec { let mut bytes = Vec::new(); bytes.extend(self.header.to_le_bytes()); bytes.extend(self.timestamp.to_le_bytes()); bytes.extend(self.msg_type.to_le_bytes()); bytes.extend(self.seq_num.to_le_bytes()); bytes.extend(self.size.to_le_bytes()); bytes.extend(self.data.clone()); return bytes; } } const CONNECT_MSG_LEN: usize = 114; #[derive(Debug)] struct ConnectMsg { version: u32, uuid: [u8; 16], last_notice_id: u64, username: [u8; 16], pass_hash: [u8; 32], pw_valid: u8, state_valid: u8, client_name: [u8; 32], server_time_zone: i32, } impl ConnectMsg { fn from_welcome(welcome: ConnectMsg, password: &str, uuid: [u8; 16], client_name: [u8; 32], username: [u8; 16]) -> ConnectMsg { let mut hasher = Sha256::new(); hasher.update(welcome.pass_hash); hasher.update(password); let result = hasher.finalize(); return ConnectMsg{ version: welcome.version, uuid, last_notice_id: 0, username, pass_hash: result.try_into().unwrap(), pw_valid: welcome.pw_valid, state_valid: welcome.state_valid, client_name, server_time_zone: welcome.server_time_zone, }; } fn from_bytes(bytes: Vec) -> Option { if bytes.len() < CONNECT_MSG_LEN { return None; } return Some(ConnectMsg{ version: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), uuid: bytes[4..20].to_owned().try_into().unwrap(), last_notice_id: u64::from_le_bytes(bytes[20..28].try_into().unwrap()), username: bytes[28..44].to_owned().try_into().unwrap(), pass_hash: bytes[44..76].to_owned().try_into().unwrap(), pw_valid: bytes[76].to_owned(), state_valid: bytes[77].to_owned(), client_name: bytes[78..110].to_owned().try_into().unwrap(), server_time_zone: i32::from_le_bytes(bytes[110..114].try_into().unwrap()), }); } fn as_bytes(self: &ConnectMsg) -> Vec { let mut bytes = Vec::new(); bytes.extend(self.version.to_le_bytes()); bytes.extend(self.uuid); bytes.extend(self.last_notice_id.to_le_bytes()); bytes.extend(self.username); bytes.extend(self.pass_hash); bytes.extend(self.pw_valid.to_le_bytes()); bytes.extend(self.state_valid.to_le_bytes()); bytes.extend(self.client_name); bytes.extend(self.server_time_zone.to_le_bytes()); return bytes; } } #[derive(Debug)] struct NoticeMsg { notice_id: u64, notice: tm::Notice, } impl NoticeMsg { fn from_bytes(bytes: Vec) -> Option { if bytes.len() < 8 { return None; } let notice_id = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); match BackendMessage::from_bytes(bytes[8..].to_vec()) { Some(message) => { match message.data.notice { Some(notice) => Some(NoticeMsg{ notice_id, notice, }), None => None, } }, None => None, } } } struct TMClient { stream: openssl::ssl::SslStream, work_queue: mpsc::Sender, responses: mpsc::Sender>, requests: mpsc::Receiver>, uuid: [u8; 16], client_name: [u8; 32], password: String, last_seq_num: u64, username: [u8; 16], connected: bool, } const TCP_BUFFER_SIZE: usize = 10000; impl TMClient { fn new(uuid: [u8; 16], client_name: [u8; 32], password: String, username: [u8; 16]) -> (TMClient, TMConnection) { let (work_tx, work_rx) = mpsc::channel(); let (response_tx, response_rx) = mpsc::channel(); let (request_tx, request_rx) = mpsc::channel(); let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap(); builder.set_ca_file("tm.crt").unwrap(); builder.set_verify(openssl::ssl::SslVerifyMode::PEER); let connector = builder.build(); let stream = TcpStream::connect("127.0.0.1:5000").unwrap(); let mut stream_config = connector.configure().unwrap(); stream_config.set_verify_hostname(false); stream_config.set_certificate_chain_file("tm.crt").unwrap(); stream_config.set_private_key_file("tm.crt", openssl::ssl::SslFiletype::PEM).unwrap(); stream_config.set_use_server_name_indication(false); let stream = stream_config.connect("127.0.0.1", stream).unwrap(); stream.get_ref().set_read_timeout(Some(Duration::from_millis(100))).expect("Failed to set read timeout on socket"); return (TMClient{ stream, work_queue: work_tx.clone(), responses: response_tx, requests: request_rx, uuid, client_name, password, last_seq_num: 0xFFFFFFFFFFFFFFFF, username, connected: false, }, TMConnection{ work_queuer: work_tx, state_cancels: HashMap::new(), requests: request_tx, work_queue: work_rx, responses: response_rx, },); } fn process(self: &mut TMClient) { if self.connected == true { // TODO: right now it's halfway to processing multiple requests at once, but currently // it only processes a single requests/response at a time. This is fine since there's // only a single callback thread though. for request in self.requests.try_iter() { let packet = BackendPacket::new(TM_HEADER, get_float_time(), 2, self.last_seq_num + 1, request.as_bytes()); match self.stream.write(&packet.as_bytes()) { Ok(_) => { log::debug!("Sent: {:?}", packet); self.last_seq_num += 1; }, Err(error) => log::error!("Request send error: {:?}", error), } } } let mut incoming = [0; TCP_BUFFER_SIZE]; match self.stream.read(&mut incoming) { Ok(read) => { let data = incoming[0..read].to_vec(); match BackendPacket::from_bytes(data) { Some(packet) => { self.last_seq_num = packet.seq_num; match packet.msg_type { // Notice Message 4 => { match NoticeMsg::from_bytes(packet.data.clone()) { Some(notice) => { log::debug!("Received notice: {:#?}", notice); let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); self.last_seq_num += 1; match self.stream.write(&ack.as_bytes()) { Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Err(error) => log::error!("ACK error: {:?}", error), } match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { Ok(_) => log::debug!("Forwarded notice to callback engine"), Err(error) => log::error!("Notice forward error {:?}", error), } }, None => log::error!("Notice parse error: {:?}", packet), } }, // Response message 3 => { match BackendMessage::from_bytes(packet.data.clone()) { Some(message) => { log::debug!("Received response: {:#?}", message); match self.responses.send(Box::new(message)) { Ok(_) => log::debug!("Forwarded response to callback engine"), Err(error) => log::error!("Response forward error {:?}", error), } }, None => log::error!("BackendMessage parse error: {:?}", packet), } }, // Server Message 2 => { match ConnectMsg::from_bytes(packet.data) { Some(welcome_msg) => { log::debug!("Received connect message: {:#?}", welcome_msg); if welcome_msg.pw_valid == 0 { let connect_response = ConnectMsg::from_welcome(welcome_msg, &self.password, self.uuid, self.client_name, self.username); let response = BackendPacket::new(packet.header, packet.timestamp, packet.msg_type, self.last_seq_num+1, connect_response.as_bytes()); match self.stream.write(&response.as_bytes()) { Err(error) => log::error!("Send error: {:?}", error), Ok(_) => self.last_seq_num += 1, } } else if welcome_msg.state_valid == 0 { log::error!("pw_valid but not state_valid"); } else { self.connected = true; log::info!("Connected to TM backend!"); } }, None => log::error!("Failed to parse welcome msg"), } }, _ => log::warn!("Unhandled message type: {}", packet.msg_type), } }, None => { log::error!("Failed to parse BackendPacket({}): {}", read, String::from_utf8_lossy(&incoming)); // Sleep to prevent busy loop when TM is spamming 0 length packets thread::sleep(Duration::from_millis(100)); } } }, Err(_) => {}, } } } struct StateChange { next_state: MatchState, tuple: MatchTuple, field: FieldTuple, } #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq, Hash)] struct FieldTuple { set: i32, id: i32, } impl FieldTuple { fn topic(self: &FieldTuple, suffix: &str) -> String { format!("field/{}/{}{}", &self.set, &self.id, suffix) } } struct TMConnection { work_queuer: mpsc::Sender, work_queue: mpsc::Receiver, responses: mpsc::Receiver>, requests: mpsc::Sender>, state_cancels: HashMap>, } impl TMConnection { fn request(self: &TMConnection, request_id: u32, data: tm::BackendMessageData) -> BackendMessage { self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap(); return *self.responses.recv().unwrap(); } fn queue_state_change(self: &mut TMConnection, wait: Duration, state_change: StateChange) -> mpsc::Sender<()> { let work_queuer = self.work_queuer.clone(); let (cancel_tx, cancel_rx) = mpsc::channel(); thread::spawn(move || { match cancel_rx.recv_timeout(wait) { Ok(_) => log::debug!("state change cancelled"), Err(_) => match work_queuer.send(Work::State(state_change)) { Ok(_) => {}, Err(error) => log::error!("State change send error: {:?}", error), }, } }); return cancel_tx; } } type NoticeCallback = fn(tm::Notice, &mut Event, &mut TMConnection) -> Vec; fn get_affected_match(notice: &tm::Notice) -> Option { match ¬ice.affected_match { None => None, Some(affected_match) => { Some(MatchTuple{ division: affected_match.division(), round: int_to_round(affected_match.round() as i32), instance: affected_match.instance(), match_num: affected_match.r#match(), session: affected_match.session(), }) }, } } fn get_field_tuple(field: &Option) -> Option { match field { None => None, Some(field) => { Some(FieldTuple{ set: field.field_set_id(), id: field.id(), }) }, } } fn tm_tuple_to_struct(tuple: tm::MatchTuple) -> MatchTuple { return MatchTuple{ division: tuple.division(), round: int_to_round(tuple.round() as i32), instance: tuple.instance(), match_num: tuple.r#match(), session: tuple.session(), }; } fn struct_tuple_to_tm(tuple: MatchTuple) -> tm::MatchTuple { let mut out = tm::MatchTuple::default(); out.division = Some(tuple.division); out.session = Some(tuple.session); out.round = Some(tuple.round as i32); out.r#match = Some(tuple.match_num); out.instance = Some(tuple.instance); return out; } fn get_match_score(connection: &TMConnection, filter: tm::MatchTuple) -> Option { let mut req = tm::BackendMessageData::default(); req.match_tuple = Some(filter); let resp = connection.request(1000, req); match resp.data.match_score { None => None, Some(scores) => get_game_score(&scores), } } fn get_game_score(scores: &tm::MatchScore) -> Option { if scores.alliances.len() != 2 { return None; } let ref red_score = scores.alliances[0]; let ref blue_score = scores.alliances[1]; let mut out = MatchScore{ autonomous_winner: None, red_total: 0, blue_total: 0, blue_score: AllianceScore{ auton_wp: false, team_goal: 0, team_zone: 0, green_goal: 0, green_zone: 0, elevation_tiers: [0, 0], }, red_score : AllianceScore{ auton_wp: false, team_goal: 0, team_zone: 0, green_goal: 0, green_zone: 0, elevation_tiers: [0, 0], }, }; for symbol in red_score.score_types.iter() { match symbol.name.as_str() { "auto" => if symbol.val != 0 {out.autonomous_winner = Some(GameSide::Red)}, "auto_tie" => if symbol.val != 0 {out.autonomous_winner = Some(GameSide::Tie)}, "auto_wp" => out.red_score.auton_wp = symbol.val != 0, "zone_alliance_triballs" => out.red_score.team_zone = symbol.val, "goal_alliance_triballs" => out.red_score.team_goal = symbol.val, "goal_triballs" => out.red_score.green_goal = symbol.val, "zone_triballs" => out.red_score.green_zone = symbol.val, "elevation_tier_1" => out.red_score.elevation_tiers[0] = symbol.val, "elevation_tier_2" => out.red_score.elevation_tiers[1] = symbol.val, _ => {}, } } for symbol in blue_score.score_types.iter() { match symbol.name.as_str() { "auto" => if symbol.val != 0 {out.autonomous_winner = Some(GameSide::Blue)}, "auto_tie" => if symbol.val != 0 {out.autonomous_winner = Some(GameSide::Tie)}, "auto_wp" => out.blue_score.auton_wp = symbol.val != 0, "zone_alliance_triballs" => out.blue_score.team_zone = symbol.val, "goal_alliance_triballs" => out.blue_score.team_goal = symbol.val, "goal_triballs" => out.blue_score.green_goal = symbol.val, "zone_triballs" => out.blue_score.green_zone = symbol.val, "elevation_tier_1" => out.blue_score.elevation_tiers[0] = symbol.val, "elevation_tier_2" => out.blue_score.elevation_tiers[1] = symbol.val, _ => {}, } } match &out.autonomous_winner { None => {}, Some(winner) => match winner { GameSide::Red => {out.red_total = 8; out.blue_total = 0;}, GameSide::Blue => {out.red_total = 0; out.blue_total = 8;}, GameSide::Tie => {out.red_total = 4; out.blue_total = 4;}, }, } out.red_total += 5 * (out.red_score.green_goal + out.red_score.team_goal); out.red_total += 2 * (out.red_score.green_zone + out.red_score.team_zone); out.blue_total += 5 * (out.blue_score.green_goal + out.blue_score.team_goal); out.blue_total += 2 * (out.blue_score.green_zone + out.blue_score.team_zone); let mut elevations = [(0, out.red_score.elevation_tiers[0]), (1, out.red_score.elevation_tiers[1]), (2, out.blue_score.elevation_tiers[0]), (3, out.blue_score.elevation_tiers[1])]; elevations.sort_by(|a, b| b.1.cmp(&a.1)); let mut elev_list = Vec::new(); let mut elev_map: HashMap> = HashMap::new(); for elevation in elevations { match elev_list.last() { None => elev_list.push(elevation.1), Some(last) => if *last != elevation.1 {elev_list.push(elevation.1)}, } match elev_map.get_mut(&elevation.1) { None => { let mut holders = Vec::new(); holders.push(elevation.0); elev_map.insert(elevation.1, holders); }, Some(cur) => cur.push(elevation.0), } } for (idx, elevation) in elev_list.iter().enumerate() { if *elevation == 0i32 { break; } match elev_map.get(&elevation) { None => {}, Some(holders) => { for holder in holders { match holder { 0 => out.red_total += 20 - (5*idx as i32), 1 => out.red_total += 20 - (5*idx as i32), 2 => out.blue_total += 20 - (5*idx as i32), 3 => out.blue_total += 20 - (5*idx as i32), _ => {}, } } } } } return Some(out); } fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { let Some(tuple) = get_affected_match(¬ice) else { return Vec::new() }; let Some(scores) = notice.match_score else { return Vec::new() }; let Some(score) = get_game_score(&scores) else { return Vec::new() }; let Some(division) = &mut event.divisions.get_mut(&tuple.division) else { return Vec::new() }; let Some(m) = &mut division.matches.iter_mut().find(|a| a.info.tuple == tuple) else { return Vec::new() }; m.score = Some(score.clone()); m.state = MatchState{ state: GameState::Scored, start: get_float_time(), }; let score_serialized = serde_json::to_string_pretty(&m.score).unwrap(); let state_serialized = serde_json::to_string_pretty(&m.state).unwrap(); let mut out = Vec::new(); out.push(MQTTMessage{ topic: tuple.topic("/score"), payload: score_serialized.clone(), }); out.push(MQTTMessage{ topic: tuple.topic("/state"), payload: state_serialized.clone(), }); for (_, field_set) in &event.field_sets { for (_, field) in &field_set.fields { match field.last_known_match { None => {}, Some(last_known_match) => { if last_known_match == tuple { out.push(MQTTMessage{ topic: field.tuple.topic("/score"), payload: score_serialized.clone(), }); out.push(MQTTMessage{ topic: field.tuple.topic("/state"), payload: state_serialized.clone(), }); } }, } } } return out; } fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { let Some(tuple) = get_affected_match(¬ice) else { return Vec::new() }; let Some(scores) = notice.match_score else { return Vec::new() }; let Some(score) = get_game_score(&scores) else { return Vec::new() }; let Some(division) = &mut event.divisions.get_mut(&tuple.division) else { return Vec::new() }; let Some(m) = &mut division.matches.iter_mut().find(|a| a.info.tuple == tuple) else { return Vec::new() }; m.score = Some(score.clone()); let serialized = serde_json::to_string_pretty(&score).unwrap(); let mut out = Vec::new(); out.push(MQTTMessage{ topic: tuple.topic("/score"), payload: serialized.clone(), }); for (_, field_set) in &event.field_sets { for (_, field) in &field_set.fields { match field.last_known_match { None => {}, Some(last_known_match) => { if last_known_match == tuple { out.push(MQTTMessage{ topic: field.tuple.topic("/score"), payload: serialized.clone(), }); } }, } } } return out; } fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let mut messages = Vec::new(); let resp = connection.request(1004, tm::BackendMessageData::default()); match resp.data.match_schedule { None => {}, Some(schedule) => { event.parse_match_schedule(schedule); for (division_id, division) in &event.divisions { messages.push(MQTTMessage{ topic: format!("division/{}/schedule", division_id), payload: serde_json::to_string_pretty(&division.matches).unwrap(), }); for m in &division.matches { let serialized = serde_json::to_string_pretty(&m.info).unwrap(); messages.push(MQTTMessage{ topic: m.info.tuple.topic(""), payload: serialized }); let serialized_state = serde_json::to_string_pretty(&m.state).unwrap(); messages.push(MQTTMessage{ topic: m.info.tuple.topic("/state"), payload: serialized_state, }); } } }, } for (_, field_set) in &event.field_sets { for (_, field) in &field_set.fields { match field.last_known_match { None => {}, Some(tuple) => { let Some(m) = get_match(&mut event.divisions, tuple) else {continue;}; let serialized_state = serde_json::to_string_pretty(&m.state).unwrap(); let serialized = serde_json::to_string_pretty(&m.info).unwrap(); messages.push(MQTTMessage{ topic: field.tuple.topic(""), payload: serialized, }); messages.push(MQTTMessage{ topic: field.tuple.topic("/state"), payload: serialized_state, }); }, } } } return messages; } fn get_float_time() -> f64 { let time = SystemTime::now(); let millis = time.duration_since(UNIX_EPOCH).unwrap(); return (millis.as_millis() as f64)/1000.0; } fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { let Some(field_info) = get_field_tuple(¬ice.field) else { return Vec::new() }; let Some(tuple) = get_affected_match(¬ice) else { return Vec::new() }; let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() }; let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() }; m.state = MatchState{ state: GameState::Scheduled, start: get_float_time(), }; field.last_known_match = Some(tuple); let mut messages = Vec::new(); let serialized = serde_json::to_string_pretty(&m.state).unwrap(); let serialized_info = serde_json::to_string_pretty(&m.info).unwrap(); messages.push(MQTTMessage{ topic: field_info.topic("/state"), payload: serialized.clone(), }); messages.push(MQTTMessage{ topic: field_info.topic(""), payload: serialized_info, }); messages.push(MQTTMessage{ topic: tuple.topic("/state"), payload: serialized, }); if let Some(score) = &m.score { let serialized = serde_json::to_string_pretty(&score).unwrap(); messages.push(MQTTMessage{ topic: tuple.topic("/score"), payload: serialized, }); } return messages; } fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { let mut messages = Vec::new(); let Some(field_time) = ¬ice.field_time else { return Vec::new() }; let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() }; let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() }; let Some(current_match) = field.last_known_match else { return Vec::new() }; let Some(m) = get_match(&mut event.divisions, current_match) else { return Vec::new() }; m.state = MatchState{ state: GameState::Stopped, start: get_float_time(), }; let match_state_topic = current_match.topic("/state"); let field_state_topic = field_info.topic("/state"); let serialized = serde_json::to_string_pretty(&m.state).unwrap(); messages.push(MQTTMessage{ topic: match_state_topic, payload: serialized.clone(), }); messages.push(MQTTMessage{ topic: field_state_topic, payload: serialized, }); return messages; } fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let Some(field_time) = ¬ice.field_time else { return Vec::new() }; let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() }; let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() }; let Some(tuple) = field.last_known_match else { return Vec::new() }; let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() }; let Some(block_list) = &field_time.block_list else { return Vec::new() }; let Some(current_block_idx) = &field_time.current_block else { return Vec::new() }; let Some(current_block_start) = &field_time.current_block_start else { return Vec::new() }; let Some(current_block_end) = &field_time.current_block_end else { return Vec::new() }; let current_block = &block_list.entries[*current_block_idx as usize]; let mut messages = Vec::new(); if current_block.r#type == Some(2) { //Auto m.state = MatchState{ state: GameState::Autonomous, start: *current_block_start, }; let field_state_topic = field_info.topic("/state"); let match_state_topic = tuple.topic("/state"); let payload = serde_json::to_string_pretty(&m.state).unwrap(); messages.push(MQTTMessage{ topic: field_state_topic, payload: payload.clone(), }); messages.push(MQTTMessage{ topic: match_state_topic, payload, }); let field_tuple = FieldTuple{ set: field_info.set, id: field_info.id, }; let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{ next_state: MatchState{ state: GameState::AutonomousDone, start: *current_block_end, }, tuple, field: field_tuple.clone(), }); connection.state_cancels.insert(field_tuple, cancel_state); } else if current_block.r#type == Some(3) { //Driver m.state = MatchState{ state: GameState::Driver, start: *current_block_start, }; let field_state_topic = field_info.topic("/state"); let match_state_topic = tuple.topic("/state"); let payload = serde_json::to_string_pretty(&m.state).unwrap(); messages.push(MQTTMessage{ topic: field_state_topic, payload: payload.clone(), }); messages.push(MQTTMessage{ topic: match_state_topic, payload, }); let field_tuple = FieldTuple{ set: field_info.set, id: field_info.id, }; let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{ next_state: MatchState{ state: GameState::DriverDone, start: *current_block_end, }, tuple, field: field_tuple.clone(), }); connection.state_cancels.insert(field_tuple, cancel_state); } return messages; } enum Work { Notice(Box), State(StateChange), } fn main() { env_logger::init(); let mut callbacks: HashMap = HashMap::new(); callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change); callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_set); callbacks.insert(tm::NoticeId::NoticeMatchListUpdated, on_match_list_update); callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start); callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_field_assigned); callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_timer_stop); let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_last_will(LastWill{ topic: String::from("bridge/status"), message: Bytes::from("{\"online\": false}"), qos: QoS::AtLeastOnce, retain: true, }); let (mut client, mut connection) = Client::new(mqttoptions, 10); client.subscribe("bridge", QoS::AtLeastOnce).unwrap(); client.publish("bridge/status", QoS::AtLeastOnce, true, "{\"online\": true}").unwrap(); let mqtt_recv_thread = thread::spawn(move || for _ in connection.iter() { } ); let running = true; let mut uuid = [0u8; 16]; rand::thread_rng().fill_bytes(&mut uuid); let mut client_name = [0u8;32]; rand::thread_rng().fill_bytes(&mut client_name); let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let (mut tm_client, mut tm_connection) = TMClient::new(uuid, client_name, String::from(""), username); let tm_thread = thread::spawn(move || while running { tm_client.process(); } ); let mut event = Event::new(); // Get the division list let division_list_resp = tm_connection.request(200, tm::BackendMessageData::default()); event.parse_division_list(division_list_resp.data.division_list.unwrap()); // Get the field list let field_set_resp = tm_connection.request(300, tm::BackendMessageData::default()); event.parse_field_sets(field_set_resp.data.field_set_list.unwrap()); // Get the match list let match_schedule_resp = tm_connection.request(1004, tm::BackendMessageData::default()); event.parse_match_schedule(match_schedule_resp.data.match_schedule.unwrap()); // For each match, get the score and make the initial publish for (_, division) in &mut event.divisions { for m in &mut division.matches { let serialized = serde_json::to_string_pretty(&m.info).unwrap(); client.publish(m.info.tuple.topic(""), QoS::AtLeastOnce, true, serialized).unwrap(); m.score = get_match_score(&tm_connection, struct_tuple_to_tm(m.info.tuple)); let state_serialized = serde_json::to_string_pretty(&m.state).unwrap(); client.publish(m.info.tuple.topic("/state"), QoS::AtLeastOnce, true, state_serialized).unwrap(); if let Some(score) = &m.score { let serialized_score = serde_json::to_string_pretty(score).unwrap(); client.publish(m.info.tuple.topic("/score"), QoS::AtLeastOnce, true, serialized_score).unwrap(); } } } // For each field set, get the active match and assign it to the scheduled field for (field_set_id, field_set) in &mut event.field_sets { let mut field_req = tm::BackendMessageData::default(); let mut field_data = tm::OnFieldMatch::default(); let mut f = tm::Field::default(); f.field_set_id = Some(*field_set_id); field_data.field = Some(f); field_req.on_field_match = Some(field_data); let field_resp = tm_connection.request(309, field_req); match field_resp.data.on_field_match { None => {}, Some(ofm) => match ofm.match_tuple { None => {}, Some(match_tuple) => { let tuple = tm_tuple_to_struct(match_tuple); match get_match(&mut event.divisions, tuple) { None => {}, Some(m) => match field_set.fields.get_mut(&m.info.field.id) { None => {}, Some(field) => { field.last_known_match = Some(tuple); let serialized = serde_json::to_string_pretty(&m.info).unwrap(); client.publish(field.tuple.topic(""), QoS::AtLeastOnce, true, serialized).unwrap(); let serialized_score = serde_json::to_string_pretty(&m.score).unwrap(); client.publish(field.tuple.topic("/score"), QoS::AtLeastOnce, true, serialized_score).unwrap(); let serialized_state = serde_json::to_string_pretty(&m.state).unwrap(); client.publish(field.tuple.topic("/state"), QoS::AtLeastOnce, true, serialized_state).unwrap(); } }, } }, }, } } log::info!("EVENT: {:#?}", &event); // Callback loop while running { match tm_connection.work_queue.recv() { Ok(work) => match work { Work::Notice(notice) => { let callback = callbacks.get(¬ice.id()); match callback { None => { match notice.id { None => log::error!("Notice without NoticeId received"), Some(notice_id) => log::warn!("Unhandled NoticeId: {}", notice_id), } }, Some(callback) => { let messages = callback(*notice, &mut event, &mut tm_connection); for message in messages { let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload); match result { Ok(_) => {}, Err(error) => log::error!("Publish error: {}", error), } } }, } }, Work::State(state_change) => { match get_match(&mut event.divisions, state_change.tuple) { None => log::warn!("Received state change for unknown match {:#?}", state_change.tuple), Some(m) => { m.state = state_change.next_state.clone(); let state_serialized = serde_json::to_vec_pretty(&state_change.next_state).unwrap(); client.publish(state_change.field.topic("/state"), QoS::AtLeastOnce, true, state_serialized.clone()).expect("Failed MQTT publish"); client.publish(m.info.tuple.topic("/state"), QoS::AtLeastOnce, true, state_serialized).expect("Failed MQTT publish"); }, } }, }, Err(error) => log::error!("Notice recv error: {}", error), } } mqtt_recv_thread.join().expect("Failed to join mqtt thread"); tm_thread.join().expect("Failed to join tm connection thread"); }