diff --git a/README.md b/README.md index 38d59e6..4e6ffbb 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,21 @@ # VelNetServerRust +This basic, single-file relay server is designed to be used for network games, and is similar to Photon Realtime in design. It is written in Rust, with a single-threaded, non-blocking design and does not rely on any network frameworks (pure TCP/UDP). A Unity/C# client implementation can be found in our VelNetUnity repository. + +Like Photon, there is no built-in persistence of rooms or data. Rooms are created when the first client joins and destroyed when the last client leaves. + +The only game logic implemented by the server is that of a "master client", which is an easier way to negotiate a leader in a room that can perform room level operations. + +The "group" functionality is used to specify specific clients to communicate with. Note, these client ids can bridge across rooms. + +The server supports both TCP and UDP transports. + ## Running -1. Get a linoox server +1. Get a linoox server (also runs fine on windows & osx, but the instructions below are for linux) 2. Clone this repo -3. Install rust: `sudo apt install cargo` +3. Edit config.txt to an open port on your firewall +4. Install rust through cargo: e.g. `sudo apt install cargo` 5. Build: `cargo build --release` 6. Run: `sudo ./target/release/VelNetServerRust` 7. Or run in the background so that it doesn't quit when you leave ssh: `nohup sudo ./target/release/VelNetServerRust`. You'll have to install `nohup` with apt. diff --git a/config.txt b/config.txt index fbdf4df..81bd2ee 100644 --- a/config.txt +++ b/config.txt @@ -1,5 +1,4 @@ { "port":5000, -"tcp_timeout":30, -"tcp_send_buffer":100000 +"tcp_timeout":30 } diff --git a/src/main.rs b/src/main.rs index 010c030..4037adb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ use async_std::prelude::*; +use async_std::future; use async_std::net::TcpListener; use async_std::net::TcpStream; use async_std::net::UdpSocket; use async_notify::Notify; use futures::stream::StreamExt; -use futures::future; use futures::join; use futures::select; use futures::pin_mut; @@ -15,9 +15,10 @@ use std::rc::{Rc}; use std::cell::{RefCell}; use std::fs; use chrono::Local; -use std::time; +use std::time::Duration; use serde::{Serialize, Deserialize}; use std::error::Error; +use std::ptr; enum ToClientTCPMessageType { LoggedIn = 0, RoomList = 1, @@ -77,8 +78,7 @@ struct Room { #[derive(Serialize, Deserialize)] struct Config { port: u16, - tcp_timeout: u64, - tcp_send_buffer: usize + tcp_timeout: u64 } @@ -101,15 +101,22 @@ async fn main() { let tcp_future = tcp_listener .incoming() - .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone())); + .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone(),&config)); let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone()); join!(tcp_future,udp_future); } -async fn process_client(socket: TcpStream, udp_socket: Rc>, clients: Rc>>>>, rooms: Rc>>>>, last_client_id: Rc>){ +async fn process_client(socket: TcpStream, udp_socket: Rc>, clients: Rc>>>>, rooms: Rc>>>>, last_client_id: Rc>, config: &Config){ println!("started tcp"); + + match socket.set_nodelay(true) { + Ok(_)=>{}, + Err(_)=>{eprintln!("could not set no delay"); return;} + } + //socket.set_read_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); + //socket.set_write_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap(); let my_id; { @@ -121,6 +128,14 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c let client_notify = Rc::new(Notify::new()); let client_notify_udp = Rc::new(Notify::new()); + + let ip; + + match socket.peer_addr() { + Ok(p)=>ip=p.ip(), + Err(_)=>{return;} + } + let client = Rc::new(RefCell::new(Client{ id: my_id, username: String::from(""), @@ -128,7 +143,7 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c roomname: String::from(""), application: String::from(""), groups: HashMap::new(), - ip: socket.peer_addr().unwrap().ip(), + ip: ip, udp_port: 0 as u16, message_queue: vec![], message_queue_udp: vec![], @@ -147,9 +162,8 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } - - let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); - let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse(); + let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone(), config.tcp_timeout*1000).fuse(); + let write_async = client_write(client.clone(), socket, client_notify.clone(), config.tcp_timeout*1000).fuse(); let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select @@ -171,22 +185,54 @@ async fn process_client(socket: TcpStream, udp_socket: Rc>, c } -async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>){ +async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> Result> { + + //this is a read exact function. The buffer passed should be the exact size wanted + + let num_to_read = buf.len(); + + let mut num_read = 0; + + while num_read < num_to_read { + match future::timeout(Duration::from_millis(duration), socket.read(&mut buf[num_read..])).await { + Ok(r) => { + match r { + + Ok(n) if n == 0 => { + + return Err(format!("{}", "no bytes read"))? + + }, + Ok(n) => { + num_read += n; + }, + Err(e) => {return Err(format!("{}", e.to_string()))?} + } + + }, + + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + } + + return Ok(num_read); + +} + +async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>, duration: u64){ let mut buf = [0; 1]; loop { - match socket.read(&mut buf).await { - // socket closed - Ok(n) if n == 0 => { - println!("client read ended naturally?"); - return; - }, + + match read_timeout(&mut socket, &mut buf, duration).await { + Ok(_) => { let t = buf[0]; if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] - match read_login_message(socket.clone(), client.clone()).await{ + match read_login_message(socket.clone(), client.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -196,26 +242,26 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::GetRoomData as u8 { - match read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await{ + match read_roomdata_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] - match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ - Ok(_)=>(), + match read_join_message(socket.clone(), client.clone(), rooms.clone(), duration).await{ + Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] - match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ + t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] + match read_send_message(socket.clone(), client.clone(), rooms.clone(), t, duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] - match read_group_message(socket.clone(), client.clone(), clients.clone()).await{ + match read_group_message(socket.clone(), client.clone(), clients.clone(),duration).await{ Ok(_)=>(), Err(_)=>{eprintln!("failed to read from socket"); return;} }; @@ -231,10 +277,38 @@ async fn client_read(client: Rc>, mut socket: TcpStream, clients //remove the client return; } + + }; } } -async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc){ + + +async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Result> { + + match future::timeout(Duration::from_millis(duration), socket.write(buf)).await { + Ok(r) => { + match r { + + Ok(n)=> { + + return Ok(n); + + }, + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + + }, + + Err(e) => {return Err(format!("{}", e.to_string()))?} + + } + +} + + +async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc,duration:u64){ //wait on messages in my queue loop { @@ -252,7 +326,7 @@ async fn client_write(client: Rc>, mut socket: TcpStream, notify client_ref.message_queue.clear(); } - match socket.write(&to_write).await { + match write_timeout(&mut socket, &to_write, duration).await { Ok(_) => (), Err(_) => {eprintln!("failed to write to the tcp socket"); return;} } @@ -382,26 +456,60 @@ async fn process_udp(socket: Rc>,clients: Rc{}, + Err(_)=>{eprintln!("Could not convert group name"); return;} + } + + let group_name = res.unwrap(); let clients = clients.borrow(); + + let mut message_to_send = vec![]; + message_to_send.push(ToClientUDPMessageType::DataMessage as u8); + message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]); + message_to_send.extend(message_bytes); + + let mut send_to_client = false; if clients.contains_key(&client_id){ - let client = clients.get(&client_id).unwrap().borrow(); - if client.groups.contains_key(&group_name) { - let clients = client.groups.get(&group_name).unwrap(); - //we need to form a new message without the group name - let mut message_to_send = vec![]; - message_to_send.push(ToClientUDPMessageType::DataMessage as u8); - message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]); - message_to_send.extend(message_bytes); - - for v in clients.iter() { - let mut v_ref = v.borrow_mut(); - v_ref.message_queue_udp.push(message_to_send.clone()); - v_ref.notify_udp.notify(); + { + { + let client = clients.get(&client_id).unwrap().borrow(); + if client.groups.contains_key(&group_name) { + let clients = client.groups.get(&group_name).unwrap(); + //we need to form a new message without the group name + + + for v in clients.iter() { + let mut skip_client=false; + + { + let v_ref = v.borrow(); + if(v_ref.id == client.id){ + skip_client=true; + send_to_client = true; + } + } + if !skip_client { + let mut v_ref = v.borrow_mut(); + v_ref.message_queue_udp.push(message_to_send.clone()); + v_ref.notify_udp.notify(); + } + } + } } + if send_to_client { + let mut client = clients.get(&client_id).unwrap().borrow_mut(); + client.message_queue_udp.push(message_to_send.clone()); + client.notify_udp.notify(); + } + } } + + } } @@ -409,11 +517,11 @@ async fn process_udp(socket: Rc>,clients: Rc>) -> Result<(),Box>{ +async fn read_login_message(mut stream: TcpStream, client: Rc>, duration: u64) -> Result<(),Box>{ //byte,shortstring,byte,shortstring - let username = read_short_string(&mut stream).await?; - let application = read_short_string(&mut stream).await?; + let username = read_short_string(&mut stream, duration).await?; + let application = read_short_string(&mut stream, duration).await?; println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); { let mut client = client.borrow_mut(); @@ -469,10 +577,10 @@ async fn read_rooms_message(mut stream: TcpStream, client: Rc>, return Ok(()); } -async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //byte,shortstring - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let extended_room_name; let mut leave_room = false; { @@ -540,7 +648,7 @@ async fn read_join_message(mut stream: TcpStream, client: Rc>, r return Ok(()); } -async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>)-> Result<(),Box>{ +async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, duration: u64)-> Result<(),Box>{ //type, room_name //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... @@ -548,7 +656,7 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc - let short_room_name = read_short_string(&mut stream).await?; + let short_room_name = read_short_string(&mut stream, duration).await?; let mut client_ref = client.borrow_mut(); let room_name = format!("{}_{}", client_ref.application, short_room_name); @@ -592,10 +700,10 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc } -async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8)-> Result<(),Box>{ +async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8, duration: u64)-> Result<(),Box>{ //4 byte length, array //this is a message for everyone in the room (maybe) - let to_send = read_vec(&mut stream).await?; + let to_send = read_vec(&mut stream, duration).await?; if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { send_room_message(client,&to_send,rooms.clone(),false,false); }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { @@ -605,16 +713,16 @@ async fn read_send_message(mut stream: TcpStream, client: Rc>, r }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered send_room_message(client,&to_send,rooms.clone(),true,true); }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { - let group = read_short_string(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; send_group_message(client,&to_send, &group); } return Ok(()); } -async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>)-> Result<(),Box>{ +async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>, duration: u64)-> Result<(),Box>{ - let group = read_short_string(&mut stream).await?; - let id_bytes = read_vec(&mut stream).await?; + let group = read_short_string(&mut stream, duration).await?; + let id_bytes = read_vec(&mut stream, duration).await?; let num = id_bytes.len(); let mut client_ref = client.borrow_mut(); @@ -671,7 +779,7 @@ fn client_leave_room(client: Rc>, send_to_client: bool, rooms: R let mut destroy_room = false; { - let mut rooms_ref = rooms.borrow_mut(); + let rooms_ref = rooms.borrow_mut(); let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut(); for (_k,v) in room_ref.clients.iter() { @@ -801,15 +909,8 @@ fn send_room_message(sender: Rc>, message: &Vec, rooms: Rc>, message: &Vec, rooms: Rc>, message: &Vec, group: &String){ let mut write_buf = vec![]; - let sender_ref = sender.borrow(); + let mut sender_ref = sender.borrow_mut(); write_buf.push(ToClientTCPMessageType::DataMessage as u8); write_buf.extend_from_slice(&sender_ref.id.to_be_bytes()); write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); write_buf.extend_from_slice(message); //get the list of client ids for this group - let groups = &sender_ref.groups; - if groups.contains_key(group) { - let group = groups.get(group).unwrap(); - for c in group { - let mut temp_mut = c.borrow_mut(); - temp_mut.message_queue.extend_from_slice(&write_buf); - temp_mut.notify.notify(); - } + let mut send_to_client = false; + if sender_ref.groups.contains_key(group) { + let group = sender_ref.groups.get(group).unwrap(); + for c in group { + if ptr::eq(sender.as_ref(),c.as_ref()){ + send_to_client = true; + } + else{ + let mut temp_mut = c.borrow_mut(); + temp_mut.message_queue.extend_from_slice(&write_buf); + temp_mut.notify.notify(); + } + } + } + + if send_to_client { + sender_ref.message_queue.extend_from_slice(&write_buf); + sender_ref.notify.notify(); } } -async fn read_u8(stream: &mut TcpStream) -> Result> { +async fn read_u8(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf = [0; 1]; - stream.read_exact(&mut buf).await?; + read_timeout(stream, &mut buf, duration).await?; return Ok(buf[0]); + } -async fn read_u32(stream: &mut TcpStream) -> Result> { +async fn read_u32(stream: &mut TcpStream,duration: u64) -> Result> { let mut buf:[u8;4] = [0; 4]; - stream.read_exact(&mut buf).await?; + read_timeout(stream, &mut buf, duration).await?; let size = u32::from_be_bytes(buf); return Ok(size); } -async fn _read_string(stream: &mut TcpStream) -> Result> { - let size = read_u32(stream).await?; +async fn _read_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u32(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await?; - return Ok(String::from_utf8(string_bytes).unwrap()); + read_timeout(stream, &mut string_bytes, duration).await?; + return Ok(String::from_utf8(string_bytes)?); + } -async fn read_short_string(stream: &mut TcpStream) -> Result> { - let size = read_u8(stream).await?; +async fn read_short_string(stream: &mut TcpStream,duration: u64) -> Result> { + let size = read_u8(stream,duration).await?; let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).await?; - return Ok(String::from_utf8(string_bytes).unwrap()); + read_timeout(stream, &mut string_bytes, duration).await?; + return Ok(String::from_utf8(string_bytes)?); } -async fn read_vec(stream: &mut TcpStream) -> Result,Box> { - let message_size = read_u32(stream).await?; +async fn read_vec(stream: &mut TcpStream,duration: u64) -> Result,Box> { + let message_size = read_u32(stream,duration).await?; let mut message = vec![0u8;message_size as usize]; - stream.read_exact(&mut message).await?; + read_timeout(stream, &mut message, duration).await?; return Ok(message); }