Added callback mechanism with dummy function to retrieve notices

master
noah metz 2024-01-18 17:22:41 -07:00
parent 0b33a9e580
commit 64c77b259d
1 changed files with 86 additions and 16 deletions

@ -1,6 +1,7 @@
use rumqttc::{MqttOptions, Client, QoS}; use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration; use std::time::Duration;
use std::thread; use std::thread;
use std::collections::hash_map::HashMap;
use prost::Message; use prost::Message;
use std::io::Cursor; use std::io::Cursor;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -75,12 +76,12 @@ use serde::{Serialize, Deserialize};
// - game/{division_id}/{game_id}/score // - game/{division_id}/{game_id}/score
// - team/{team_string} // - team/{team_string}
pub mod tm_proto { pub mod tm {
include!(concat!(env!("OUT_DIR"), "/tm.rs")); include!(concat!(env!("OUT_DIR"), "/tm.rs"));
} }
pub fn deserialize_notice(buf: &[u8]) -> Result<tm_proto::Notice, prost::DecodeError> { pub fn deserialize_notice(buf: &[u8]) -> Result<tm::Notice, prost::DecodeError> {
tm_proto::Notice::decode(&mut Cursor::new(buf)) tm::Notice::decode(&mut Cursor::new(buf))
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -184,29 +185,78 @@ struct ArenaInfo {
match_tuple: MatchTuple, match_tuple: MatchTuple,
} }
fn main() { struct MQTTMessage {
let score = GameScore{ topic: String,
payload: String,
}
fn get_game_score(scores: tm::MatchScore) -> Option<GameScore> {
if scores.alliances.len() != 2 {
return None;
}
let ref red_score = scores.alliances[0];
let ref blue_score = scores.alliances[1];
// 1) Get the autonomous winner
// 2) Get score object and fill AllianceScore struct
// 3) Compute total scores
let out = GameScore{
autonomous_winner: None, autonomous_winner: None,
red_score: AllianceScore{ red_total: 0,
blue_total: 0,
blue_score: AllianceScore{
team_goal: 0, team_goal: 0,
team_zone: 0, team_zone: 0,
green_goal: 0, green_goal: 0,
green_zone: 0, green_zone: 0,
elevation_tiers: [None, None], elevation_tiers: [None, None],
}, },
blue_score: AllianceScore{ red_score : AllianceScore{
team_goal: 0, team_goal: 0,
team_zone: 0, team_zone: 0,
green_goal: 0, green_goal: 0,
green_zone: 0, green_zone: 0,
elevation_tiers: [None, None], elevation_tiers: [None, None],
}, },
red_total: 0,
blue_total: 0,
}; };
let serialized = serde_json::to_string(&score).unwrap(); return Some(out);
println!("Serialized = {}", serialized); }
fn on_score_change(notice: tm::Notice) -> Vec<MQTTMessage> {
match notice.match_score {
None => return Vec::new(),
Some(game_scores) => {
match get_game_score(game_scores) {
Some(score_json) => {
let serialized = serde_json::to_string(&score_json).unwrap();
let arena_topic = String::from("arena/TEST/score");
let mut out = Vec::new();
out.push(MQTTMessage{
topic: arena_topic,
payload: serialized,
});
return out;
},
None => return Vec::new(),
}
},
}
}
fn get_next_notice() -> tm::Notice {
thread::sleep_ms(1000);
return tm::Notice::default();
}
type NoticeCallback = fn(tm::Notice) -> Vec<MQTTMessage>;
fn main() {
let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new();
callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change);
// Set client options // Set client options
let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883); let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883);
@ -216,17 +266,37 @@ fn main() {
let (mut client, mut connection) = Client::new(mqttoptions, 10); let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("bridge", QoS::AtMostOnce).unwrap(); client.subscribe("bridge", QoS::AtMostOnce).unwrap();
client.publish("bridge/status", QoS::AtLeastOnce, false, "{\"status\": true}").unwrap();
// Create a thread to own connection and handle it's incoming messages // Create a thread to own connection and handle it's incoming messages
let running = true;
let mqtt_thread = thread::spawn(move || let mqtt_thread = thread::spawn(move ||
for (_, message) in connection.iter().enumerate() { for message in connection.iter() {
println!("Message = {:?}", message); println!("Message = {:?}", message);
} }
); );
// Publish using the client while running {
for _ in 0..10 { let notice = get_next_notice();
client.publish("bridge/status", QoS::AtLeastOnce, false, "{\"status\": true}").unwrap(); let callback = callbacks.get(&notice.id());
thread::sleep(Duration::from_millis(100)); match callback {
None => {
match notice.id {
None => println!("Notice without NoticeId received"),
Some(notice_id) => println!("Unhandled NoticeId: {}", notice_id),
}
},
Some(callback) => {
let messages = callback(notice);
for message in messages {
let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload);
match result {
Ok(_) => {},
Err(error) => println!("Publish error: {}", error),
}
}
},
}
} }
mqtt_thread.join().expect("Failed to join mqtt thread"); mqtt_thread.join().expect("Failed to join mqtt thread");