Add time offset publish

master
noah metz 2024-01-27 16:32:43 -07:00
parent b7c09dcbd5
commit 6bcc750e0a
1 changed files with 28 additions and 4 deletions

@ -552,6 +552,7 @@ struct TMClient {
username: [u8; 16], username: [u8; 16],
connected: bool, connected: bool,
time_offset: f64, time_offset: f64,
offset_tick: u16,
} }
impl TMClient { impl TMClient {
@ -590,6 +591,7 @@ impl TMClient {
username, username,
connected: false, connected: false,
time_offset: 0.0, time_offset: 0.0,
offset_tick: 0,
}, },
TMConnection{ TMConnection{
time_offset: 0.0, time_offset: 0.0,
@ -633,6 +635,15 @@ impl TMClient {
} }
}, },
Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => { Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {
if self.offset_tick == 0 {
match self.work_queue.send(Work::Offset(self.time_offset)) {
Ok(_) => log::debug!("Sent time offset"),
Err(error) => log::error!("Offset send error: {:#?}", error),
}
self.offset_tick = 100;
} else {
self.offset_tick -= 1;
}
log::debug!("Resource temporarily unavailable, retrying later."); log::debug!("Resource temporarily unavailable, retrying later.");
return; return;
}, },
@ -674,7 +685,7 @@ impl TMClient {
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id),
Err(error) => log::error!("ACK error: {:?}", error), Err(error) => log::error!("ACK error: {:?}", error),
} }
match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { match self.work_queue.send(Work::Notice(Box::new(NoticeWithOffset{notice: notice.notice, offset: self.time_offset}))) {
Ok(_) => log::debug!("Forwarded notice to callback engine"), Ok(_) => log::debug!("Forwarded notice to callback engine"),
Err(error) => log::error!("Notice forward error {:?}", error), Err(error) => log::error!("Notice forward error {:?}", error),
} }
@ -1302,8 +1313,14 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn
return messages; return messages;
} }
struct NoticeWithOffset {
notice: tm::Notice,
offset: f64,
}
enum Work { enum Work {
Notice(Box<tm::Notice>), Notice(Box<NoticeWithOffset>),
Offset(f64),
State(StateChange), State(StateChange),
} }
@ -1428,8 +1445,15 @@ fn main() {
while running { while running {
match tm_connection.work_queue.recv() { match tm_connection.work_queue.recv() {
Ok(work) => match work { Ok(work) => match work {
Work::Notice(notice) => { Work::Offset(offset) => {
let current = get_float_time() - offset;
client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(&current).unwrap()).unwrap();
},
Work::Notice(package) => {
let current = get_float_time() - package.offset;
let notice = package.notice;
let callback = callbacks.get(&notice.id()); let callback = callbacks.get(&notice.id());
client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(&current).unwrap()).unwrap();
match callback { match callback {
None => { None => {
match notice.id { match notice.id {
@ -1438,7 +1462,7 @@ fn main() {
} }
}, },
Some(callback) => { Some(callback) => {
let messages = callback(*notice, &mut event, &mut tm_connection); let messages = callback(notice, &mut event, &mut tm_connection);
for message in messages { for message in messages {
let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload); let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload);
match result { match result {