Added some more setup functions to populate the event on boot

master
noah metz 2024-01-22 02:08:53 -07:00
parent d1dd10e090
commit 10a1175941
1 changed files with 305 additions and 73 deletions

@ -1,4 +1,3 @@
use log::warn;
use rumqttc::{MqttOptions, Client, QoS, LastWill};
use bytes::Bytes;
use std::time::Duration;
@ -58,12 +57,14 @@ struct MatchScore {
#[derive(Serialize, Deserialize, Debug, Clone)]
enum GameState {
Scheduled,
Stopped,
Timeout,
Driver,
DriverDone,
Autonomous,
AutonomousDone,
Abandoned,
Scored,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
@ -112,6 +113,12 @@ struct MatchTuple {
session: i32,
}
impl MatchTuple {
fn topic(self: &MatchTuple, suffix: &str) -> String {
format!("division/{}/{:?}/{}{}", &self.division, &self.round, &self.match_num, suffix)
}
}
#[derive(Debug)]
struct MQTTMessage {
topic: String,
@ -132,6 +139,34 @@ struct Event {
rankings: Vec<Rank>,
}
fn get_field(sets: &mut HashMap<i32, FieldSet>, tuple: FieldTuple) -> Option<&mut Field> {
match sets.get_mut(&tuple.set) {
None => {},
Some(set) => {
for (_, field) in &mut set.fields {
if field.tuple == tuple {
return Some(field);
}
}
},
};
None
}
fn get_match(divisions: &mut HashMap<i32, Division>, tuple: MatchTuple) -> Option<&mut Match> {
match divisions.get_mut(&tuple.division) {
None => {},
Some(division) => {
for m in &mut division.matches {
if m.tuple == tuple {
return Some(m);
}
}
},
};
None
}
impl Event {
fn new() -> Event {
Event{
@ -141,26 +176,16 @@ impl Event {
}
}
fn get_match(self: &mut Event, tuple: MatchTuple) -> Option<&mut Match> {
match self.divisions.get_mut(&tuple.division) {
None => {},
Some(division) => {
for m in &mut division.matches {
if m.tuple == tuple {
return Some(m);
}
}
},
};
None
}
// TODO: remove extra entries instead of just adding new ones
fn parse_field_sets(self: &mut Event, sets: tm::FieldSetList) {
for set in sets.field_sets {
let mut fields = HashMap::new();
for field in &set.fields {
fields.insert(field.id(), Field{
tuple: FieldTuple{
set: set.id(),
id: field.id(),
},
name: String::from(field.name()),
last_known_match: None,
});
@ -242,6 +267,7 @@ struct FieldSet {
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Field {
name: String,
tuple: FieldTuple,
last_known_match: Option<MatchTuple>,
}
@ -255,6 +281,7 @@ struct MatchState {
struct MatchInfo {
red_teams: [String; 2],
blue_teams: [String; 2],
field: FieldTuple,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -625,6 +652,12 @@ struct FieldTuple {
id: i32,
}
impl FieldTuple {
fn topic(self: &FieldTuple, suffix: &str) -> String {
format!("field/{}/{}{}", &self.set, &self.id, suffix)
}
}
struct TMConnection {
work_queuer: mpsc::Sender<Work>,
work_queue: mpsc::Receiver<Work>,
@ -644,11 +677,11 @@ impl TMConnection {
let (cancel_tx, cancel_rx) = mpsc::channel();
thread::spawn(move || {
match cancel_rx.recv_timeout(wait) {
Ok(_) => match work_queuer.send(Work::State(state_change)) {
Ok(_) => log::debug!("state change cancelled"),
Err(_) => match work_queuer.send(Work::State(state_change)) {
Ok(_) => {},
Err(error) => log::error!("State change send error: {:?}", error),
},
Err(error) => log::error!("state change queue error: {:?}", error),
}
});
return cancel_tx;
@ -672,6 +705,75 @@ fn get_affected_match(notice: &tm::Notice) -> Option<MatchTuple> {
}
}
fn get_field_tuple(field: &Option<tm::Field>) -> Option<FieldTuple> {
match field {
None => None,
Some(field) => {
Some(FieldTuple{
set: field.field_set_id(),
id: field.id(),
})
},
}
}
fn tm_tuple_to_struct(tuple: tm::MatchTuple) -> MatchTuple {
return MatchTuple{
division: tuple.division(),
round: int_to_round(tuple.round() as i32),
instance: tuple.instance(),
match_num: tuple.r#match(),
session: tuple.session(),
};
}
fn struct_tuple_to_tm(tuple: MatchTuple) -> tm::MatchTuple {
let mut out = tm::MatchTuple::default();
out.division = Some(tuple.division);
out.session = Some(tuple.session);
out.round = Some(tuple.round as i32);
out.r#match = Some(tuple.match_num);
out.instance = Some(tuple.instance);
return out;
}
fn get_match_info(connection: &TMConnection, filter: tm::MatchTuple) -> Option<MatchInfo> {
let mut req = tm::BackendMessageData::default();
req.match_tuple = Some(filter);
let resp = connection.request(1004, req);
let Some(schedule) = resp.data.match_schedule else { return None; };
if schedule.matches.len() != 1 {
return None;
}
let Some(field) = &schedule.matches[0].assigned_field else { return None; };
let red_1 = schedule.matches[0].alliances[0].teams[0].number();
let red_2 = schedule.matches[0].alliances[0].teams[1].number();
let blue_1 = schedule.matches[0].alliances[1].teams[0].number();
let blue_2 = schedule.matches[0].alliances[1].teams[1].number();
return Some(MatchInfo{
red_teams: [String::from(red_1), String::from(red_2)],
blue_teams: [String::from(blue_1), String::from(blue_2)],
field: FieldTuple{
set: field.field_set_id(),
id: field.id(),
},
});
}
fn get_match_score(connection: &TMConnection, filter: tm::MatchTuple) -> Option<MatchScore> {
let mut req = tm::BackendMessageData::default();
req.match_tuple = Some(filter);
let resp = connection.request(1000, req);
match resp.data.match_score {
None => None,
Some(scores) => get_game_score(&scores),
}
}
fn get_game_score(scores: &tm::MatchScore) -> Option<MatchScore> {
if scores.alliances.len() != 2 {
return None;
@ -790,6 +892,58 @@ fn get_game_score(scores: &tm::MatchScore) -> Option<MatchScore> {
return Some(out);
}
fn on_score_set(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(tuple) = get_affected_match(&notice) else { return Vec::new() };
let Some(scores) = notice.match_score else { return Vec::new() };
let Some(score) = get_game_score(&scores) else { return Vec::new() };
let Some(division) = &mut event.divisions.get_mut(&tuple.division) else { return Vec::new() };
let Some(m) = &mut division.matches.iter_mut().find(|a| a.tuple == tuple) else { return Vec::new() };
m.score = Some(score.clone());
m.state = Some(MatchState{
state: GameState::Scored,
start: get_float_time(),
});
let score_serialized = serde_json::to_string_pretty(&m.score).unwrap();
let state_serialized = serde_json::to_string_pretty(&m.state).unwrap();
let game_score_topic = format!("division/{}/{:?}/{}/score", tuple.division, tuple.round, tuple.match_num);
let game_state_topic = format!("division/{}/{:?}/{}/state", tuple.division, tuple.round, tuple.match_num);
let mut out = Vec::new();
out.push(MQTTMessage{
topic: game_score_topic,
payload: score_serialized.clone(),
});
out.push(MQTTMessage{
topic: game_state_topic,
payload: state_serialized.clone(),
});
for (field_set_id, field_set) in &event.field_sets {
for (field_id, field) in &field_set.fields {
match field.last_known_match {
None => {},
Some(last_known_match) => {
if last_known_match == tuple {
let field_score_topic = format!("field/{}/{}/score", field_set_id, field_id);
let field_state_topic = format!("field/{}/{}/state", field_set_id, field_id);
out.push(MQTTMessage{
topic: field_score_topic,
payload: score_serialized.clone(),
});
out.push(MQTTMessage{
topic: field_state_topic,
payload: state_serialized.clone(),
});
}
},
}
}
}
return out;
}
fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(tuple) = get_affected_match(&notice) else { return Vec::new() };
let Some(scores) = notice.match_score else { return Vec::new() };
@ -849,13 +1003,12 @@ fn get_float_time() -> f64 {
return (millis.as_millis() as f64)/1000.0;
}
fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(field_info) = &notice.field else { return Vec::new() };
fn on_field_assigned(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
let Some(field_info) = get_field_tuple(&notice.field) else { return Vec::new() };
let Some(tuple) = get_affected_match(&notice) else { return Vec::new() };
let Some(field_set) = &mut event.field_sets.get_mut(&field_info.field_set_id()) else { return Vec::new() };
let Some(field) = &mut field_set.fields.get_mut(&field_info.id()) else { return Vec::new() };
let Some(division) = &mut event.divisions.get_mut(&tuple.division) else { return Vec::new() };
let Some(m) = &mut division.matches.iter_mut().find(|a| a.tuple == tuple) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() };
m.state = Some(MatchState{
state: GameState::Scheduled,
start: get_float_time(),
@ -866,8 +1019,8 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMC
if let Some(state) = &m.state {
let serialized = serde_json::to_string_pretty(&state).unwrap();
let field_topic = format!("field/{}/{}/state", &field_info.field_set_id(), &field_info.id());
let match_topic = format!("division/{}/{:?}/{}/state", &m.tuple.division, &m.tuple.round, &m.tuple.match_num);
let field_topic = field_info.topic("/state");
let match_topic = tuple.topic("/state");
messages.push(MQTTMessage{
topic: field_topic,
payload: serialized.clone(),
@ -881,7 +1034,7 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMC
if let Some(score) = &m.score {
let serialized = serde_json::to_string_pretty(&score).unwrap();
let topic = format!("field/{}/{}/score", &field_info.field_set_id(), &field_info.id());
let topic = tuple.topic("/score");
messages.push(MQTTMessage{
topic,
payload: serialized,
@ -891,45 +1044,107 @@ fn on_field_assigned(notice: tm::Notice, event: &mut Event, connection: &mut TMC
return messages;
}
fn on_timer_stop(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
let mut messages = Vec::new();
let Some(field_time) = &notice.field_time else { return Vec::new() };
let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
let Some(current_match) = field.last_known_match else { return Vec::new() };
let Some(m) = get_match(&mut event.divisions, current_match) else { return Vec::new() };
m.state = Some(MatchState{
state: GameState::Stopped,
start: get_float_time(),
});
let match_state_topic = current_match.topic("/state");
let field_state_topic = field_info.topic("/state");
let serialized = serde_json::to_string_pretty(&m.state).unwrap();
messages.push(MQTTMessage{
topic: match_state_topic,
payload: serialized.clone(),
});
messages.push(MQTTMessage{
topic: field_state_topic,
payload: serialized,
});
return messages;
}
fn on_timer_start(notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
// 1) Find the state associated with the current block(driver or auton)
// 2) get the match tuple from the field
// 3) add the mqtt messages for match & field states
// 4) queue the state change to ${state}_done
// 5) add the cancel_send to the tmconnections map of timeout_sends
let Some(field_time) = notice.field_time else { return Vec::new() };
let Some(field_info) = &field_time.field else { return Vec::new() };
let Some(field_time) = &notice.field_time else { return Vec::new() };
let Some(field_info) = get_field_tuple(&field_time.field) else { return Vec::new() };
let Some(field) = get_field(&mut event.field_sets, field_info) else { return Vec::new() };
let Some(tuple) = field.last_known_match else { return Vec::new() };
let Some(m) = get_match(&mut event.divisions, tuple) else { return Vec::new() };
let Some(block_list) = &field_time.block_list else { return Vec::new() };
let Some(current_block_idx) = &field_time.current_block else { return Vec::new() };
let Some(current_block_start) = &field_time.current_block_start else { return Vec::new() };
let Some(current_block_end) = &field_time.current_block_end else { return Vec::new() };
let Some(field_set) = &event.field_sets.get(&field_info.field_set_id()) else { return Vec::new() };
let Some(field) = &field_set.fields.get(&field_info.id()) else { return Vec::new() };
let Some(tuple) = &field.last_known_match else { return Vec::new() };
let Some(division) = &mut event.divisions.get_mut(&tuple.division) else { return Vec::new() };
let Some(m) = &mut division.matches.iter_mut().find(|a| a.tuple == *tuple) else { return Vec::new() };
let current_block = &block_list.entries[*current_block_idx as usize];
let messages = Vec::new();
let mut messages = Vec::new();
if current_block.r#type == Some(2) { //Auto
m.state = Some(MatchState{
state: GameState::Autonomous,
start: *current_block_start,
});
let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state");
let payload = serde_json::to_string_pretty(&m.state).unwrap();
messages.push(MQTTMessage{
topic: field_state_topic,
payload: payload.clone(),
});
messages.push(MQTTMessage{
topic: match_state_topic,
payload,
});
let field_tuple = FieldTuple{
set: field_info.set,
id: field_info.id,
};
let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{
next_state: MatchState{
state: GameState::AutonomousDone,
start: *current_block_end,
},
tuple,
field: field_tuple.clone(),
});
connection.state_cancels.insert(field_tuple, cancel_state);
} else if current_block.r#type == Some(3) { //Driver
m.state = Some(MatchState{
state: GameState::Driver,
start: *current_block_start,
});
let field_state_topic = field_info.topic("/state");
let match_state_topic = tuple.topic("/state");
let payload = serde_json::to_string_pretty(&m.state).unwrap();
messages.push(MQTTMessage{
topic: field_state_topic,
payload: payload.clone(),
});
messages.push(MQTTMessage{
topic: match_state_topic,
payload,
});
let field_tuple = FieldTuple{
set: field_info.field_set_id(),
id: field_info.id(),
set: field_info.set,
id: field_info.id,
};
let cancel_state = connection.queue_state_change(Duration::from_secs(current_block.seconds() as u64), StateChange{
next_state: MatchState{
state: GameState::DriverDone,
start: *current_block_end,
},
tuple: *tuple,
tuple,
field: field_tuple.clone(),
});
connection.state_cancels.insert(field_tuple, cancel_state);
@ -948,10 +1163,11 @@ fn main() {
let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new();
callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change);
callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_change);
callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_set);
callbacks.insert(tm::NoticeId::NoticeMatchListUpdated, on_match_list_update);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start);
callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_field_assigned);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_timer_stop);
let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
@ -971,7 +1187,7 @@ fn main() {
}
);
let mut running = true;
let running = true;
let mut uuid = [0u8; 16];
rand::thread_rng().fill_bytes(&mut uuid);
let mut client_name = [0u8;32];
@ -985,53 +1201,69 @@ fn main() {
);
let mut event = Event::new();
// Get the division list
let division_list_resp = tm_connection.request(200, tm::BackendMessageData::default());
event.parse_division_list(division_list_resp.data.division_list.unwrap());
// Get the field list
let field_set_resp = tm_connection.request(300, tm::BackendMessageData::default());
event.parse_field_sets(field_set_resp.data.field_set_list.unwrap());
// Get the match list
let match_list_resp = tm_connection.request(1002, tm::BackendMessageData::default());
event.parse_match_list(match_list_resp.data.match_list.unwrap());
// For each match, get the score and info(field & teams)
for (_, division) in &mut event.divisions {
for m in &mut division.matches {
let mut req = tm::BackendMessageData::default();
let mut filter = tm::MatchTuple::default();
filter.division = Some(m.tuple.division);
filter.r#match = Some(m.tuple.match_num);
filter.instance = Some(m.tuple.instance);
filter.session = Some(m.tuple.session);
filter.round = Some(m.tuple.round as i32);
req.match_tuple = Some(filter);
let resp = tm_connection.request(1000, req);
match resp.data.match_score {
None => {},
Some(scores) => {
let score = get_game_score(&scores);
m.score = score.clone();
client.publish(format!("division/{}/{:?}/{}", m.tuple.division, m.tuple.round, m.tuple.match_num), QoS::AtLeastOnce, true, serde_json::to_string_pretty(&score).unwrap()).expect("MQTT publish fail");
},
m.score = get_match_score(&tm_connection, struct_tuple_to_tm(m.tuple));
if let Some(score) = &m.score {
let serialized = serde_json::to_string_pretty(score).unwrap();
client.publish(m.tuple.topic("/score"), QoS::AtLeastOnce, true, serialized).expect("MQTT publish fail");
}
m.info = get_match_info(&tm_connection, struct_tuple_to_tm(m.tuple));
if let Some(info) = &m.info {
let serialized = serde_json::to_string_pretty(info).unwrap();
client.publish(m.tuple.topic(""), QoS::AtLeastOnce, true, serialized).expect("MQTT publish fail");
}
}
}
for (field_set_id, field_set) in &event.field_sets {
for (field_id, field) in &field_set.fields {
let mut field_req = tm::BackendMessageData::default();
let mut field_data = tm::OnFieldMatch::default();
let mut f = tm::Field::default();
f.id = Some(*field_id);
f.field_set_id = Some(*field_set_id);
field_data.field = Some(f);
field_req.on_field_match = Some(field_data);
let field_resp = tm_connection.request(309, field_req);
// For each field set, get the active match and assign it to the scheduled field
for (field_set_id, field_set) in &mut event.field_sets {
let mut field_req = tm::BackendMessageData::default();
let mut field_data = tm::OnFieldMatch::default();
let mut f = tm::Field::default();
f.field_set_id = Some(*field_set_id);
field_data.field = Some(f);
field_req.on_field_match = Some(field_data);
let field_resp = tm_connection.request(309, field_req);
match field_resp.data.on_field_match {
None => {},
Some(ofm) => match ofm.match_tuple {
None => {},
Some(match_tuple) => {
let tuple = tm_tuple_to_struct(match_tuple);
match get_match(&mut event.divisions, tuple) {
None => {},
Some(m) => match &m.info {
None => {},
Some(info) => match field_set.fields.get_mut(&info.field.id) {
None => {},
Some(field) => field.last_known_match = Some(tuple),
},
},
}
},
},
}
}
println!("Event: {:#?}", event);
log::info!("EVENT: {:#?}", &event);
// Callback loop
while running {
match tm_connection.work_queue.recv() {
Ok(work) => match work {
@ -1057,7 +1289,7 @@ fn main() {
}
},
Work::State(state_change) => {
match event.get_match(state_change.tuple) {
match get_match(&mut event.divisions, state_change.tuple) {
None => log::warn!("Received state change for unknown match {:#?}", state_change.tuple),
Some(m) => {
m.state = Some(state_change.next_state.clone());