important change, added 5s timeout on all connections. Must send heartbeat

asyncversion
Kyle Johnsen 2022-03-18 08:59:41 -04:00
parent aa792c736c
commit 792054749f
1 changed files with 76 additions and 22 deletions

View File

@ -1,10 +1,10 @@
use async_std::prelude::*;
use async_std::future;
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::net::UdpSocket;
use async_notify::Notify;
use futures::stream::StreamExt;
use futures::future;
use futures::join;
use futures::select;
use futures::pin_mut;
@ -15,7 +15,7 @@ use std::rc::{Rc};
use std::cell::{RefCell};
use std::fs;
use chrono::Local;
use std::time;
use std::time::Duration;
use serde::{Serialize, Deserialize};
use std::error::Error;
use std::ptr;
@ -102,16 +102,20 @@ async fn main() {
let tcp_future = tcp_listener
.incoming()
.for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone()));
.for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone(),&config));
let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone());
join!(tcp_future,udp_future);
}
async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, last_client_id: Rc<RefCell<u32>>){
async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, last_client_id: Rc<RefCell<u32>>, config: &Config){
println!("started tcp");
socket.set_nodelay(true).unwrap();
//socket.set_read_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap();
//socket.set_write_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap();
let my_id;
{
let mut reference = last_client_id.borrow_mut();
@ -156,7 +160,6 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
}
let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse();
let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse();
let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse();
@ -180,17 +183,39 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
}
async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> Result<usize,Box<dyn Error>> {
match future::timeout(Duration::from_millis(duration), socket.read(buf)).await {
Ok(r) => {
match r {
Ok(n) if n == 0 => {
return Err(format!("{}", "no bytes read"))?
},
Ok(n) => {
return Ok(n);
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
}
async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>){
let mut buf = [0; 1];
loop {
match socket.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => {
println!("client read ended naturally?");
return;
},
match read_timeout(&mut socket, &mut buf, 5000).await {
Ok(_) => {
let t = buf[0];
@ -240,9 +265,37 @@ async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients
//remove the client
return;
}
};
}
}
async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Result<usize,Box<dyn Error>> {
match future::timeout(Duration::from_millis(duration), socket.write(buf)).await {
Ok(r) => {
match r {
Ok(n)=> {
return Ok(n);
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
}
async fn client_write(client: Rc<RefCell<Client>>, mut socket: TcpStream, notify: Rc<Notify>){
//wait on messages in my queue
@ -261,7 +314,7 @@ async fn client_write(client: Rc<RefCell<Client>>, mut socket: TcpStream, notify
client_ref.message_queue.clear();
}
match socket.write(&to_write).await {
match write_timeout(&mut socket, &to_write, 5000).await {
Ok(_) => (),
Err(_) => {eprintln!("failed to write to the tcp socket"); return;}
}
@ -882,32 +935,33 @@ fn send_group_message(sender: Rc<RefCell<Client>>, message: &Vec<u8>, group: &St
async fn read_u8(stream: &mut TcpStream) -> Result<u8,Box<dyn Error>> {
let mut buf = [0; 1];
stream.read_exact(&mut buf).await?;
read_timeout(stream, &mut buf, 5000).await?;
return Ok(buf[0]);
}
async fn read_u32(stream: &mut TcpStream) -> Result<u32,Box<dyn Error>> {
let mut buf:[u8;4] = [0; 4];
stream.read_exact(&mut buf).await?;
read_timeout(stream, &mut buf, 5000).await?;
let size = u32::from_be_bytes(buf);
return Ok(size);
}
async fn _read_string(stream: &mut TcpStream) -> Result<String,Box<dyn Error>> {
let size = read_u32(stream).await?;
let mut string_bytes = vec![0;size as usize];
stream.read_exact(&mut string_bytes).await?;
read_timeout(stream, &mut string_bytes, 5000).await?;
return Ok(String::from_utf8(string_bytes).unwrap());
}
async fn read_short_string(stream: &mut TcpStream) -> Result<String,Box<dyn Error>> {
let size = read_u8(stream).await?;
let mut string_bytes = vec![0;size as usize];
stream.read_exact(&mut string_bytes).await?;
read_timeout(stream, &mut string_bytes, 5000).await?;
return Ok(String::from_utf8(string_bytes).unwrap());
}
async fn read_vec(stream: &mut TcpStream) -> Result<Vec<u8>,Box<dyn Error>> {
let message_size = read_u32(stream).await?;
let mut message = vec![0u8;message_size as usize];
stream.read_exact(&mut message).await?;
read_timeout(stream, &mut message, 5000).await?;
return Ok(message);
}