diff --git a/src/main.rs b/src/main.rs index e1179f1..26a34ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use std::fs; use chrono::Local; use std::time; use serde::{Serialize, Deserialize}; +use std::error::Error; enum ToClientTCPMessageType { LoggedIn = 0, RoomList = 1, @@ -178,30 +179,49 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients match socket.read(&mut buf).await { // socket closed Ok(n) if n == 0 => { + println!("client read ended naturally?"); return; }, - Ok(n) => { + Ok(_) => { let t = buf[0]; if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] - read_login_message(socket.clone(), client.clone()).await; + match read_login_message(socket.clone(), client.clone()).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else if t == FromClientTCPMessageType::GetRooms as u8 {//[1:u8] - read_rooms_message(socket.clone(), client.clone(), rooms.clone()).await; + match read_rooms_message(socket.clone(), client.clone(), rooms.clone()).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else if t == FromClientTCPMessageType::GetRoomData as u8 { - read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await; + match read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] - read_join_message(socket.clone(), client.clone(), rooms.clone()).await; + match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] - read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await; + match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] - read_group_message(socket.clone(), client.clone(), clients.clone()).await; + match read_group_message(socket.clone(), client.clone(), clients.clone()).await{ + Ok(_)=>(), + Err(_)=>{eprintln!("failed to read from socket"); return;} + }; } else { //die...not correct protocol - println!("Incorrect protocol, killing"); + eprintln!("Incorrect protocol, killing"); return; } @@ -234,7 +254,7 @@ async fn client_write(client: Rc>, mut socket: TcpStream, notify match socket.write(&to_write).await { Ok(_) => (), - Err(_) => return + Err(_) => {eprintln!("failed to write to the tcp socket"); return;} } } @@ -262,7 +282,7 @@ async fn client_write_udp(client: Rc>, socket: Rc (), - Err(_) => () + Err(_) => {eprintln!("failed to write to the udp socket"); return;} } } @@ -389,11 +409,11 @@ async fn process_udp(socket: Rc>,clients: Rc>) { +async fn read_login_message(mut stream: TcpStream, client: Rc>) -> Result<(),Box>{ //byte,shortstring,byte,shortstring - let username = read_short_string(&mut stream).await; - let application = read_short_string(&mut stream).await; + let username = read_short_string(&mut stream).await?; + let application = read_short_string(&mut stream).await?; println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); { let mut client = client.borrow_mut(); @@ -412,9 +432,10 @@ async fn read_login_message(mut stream: TcpStream, client: Rc>) client.message_queue.extend(write_buf); client.notify.notify(); } + return Ok(()); } -async fn read_rooms_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ +async fn read_rooms_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ let mut write_buf = vec![]; write_buf.push(ToClientTCPMessageType::RoomList as u8); @@ -445,13 +466,13 @@ async fn read_rooms_message(mut stream: TcpStream, client: Rc>, write_buf.extend_from_slice(message_bytes); client.message_queue.extend_from_slice(&write_buf); client.notify.notify(); - + return Ok(()); } -async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ +async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ //byte,shortstring - let short_room_name = read_short_string(&mut stream).await; + let short_room_name = read_short_string(&mut stream).await?; let extended_room_name; let mut leave_room = false; { @@ -470,7 +491,7 @@ async fn read_join_message(mut stream: TcpStream, client: Rc>, r let mut rooms_ref = rooms.borrow_mut(); if short_room_name.trim() == "" || short_room_name == "-1" { - return; + return Ok(()); } if !rooms_ref.contains_key(&extended_room_name) { //new room, must create it @@ -515,9 +536,11 @@ async fn read_join_message(mut stream: TcpStream, client: Rc>, r }else{ send_client_master_message(&mut *client_ref, room_to_join.master_client.borrow().id); } + + return Ok(()); } -async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ +async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ //type, room_name //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... @@ -525,7 +548,7 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc - let short_room_name = read_short_string(&mut stream).await; + let short_room_name = read_short_string(&mut stream).await?; let mut client_ref = client.borrow_mut(); let room_name = format!("{}_{}", client_ref.application, short_room_name); @@ -565,12 +588,14 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc client_ref.notify.notify(); } + return Ok(()); + } -async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8){ +async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8)-> Result<(),Box>{ //4 byte length, array //this is a message for everyone in the room (maybe) - let to_send = read_vec(&mut stream).await; + let to_send = read_vec(&mut stream).await?; if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { send_room_message(client,&to_send,rooms.clone(),false,false); }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { @@ -580,15 +605,16 @@ async fn read_send_message(mut stream: TcpStream, client: Rc>, r }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered send_room_message(client,&to_send,rooms.clone(),true,true); }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { - let group = read_short_string(&mut stream).await; + let group = read_short_string(&mut stream).await?; send_group_message(client,&to_send, &group); } + return Ok(()); } -async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>){ +async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>)-> Result<(),Box>{ - let group = read_short_string(&mut stream).await; - let id_bytes = read_vec(&mut stream).await; + let group = read_short_string(&mut stream).await?; + let id_bytes = read_vec(&mut stream).await?; let num = id_bytes.len(); let mut client_ref = client.borrow_mut(); @@ -623,6 +649,7 @@ async fn read_group_message(mut stream: TcpStream, client: Rc>, client_ref.groups.insert(group.clone(),group_clients); + return Ok(()); } fn client_leave_room(client: Rc>, send_to_client: bool, rooms: Rc>>>>){ @@ -808,34 +835,34 @@ fn send_group_message(sender: Rc>, message: &Vec, group: &St } -async fn read_u8(stream: &mut TcpStream) -> u8 { +async fn read_u8(stream: &mut TcpStream) -> Result> { let mut buf = [0; 1]; - stream.read_exact(&mut buf).await.unwrap(); - return buf[0]; + stream.read_exact(&mut buf).await?; + return Ok(buf[0]); } -async fn read_u32(stream: &mut TcpStream) -> u32 { +async fn read_u32(stream: &mut TcpStream) -> Result> { let mut buf:[u8;4] = [0; 4]; - stream.read_exact(&mut buf).await.unwrap(); + stream.read_exact(&mut buf).await?; let size = u32::from_be_bytes(buf); - return size; + return Ok(size); } -async fn _read_string(stream: &mut TcpStream) -> String { - let size = read_u32(stream).await; +async fn _read_string(stream: &mut TcpStream) -> Result> { + let size = read_u32(stream).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await.unwrap(); - return String::from_utf8(string_bytes).unwrap(); + stream.read_exact(&mut string_bytes).await?; + return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_short_string(stream: &mut TcpStream) -> String { - let size = read_u8(stream).await; +async fn read_short_string(stream: &mut TcpStream) -> Result> { + let size = read_u8(stream).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await.unwrap(); - return String::from_utf8(string_bytes).unwrap(); + stream.read_exact(&mut string_bytes).await?; + return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_vec(stream: &mut TcpStream) -> Vec { - let message_size = read_u32(stream).await; +async fn read_vec(stream: &mut TcpStream) -> Result,Box> { + let message_size = read_u32(stream).await?; let mut message = vec![0u8;message_size as usize]; - stream.read_exact(&mut message).await.unwrap(); - return message; + stream.read_exact(&mut message).await?; + return Ok(message); }