Fixed clock drift by calculating offset every request

master
noah metz 2024-01-22 13:47:36 -07:00
parent 8732193a4c
commit 4ddef293bc
1 changed files with 40 additions and 31 deletions

@ -206,7 +206,7 @@ impl Event {
}
}
fn parse_match_schedule(self: &mut Event, schedule: tm::MatchSchedule) {
fn parse_match_schedule(self: &mut Event, schedule: tm::MatchSchedule, offset: f64) {
let mut matches: HashMap<i32, Vec<Match>> = HashMap::new();
for m in schedule.matches.iter() {
let tuple = tm_tuple_to_struct(m.match_tuple.clone().unwrap());
@ -223,20 +223,20 @@ impl Event {
match state {
0 => match_state = MatchState{
state: GameState::Scheduled,
start: scheduled as f64 / 1000.0,
start: (scheduled as f64 / 1000.0) - offset,
},
3 => { // If the match is active, use it's current state
// since I can't find a way to request the auton/driver state
match self.divisions.get(&tuple.division) {
None => match_state = MatchState{
state: GameState::Scheduled,
start: started as f64 / 1000.0,
start: (started as f64 / 1000.0) - offset,
},
Some(division) => {
match division.matches.iter().find(|a| a.info.tuple == tuple) {
None => match_state = MatchState{
state: GameState::Scheduled,
start: started as f64 / 1000.0,
start: (started as f64 / 1000.0) - offset,
},
Some(event_m) => match_state = event_m.state.clone(),
}
@ -245,15 +245,15 @@ impl Event {
},
4 => match_state = MatchState{
state: GameState::Scored,
start: resumed as f64 / 1000.0,
start: (resumed as f64 / 1000.0) - offset,
},
5 => match_state = MatchState{
state: GameState::Scored,
start: resumed as f64 / 1000.0,
start: (resumed as f64 / 1000.0) - offset,
},
_ => match_state = MatchState{
state: GameState::Scheduled,
start: started as f64 / 1000.0,
start: (started as f64 / 1000.0) - offset,
},
}
match matches.get_mut(&tuple.division) {
@ -538,7 +538,7 @@ impl NoticeMsg {
struct TMClient {
stream: openssl::ssl::SslStream<TcpStream>,
work_queue: mpsc::Sender<Work>,
responses: mpsc::Sender<Box<BackendMessage>>,
responses: mpsc::Sender<(Box<BackendMessage>, f64)>,
requests: mpsc::Receiver<Box<BackendMessage>>,
uuid: [u8; 16],
client_name: [u8; 32],
@ -546,6 +546,7 @@ struct TMClient {
last_seq_num: u64,
username: [u8; 16],
connected: bool,
time_offset: f64,
}
const TCP_BUFFER_SIZE: usize = 10000;
@ -584,8 +585,10 @@ impl TMClient {
last_seq_num: 0xFFFFFFFFFFFFFFFF,
username,
connected: false,
time_offset: 0.0,
},
TMConnection{
time_offset: 0.0,
work_queuer: work_tx,
state_cancels: HashMap::new(),
requests: request_tx,
@ -596,15 +599,12 @@ impl TMClient {
fn process(self: &mut TMClient) {
if self.connected == true {
// TODO: right now it's halfway to processing multiple requests at once, but currently
// it only processes a single requests/response at a time. This is fine since there's
// only a single callback thread though.
for request in self.requests.try_iter() {
let packet = BackendPacket::new(TM_HEADER, get_float_time(), 2, self.last_seq_num + 1, request.as_bytes());
let packet = BackendPacket::new(TM_HEADER, get_float_time() - self.time_offset, 2, self.last_seq_num + 1, request.as_bytes());
match self.stream.write(&packet.as_bytes()) {
Ok(_) => {
log::debug!("Sent: {:?}", packet);
self.last_seq_num += 1;
log::debug!("Sent: {:?}", packet);
},
Err(error) => log::error!("Request send error: {:?}", error),
}
@ -643,7 +643,13 @@ impl TMClient {
match BackendMessage::from_bytes(packet.data.clone()) {
Some(message) => {
log::debug!("Received response: {:#?}", message);
match self.responses.send(Box::new(message)) {
let offset = packet.timestamp - get_float_time();
self.time_offset = offset.clone();
log::info!("New offset: {}", offset);
log::info!("Server Timetamp: {}", packet.timestamp);
log::info!("Local Timetamp: {}", get_float_time());
match self.responses.send((Box::new(message), offset)) {
Ok(_) => log::debug!("Forwarded response to callback engine"),
Err(error) => log::error!("Response forward error {:?}", error),
}
@ -710,15 +716,18 @@ impl FieldTuple {
struct TMConnection {
work_queuer: mpsc::Sender<Work>,
work_queue: mpsc::Receiver<Work>,
responses: mpsc::Receiver<Box<BackendMessage>>,
responses: mpsc::Receiver<(Box<BackendMessage>, f64)>,
requests: mpsc::Sender<Box<BackendMessage>>,
state_cancels: HashMap<FieldTuple, mpsc::Sender<()>>,
time_offset: f64,
}
impl TMConnection {
fn request(self: &TMConnection, request_id: u32, data: tm::BackendMessageData) -> BackendMessage {
fn request(self: &mut TMConnection, request_id: u32, data: tm::BackendMessageData) -> BackendMessage {
self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap();
return *self.responses.recv().unwrap();
let (message, offset) = self.responses.recv().unwrap();
self.time_offset = offset;
return *message;
}
fn queue_state_change(self: &mut TMConnection, wait: Duration, state_change: StateChange) -> mpsc::Sender<()> {
@ -786,7 +795,7 @@ fn struct_tuple_to_tm(tuple: MatchTuple) -> tm::MatchTuple {
return out;
}
fn get_match_score(connection: &TMConnection, filter: tm::MatchTuple) -> Option<MatchScore> {
fn get_match_score(connection: &mut TMConnection, filter: tm::MatchTuple) -> Option<MatchScore> {
let mut req = tm::BackendMessageData::default();
req.match_tuple = Some(filter);
@ -915,7 +924,7 @@ fn get_game_score(scores: &tm::MatchScore) -> Option<MatchScore> {
return Some(out);
}
fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
fn on_score_set(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(tuple) = get_affected_match(&notice) else { return Vec::new() };
let Some(scores) = notice.match_score else { return Vec::new() };
let Some(score) = get_game_score(&scores) else { return Vec::new() };
@ -925,7 +934,7 @@ fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConne
m.score = Some(score.clone());
m.state = MatchState{
state: GameState::Scored,
start: get_float_time(),
start: get_float_time() - connection.time_offset,
};
let score_serialized = serde_json::to_string_pretty(&m.score).unwrap();
@ -1002,7 +1011,7 @@ fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &mut
match resp.data.match_schedule {
None => {},
Some(schedule) => {
event.parse_match_schedule(schedule);
event.parse_match_schedule(schedule, connection.time_offset);
for (division_id, division) in &event.divisions {
messages.push(MQTTMessage{
topic: format!("division/{}/schedule", division_id),
@ -1055,7 +1064,7 @@ fn get_float_time() -> f64 {
return (millis.as_millis() as f64)/1000.0;
}
fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(field_info) = get_field_tuple(&notice.field) else { return Vec::new() };
let Some(tuple) = get_affected_match(&notice) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
@ -1063,7 +1072,7 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TM
m.state = MatchState{
state: GameState::Scheduled,
start: get_float_time(),
start: get_float_time() - connection.time_offset,
};
field.last_known_match = Some(tuple);
@ -1097,7 +1106,7 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TM
return messages;
}
fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
fn on_timer_stop(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
let mut messages = Vec::new();
let Some(field_time) = &notice.field_time else { return Vec::new() };
let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() };
@ -1107,7 +1116,7 @@ fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConn
m.state = MatchState{
state: GameState::Stopped,
start: get_float_time(),
start: get_float_time() - connection.time_offset,
};
let match_state_topic = current_match.topic("/state");
@ -1144,7 +1153,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
if current_block.r#type == Some(2) { //Auto
m.state = MatchState{
state: GameState::Autonomous,
start: *current_block_start,
start: *current_block_start - connection.time_offset,
};
let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state");
@ -1165,7 +1174,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{
next_state: MatchState{
state: GameState::AutonomousDone,
start: *current_block_end,
start: *current_block_end - connection.time_offset,
},
tuple,
field: field_tuple.clone(),
@ -1174,7 +1183,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
} else if current_block.r#type == Some(3) { //Driver
m.state = MatchState{
state: GameState::Driver,
start: *current_block_start,
start: *current_block_start - connection.time_offset,
};
let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state");
@ -1195,7 +1204,7 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{
next_state: MatchState{
state: GameState::DriverDone,
start: *current_block_end,
start: *current_block_end - connection.time_offset,
},
tuple,
field: field_tuple.clone(),
@ -1264,14 +1273,14 @@ fn main() {
// Get the match list
let match_schedule_resp = tm_connection.request(1004, tm::BackendMessageData::default());
event.parse_match_schedule(match_schedule_resp.data.match_schedule.unwrap());
event.parse_match_schedule(match_schedule_resp.data.match_schedule.unwrap(), tm_connection.time_offset);
// For each match, get the score and make the initial publish
for (_, division) in &mut event.divisions {
for m in &mut division.matches {
let serialized = serde_json::to_string_pretty(&m.info).unwrap();
client.publish(m.info.tuple.topic(""), QoS::AtLeastOnce, true, serialized).unwrap();
m.score = get_match_score(&tm_connection, struct_tuple_to_tm(m.info.tuple));
m.score = get_match_score(&mut tm_connection, struct_tuple_to_tm(m.info.tuple));
let state_serialized = serde_json::to_string_pretty(&m.state).unwrap();
client.publish(m.info.tuple.topic("/state"), QoS::AtLeastOnce, true, state_serialized).unwrap();