switched to RWLock over mutex, so it should be much more performant now

main
Kyle Johnsen 2022-01-19 15:11:55 -05:00
parent 01e3e0c808
commit 2f1c216d62
1 changed files with 58 additions and 58 deletions

View File

@ -2,26 +2,26 @@ use std::io::prelude::*;
use std::thread; use std::thread;
use std::net::{TcpListener, TcpStream,UdpSocket,IpAddr,SocketAddr}; use std::net::{TcpListener, TcpStream,UdpSocket,IpAddr,SocketAddr};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc,Mutex}; use std::sync::{Arc,RwLock};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::mpsc::{SyncSender,Receiver}; use std::sync::mpsc::{SyncSender,Receiver};
struct Client { struct Client {
logged_in: Arc<Mutex<bool>>, logged_in: Arc<RwLock<bool>>,
id: u32, id: u32,
username: Arc<Mutex<String>>, username: Arc<RwLock<String>>,
room: Arc<Mutex<Option<Arc<Room>>>>, room: Arc<RwLock<Option<Arc<Room>>>>,
sender: SyncSender<Vec<u8>>, sender: SyncSender<Vec<u8>>,
rooms_mutex: Arc<Mutex<HashMap<String,Arc<Room>>>>, rooms_mutex: Arc<RwLock<HashMap<String,Arc<Room>>>>,
clients_mutex: Arc<Mutex<HashMap<u32,Arc<Client>>>>, clients_mutex: Arc<RwLock<HashMap<u32,Arc<Client>>>>,
groups: Arc<Mutex<HashMap<String,Vec<Arc<Client>>>>>, groups: Arc<RwLock<HashMap<String,Vec<Arc<Client>>>>>,
ip: Arc<Mutex<IpAddr>>, ip: Arc<RwLock<IpAddr>>,
port: Arc<Mutex<u16>> port: Arc<RwLock<u16>>
} }
struct Room { struct Room {
name: String, name: String,
clients: Mutex<HashMap<u32,Arc<Client>>>, clients: RwLock<HashMap<u32,Arc<Client>>>,
master_client: Arc<Mutex<Arc<Client>>> master_client: Arc<RwLock<Arc<Client>>>
} }
@ -64,9 +64,9 @@ fn read_login_message(stream: &mut TcpStream, client: &Arc<Client>) {
let password = read_short_string(stream); let password = read_short_string(stream);
println!("Got username {} and password {}",username,password); println!("Got username {} and password {}",username,password);
let mut client_user = client.username.lock().unwrap(); let mut client_user = client.username.write().unwrap();
*client_user = username; *client_user = username;
let mut client_loggedin = client.logged_in.lock().unwrap(); let mut client_loggedin = client.logged_in.write().unwrap();
*client_loggedin = true; *client_loggedin = true;
let mut write_buf = vec![]; let mut write_buf = vec![];
write_buf.push(0u8); write_buf.push(0u8);
@ -110,16 +110,16 @@ fn send_client_join_message(to: &Arc<Client>, from: u32, room: &str){
fn client_leave_room(client: &Arc<Client>, send_to_client: bool){ fn client_leave_room(client: &Arc<Client>, send_to_client: bool){
//first remove the client from the room they are in //first remove the client from the room they are in
let mut room = client.room.lock().unwrap(); let mut room = client.room.write().unwrap();
if room.is_some(){ if room.is_some(){
{ {
let room = room.as_ref().unwrap(); let room = room.as_ref().unwrap();
//may have to choose a new master //may have to choose a new master
let mut master_client = room.master_client.lock().unwrap(); let mut master_client = room.master_client.write().unwrap();
let mut change_master = false; let mut change_master = false;
let mut clients = room.clients.lock().unwrap(); let mut clients = room.clients.write().unwrap();
let mut new_master_id = 0; let mut new_master_id = 0;
@ -127,7 +127,7 @@ fn client_leave_room(client: &Arc<Client>, send_to_client: bool){
println!("Will change master"); println!("Will change master");
//change the master //change the master
change_master = true; change_master = true;
for (k,v) in clients.iter() { for (_k,v) in clients.iter() {
if v.id != client.id { if v.id != client.id {
new_master_id = v.id; new_master_id = v.id;
break; break;
@ -151,7 +151,7 @@ fn client_leave_room(client: &Arc<Client>, send_to_client: bool){
//if the room is empty, destroy it as well //if the room is empty, destroy it as well
if clients.len() == 0 { if clients.len() == 0 {
let mut rooms = client.rooms_mutex.lock().unwrap(); let mut rooms = client.rooms_mutex.write().unwrap();
rooms.remove(&room.name); rooms.remove(&room.name);
println!("Destroyed room {}",&room.name) println!("Destroyed room {}",&room.name)
}else if change_master{ }else if change_master{
@ -181,20 +181,20 @@ fn read_join_message(stream: &mut TcpStream, client: &Arc<Client>){
//if the client is in a room, leave it //if the client is in a room, leave it
let mut room = client.room.lock().unwrap(); let mut room = client.room.write().unwrap();
if room.as_ref().is_some(){ if room.as_ref().is_some(){
client_leave_room(client, true); client_leave_room(client, true);
} }
//join room_name //join room_name
{ {
let mut rooms = client.rooms_mutex.lock().unwrap(); let mut rooms = client.rooms_mutex.write().unwrap();
if !rooms.contains_key(&room_name) { //new room, must create it if !rooms.contains_key(&room_name) { //new room, must create it
let map: HashMap<u32, Arc<Client>> = HashMap::new(); let map: HashMap<u32, Arc<Client>> = HashMap::new();
let r = Arc::new(Room { let r = Arc::new(Room {
name: room_name.to_string(), name: room_name.to_string(),
clients: Mutex::new(map), clients: RwLock::new(map),
master_client: Arc::new(Mutex::new(client.clone())) //client is the master, since they joined first master_client: Arc::new(RwLock::new(client.clone())) //client is the master, since they joined first
}); });
rooms.insert(String::from(&room_name),r); rooms.insert(String::from(&room_name),r);
println!("New room {} created",&room_name); println!("New room {} created",&room_name);
@ -202,7 +202,7 @@ fn read_join_message(stream: &mut TcpStream, client: &Arc<Client>){
//the room is guaranteed to exist now //the room is guaranteed to exist now
{ {
let mut clients = rooms[&room_name].clients.lock().unwrap(); let mut clients = rooms[&room_name].clients.write().unwrap();
clients.insert(client.id,client.clone()); clients.insert(client.id,client.clone());
println!("Client {} joined {}",client.id,&room_name); println!("Client {} joined {}",client.id,&room_name);
*room = Some(rooms.get(&room_name).unwrap().clone()); //we create an option and assign it back to the room *room = Some(rooms.get(&room_name).unwrap().clone()); //we create an option and assign it back to the room
@ -218,7 +218,7 @@ fn read_join_message(stream: &mut TcpStream, client: &Arc<Client>){
} }
} }
//tell the client who the master is //tell the client who the master is
let master_client = room.as_ref().unwrap().master_client.lock().unwrap(); let master_client = room.as_ref().unwrap().master_client.read().unwrap();
send_client_master_message(client, master_client.id); send_client_master_message(client, master_client.id);
} }
} }
@ -234,10 +234,10 @@ fn send_room_message(sender: &Arc<Client>, message: &Vec<u8>, include_sender: bo
write_buf.extend_from_slice(message); write_buf.extend_from_slice(message);
//println!("sending {} bytes from {}",message.len(),sender.id); //println!("sending {} bytes from {}",message.len(),sender.id);
{ {
let room = sender.room.lock().unwrap(); let room = sender.room.read().unwrap();
if room.is_some() { if room.is_some() {
let clients = room.as_ref().unwrap().clients.lock().unwrap(); let clients = room.as_ref().unwrap().clients.read().unwrap();
for (_k,v) in clients.iter(){ for (_k,v) in clients.iter(){
if !include_sender && v.id == sender.id { if !include_sender && v.id == sender.id {
continue; continue;
@ -255,7 +255,7 @@ fn send_group_message(sender: &Arc<Client>, message: &Vec<u8>, group: &String){
write_buf.extend_from_slice(message); write_buf.extend_from_slice(message);
//get the list of client ids for this group //get the list of client ids for this group
let groups = sender.groups.lock().unwrap(); let groups = sender.groups.read().unwrap();
if groups.contains_key(group) { if groups.contains_key(group) {
let group = groups.get(group).unwrap(); let group = groups.get(group).unwrap();
for c in group { for c in group {
@ -284,11 +284,11 @@ fn read_send_message(stream: &mut TcpStream, client: &Arc<Client>, message_type:
} }
fn read_group_message(stream: &mut TcpStream, client: &Arc<Client>){ fn read_group_message(stream: &mut TcpStream, client: &Arc<Client>){
let mut groups = client.groups.lock().unwrap(); let mut groups = client.groups.write().unwrap();
let group = read_short_string(stream); let group = read_short_string(stream);
let id_bytes = read_vec(stream); let id_bytes = read_vec(stream);
let num = id_bytes.len(); let num = id_bytes.len();
let clients = client.clients_mutex.lock().unwrap(); let clients = client.clients_mutex.read().unwrap();
let mut group_clients = vec![]; let mut group_clients = vec![];
println!("Received form group message {} ", group); println!("Received form group message {} ", group);
let mut i = 0; let mut i = 0;
@ -362,7 +362,7 @@ fn client_write_thread(mut stream: TcpStream, rx: Receiver<Vec<u8>> ) {
} }
fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc<Mutex<HashMap<u32,Arc<Client>>>>, rooms_mutex: Arc<Mutex<HashMap<String,Arc<Room>>>>){ fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc<RwLock<HashMap<u32,Arc<Client>>>>, rooms_mutex: Arc<RwLock<HashMap<String,Arc<Room>>>>){
stream.set_nodelay(true).unwrap(); stream.set_nodelay(true).unwrap();
println!("Accepted new connection"); println!("Accepted new connection");
@ -371,19 +371,19 @@ fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc<Mutex<Has
//create a new client structure and add it to the list of clients //create a new client structure and add it to the list of clients
let client = Arc::new(Client{ let client = Arc::new(Client{
id: client_id, id: client_id,
username: Arc::new(Mutex::new(String::from(""))), username: Arc::new(RwLock::new(String::from(""))),
logged_in: Arc::new(Mutex::new(false)), logged_in: Arc::new(RwLock::new(false)),
room: Arc::new(Mutex::new(Option::None)), room: Arc::new(RwLock::new(Option::None)),
sender: tx, sender: tx,
rooms_mutex: rooms_mutex.clone(), rooms_mutex: rooms_mutex.clone(),
clients_mutex: clients_mutex.clone(), clients_mutex: clients_mutex.clone(),
groups: Arc::new(Mutex::new(HashMap::new())), groups: Arc::new(RwLock::new(HashMap::new())),
ip: Arc::new(Mutex::new(stream.peer_addr().unwrap().ip())), ip: Arc::new(RwLock::new(stream.peer_addr().unwrap().ip())),
port: Arc::new(Mutex::new(0)) port: Arc::new(RwLock::new(0))
}); });
{ {
let mut clients = clients_mutex.lock().unwrap(); let mut clients = clients_mutex.write().unwrap();
clients.insert(client_id, client.clone()); clients.insert(client_id, client.clone());
} }
@ -412,13 +412,13 @@ fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc<Mutex<Has
{ {
//make sure we remove the client from all rooms //make sure we remove the client from all rooms
client_leave_room(&client, false); client_leave_room(&client, false);
let mut clients = clients_mutex.lock().unwrap(); let mut clients = clients_mutex.write().unwrap();
clients.remove(&client_id); clients.remove(&client_id);
} }
} }
fn tcp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, room_mutex: Arc<Mutex<HashMap<String,Arc<Room>>>>){ fn tcp_listen(client_mutex: Arc<RwLock<HashMap<u32, Arc<Client>>>>, room_mutex: Arc<RwLock<HashMap<String,Arc<Room>>>>){
println!("Started TCP Listener"); println!("Started TCP Listener");
let listener = TcpListener::bind("127.0.0.1:80").expect("could not bind port"); let listener = TcpListener::bind("127.0.0.1:80").expect("could not bind port");
@ -432,7 +432,7 @@ fn tcp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, room_mutex: A
} }
} }
fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex: Arc<Mutex<HashMap<String,Arc<Room>>>>){ fn udp_listen(client_mutex: Arc<RwLock<HashMap<u32, Arc<Client>>>>, _room_mutex: Arc<RwLock<HashMap<String,Arc<Room>>>>){
let mut buf = [0u8;1024]; let mut buf = [0u8;1024];
let s = UdpSocket::bind("127.0.0.1:80").unwrap(); let s = UdpSocket::bind("127.0.0.1:80").unwrap();
println!("UDP Thread Started"); println!("UDP Thread Started");
@ -448,9 +448,9 @@ fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex:
if t == 0 { if t == 0 {
//connect message, respond back //connect message, respond back
{ {
let clients = client_mutex.lock().unwrap(); let clients = client_mutex.read().unwrap();
let client = clients.get(&client_id).unwrap(); let client = clients.get(&client_id).unwrap();
let mut port = client.port.lock().unwrap(); let mut port = client.port.write().unwrap();
*port = addr.port(); //set the udp port to send data to *port = addr.port(); //set the udp port to send data to
s.send_to(&buf,addr).unwrap(); //echo back s.send_to(&buf,addr).unwrap(); //echo back
} }
@ -459,15 +459,15 @@ fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex:
} else if t == 3 { } else if t == 3 {
//message message, send to everyone else in my room //message message, send to everyone else in my room
{ {
let clients = client_mutex.lock().unwrap(); let clients = client_mutex.read().unwrap();
let client = clients.get(&client_id).unwrap(); let client = clients.get(&client_id).unwrap();
let room_option = client.room.lock().unwrap(); let room_option = client.room.read().unwrap();
let room = room_option.as_ref().unwrap(); let room = room_option.as_ref().unwrap();
let room_clients = room.clients.lock().unwrap(); //we finally got to the room! let room_clients = room.clients.read().unwrap(); //we finally got to the room!
for (_k,v) in room_clients.iter() { for (_k,v) in room_clients.iter() {
if v.id != client_id{ if v.id != client_id{
let ip = v.ip.lock().unwrap(); let ip = v.ip.read().unwrap();
let port = v.port.lock().unwrap(); let port = v.port.read().unwrap();
match s.send_to(&buf,SocketAddr::new(*ip, *port)) { match s.send_to(&buf,SocketAddr::new(*ip, *port)) {
Ok(_)=> (), Ok(_)=> (),
Err(_) => {println!("Error sending to person in room. They probably left");} Err(_) => {println!("Error sending to person in room. They probably left");}
@ -478,16 +478,16 @@ fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex:
} else if t == 4 { } else if t == 4 {
{ {
let clients = client_mutex.lock().unwrap(); let clients = client_mutex.read().unwrap();
let client = clients.get(&client_id).unwrap(); let client = clients.get(&client_id).unwrap();
let room_option = client.room.lock().unwrap(); let room_option = client.room.read().unwrap();
let room = room_option.as_ref().unwrap(); let room = room_option.as_ref().unwrap();
let room_clients = room.clients.lock().unwrap(); //we finally got to the room! let room_clients = room.clients.read().unwrap(); //we finally got to the room!
buf[0] = 3u8; buf[0] = 3u8;
for (_k,v) in room_clients.iter() { for (_k,v) in room_clients.iter() {
let ip = v.ip.lock().unwrap(); let ip = v.ip.read().unwrap();
let port = v.port.lock().unwrap(); let port = v.port.read().unwrap();
match s.send_to(&buf,SocketAddr::new(*ip, *port)) { match s.send_to(&buf,SocketAddr::new(*ip, *port)) {
Ok(_)=> (), Ok(_)=> (),
Err(_) => {println!("Error sending to person in room. They probably left");} Err(_) => {println!("Error sending to person in room. They probably left");}
@ -504,9 +504,9 @@ fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex:
let message_vec = buf[6..].to_vec(); let message_vec = buf[6..].to_vec();
let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize); let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize);
let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap(); let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap();
let clients = client_mutex.lock().unwrap(); let clients = client_mutex.read().unwrap();
let client = clients.get(&client_id).unwrap(); let client = clients.get(&client_id).unwrap();
let groups = client.groups.lock().unwrap(); let groups = client.groups.read().unwrap();
let clients = groups.get(&group_name).unwrap(); let clients = groups.get(&group_name).unwrap();
//we need to form a new message without the group name //we need to form a new message without the group name
@ -516,8 +516,8 @@ fn udp_listen(client_mutex: Arc<Mutex<HashMap<u32, Arc<Client>>>>, _room_mutex:
message_to_send.extend(message_bytes); message_to_send.extend(message_bytes);
for v in clients.iter() { for v in clients.iter() {
let ip = v.ip.lock().unwrap(); let ip = v.ip.read().unwrap();
let port = v.port.lock().unwrap(); let port = v.port.read().unwrap();
match s.send_to(&message_bytes,SocketAddr::new(*ip, *port)) { match s.send_to(&message_bytes,SocketAddr::new(*ip, *port)) {
Ok(_)=> (), Ok(_)=> (),
Err(_) => {println!("Error sending to person in room. They probably left");} Err(_) => {println!("Error sending to person in room. They probably left");}
@ -539,8 +539,8 @@ fn main() {
let clients: HashMap<u32, Arc<Client>> = HashMap::new(); let clients: HashMap<u32, Arc<Client>> = HashMap::new();
let rooms: HashMap<String, Arc<Room>> = HashMap::new(); let rooms: HashMap<String, Arc<Room>> = HashMap::new();
let client_mutex = Arc::new(Mutex::new(clients)); let client_mutex = Arc::new(RwLock::new(clients));
let room_mutex = Arc::new(Mutex::new(rooms)); let room_mutex = Arc::new(RwLock::new(rooms));
//start the UDP thread //start the UDP thread
let udp_clients = Arc::clone(&client_mutex); let udp_clients = Arc::clone(&client_mutex);