Changed notice queue to work_queue so that multiple threads can pass the main thread work to do on the event/connection

master
noah metz 2024-01-21 20:10:23 -07:00
parent 2b30e64c14
commit ea21d4728a
1 changed files with 131 additions and 91 deletions

@ -59,13 +59,13 @@ enum GameState {
Scheduled, Scheduled,
Timeout, Timeout,
Driver, Driver,
Driverdone, DriverDone,
Autonomous, Autonomous,
AutonomousDone, AutonomousDone,
Abandoned, Abandoned,
} }
#[derive(Serialize, Deserialize, Debug, Clone, Copy)] #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
enum Round { enum Round {
None = 0, None = 0,
Practice = 1, Practice = 1,
@ -102,7 +102,7 @@ fn int_to_round(round: i32) -> Round {
} }
} }
#[derive(Serialize, Deserialize, Debug, Clone, Copy)] #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
struct MatchTuple { struct MatchTuple {
division: i32, division: i32,
round: Round, round: Round,
@ -140,6 +140,20 @@ impl Event {
} }
} }
fn get_match(self: &mut Event, tuple: MatchTuple) -> Option<&mut Match> {
match self.divisions.get_mut(&tuple.division) {
None => {},
Some(division) => {
for m in &mut division.matches {
if m.tuple == tuple {
return Some(m);
}
}
},
};
None
}
// TODO: remove extra entries instead of just adding new ones // TODO: remove extra entries instead of just adding new ones
fn parse_field_sets(self: &mut Event, sets: tm::FieldSetList) { fn parse_field_sets(self: &mut Event, sets: tm::FieldSetList) {
for set in sets.field_sets { for set in sets.field_sets {
@ -170,19 +184,19 @@ impl Event {
fn parse_match_list(self: &mut Event, match_list: tm::MatchList) { fn parse_match_list(self: &mut Event, match_list: tm::MatchList) {
let mut matches: HashMap<i32, Vec<Match>> = HashMap::new(); let mut matches: HashMap<i32, Vec<Match>> = HashMap::new();
for m in match_list.matches.iter() { for m in match_list.matches.iter() {
let match_tuple = MatchTuple{ let tuple = MatchTuple{
division: m.division.unwrap(), division: m.division.unwrap(),
round: int_to_round(m.round.unwrap()), round: int_to_round(m.round.unwrap()),
instance: m.instance.unwrap(), instance: m.instance.unwrap(),
match_num: m.r#match.unwrap(), match_num: m.r#match.unwrap(),
session: m.session.unwrap(), session: m.session.unwrap(),
}; };
match matches.get_mut(&match_tuple.division) { match matches.get_mut(&tuple.division) {
Some(match_list) => { Some(match_list) => {
match_list.push(Match{ match_list.push(Match{
state: None, state: None,
info: None, info: None,
match_tuple: match_tuple.clone(), tuple: tuple.clone(),
}) })
}, },
None => { None => {
@ -190,9 +204,9 @@ impl Event {
new_match_list.push(Match{ new_match_list.push(Match{
state: None, state: None,
info: None, info: None,
match_tuple: match_tuple.clone(), tuple: tuple.clone(),
}); });
matches.insert(match_tuple.division, new_match_list); matches.insert(tuple.division, new_match_list);
}, },
} }
} }
@ -247,7 +261,7 @@ struct MatchInfo {
struct Match { struct Match {
state: Option<MatchState>, state: Option<MatchState>,
info: Option<MatchInfo>, info: Option<MatchInfo>,
match_tuple: MatchTuple, tuple: MatchTuple,
} }
#[derive(Debug)] #[derive(Debug)]
@ -447,7 +461,7 @@ impl NoticeMsg {
struct TMClient { struct TMClient {
stream: openssl::ssl::SslStream<TcpStream>, stream: openssl::ssl::SslStream<TcpStream>,
notices: mpsc::Sender<Box<tm::Notice>>, work_queue: mpsc::Sender<Work>,
responses: mpsc::Sender<Box<BackendMessage>>, responses: mpsc::Sender<Box<BackendMessage>>,
requests: mpsc::Receiver<Box<BackendMessage>>, requests: mpsc::Receiver<Box<BackendMessage>>,
uuid: [u8; 16], uuid: [u8; 16],
@ -461,7 +475,7 @@ struct TMClient {
const TCP_BUFFER_SIZE: usize = 10000; const TCP_BUFFER_SIZE: usize = 10000;
impl TMClient { impl TMClient {
fn new(uuid: [u8; 16], client_name: [u8; 32], password: String, username: [u8; 16]) -> (TMClient, TMConnection) { fn new(uuid: [u8; 16], client_name: [u8; 32], password: String, username: [u8; 16]) -> (TMClient, TMConnection) {
let (notice_tx, notice_rx) = mpsc::channel(); let (work_tx, work_rx) = mpsc::channel();
let (response_tx, response_rx) = mpsc::channel(); let (response_tx, response_rx) = mpsc::channel();
let (request_tx, request_rx) = mpsc::channel(); let (request_tx, request_rx) = mpsc::channel();
@ -482,9 +496,10 @@ impl TMClient {
let stream = stream_config.connect("127.0.0.1", stream).unwrap(); let stream = stream_config.connect("127.0.0.1", stream).unwrap();
stream.get_ref().set_read_timeout(Some(Duration::from_millis(100))).expect("Failed to set read timeout on socket"); stream.get_ref().set_read_timeout(Some(Duration::from_millis(100))).expect("Failed to set read timeout on socket");
return (TMClient{ return (TMClient{
stream, stream,
notices: notice_tx, work_queue: work_tx.clone(),
responses: response_tx, responses: response_tx,
requests: request_rx, requests: request_rx,
uuid, uuid,
@ -495,8 +510,10 @@ impl TMClient {
connected: false, connected: false,
}, },
TMConnection{ TMConnection{
work_queuer: work_tx,
state_cancels: HashMap::new(),
requests: request_tx, requests: request_tx,
notices: notice_rx, work_queue: work_rx,
responses: response_rx, responses: response_rx,
},); },);
} }
@ -533,14 +550,14 @@ impl TMClient {
4 => { 4 => {
match NoticeMsg::from_bytes(packet.data.clone()) { match NoticeMsg::from_bytes(packet.data.clone()) {
Some(notice) => { Some(notice) => {
log::debug!("Received notice: {:?}", notice); log::debug!("Received notice: {:#?}", notice);
let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec()); let ack = BackendPacket::new(packet.header, packet.timestamp, 5, self.last_seq_num+1, notice.notice_id.to_le_bytes().to_vec());
self.last_seq_num += 1; self.last_seq_num += 1;
match self.stream.write(&ack.as_bytes()) { match self.stream.write(&ack.as_bytes()) {
Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id), Ok(_) => log::debug!("Sent ACK for notice {}", notice.notice_id),
Err(error) => log::error!("ACK error: {:?}", error), Err(error) => log::error!("ACK error: {:?}", error),
} }
match self.notices.send(Box::new(notice.notice)) { match self.work_queue.send(Work::Notice(Box::new(notice.notice))) {
Ok(_) => log::debug!("Forwarded notice to callback engine"), Ok(_) => log::debug!("Forwarded notice to callback engine"),
Err(error) => log::error!("Notice forward error {:?}", error), Err(error) => log::error!("Notice forward error {:?}", error),
} }
@ -597,10 +614,23 @@ impl TMClient {
} }
} }
struct StateChange {
next_state: MatchState,
tuple: MatchTuple,
field: FieldTuple,
}
struct FieldTuple {
set: i32,
id: i32,
}
struct TMConnection { struct TMConnection {
notices: mpsc::Receiver<Box<tm::Notice>>, work_queuer: mpsc::Sender<Work>,
work_queue: mpsc::Receiver<Work>,
responses: mpsc::Receiver<Box<BackendMessage>>, responses: mpsc::Receiver<Box<BackendMessage>>,
requests: mpsc::Sender<Box<BackendMessage>>, requests: mpsc::Sender<Box<BackendMessage>>,
state_cancels: HashMap<FieldTuple, mpsc::Sender<()>>,
} }
impl TMConnection { impl TMConnection {
@ -608,9 +638,24 @@ impl TMConnection {
self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap(); self.requests.send(Box::new(BackendMessage::new(request_id, data))).unwrap();
return *self.responses.recv().unwrap(); return *self.responses.recv().unwrap();
} }
fn queue_state_change(self: &mut TMConnection, wait: Duration, state_change: StateChange) -> mpsc::Sender<()> {
let work_queuer = self.work_queuer.clone();
let (cancel_tx, cancel_rx) = mpsc::channel();
thread::spawn(move || {
match cancel_rx.recv_timeout(wait) {
Ok(_) => match work_queuer.send(Work::State(state_change)) {
Ok(_) => {},
Err(error) => log::error!("State change send error: {:?}", error),
},
Err(error) => log::error!("state change queue error: {:?}", error),
}
});
return cancel_tx;
}
} }
type NoticeCallback = fn(tm::Notice, &mut Event, &TMConnection) -> Vec<MQTTMessage>; type NoticeCallback = fn(tm::Notice, &mut Event, &mut TMConnection) -> Vec<MQTTMessage>;
fn get_affected_match(notice: &tm::Notice) -> Option<MatchTuple> { fn get_affected_match(notice: &tm::Notice) -> Option<MatchTuple> {
match &notice.affected_match { match &notice.affected_match {
@ -751,12 +796,10 @@ fn get_game_score(notice: &tm::Notice) -> Option<GameScore> {
} }
} }
fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> { fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
match get_affected_match(&notice) { match get_affected_match(&notice) {
None => Vec::new(), None => Vec::new(),
Some(tuple) => { Some(tuple) => {
// Use `event` to figure out which arena topic to publish to
// Also add the match score topic based on the tuple
match get_game_score(&notice) { match get_game_score(&notice) {
None => Vec::new(), None => Vec::new(),
Some(score) => { Some(score) => {
@ -765,8 +808,24 @@ fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnec
let mut out = Vec::new(); let mut out = Vec::new();
out.push(MQTTMessage{ out.push(MQTTMessage{
topic: game_topic, topic: game_topic,
payload: serialized, payload: serialized.clone(),
});
for (_, field_set) in &event.field_sets {
for field in &field_set.fields {
match field.last_known_match {
None => {},
Some(last_known_match) => {
if last_known_match == tuple {
let field_topic = format!("field/{}/score", field.id);
out.push(MQTTMessage{
topic: field_topic,
payload: serialized.clone(),
}); });
}
},
}
}
}
return out; return out;
}, },
} }
@ -774,43 +833,7 @@ fn on_score_change(notice: tm::Notice, event: &mut Event, _connection: &TMConnec
} }
} }
fn on_match_start(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> { fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &mut TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_match_cancel(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_match_reset(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_match_assigned(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_active_field_changed(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_rankings_updated(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_event_status_updated(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_elim_alliance_update(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_elim_unavail_teams_update(_notice: tm::Notice, event: &mut Event, _connection: &TMConnection) -> Vec<MQTTMessage> {
return Vec::new();
}
fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &TMConnection) -> Vec<MQTTMessage> {
let mut messages = Vec::new(); let mut messages = Vec::new();
let match_list_resp = connection.request(1002, tm::BackendMessageData::default()); let match_list_resp = connection.request(1002, tm::BackendMessageData::default());
match match_list_resp.data.match_list { match match_list_resp.data.match_list {
@ -828,22 +851,30 @@ fn on_match_list_update(_notice: tm::Notice, event: &mut Event, connection: &TMC
return messages; return messages;
} }
fn on_timer_start(notice: tm::Notice, _event: &mut Event, _connection: &mut TMConnection) -> Vec<MQTTMessage> {
println!("State: {:#?}", notice.field_time.unwrap());
// 1) Find the state associated with the current block(driver or auton)
// 2) get the match tuple from the arena
// 3) add the mqtt messages for match & arena states
// 4) queue the state change to ${state}_done
// 5) add the cancel_send to the tmconnections map of timeout_sends
return Vec::new();
}
enum Work {
Exit,
Notice(Box<tm::Notice>),
State(StateChange),
}
fn main() { fn main() {
env_logger::init(); env_logger::init();
let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new(); let mut callbacks: HashMap<tm::NoticeId, NoticeCallback> = HashMap::new();
callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change); callbacks.insert(tm::NoticeId::NoticeRealtimeScoreChanged, on_score_change);
callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_change); callbacks.insert(tm::NoticeId::NoticeMatchScoreUpdated, on_score_change);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_match_start);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStopped, on_match_cancel);
callbacks.insert(tm::NoticeId::NoticeFieldResetTimer, on_match_reset);
callbacks.insert(tm::NoticeId::NoticeFieldMatchAssigned, on_match_assigned);
callbacks.insert(tm::NoticeId::NoticeActiveFieldChanged, on_active_field_changed);
callbacks.insert(tm::NoticeId::NoticeRankingsUpdated, on_rankings_updated);
callbacks.insert(tm::NoticeId::NoticeEventStatusUpdated, on_event_status_updated);
callbacks.insert(tm::NoticeId::NoticeElimAllianceUpdated, on_elim_alliance_update);
callbacks.insert(tm::NoticeId::NoticeElimUnavailTeamsUpdated, on_elim_unavail_teams_update);
callbacks.insert(tm::NoticeId::NoticeMatchListUpdated, on_match_list_update); callbacks.insert(tm::NoticeId::NoticeMatchListUpdated, on_match_list_update);
callbacks.insert(tm::NoticeId::NoticeFieldTimerStarted, on_timer_start);
let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883); let mut mqttoptions = MqttOptions::new("vex-bridge", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
@ -863,13 +894,13 @@ fn main() {
} }
); );
let running = true; let mut running = true;
let mut uuid = [0u8; 16]; let mut uuid = [0u8; 16];
rand::thread_rng().fill_bytes(&mut uuid); rand::thread_rng().fill_bytes(&mut uuid);
let mut client_name = [0u8;32]; let mut client_name = [0u8;32];
rand::thread_rng().fill_bytes(&mut client_name); rand::thread_rng().fill_bytes(&mut client_name);
let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let username: [u8;16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let (mut tm_client, tm_connection) = TMClient::new(uuid, client_name, String::from(""), username); let (mut tm_client, mut tm_connection) = TMClient::new(uuid, client_name, String::from(""), username);
let tm_thread = thread::spawn(move || let tm_thread = thread::spawn(move ||
while running { while running {
tm_client.process(); tm_client.process();
@ -897,14 +928,14 @@ fn main() {
field_req.on_field_match = Some(field_data); field_req.on_field_match = Some(field_data);
let field_resp = tm_connection.request(309, field_req); let field_resp = tm_connection.request(309, field_req);
println!("Field {}/{}: {:#?}", field_set_id, field.id, field_resp);
} }
} }
while running { while running {
thread::sleep(Duration::from_millis(1000)); match tm_connection.work_queue.recv() {
match tm_connection.notices.recv() { Ok(work) => match work {
Ok(notice) => { Work::Exit => running = false,
Work::Notice(notice) => {
let callback = callbacks.get(&notice.id()); let callback = callbacks.get(&notice.id());
match callback { match callback {
None => { None => {
@ -914,7 +945,7 @@ fn main() {
} }
}, },
Some(callback) => { Some(callback) => {
let messages = callback(*notice, &mut event, &tm_connection); let messages = callback(*notice, &mut event, &mut tm_connection);
for message in messages { for message in messages {
let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload); let result = client.publish(message.topic, QoS::AtMostOnce, true, message.payload);
match result { match result {
@ -925,6 +956,15 @@ fn main() {
}, },
} }
}, },
Work::State(state_change) => {
match event.get_match(state_change.tuple) {
None => log::warn!("Received state change for unknown match {:#?}", state_change.tuple),
Some(m) => {
m.state = Some(state_change.next_state);
},
}
},
},
Err(error) => log::error!("Notice recv error: {}", error), Err(error) => log::error!("Notice recv error: {}", error),
} }