Added structures and methods for connect messages

master
noah metz 2024-01-20 17:12:16 -07:00
parent 7b4fafa3bd
commit 3ebd80fedb
1 changed files with 126 additions and 11 deletions

@ -187,11 +187,112 @@ impl BackendMessage {
return None; return None;
} }
None return Some(BackendMessage{
status: bytes[0],
request_id: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
data: bytes[5..].to_vec(),
});
} }
fn as_bytes(self: BackendMessage) -> Vec<u8> { fn as_bytes(self: BackendMessage) -> Vec<u8> {
return Vec::new(); let mut bytes = Vec::new();
bytes.push(self.status);
bytes.extend(self.request_id.to_le_bytes());
bytes.extend(self.data);
return bytes;
}
}
const BACKEND_PACKET_HEADER_SIZE: usize = 28;
#[derive(Debug)]
struct BackendPacket {
header: u32,
timestamp: f64,
msg_type: u32,
seq_num: u64,
size: u32,
data: Vec<u8>,
}
impl BackendPacket {
fn from_bytes(bytes: Vec<u8>) -> Option<BackendPacket> {
if bytes.len() < BACKEND_PACKET_HEADER_SIZE {
return None;
}
return Some(BackendPacket{
header: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
timestamp: f64::from_le_bytes(bytes[4..12].try_into().unwrap()),
msg_type: u32::from_le_bytes(bytes[12..16].try_into().unwrap()),
seq_num: u64::from_le_bytes(bytes[16..24].try_into().unwrap()),
size: u32::from_le_bytes(bytes[24..28].try_into().unwrap()),
data: bytes[28..].to_vec(),
});
}
fn to_bytes(self: BackendPacket) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(self.header.to_le_bytes());
bytes.extend(self.timestamp.to_le_bytes());
bytes.extend(self.msg_type.to_le_bytes());
bytes.extend(self.seq_num.to_le_bytes());
bytes.extend(self.size.to_le_bytes());
bytes.extend(self.data);
return bytes;
}
}
const CONNECT_MSG_LEN: usize = 114;
#[derive(Debug)]
struct ConnectMsg {
version: u32,
uuid: [u8; 16],
last_notice_id: u64,
username: [u8; 16],
pass_hash: [u8; 32],
pw_valid: u8,
state_valid: u8,
client_name: [u8; 32],
server_time_zone: i32,
}
impl ConnectMsg {
fn from_bytes(bytes: Vec<u8>) -> Option<ConnectMsg> {
if bytes.len() < CONNECT_MSG_LEN {
return None;
}
return Some(ConnectMsg{
version: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
uuid: bytes[4..20].to_owned().try_into().unwrap(),
last_notice_id: u64::from_le_bytes(bytes[20..28].try_into().unwrap()),
username: bytes[28..44].to_owned().try_into().unwrap(),
pass_hash: bytes[44..76].to_owned().try_into().unwrap(),
pw_valid: bytes[77].to_owned(),
state_valid: bytes[78].to_owned(),
client_name: bytes[78..110].to_owned().try_into().unwrap(),
server_time_zone: i32::from_le_bytes(bytes[110..114].try_into().unwrap()),
});
}
fn to_bytes(self: ConnectMsg) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(self.version.to_le_bytes());
bytes.extend(self.uuid);
bytes.extend(self.last_notice_id.to_le_bytes());
bytes.extend(self.username);
bytes.extend(self.pass_hash);
bytes.extend(self.pw_valid.to_le_bytes());
bytes.extend(self.state_valid.to_le_bytes());
bytes.extend(self.client_name);
bytes.extend(self.server_time_zone.to_le_bytes());
return bytes;
} }
} }
@ -215,28 +316,42 @@ impl TMClient {
stream_config.set_use_server_name_indication(false); stream_config.set_use_server_name_indication(false);
let stream = stream_config.connect("127.0.0.1", stream).unwrap(); let stream = stream_config.connect("127.0.0.1", stream).unwrap();
let stream = Arc::new(Mutex::new(stream)); let stream_arc = Arc::new(Mutex::new(stream));
return (TMClient{ return (TMClient{
stream: stream.clone(), stream: stream_arc.clone(),
notices: notice_tx, notices: notice_tx,
responses: response_tx responses: response_tx
}, },
TMConnection{ TMConnection{
stream: stream, stream: stream_arc,
notices: notice_rx, notices: notice_rx,
responses: response_rx responses: response_rx
},); },);
} }
fn process(self: &TMClient) { fn process(self: &TMClient) {
let mut incoming = [0; 2048]; let mut incoming = [0; 2048];
let mut stream = self.stream.lock().unwrap(); let mut stream = self.stream.lock().unwrap();
match stream.read(&mut incoming) { match stream.read(&mut incoming) {
Ok(read) => { Ok(read) => {
println!("Data({}): {}", read, String::from_utf8_lossy(&incoming)); let data = incoming[0..read].to_vec();
// If the data is a notice, parse it and send it to the notices mpsc match BackendPacket::from_bytes(data) {
// If the data is a response, parse it and send it to the responses mpsc Some(packet) => {
println!("Packet: {:?}", packet);
match packet.msg_type {
2 => {
match ConnectMsg::from_bytes(packet.data) {
Some(welcome_msg) => println!("Welcome msg: {:?}", welcome_msg),
None => println!("Failed to parse wlecome msg"),
}
},
_ => println!("Unhandled message type: {}", packet.msg_type),
}
},
None => println!("Failed to parse BackendPacket"),
}
}, },
Err(error) => println!("Error: {}", error), Err(error) => println!("Error: {}", error),
} }
@ -391,8 +506,7 @@ fn main() {
client.publish("bridge/status", QoS::AtLeastOnce, true, "{\"online\": true}").unwrap(); client.publish("bridge/status", QoS::AtLeastOnce, true, "{\"online\": true}").unwrap();
let mqtt_recv_thread = thread::spawn(move || let mqtt_recv_thread = thread::spawn(move ||
for message in connection.iter() { for _ in connection.iter() {
println!("Message = {:?}", message);
} }
); );
@ -405,6 +519,7 @@ fn main() {
); );
while running { while running {
thread::sleep(Duration::from_millis(1000));
match tm_connection.notices.recv() { match tm_connection.notices.recv() {
Ok(notice) => { Ok(notice) => {
let callback = callbacks.get(&notice.id()); let callback = callbacks.get(&notice.id());