From ea21d4728a1e8381a8a7ca124dd7f5384699316e Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sun, 21 Jan 2024 20:10:23 -0700 Subject: [PATCH] Changed notice queue to work_queue so that multiple threads can pass the main thread work to do on the event/connection --- src/main.rs | 222 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 131 insertions(+), 91 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3e831cb..48a53f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,13 +59,13 @@ enum GameState { Scheduled, Timeout, Driver, - Driverdone, + DriverDone, Autonomous, AutonomousDone, Abandoned, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] enum Round { None = 0, Practice = 1, @@ -102,7 +102,7 @@ fn int_to_round(round: i32) -> Round { } } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] struct MatchTuple { division: i32, round: Round, @@ -140,6 +140,20 @@ impl Event { } } + fn get_match(self: &mut Event, tuple: MatchTuple) -> Option<&mut Match> { + match self.divisions.get_mut(&tuple.division) { + None => {}, + Some(division) => { + for m in &mut division.matches { + if m.tuple == tuple { + return Some(m); + } + } + }, + }; + None + } + // TODO: remove extra entries instead of just adding new ones fn parse_field_sets(self: &mut Event, sets: tm::FieldSetList) { for set in sets.field_sets { @@ -170,19 +184,19 @@ impl Event { fn parse_match_list(self: &mut Event, match_list: tm::MatchList) { let mut matches: HashMap> = HashMap::new(); for m in match_list.matches.iter() { - let match_tuple = MatchTuple{ + let tuple = MatchTuple{ division: m.division.unwrap(), round: int_to_round(m.round.unwrap()), instance: m.instance.unwrap(), match_num: m.r#match.unwrap(), session: m.session.unwrap(), }; - match matches.get_mut(&match_tuple.division) { + match matches.get_mut(&tuple.division) { Some(match_list) => { match_list.push(Match{ state: None, info: None, - match_tuple: match_tuple.clone(), + tuple: tuple.clone(), }) }, None => { @@ -190,9 +204,9 @@ impl Event { new_match_list.push(Match{ state: None, info: None, - match_tuple: match_tuple.clone(), + tuple: tuple.clone(), }); - matches.insert(match_tuple.division, new_match_list); + matches.insert(tuple.division, new_match_list); }, } } @@ -247,7 +261,7 @@ struct MatchInfo { struct Match { state: Option, info: Option, - match_tuple: MatchTuple, + tuple: MatchTuple, } #[derive(Debug)] @@ -447,7 +461,7 @@ impl NoticeMsg { struct TMClient { stream: openssl::ssl::SslStream, - notices: mpsc::Sender>, + work_queue: mpsc::Sender, responses: mpsc::Sender>, requests: mpsc::Receiver>, uuid: [u8; 16], @@ -461,7 +475,7 @@ struct TMClient { const TCP_BUFFER_SIZE: usize = 10000; impl TMClient { fn new(uuid: [u8; 16], client_name: [u8; 32], password: String, username: [u8; 16]) -> (TMClient, TMConnection) { - let (notice_tx, notice_rx) = mpsc::channel(); + let (work_tx, work_rx) = mpsc::channel(); let (response_tx, response_rx) = mpsc::channel(); let (request_tx, request_rx) = mpsc::channel(); @@ -482,9 +496,10 @@ impl TMClient { let stream = stream_config.connect("127.0.0.1", stream).unwrap(); stream.get_ref().set_read_timeout(Some(Duration::from_millis(100))).expect("Failed to set read timeout on socket"); + return (TMClient{ stream, - notices: notice_tx, + work_queue: work_tx.clone(), responses: response_tx, requests: request_rx, uuid, @@ -495,8 +510,10 @@ impl TMClient { connected: false, }, TMConnection{ + work_queuer: work_tx, + state_cancels: HashMap::new(), requests: request_tx, - notices: notice_rx, + work_queue: work_rx, responses: response_rx, },); } @@ -533,14 +550,14 @@ impl TMClient { 4 => { match NoticeMsg::from_bytes(packet.data.clone()) { Some(notice) => { - log::debug!("Received notice: {:?}", notice); + log::debug!("Received notice: {:#?}", notice); let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); self.last_seq_num += 1; match self.stream.write(&ack.as_bytes()) { Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Err(error) => log::error!("ACK error: {:?}", error), } - match self.notices.send(Box::new(notice.notice)) { + match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { Ok(_) => log::debug!("Forwarded notice to callback engine"), Err(error) => log::error!("Notice forward error {:?}", error), } @@ -597,10 +614,23 @@ impl TMClient { } } +struct StateChange { + next_state: MatchState, + tuple: MatchTuple, + field: FieldTuple, +} + +struct FieldTuple { + set: i32, + id: i32, +} + struct TMConnection { - notices: mpsc::Receiver>, + work_queuer: mpsc::Sender, + work_queue: mpsc::Receiver, responses: mpsc::Receiver>, requests: mpsc::Sender>, + state_cancels: HashMap>, } impl TMConnection { @@ -608,9 +638,24 @@ impl TMConnection { self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap(); return *self.responses.recv().unwrap(); } + + fn queue_state_change(self: &mut TMConnection, wait: Duration, state_change: StateChange) -> mpsc::Sender<()> { + let work_queuer = self.work_queuer.clone(); + let (cancel_tx, cancel_rx) = mpsc::channel(); + thread::spawn(move || { + match cancel_rx.recv_timeout(wait) { + Ok(_) => match work_queuer.send(Work::State(state_change)) { + Ok(_) => {}, + Err(error) => log::error!("State change send error: {:?}", error), + }, + Err(error) => log::error!("state change queue error: {:?}", error), + } + }); + return cancel_tx; + } } -type NoticeCallback = fn(tm::Notice, &mut Event, &TMConnection) -> Vec; +type NoticeCallback = fn(tm::Notice, &mut Event, &mut TMConnection) -> Vec; fn get_affected_match(notice: &tm::Notice) -> Option { match ¬ice.affected_match { @@ -751,12 +796,10 @@ fn get_game_score(notice: &tm::Notice) -> Option { } } -fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { +fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { match get_affected_match(¬ice) { None => Vec::new(), Some(tuple) => { - // Use `event` to figure out which arena topic to publish to - // Also add the match score topic based on the tuple match get_game_score(¬ice) { None => Vec::new(), Some(score) => { @@ -765,8 +808,24 @@ fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnec let mut out = Vec::new(); out.push(MQTTMessage{ topic: game_topic, - payload: serialized, + payload: serialized.clone(), }); + for (_, field_set) in &event.field_sets { + for field in &field_set.fields { + match field.last_known_match { + None => {}, + Some(last_known_match) => { + if last_known_match == tuple { + let field_topic = format!("field/{}/score", field.id); + out.push(MQTTMessage{ + topic: field_topic, + payload: serialized.clone(), + }); + } + }, + } + } + } return out; }, } @@ -774,43 +833,7 @@ fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnec } } -fn on_match_start(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_match_cancel(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_match_reset(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_match_assigned(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_active_field_changed(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_rankings_updated(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_event_status_updated(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_elim_alliance_update(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_elim_unavail_teams_update(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec { - return Vec::new(); -} - -fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &TMConnection) -> Vec { +fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let mut messages = Vec::new(); let match_list_resp = connection.request(1002, tm::BackendMessageData::default()); match match_list_resp.data.match_list { @@ -828,22 +851,30 @@ fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &TMC return messages; } +fn on_timer_start(notice: tm::Notice, _event: &mut Event, _connection: &mut TMConnection) -> Vec { + println!("State: {:#?}", notice.field_time.unwrap()); + // 1) Find the state associated with the current block(driver or auton) + // 2) get the match tuple from the arena + // 3) add the mqtt messages for match & arena states + // 4) queue the state change to ${state}_done + // 5) add the cancel_send to the tmconnections map of timeout_sends + return Vec::new(); +} + +enum Work { + Exit, + Notice(Box), + State(StateChange), +} + fn main() { env_logger::init(); let mut callbacks: HashMap = HashMap::new(); callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change); callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_change); - callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_match_start); - callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_match_cancel); - callbacks.insert(tm::NoticeId::NoticeFieldResetTimer, on_match_reset); - callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_match_assigned); - callbacks.insert(tm::NoticeId::NoticeActiveFieldChanged, on_active_field_changed); - callbacks.insert(tm::NoticeId::NoticeRankingsUpdated, on_rankings_updated); - callbacks.insert(tm::NoticeId::NoticeEventStatusUpdated, on_event_status_updated); - callbacks.insert(tm::NoticeId::NoticeElimAllianceUpdated, on_elim_alliance_update); - callbacks.insert(tm::NoticeId::NoticeElimUnavailTeamsUpdated, on_elim_unavail_teams_update); callbacks.insert(tm::NoticeId::NoticeMatchListUpdated, on_match_list_update); + callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start); let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); @@ -863,13 +894,13 @@ fn main() { } ); - let running = true; + let mut running = true; let mut uuid = [0u8; 16]; rand::thread_rng().fill_bytes(&mut uuid); let mut client_name = [0u8;32]; rand::thread_rng().fill_bytes(&mut client_name); let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; - let (mut tm_client, tm_connection) = TMClient::new(uuid, client_name, String::from(""), username); + let (mut tm_client, mut tm_connection) = TMClient::new(uuid, client_name, String::from(""), username); let tm_thread = thread::spawn(move || while running { tm_client.process(); @@ -897,33 +928,42 @@ fn main() { field_req.on_field_match = Some(field_data); let field_resp = tm_connection.request(309, field_req); - println!("Field {}/{}: {:#?}", field_set_id, field.id, field_resp); } } while running { - thread::sleep(Duration::from_millis(1000)); - match tm_connection.notices.recv() { - Ok(notice) => { - let callback = callbacks.get(¬ice.id()); - match callback { - None => { - match notice.id { - None => log::error!("Notice without NoticeId received"), - Some(notice_id) => log::warn!("Unhandled NoticeId: {}", notice_id), - } - }, - Some(callback) => { - let messages = callback(*notice, &mut event, &tm_connection); - for message in messages { - let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload); - match result { - Ok(_) => {}, - Err(error) => log::error!("Publish error: {}", error), + match tm_connection.work_queue.recv() { + Ok(work) => match work { + Work::Exit => running = false, + Work::Notice(notice) => { + let callback = callbacks.get(¬ice.id()); + match callback { + None => { + match notice.id { + None => log::error!("Notice without NoticeId received"), + Some(notice_id) => log::warn!("Unhandled NoticeId: {}", notice_id), } - } - }, - } + }, + Some(callback) => { + let messages = callback(*notice, &mut event, &mut tm_connection); + for message in messages { + let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload); + match result { + Ok(_) => {}, + Err(error) => log::error!("Publish error: {}", error), + } + } + }, + } + }, + Work::State(state_change) => { + match event.get_match(state_change.tuple) { + None => log::warn!("Received state change for unknown match {:#?}", state_change.tuple), + Some(m) => { + m.state = Some(state_change.next_state); + }, + } + }, }, Err(error) => log::error!("Notice recv error: {}", error), }