From 3a03ceb8ceeed1927e23d707ad2fea658ef75820 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Sat, 12 Mar 2022 21:54:52 -0500 Subject: [PATCH 01/10] fixed another unwrap bug --- src/main.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 010c030..5bbca05 100644 --- a/src/main.rs +++ b/src/main.rs @@ -121,6 +121,14 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c let client_notify = Rc::new(Notify::new()); let client_notify_udp = Rc::new(Notify::new()); + + let ip; + + match socket.peer_addr() { + Ok(p)=>ip=p.ip(), + Err(e)=>{return;} + } + let client = Rc::new(RefCell::new(Client{ id: my_id, username: String::from(""), @@ -128,7 +136,7 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c roomname: String::from(""), application: String::from(""), groups: HashMap::new(), - ip: socket.peer_addr().unwrap().ip(), + ip: ip, udp_port: 0 as u16, message_queue: vec![], message_queue_udp: vec![], From 49d7628ad50333fc945fbe16f97811c671afa8ee Mon Sep 17 00:00:00 2001 From: kjjohnsen Date: Sun, 13 Mar 2022 10:28:35 -0400 Subject: [PATCH 02/10] Update README.md --- README.md | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 38d59e6..4e6ffbb 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,21 @@ # VelNetServerRust +This basic, single-file relay server is designed to be used for network games, and is similar to Photon Realtime in design. It is written in Rust, with a single-threaded, non-blocking design and does not rely on any network frameworks (pure TCP/UDP). A Unity/C# client implementation can be found in our VelNetUnity repository. + +Like Photon, there is no built-in persistence of rooms or data. Rooms are created when the first client joins and destroyed when the last client leaves. + +The only game logic implemented by the server is that of a "master client", which is an easier way to negotiate a leader in a room that can perform room level operations. + +The "group" functionality is used to specify specific clients to communicate with. Note, these client ids can bridge across rooms. + +The server supports both TCP and UDP transports. + ## Running -1. Get a linoox server +1. Get a linoox server (also runs fine on windows & osx, but the instructions below are for linux) 2. Clone this repo -3. Install rust: `sudo apt install cargo` +3. Edit config.txt to an open port on your firewall +4. Install rust through cargo: e.g. `sudo apt install cargo` 5. Build: `cargo build --release` 6. Run: `sudo ./target/release/VelNetServerRust` 7. Or run in the background so that it doesn't quit when you leave ssh: `nohup sudo ./target/release/VelNetServerRust`. You'll have to install `nohup` with apt. From 983308aabebf3cc5628be0d7b6ee0b8de1deb73c Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Mon, 14 Mar 2022 15:07:24 -0400 Subject: [PATCH 03/10] untested hotfix --- src/main.rs | 82 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5bbca05..a0319f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use chrono::Local; use std::time; use serde::{Serialize, Deserialize}; use std::error::Error; +use std::ptr; enum ToClientTCPMessageType { LoggedIn = 0, RoomList = 1, @@ -393,23 +394,50 @@ async fn process_udp(socket: Rc>,clients: Rc>, message: &Vec, rooms: Rc>, message: &Vec, group: &String){ let mut write_buf = vec![]; - let sender_ref = sender.borrow(); + let mut sender_ref = sender.borrow_mut(); write_buf.push(ToClientTCPMessageType::DataMessage as u8); write_buf.extend_from_slice(&sender_ref.id.to_be_bytes()); write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); write_buf.extend_from_slice(message); //get the list of client ids for this group - let groups = &sender_ref.groups; - if groups.contains_key(group) { - let group = groups.get(group).unwrap(); - for c in group { - let mut temp_mut = c.borrow_mut(); - temp_mut.message_queue.extend_from_slice(&write_buf); - temp_mut.notify.notify(); - } + let mut send_to_client = false; + if sender_ref.groups.contains_key(group) { + let group = sender_ref.groups.get(group).unwrap(); + for c in group { + if ptr::eq(sender.as_ref(),c.as_ref()){ + send_to_client = true; + } + else{ + let mut temp_mut = c.borrow_mut(); + temp_mut.message_queue.extend_from_slice(&write_buf); + temp_mut.notify.notify(); + } + } + } + + if send_to_client { + sender_ref.message_queue.extend_from_slice(&write_buf); + sender_ref.notify.notify(); } } From 5fa0855b9b04c6ee3fc1a2954eb6d959a3d99e2d Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Tue, 15 Mar 2022 13:27:09 -0400 Subject: [PATCH 04/10] bug fix --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index a0319f3..650214e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -412,9 +412,9 @@ async fn process_udp(socket: Rc>,clients: Rc Date: Wed, 16 Mar 2022 00:30:59 -0400 Subject: [PATCH 05/10] accidentally sending messages twice --- src/main.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 650214e..aecb577 100644 --- a/src/main.rs +++ b/src/main.rs @@ -837,15 +837,8 @@ fn send_room_message(sender: Rc>, message: &Vec, rooms: Rc Date: Fri, 18 Mar 2022 08:59:41 -0400 Subject: [PATCH 06/10] important change, added 5s timeout on all connections. Must send heartbeat --- src/main.rs | 98 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 76 insertions(+), 22 deletions(-) diff --git a/src/main.rs b/src/main.rs index aecb577..cdd38ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ use async_std::prelude::*; +use async_std::future; use async_std::net::TcpListener; use async_std::net::TcpStream; use async_std::net::UdpSocket; use async_notify::Notify; use futures::stream::StreamExt; -use futures::future; use futures::join; use futures::select; use futures::pin_mut; @@ -15,7 +15,7 @@ use std::rc::{Rc}; use std::cell::{RefCell}; use std::fs; use chrono::Local; -use std::time; +use std::time::Duration; use serde::{Serialize, Deserialize}; use std::error::Error; use std::ptr; @@ -102,15 +102,19 @@ async fn main() { let tcp_future = tcp_listener .incoming() - .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone())); + .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone(),&config)); let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone()); join!(tcp_future,udp_future); } -async fn process_client(socket: TcpStream, udp_socket: Rc>, clients: Rc>>>>, rooms: Rc>>>>, last_client_id: Rc>){ +async fn process_client(socket: TcpStream, udp_socket: Rc>, clients: Rc>>>>, rooms: Rc>>>>, last_client_id: Rc>, config: &Config){ println!("started tcp"); + + socket.set_nodelay(true).unwrap(); + //socket.set_read_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); + //socket.set_write_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); let my_id; { @@ -156,7 +160,6 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } - let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse(); let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); @@ -180,17 +183,39 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } +async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> Result> { + + match future::timeout(Duration::from_millis(duration), socket.read(buf)).await { + Ok(r) => { + match r { + + Ok(n) if n == 0 => { + + return Err(format!("{}", "no bytes read"))? + + }, + Ok(n) => { + return Ok(n); + }, + Err(e) => {return Err(format!("{}", e.to_string()))?} + } + + }, + + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + +} + async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>){ let mut buf = [0; 1]; loop { - match socket.read(&mut buf).await { - // socket closed - Ok(n) if n == 0 => { - println!("client read ended naturally?"); - return; - }, + + match read_timeout(&mut socket, &mut buf, 5000).await { + Ok(_) => { let t = buf[0]; @@ -211,14 +236,14 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients }; } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ - Ok(_)=>(), + Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] + t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} @@ -240,9 +265,37 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients //remove the client return; } + + }; } } + + +async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Result> { + + match future::timeout(Duration::from_millis(duration), socket.write(buf)).await { + Ok(r) => { + match r { + + Ok(n)=> { + + return Ok(n); + + }, + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + + }, + + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + +} + + async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc){ //wait on messages in my queue @@ -261,7 +314,7 @@ async fn client_write(client: Rc>, mut socket: TcpStream, notify client_ref.message_queue.clear(); } - match socket.write(&to_write).await { + match write_timeout(&mut socket, &to_write, 5000).await { Ok(_) => (), Err(_) => {eprintln!("failed to write to the tcp socket"); return;} } @@ -882,32 +935,33 @@ fn send_group_message(sender: Rc>, message: &Vec, group: &St async fn read_u8(stream: &mut TcpStream) -> Result> { let mut buf = [0; 1]; - stream.read_exact(&mut buf).await?; + read_timeout(stream, &mut buf, 5000).await?; return Ok(buf[0]); + } async fn read_u32(stream: &mut TcpStream) -> Result> { let mut buf:[u8;4] = [0; 4]; - stream.read_exact(&mut buf).await?; + read_timeout(stream, &mut buf, 5000).await?; let size = u32::from_be_bytes(buf); return Ok(size); } async fn _read_string(stream: &mut TcpStream) -> Result> { let size = read_u32(stream).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await?; + read_timeout(stream, &mut string_bytes, 5000).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } async fn read_short_string(stream: &mut TcpStream) -> Result> { let size = read_u8(stream).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await?; + read_timeout(stream, &mut string_bytes, 5000).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } async fn read_vec(stream: &mut TcpStream) -> Result,Box> { let message_size = read_u32(stream).await?; let mut message = vec![0u8;message_size as usize]; - stream.read_exact(&mut message).await?; + read_timeout(stream, &mut message, 5000).await?; return Ok(message); } From 85cc1d5a397458a0afe48699c41c495b407f83e5 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Fri, 18 Mar 2022 16:33:35 -0400 Subject: [PATCH 07/10] moved timeout to config --- config.txt | 3 +-- src/main.rs | 74 ++++++++++++++++++++++++++--------------------------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/config.txt b/config.txt index fbdf4df..81bd2ee 100644 --- a/config.txt +++ b/config.txt @@ -1,5 +1,4 @@ { "port":5000, -"tcp_timeout":30, -"tcp_send_buffer":100000 +"tcp_timeout":30 } diff --git a/src/main.rs b/src/main.rs index cdd38ec..c4279d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -160,8 +160,8 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } - let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); - let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse(); + let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone(), config.tcp_timeout*1000).fuse(); + let write_async = client_write(client.clone(), socket, client_notify.clone(), config.tcp_timeout*1000).fuse(); let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select @@ -208,19 +208,19 @@ async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> } -async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>){ +async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>, duration: u64){ let mut buf = [0; 1]; loop { - match read_timeout(&mut socket, &mut buf, 5000).await { + match read_timeout(&mut socket, &mut buf, duration).await { Ok(_) => { let t = buf[0]; if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] - match read_login_message(socket.clone(), client.clone()).await{ + match read_login_message(socket.clone(), client.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -230,12 +230,12 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::GetRoomData as u8 { - match read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await{ + match read_roomdata_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] - match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ + match read_join_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -244,12 +244,12 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] - match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ + match read_send_message(socket.clone(), client.clone(), rooms.clone(), t, duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] - match read_group_message(socket.clone(), client.clone(), clients.clone()).await{ + match read_group_message(socket.clone(), client.clone(), clients.clone(),duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -296,7 +296,7 @@ async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Res } -async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc){ +async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc,duration:u64){ //wait on messages in my queue loop { @@ -314,7 +314,7 @@ async fn client_write(client: Rc>, mut socket: TcpStream, notify client_ref.message_queue.clear(); } - match write_timeout(&mut socket, &to_write, 5000).await { + match write_timeout(&mut socket, &to_write, duration).await { Ok(_) => (), Err(_) => {eprintln!("failed to write to the tcp socket"); return;} } @@ -498,11 +498,11 @@ async fn process_udp(socket: Rc>,clients: Rc>) -> Result<(),Box>{ +async fn read_login_message(mut stream: TcpStream, client: Rc>, duration: u64) -> Result<(),Box>{ //byte,shortstring,byte,shortstring - let username = read_short_string(&mut stream).await?; - let application = read_short_string(&mut stream).await?; + let username = read_short_string(&mut stream, duration).await?; + let application = read_short_string(&mut stream, duration).await?; println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); { let mut client = client.borrow_mut(); @@ -558,10 +558,10 @@ async fn read_rooms_message(mut stream: TcpStream, client: Rc>, return Ok(()); } -async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //byte,shortstring - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let extended_room_name; let mut leave_room = false; { @@ -629,7 +629,7 @@ async fn read_join_message(mut stream: TcpStream, client: Rc>, r return Ok(()); } -async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //type, room_name //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... @@ -637,7 +637,7 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let mut client_ref = client.borrow_mut(); let room_name = format!("{}_{}", client_ref.application, short_room_name); @@ -681,10 +681,10 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc } -async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8)-> Result<(),Box>{ +async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8, duration: u64)-> Result<(),Box>{ //4 byte length, array //this is a message for everyone in the room (maybe) - let to_send = read_vec(&mut stream).await?; + let to_send = read_vec(&mut stream, duration).await?; if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { send_room_message(client,&to_send,rooms.clone(),false,false); }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { @@ -694,16 +694,16 @@ async fn read_send_message(mut stream: TcpStream, client: Rc>, r }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered send_room_message(client,&to_send,rooms.clone(),true,true); }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { - let group = read_short_string(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; send_group_message(client,&to_send, &group); } return Ok(()); } -async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>)-> Result<(),Box>{ +async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>, duration: u64)-> Result<(),Box>{ - let group = read_short_string(&mut stream).await?; - let id_bytes = read_vec(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; + let id_bytes = read_vec(&mut stream, duration).await?; let num = id_bytes.len(); let mut client_ref = client.borrow_mut(); @@ -933,35 +933,35 @@ fn send_group_message(sender: Rc>, message: &Vec, group: &St } -async fn read_u8(stream: &mut TcpStream) -> Result> { +async fn read_u8(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf = [0; 1]; - read_timeout(stream, &mut buf, 5000).await?; + read_timeout(stream, &mut buf, duration).await?; return Ok(buf[0]); } -async fn read_u32(stream: &mut TcpStream) -> Result> { +async fn read_u32(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf:[u8;4] = [0; 4]; - read_timeout(stream, &mut buf, 5000).await?; + read_timeout(stream, &mut buf, duration).await?; let size = u32::from_be_bytes(buf); return Ok(size); } -async fn _read_string(stream: &mut TcpStream) -> Result> { - let size = read_u32(stream).await?; +async fn _read_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u32(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - read_timeout(stream, &mut string_bytes, 5000).await?; + read_timeout(stream, &mut string_bytes, duration).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_short_string(stream: &mut TcpStream) -> Result> { - let size = read_u8(stream).await?; +async fn read_short_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u8(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - read_timeout(stream, &mut string_bytes, 5000).await?; + read_timeout(stream, &mut string_bytes, duration).await?; return Ok(String::from_utf8(string_bytes).unwrap()); } -async fn read_vec(stream: &mut TcpStream) -> Result,Box> { - let message_size = read_u32(stream).await?; +async fn read_vec(stream: &mut TcpStream,duration: u64) -> Result,Box> { + let message_size = read_u32(stream,duration).await?; let mut message = vec![0u8;message_size as usize]; - read_timeout(stream, &mut message, 5000).await?; + read_timeout(stream, &mut message, duration).await?; return Ok(message); } From 3caa73304f92bd6ccc6e0021e5ddc5a9ae2f0fb8 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Fri, 18 Mar 2022 16:34:46 -0400 Subject: [PATCH 08/10] mistake of commiting without testing --- src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index c4279d4..697a69f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,8 +78,7 @@ struct Room { #[derive(Serialize, Deserialize)] struct Config { port: u16, - tcp_timeout: u64, - tcp_send_buffer: usize + tcp_timeout: u64 } From e64ddafeb67889fde8fe10ac24cf09efca97d837 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Sun, 20 Mar 2022 15:36:37 -0400 Subject: [PATCH 09/10] removed some potential failure paths, server should never crash --- src/main.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index 697a69f..b4932b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,7 +111,10 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c println!("started tcp"); - socket.set_nodelay(true).unwrap(); + match socket.set_nodelay(true) { + Ok(_)=>{}, + Err(_)=>{eprintln!("could not set no delay"); return;} + } //socket.set_read_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); //socket.set_write_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); @@ -130,7 +133,7 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c match socket.peer_addr() { Ok(p)=>ip=p.ip(), - Err(e)=>{return;} + Err(_)=>{return;} } let client = Rc::new(RefCell::new(Client{ @@ -443,7 +446,14 @@ async fn process_udp(socket: Rc>,clients: Rc{}, + Err(_)=>{eprintln!("Could not convert group name"); return;} + } + + let group_name = res.unwrap(); let clients = clients.borrow(); @@ -759,7 +769,7 @@ fn client_leave_room(client: Rc>, send_to_client: bool, rooms: R let mut destroy_room = false; { - let mut rooms_ref = rooms.borrow_mut(); + let rooms_ref = rooms.borrow_mut(); let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut(); for (_k,v) in room_ref.clients.iter() { @@ -948,13 +958,14 @@ async fn _read_string(stream: &mut TcpStream,duration: u64) -> Result Result> { let size = read_u8(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; read_timeout(stream, &mut string_bytes, duration).await?; - return Ok(String::from_utf8(string_bytes).unwrap()); + return Ok(String::from_utf8(string_bytes)?); } async fn read_vec(stream: &mut TcpStream,duration: u64) -> Result,Box> { From 2f894d77296640aa4fef9b29d6c8d2946504f389 Mon Sep 17 00:00:00 2001 From: Kyle Johnsen Date: Sun, 20 Mar 2022 16:29:28 -0400 Subject: [PATCH 10/10] fixed potential tcp errors that may occur when packets are split --- src/main.rs | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index b4932b7..4037adb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -187,27 +187,37 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> Result> { - match future::timeout(Duration::from_millis(duration), socket.read(buf)).await { - Ok(r) => { - match r { + //this is a read exact function. The buffer passed should be the exact size wanted - Ok(n) if n == 0 => { - - return Err(format!("{}", "no bytes read"))? + let num_to_read = buf.len(); + + let mut num_read = 0; - }, - Ok(n) => { - return Ok(n); - }, - Err(e) => {return Err(format!("{}", e.to_string()))?} - } + while num_read < num_to_read { + match future::timeout(Duration::from_millis(duration), socket.read(&mut buf[num_read..])).await { + Ok(r) => { + match r { - }, + Ok(n) if n == 0 => { + + return Err(format!("{}", "no bytes read"))? - Err(e) => {return Err(format!("{}", e.to_string()))?} + }, + Ok(n) => { + num_read += n; + }, + Err(e) => {return Err(format!("{}", e.to_string()))?} + } + }, + + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } } + return Ok(num_read); + } async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>, duration: u64){