diff --git a/src/main.rs b/src/main.rs index 158d18f..94fe727 100644 --- a/src/main.rs +++ b/src/main.rs @@ -206,7 +206,7 @@ impl Event { } } - fn parse_match_schedule(self: &mut Event, schedule: tm::MatchSchedule) { + fn parse_match_schedule(self: &mut Event, schedule: tm::MatchSchedule, offset: f64) { let mut matches: HashMap> = HashMap::new(); for m in schedule.matches.iter() { let tuple = tm_tuple_to_struct(m.match_tuple.clone().unwrap()); @@ -223,20 +223,20 @@ impl Event { match state { 0 => match_state = MatchState{ state: GameState::Scheduled, - start: scheduled as f64 / 1000.0, + start: (scheduled as f64 / 1000.0) - offset, }, 3 => { // If the match is active, use it's current state // since I can't find a way to request the auton/driver state match self.divisions.get(&tuple.division) { None => match_state = MatchState{ state: GameState::Scheduled, - start: started as f64 / 1000.0, + start: (started as f64 / 1000.0) - offset, }, Some(division) => { match division.matches.iter().find(|a| a.info.tuple == tuple) { None => match_state = MatchState{ state: GameState::Scheduled, - start: started as f64 / 1000.0, + start: (started as f64 / 1000.0) - offset, }, Some(event_m) => match_state = event_m.state.clone(), } @@ -245,15 +245,15 @@ impl Event { }, 4 => match_state = MatchState{ state: GameState::Scored, - start: resumed as f64 / 1000.0, + start: (resumed as f64 / 1000.0) - offset, }, 5 => match_state = MatchState{ state: GameState::Scored, - start: resumed as f64 / 1000.0, + start: (resumed as f64 / 1000.0) - offset, }, _ => match_state = MatchState{ state: GameState::Scheduled, - start: started as f64 / 1000.0, + start: (started as f64 / 1000.0) - offset, }, } match matches.get_mut(&tuple.division) { @@ -538,7 +538,7 @@ impl NoticeMsg { struct TMClient { stream: openssl::ssl::SslStream, work_queue: mpsc::Sender, - responses: mpsc::Sender>, + responses: mpsc::Sender<(Box, f64)>, requests: mpsc::Receiver>, uuid: [u8; 16], client_name: [u8; 32], @@ -546,6 +546,7 @@ struct TMClient { last_seq_num: u64, username: [u8; 16], connected: bool, + time_offset: f64, } const TCP_BUFFER_SIZE: usize = 10000; @@ -584,8 +585,10 @@ impl TMClient { last_seq_num: 0xFFFFFFFFFFFFFFFF, username, connected: false, + time_offset: 0.0, }, TMConnection{ + time_offset: 0.0, work_queuer: work_tx, state_cancels: HashMap::new(), requests: request_tx, @@ -596,15 +599,12 @@ impl TMClient { fn process(self: &mut TMClient) { if self.connected == true { - // TODO: right now it's halfway to processing multiple requests at once, but currently - // it only processes a single requests/response at a time. This is fine since there's - // only a single callback thread though. for request in self.requests.try_iter() { - let packet = BackendPacket::new(TM_HEADER, get_float_time(), 2, self.last_seq_num + 1, request.as_bytes()); + let packet = BackendPacket::new(TM_HEADER, get_float_time() - self.time_offset, 2, self.last_seq_num + 1, request.as_bytes()); match self.stream.write(&packet.as_bytes()) { Ok(_) => { - log::debug!("Sent: {:?}", packet); self.last_seq_num += 1; + log::debug!("Sent: {:?}", packet); }, Err(error) => log::error!("Request send error: {:?}", error), } @@ -643,7 +643,13 @@ impl TMClient { match BackendMessage::from_bytes(packet.data.clone()) { Some(message) => { log::debug!("Received response: {:#?}", message); - match self.responses.send(Box::new(message)) { + let offset = packet.timestamp - get_float_time(); + self.time_offset = offset.clone(); + log::info!("New offset: {}", offset); + log::info!("Server Timetamp: {}", packet.timestamp); + log::info!("Local Timetamp: {}", get_float_time()); + + match self.responses.send((Box::new(message), offset)) { Ok(_) => log::debug!("Forwarded response to callback engine"), Err(error) => log::error!("Response forward error {:?}", error), } @@ -710,15 +716,18 @@ impl FieldTuple { struct TMConnection { work_queuer: mpsc::Sender, work_queue: mpsc::Receiver, - responses: mpsc::Receiver>, + responses: mpsc::Receiver<(Box, f64)>, requests: mpsc::Sender>, state_cancels: HashMap>, + time_offset: f64, } impl TMConnection { - fn request(self: &TMConnection, request_id: u32, data: tm::BackendMessageData) -> BackendMessage { + fn request(self: &mut TMConnection, request_id: u32, data: tm::BackendMessageData) -> BackendMessage { self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap(); - return *self.responses.recv().unwrap(); + let (message, offset) = self.responses.recv().unwrap(); + self.time_offset = offset; + return *message; } fn queue_state_change(self: &mut TMConnection, wait: Duration, state_change: StateChange) -> mpsc::Sender<()> { @@ -786,7 +795,7 @@ fn struct_tuple_to_tm(tuple: MatchTuple) -> tm::MatchTuple { return out; } -fn get_match_score(connection: &TMConnection, filter: tm::MatchTuple) -> Option { +fn get_match_score(connection: &mut TMConnection, filter: tm::MatchTuple) -> Option { let mut req = tm::BackendMessageData::default(); req.match_tuple = Some(filter); @@ -915,7 +924,7 @@ fn get_game_score(scores: &tm::MatchScore) -> Option { return Some(out); } -fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { +fn on_score_set(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let Some(tuple) = get_affected_match(¬ice) else { return Vec::new() }; let Some(scores) = notice.match_score else { return Vec::new() }; let Some(score) = get_game_score(&scores) else { return Vec::new() }; @@ -925,7 +934,7 @@ fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConne m.score = Some(score.clone()); m.state = MatchState{ state: GameState::Scored, - start: get_float_time(), + start: get_float_time() - connection.time_offset, }; let score_serialized = serde_json::to_string_pretty(&m.score).unwrap(); @@ -1002,7 +1011,7 @@ fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &mut match resp.data.match_schedule { None => {}, Some(schedule) => { - event.parse_match_schedule(schedule); + event.parse_match_schedule(schedule, connection.time_offset); for (division_id, division) in &event.divisions { messages.push(MQTTMessage{ topic: format!("division/{}/schedule", division_id), @@ -1055,7 +1064,7 @@ fn get_float_time() -> f64 { return (millis.as_millis() as f64)/1000.0; } -fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { +fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let Some(field_info) = get_field_tuple(¬ice.field) else { return Vec::new() }; let Some(tuple) = get_affected_match(¬ice) else { return Vec::new() }; let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() }; @@ -1063,7 +1072,7 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TM m.state = MatchState{ state: GameState::Scheduled, - start: get_float_time(), + start: get_float_time() - connection.time_offset, }; field.last_known_match = Some(tuple); @@ -1097,7 +1106,7 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TM return messages; } -fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec { +fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec { let mut messages = Vec::new(); let Some(field_time) = ¬ice.field_time else { return Vec::new() }; let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() }; @@ -1107,7 +1116,7 @@ fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConn m.state = MatchState{ state: GameState::Stopped, - start: get_float_time(), + start: get_float_time() - connection.time_offset, }; let match_state_topic = current_match.topic("/state"); @@ -1144,7 +1153,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn if current_block.r#type == Some(2) { //Auto m.state = MatchState{ state: GameState::Autonomous, - start: *current_block_start, + start: *current_block_start - connection.time_offset, }; let field_state_topic = field_info.topic("/state"); let match_state_topic = tuple.topic("/state"); @@ -1165,7 +1174,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{ next_state: MatchState{ state: GameState::AutonomousDone, - start: *current_block_end, + start: *current_block_end - connection.time_offset, }, tuple, field: field_tuple.clone(), @@ -1174,7 +1183,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn } else if current_block.r#type == Some(3) { //Driver m.state = MatchState{ state: GameState::Driver, - start: *current_block_start, + start: *current_block_start - connection.time_offset, }; let field_state_topic = field_info.topic("/state"); let match_state_topic = tuple.topic("/state"); @@ -1195,7 +1204,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{ next_state: MatchState{ state: GameState::DriverDone, - start: *current_block_end, + start: *current_block_end - connection.time_offset, }, tuple, field: field_tuple.clone(), @@ -1264,14 +1273,14 @@ fn main() { // Get the match list let match_schedule_resp = tm_connection.request(1004, tm::BackendMessageData::default()); - event.parse_match_schedule(match_schedule_resp.data.match_schedule.unwrap()); + event.parse_match_schedule(match_schedule_resp.data.match_schedule.unwrap(), tm_connection.time_offset); // For each match, get the score and make the initial publish for (_, division) in &mut event.divisions { for m in &mut division.matches { let serialized = serde_json::to_string_pretty(&m.info).unwrap(); client.publish(m.info.tuple.topic(""), QoS::AtLeastOnce, true, serialized).unwrap(); - m.score = get_match_score(&tm_connection, struct_tuple_to_tm(m.info.tuple)); + m.score = get_match_score(&mut tm_connection, struct_tuple_to_tm(m.info.tuple)); let state_serialized = serde_json::to_string_pretty(&m.state).unwrap(); client.publish(m.info.tuple.topic("/state"), QoS::AtLeastOnce, true, state_serialized).unwrap();