From 6bcc750e0ac397a2566e1c716597cc9c30a519d6 Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Sat, 27 Jan 2024 16:32:43 -0700 Subject: [PATCH] Add time offset publish --- src/main.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 45f8bca..a78c072 100644 --- a/src/main.rs +++ b/src/main.rs @@ -552,6 +552,7 @@ struct TMClient { username: [u8; 16], connected: bool, time_offset: f64, + offset_tick: u16, } impl TMClient { @@ -590,6 +591,7 @@ impl TMClient { username, connected: false, time_offset: 0.0, + offset_tick: 0, }, TMConnection{ time_offset: 0.0, @@ -633,6 +635,15 @@ impl TMClient { } }, Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => { + if self.offset_tick == 0 { + match self.work_queue.send(Work::Offset(self.time_offset)) { + Ok(_) => log::debug!("Sent time offset"), + Err(error) => log::error!("Offset send error: {:#?}", error), + } + self.offset_tick = 100; + } else { + self.offset_tick -= 1; + } log::debug!("Resource temporarily unavailable, retrying later."); return; }, @@ -674,7 +685,7 @@ impl TMClient { Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Err(error) => log::error!("ACK error: {:?}", error), } - match self.work_queue.send(Work::Notice(Box::new(notice.notice))) { + match self.work_queue.send(Work::Notice(Box::new(NoticeWithOffset{notice: notice.notice, offset: self.time_offset}))) { Ok(_) => log::debug!("Forwarded notice to callback engine"), Err(error) => log::error!("Notice forward error {:?}", error), } @@ -1302,8 +1313,14 @@ fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConn return messages; } +struct NoticeWithOffset { + notice: tm::Notice, + offset: f64, +} + enum Work { - Notice(Box), + Notice(Box), + Offset(f64), State(StateChange), } @@ -1428,8 +1445,15 @@ fn main() { while running { match tm_connection.work_queue.recv() { Ok(work) => match work { - Work::Notice(notice) => { + Work::Offset(offset) => { + let current = get_float_time() - offset; + client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(¤t).unwrap()).unwrap(); + }, + Work::Notice(package) => { + let current = get_float_time() - package.offset; + let notice = package.notice; let callback = callbacks.get(¬ice.id()); + client.publish("time", QoS::AtMostOnce, false, serde_json::to_vec(¤t).unwrap()).unwrap(); match callback { None => { match notice.id { @@ -1438,7 +1462,7 @@ fn main() { } }, Some(callback) => { - let messages = callback(*notice, &mut event, &mut tm_connection); + let messages = callback(notice, &mut event, &mut tm_connection); for message in messages { let result = client.publish(message.topic, QoS::AtLeastOnce, true, message.payload); match result {