From efeb05f5f1803ec6832e5a3a2b47064a67eb1898 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Tue, 18 Jan 2022 02:10:26 -0500 Subject: [PATCH] now sends messages --- src/main.rs | 201 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 133 insertions(+), 68 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0cf6b48..5573e38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,12 +7,12 @@ use std::io; use std::sync::mpsc; use std::sync::mpsc::{SyncSender,Receiver}; struct Client { - logged_in: bool, + logged_in: Arc>, id: u32, - name: String, - room: Option, + username: Arc>, + room: Arc>>>, sender: SyncSender>, - rooms_mutex: Arc>> + rooms_mutex: Arc>>> } struct Room { @@ -24,9 +24,9 @@ fn udp_listen(){ println!("UDP Thread Started"); } -fn read_u8(string: &mut TcpStream) -> u8 { +fn read_u8(stream: &mut TcpStream) -> u8 { let mut buf = [0; 1]; - string.read_exact(&mut buf).unwrap(); + stream.read_exact(&mut buf).unwrap(); return buf[0]; } fn read_u32(stream: &mut TcpStream) -> u32 { @@ -50,13 +50,23 @@ fn read_short_string(stream: &mut TcpStream) -> String { return String::from_utf8(stringBytes).unwrap(); } -fn read_login_message(stream: &mut TcpStream, client: Arc) { +fn read_vec(stream: &mut TcpStream) -> Vec { + let message_size = read_u32(stream); + let mut message = vec![0u8;message_size as usize]; + stream.read_exact(&mut message).unwrap(); + return message; +} + +fn read_login_message(stream: &mut TcpStream, mut client: &Arc) { println!("Got login message"); let username = read_short_string(stream); let password = read_short_string(stream); println!("Got username {} and password {}",username,password); - + let mut client_user = client.username.lock().unwrap(); + *client_user = username; + let mut client_loggedin = client.logged_in.lock().unwrap(); + *client_loggedin = true; let mut writeBuf = vec![]; writeBuf.push(0u8); writeBuf.extend_from_slice(&(client.id).to_be_bytes()); //send the client the id @@ -64,61 +74,92 @@ fn read_login_message(stream: &mut TcpStream, client: Arc) { client.sender.send(writeBuf); } -fn read_rooms_message(stream: &mut TcpStream, client: Arc){ +fn read_rooms_message(stream: &mut TcpStream, mut client: &Arc){ } -fn read_join_message(stream: &mut TcpStream, client: Arc){ +fn send_client_join_message(to: &Arc, from: u32, room: &str){ + //this message is 2u8, person_id_u32, room_name_len_u8, room_name_bytes + let mut writeBuf = vec![]; + writeBuf.push(2u8); + writeBuf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id joined the room + writeBuf.push(room.as_bytes().len() as u8); + writeBuf.extend_from_slice(room.as_bytes()); + to.sender.send(writeBuf).unwrap(); +} + +fn client_leave_room(mut client: &Arc, send_to_client: bool){ + //first remove the client from the room they are in + + let mut room = client.room.lock().unwrap(); + + if room.is_some(){ + { + let room = room.as_ref().unwrap(); + println!("Client leaving current room {}",&room.name); + let mut clients = room.clients.lock().unwrap(); + for (k,v) in clients.iter() { + if !send_to_client && v.id == client.id{ + continue; + } + send_client_join_message(v, client.id, "") //send the leave room message to everyone in the room + } + clients.remove(&client.id); //remove the client from that list in the room + + //if the room is empty, destroy it as well + + if clients.len() == 0 { + let mut rooms = client.rooms_mutex.lock().unwrap(); + rooms.remove(&room.name); + println!("Destroyed room {}",&room.name) + } + } + } + *room = Option::None; + + +} + +fn read_join_message(stream: &mut TcpStream, mut client: &Arc){ let room_name = read_short_string(stream); println!("Got room message {}",room_name); //if the client is in a room, leave it - if !client.room.is_none(){ - //we must leave our current room - //todo + let mut room = client.room.lock().unwrap(); + if room.as_ref().is_some(){ + client_leave_room(client, true); } - - //join that room + //join room_name { - let mut rooms = client.rooms_mutex.lock().unwrap(); - if !rooms.contains_key(&room_name) { + let mut rooms = client.rooms_mutex.lock().unwrap(); + if !rooms.contains_key(&room_name) { //new room, must create it let map: HashMap> = HashMap::new(); - let r = Room { + let r = Arc::new(Room { + name: room_name.to_string(), clients: Mutex::new(map) - }; + }); rooms.insert(String::from(&room_name),r); println!("New room {} created",&room_name); - - } + //the room is guaranteed to exist now { - let mut clients = rooms[&room_name].clients.lock().unwrap(); + let mut clients = rooms[&room_name].clients.lock().unwrap(); clients.insert(client.id,client.clone()); println!("Client {} joined {}",client.id,&room_name); - + *room = Some(rooms[&room_name].clone()); //send a join message to everyone in the room for (k,v) in clients.iter() { - let mut writeBuf = vec![]; - writeBuf.push(2u8); - writeBuf.extend_from_slice(&(client.id).to_be_bytes()); //send everyone that the client id joined the room - writeBuf.push(room_name.as_bytes().len() as u8); - writeBuf.extend_from_slice(room_name.as_bytes()); - v.sender.send(writeBuf); + send_client_join_message(v, client.id, &room_name); } //send a join message to the client for everyone else in the room (so they get a join message) for (k,v) in clients.iter() { if v.id != client.id { - let mut writeBuf = vec![]; - writeBuf.push(2u8); - writeBuf.extend_from_slice(&(v.id).to_be_bytes()); //send everyone that the client id joined the room - writeBuf.push(room_name.as_bytes().len() as u8); - writeBuf.extend_from_slice(room_name.as_bytes()); - client.sender.send(writeBuf); + send_client_join_message(&client, v.id, &room_name); } } } @@ -126,8 +167,43 @@ fn read_join_message(stream: &mut TcpStream, client: Arc){ } +fn send_room_message(sender: &Arc, message: &Vec, include_sender: bool){ + //this message is 3u8, sender_id_u32, message_len_u32, message_bytes + let mut writeBuf = vec![]; + writeBuf.push(3u8); + writeBuf.extend_from_slice(&sender.id.to_be_bytes()); + writeBuf.extend_from_slice(&(message.len() as u32).to_be_bytes()); + writeBuf.extend_from_slice(message); + println!("sending {} bytes from {}",message.len(),sender.id); + { + let room = sender.room.lock().unwrap(); + if room.is_some() { -fn client_read_thread(mut stream: TcpStream, client: Arc) { + let clients = room.as_ref().unwrap().clients.lock().unwrap(); + for (k,v) in clients.iter(){ + if !include_sender && v.id == sender.id { + continue; + } + v.sender.send(writeBuf.clone()).unwrap(); + } + } + } + //lock the clients in the room as well, because someone might leave the room in the middle...though, I suppose they'd have to lock the room to do it? +} + +fn read_send_message(stream: &mut TcpStream, client: &Arc, message_type: u8){ + //this is a message for everyone in the room (maybe) + let to_send = read_vec(stream); + if message_type == 3 { + send_room_message(client,&to_send,false); + }else if message_type == 4 { + //everyone in my group + }else if message_type == 5 { + //everyone including me + } +} + +fn client_read_thread(mut stream: TcpStream, mut client: Arc) { let mut readBuf:[u8;1] = [0; 1]; //messages come through as a 4-bit type identifier, that can be one of 0 (login) 1 (get rooms), 2 (join/leave room) 3(send message others) 4(send message all) 5(send message group) loop { @@ -138,29 +214,35 @@ fn client_read_thread(mut stream: TcpStream, client: Arc) { println!("Got a message {}",readBuf[0]); let t = readBuf[0]; if t == 0 { - read_login_message(&mut stream, client.clone()); + read_login_message(&mut stream, &mut client); } else if t == 1 { - read_rooms_message(&mut stream, client.clone()); + read_rooms_message(&mut stream, &mut client); } else if t == 2 { - read_join_message(&mut stream, client.clone()); + read_join_message(&mut stream, &mut client); + } else if t == 3 || t == 4 { + read_send_message(&mut stream, &client, t); } } } -fn client_write_thread(mut stream: TcpStream, client: Arc, rx: Receiver> ) { +fn client_write_thread(mut stream: TcpStream, rx: Receiver> ) { //wait on messages in my queue loop { let m = rx.recv().unwrap(); - //write the login message out + println!("Sending a message {}",m.len()); + if m.len() == 1{ + break; + } stream.write(&m).unwrap(); + } } -fn handle_client(mut stream: TcpStream, client_id: u32, clients_mutex: Arc>>>, rooms_mutex: Arc>>){ +fn handle_client(mut stream: TcpStream, client_id: u32, clients_mutex: Arc>>>, rooms_mutex: Arc>>>){ stream.set_nodelay(true).unwrap(); println!("Accepted new connection"); @@ -169,9 +251,9 @@ fn handle_client(mut stream: TcpStream, client_id: u32, clients_mutex: Arc> = HashMap::new(); - let rooms: HashMap = HashMap::new(); + let rooms: HashMap> = HashMap::new(); let client_mutex = Arc::new(Mutex::new(clients)); let room_mutex = Arc::new(Mutex::new(rooms)); let mut next_client_id = 0;