diff --git a/config.txt b/config.txt index fbdf4df..81bd2ee 100644 --- a/config.txt +++ b/config.txt @@ -1,5 +1,4 @@ { "port":5000, -"tcp_timeout":30, -"tcp_send_buffer":100000 +"tcp_timeout":30 } diff --git a/src/main.rs b/src/main.rs index cdd38ec..c4279d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -160,8 +160,8 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } - let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); - let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse(); + let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone(), config.tcp_timeout*1000).fuse(); + let write_async = client_write(client.clone(), socket, client_notify.clone(), config.tcp_timeout*1000).fuse(); let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select @@ -208,19 +208,19 @@ async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> } -async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>){ +async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>, duration: u64){ let mut buf = [0; 1]; loop { - match read_timeout(&mut socket, &mut buf, 5000).await { + match read_timeout(&mut socket, &mut buf, duration).await { Ok(_) => { let t = buf[0]; if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] - match read_login_message(socket.clone(), client.clone()).await{ + match read_login_message(socket.clone(), client.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -230,12 +230,12 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::GetRoomData as u8 { - match read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await{ + match read_roomdata_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] - match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ + match read_join_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -244,12 +244,12 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] - match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ + match read_send_message(socket.clone(), client.clone(), rooms.clone(), t, duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] - match read_group_message(socket.clone(), client.clone(), clients.clone()).await{ + match read_group_message(socket.clone(), client.clone(), clients.clone(),duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -296,7 +296,7 @@ async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Res } -async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc){ +async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc,duration:u64){ //wait on messages in my queue loop { @@ -314,7 +314,7 @@ async fn client_write(client: Rc>, mut socket: TcpStream, notify client_ref.message_queue.clear(); } - match write_timeout(&mut socket, &to_write, 5000).await { + match write_timeout(&mut socket, &to_write, duration).await { Ok(_) => (), Err(_) => {eprintln!("failed to write to the tcp socket"); return;} } @@ -498,11 +498,11 @@ async fn process_udp(socket: Rc>,clients: Rc>) -> Result<(),Box>{ +async fn read_login_message(mut stream: TcpStream, client: Rc>, duration: u64) -> Result<(),Box>{ //byte,shortstring,byte,shortstring - let username = read_short_string(&mut stream).await?; - let application = read_short_string(&mut stream).await?; + let username = read_short_string(&mut stream, duration).await?; + let application = read_short_string(&mut stream, duration).await?; println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); { let mut client = client.borrow_mut(); @@ -558,10 +558,10 @@ async fn read_rooms_message(mut stream: TcpStream, client: Rc>, return Ok(()); } -async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //byte,shortstring - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let extended_room_name; let mut leave_room = false; { @@ -629,7 +629,7 @@ async fn read_join_message(mut stream: TcpStream, client: Rc>, r return Ok(()); } -async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //type, room_name //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... @@ -637,7 +637,7 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let mut client_ref = client.borrow_mut(); let room_name = format!("{}_{}", client_ref.application, short_room_name); @@ -681,10 +681,10 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc } -async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8)-> Result<(),Box>{ +async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8, duration: u64)-> Result<(),Box>{ //4 byte length, array //this is a message for everyone in the room (maybe) - let to_send = read_vec(&mut stream).await?; + let to_send = read_vec(&mut stream, duration).await?; if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { send_room_message(client,&to_send,rooms.clone(),false,false); }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { @@ -694,16 +694,16 @@ async fn read_send_message(mut stream: TcpStream, client: Rc>, r }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered send_room_message(client,&to_send,rooms.clone(),true,true); }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { - let group = read_short_string(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; send_group_message(client,&to_send, &group); } return Ok(()); } -async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>)-> Result<(),Box>{ +async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>, duration: u64)-> Result<(),Box>{ - let group = read_short_string(&mut stream).await?; - let id_bytes = read_vec(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; + let id_bytes = read_vec(&mut stream, duration).await?; let num = id_bytes.len(); let mut client_ref = client.borrow_mut(); @@ -933,35 +933,35 @@ fn send_group_message(sender: Rc>, message: &Vec, group: &St } -async fn read_u8(stream: &mut TcpStream) -> Result> { +async fn read_u8(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf = [0; 1]; - read_timeout(stream, &mut buf, 5000).await?; + read_timeout(stream, &mut buf, duration).await?; return Ok(buf[0]); } -async fn read_u32(stream: &mut TcpStream) -> Result> { +async fn read_u32(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf:[u8;4] = [0; 4]; - read_timeout(stream, &mut buf, 5000).await?; + read_timeout(stream, &mut buf, duration).await?; let size = u32::from_be_bytes(buf); return Ok(size); } -async fn _read_string(stream: &mut TcpStream) -> Result> { - let size = read_u32(stream).await?; +async fn _read_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u32(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - read_timeout(stream, &mut string_bytes, 5000).await?; + read_timeout(stream, &mut string_bytes, duration).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_short_string(stream: &mut TcpStream) -> Result> { - let size = read_u8(stream).await?; +async fn read_short_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u8(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - read_timeout(stream, &mut string_bytes, 5000).await?; + read_timeout(stream, &mut string_bytes, duration).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_vec(stream: &mut TcpStream) -> Result,Box> { - let message_size = read_u32(stream).await?; +async fn read_vec(stream: &mut TcpStream,duration: u64) -> Result,Box> { + let message_size = read_u32(stream,duration).await?; let mut message = vec![0u8;message_size as usize]; - read_timeout(stream, &mut message, 5000).await?; + read_timeout(stream, &mut message, duration).await?; return Ok(message); }