Merge branch 'asyncversion' of github.com:velaboratory/VelNetServerRust into asyncversion

asyncversion
Anton Franzluebbers 2022-03-26 19:06:52 +00:00
commit ec86134356
3 changed files with 217 additions and 94 deletions

View File

@ -1,10 +1,21 @@
# VelNetServerRust # VelNetServerRust
This basic, single-file relay server is designed to be used for network games, and is similar to Photon Realtime in design. It is written in Rust, with a single-threaded, non-blocking design and does not rely on any network frameworks (pure TCP/UDP). A Unity/C# client implementation can be found in our VelNetUnity repository.
Like Photon, there is no built-in persistence of rooms or data. Rooms are created when the first client joins and destroyed when the last client leaves.
The only game logic implemented by the server is that of a "master client", which is an easier way to negotiate a leader in a room that can perform room level operations.
The "group" functionality is used to specify specific clients to communicate with. Note, these client ids can bridge across rooms.
The server supports both TCP and UDP transports.
## Running ## Running
1. Get a linoox server 1. Get a linoox server (also runs fine on windows & osx, but the instructions below are for linux)
2. Clone this repo 2. Clone this repo
3. Install rust: `sudo apt install cargo` 3. Edit config.txt to an open port on your firewall
4. Install rust through cargo: e.g. `sudo apt install cargo`
5. Build: `cargo build --release` 5. Build: `cargo build --release`
6. Run: `sudo ./target/release/VelNetServerRust` 6. Run: `sudo ./target/release/VelNetServerRust`
7. Or run in the background so that it doesn't quit when you leave ssh: `nohup sudo ./target/release/VelNetServerRust`. You'll have to install `nohup` with apt. 7. Or run in the background so that it doesn't quit when you leave ssh: `nohup sudo ./target/release/VelNetServerRust`. You'll have to install `nohup` with apt.

View File

@ -1,5 +1,4 @@
{ {
"port":5000, "port":5000,
"tcp_timeout":30, "tcp_timeout":30
"tcp_send_buffer":100000
} }

View File

@ -1,10 +1,10 @@
use async_std::prelude::*; use async_std::prelude::*;
use async_std::future;
use async_std::net::TcpListener; use async_std::net::TcpListener;
use async_std::net::TcpStream; use async_std::net::TcpStream;
use async_std::net::UdpSocket; use async_std::net::UdpSocket;
use async_notify::Notify; use async_notify::Notify;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::future;
use futures::join; use futures::join;
use futures::select; use futures::select;
use futures::pin_mut; use futures::pin_mut;
@ -15,9 +15,10 @@ use std::rc::{Rc};
use std::cell::{RefCell}; use std::cell::{RefCell};
use std::fs; use std::fs;
use chrono::Local; use chrono::Local;
use std::time; use std::time::Duration;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use std::error::Error; use std::error::Error;
use std::ptr;
enum ToClientTCPMessageType { enum ToClientTCPMessageType {
LoggedIn = 0, LoggedIn = 0,
RoomList = 1, RoomList = 1,
@ -77,8 +78,7 @@ struct Room {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct Config { struct Config {
port: u16, port: u16,
tcp_timeout: u64, tcp_timeout: u64
tcp_send_buffer: usize
} }
@ -101,15 +101,22 @@ async fn main() {
let tcp_future = tcp_listener let tcp_future = tcp_listener
.incoming() .incoming()
.for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone())); .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone(),&config));
let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone()); let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone());
join!(tcp_future,udp_future); join!(tcp_future,udp_future);
} }
async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, last_client_id: Rc<RefCell<u32>>){ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, last_client_id: Rc<RefCell<u32>>, config: &Config){
println!("started tcp"); println!("started tcp");
match socket.set_nodelay(true) {
Ok(_)=>{},
Err(_)=>{eprintln!("could not set no delay"); return;}
}
//socket.set_read_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap();
//socket.set_write_timeout(Some(time::Duration::new(config.tcp_timeout,0))).unwrap();
let my_id; let my_id;
{ {
@ -121,6 +128,14 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
let client_notify = Rc::new(Notify::new()); let client_notify = Rc::new(Notify::new());
let client_notify_udp = Rc::new(Notify::new()); let client_notify_udp = Rc::new(Notify::new());
let ip;
match socket.peer_addr() {
Ok(p)=>ip=p.ip(),
Err(_)=>{return;}
}
let client = Rc::new(RefCell::new(Client{ let client = Rc::new(RefCell::new(Client{
id: my_id, id: my_id,
username: String::from(""), username: String::from(""),
@ -128,7 +143,7 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
roomname: String::from(""), roomname: String::from(""),
application: String::from(""), application: String::from(""),
groups: HashMap::new(), groups: HashMap::new(),
ip: socket.peer_addr().unwrap().ip(), ip: ip,
udp_port: 0 as u16, udp_port: 0 as u16,
message_queue: vec![], message_queue: vec![],
message_queue_udp: vec![], message_queue_udp: vec![],
@ -147,9 +162,8 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
} }
let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone(), config.tcp_timeout*1000).fuse();
let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); let write_async = client_write(client.clone(), socket, client_notify.clone(), config.tcp_timeout*1000).fuse();
let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse();
let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse();
pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select
@ -171,22 +185,54 @@ async fn process_client(socket: TcpStream, udp_socket: Rc<RefCell<UdpSocket>>, c
} }
async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>){ async fn read_timeout(mut socket: &TcpStream, buf: &mut [u8], duration: u64) -> Result<usize,Box<dyn Error>> {
//this is a read exact function. The buffer passed should be the exact size wanted
let num_to_read = buf.len();
let mut num_read = 0;
while num_read < num_to_read {
match future::timeout(Duration::from_millis(duration), socket.read(&mut buf[num_read..])).await {
Ok(r) => {
match r {
Ok(n) if n == 0 => {
return Err(format!("{}", "no bytes read"))?
},
Ok(n) => {
num_read += n;
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
}
return Ok(num_read);
}
async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, duration: u64){
let mut buf = [0; 1]; let mut buf = [0; 1];
loop { loop {
match socket.read(&mut buf).await {
// socket closed match read_timeout(&mut socket, &mut buf, duration).await {
Ok(n) if n == 0 => {
println!("client read ended naturally?");
return;
},
Ok(_) => { Ok(_) => {
let t = buf[0]; let t = buf[0];
if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring]
match read_login_message(socket.clone(), client.clone()).await{ match read_login_message(socket.clone(), client.clone(), duration).await{
Ok(_)=>(), Ok(_)=>(),
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
@ -196,26 +242,26 @@ async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
} else if t == FromClientTCPMessageType::GetRoomData as u8 { } else if t == FromClientTCPMessageType::GetRoomData as u8 {
match read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await{ match read_roomdata_message(socket.clone(), client.clone(), rooms.clone(), duration).await{
Ok(_)=>(), Ok(_)=>(),
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
} else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring]
match read_join_message(socket.clone(), client.clone(), rooms.clone()).await{ match read_join_message(socket.clone(), client.clone(), rooms.clone(), duration).await{
Ok(_)=>(), Ok(_)=>(),
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
} else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 ||
t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 ||
t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 ||
t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 ||
t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array]
match read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await{ match read_send_message(socket.clone(), client.clone(), rooms.clone(), t, duration).await{
Ok(_)=>(), Ok(_)=>(),
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
} else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array]
match read_group_message(socket.clone(), client.clone(), clients.clone()).await{ match read_group_message(socket.clone(), client.clone(), clients.clone(),duration).await{
Ok(_)=>(), Ok(_)=>(),
Err(_)=>{eprintln!("failed to read from socket"); return;} Err(_)=>{eprintln!("failed to read from socket"); return;}
}; };
@ -231,10 +277,38 @@ async fn client_read(client: Rc<RefCell<Client>>, mut socket: TcpStream, clients
//remove the client //remove the client
return; return;
} }
}; };
} }
} }
async fn client_write(client: Rc<RefCell<Client>>, mut socket: TcpStream, notify: Rc<Notify>){
async fn write_timeout(mut socket: &TcpStream, buf: &[u8], duration: u64) -> Result<usize,Box<dyn Error>> {
match future::timeout(Duration::from_millis(duration), socket.write(buf)).await {
Ok(r) => {
match r {
Ok(n)=> {
return Ok(n);
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
},
Err(e) => {return Err(format!("{}", e.to_string()))?}
}
}
async fn client_write(client: Rc<RefCell<Client>>, mut socket: TcpStream, notify: Rc<Notify>,duration:u64){
//wait on messages in my queue //wait on messages in my queue
loop { loop {
@ -252,7 +326,7 @@ async fn client_write(client: Rc<RefCell<Client>>, mut socket: TcpStream, notify
client_ref.message_queue.clear(); client_ref.message_queue.clear();
} }
match socket.write(&to_write).await { match write_timeout(&mut socket, &to_write, duration).await {
Ok(_) => (), Ok(_) => (),
Err(_) => {eprintln!("failed to write to the tcp socket"); return;} Err(_) => {eprintln!("failed to write to the tcp socket"); return;}
} }
@ -382,26 +456,60 @@ async fn process_udp(socket: Rc<RefCell<UdpSocket>>,clients: Rc<RefCell<HashMap<
let group_name_size = buf[5]; let group_name_size = buf[5];
let message_vec = buf[6..packet_size].to_vec(); let message_vec = buf[6..packet_size].to_vec();
let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize); let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize);
let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap();
let res = String::from_utf8(group_name_bytes.to_vec());
match res {
Ok(_)=>{},
Err(_)=>{eprintln!("Could not convert group name"); return;}
}
let group_name = res.unwrap();
let clients = clients.borrow(); let clients = clients.borrow();
let mut message_to_send = vec![];
message_to_send.push(ToClientUDPMessageType::DataMessage as u8);
message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]);
message_to_send.extend(message_bytes);
let mut send_to_client = false;
if clients.contains_key(&client_id){ if clients.contains_key(&client_id){
let client = clients.get(&client_id).unwrap().borrow(); {
if client.groups.contains_key(&group_name) { {
let clients = client.groups.get(&group_name).unwrap(); let client = clients.get(&client_id).unwrap().borrow();
//we need to form a new message without the group name if client.groups.contains_key(&group_name) {
let mut message_to_send = vec![]; let clients = client.groups.get(&group_name).unwrap();
message_to_send.push(ToClientUDPMessageType::DataMessage as u8); //we need to form a new message without the group name
message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]);
message_to_send.extend(message_bytes);
for v in clients.iter() {
for v in clients.iter() { let mut skip_client=false;
let mut v_ref = v.borrow_mut();
v_ref.message_queue_udp.push(message_to_send.clone()); {
v_ref.notify_udp.notify(); let v_ref = v.borrow();
if(v_ref.id == client.id){
skip_client=true;
send_to_client = true;
}
}
if !skip_client {
let mut v_ref = v.borrow_mut();
v_ref.message_queue_udp.push(message_to_send.clone());
v_ref.notify_udp.notify();
}
}
}
} }
if send_to_client {
let mut client = clients.get(&client_id).unwrap().borrow_mut();
client.message_queue_udp.push(message_to_send.clone());
client.notify_udp.notify();
}
} }
} }
} }
} }
@ -409,11 +517,11 @@ async fn process_udp(socket: Rc<RefCell<UdpSocket>>,clients: Rc<RefCell<HashMap<
} }
//this is in response to someone asking to login (this is where usernames and passwords would be processed, in theory) //this is in response to someone asking to login (this is where usernames and passwords would be processed, in theory)
async fn read_login_message(mut stream: TcpStream, client: Rc<RefCell<Client>>) -> Result<(),Box<dyn Error>>{ async fn read_login_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, duration: u64) -> Result<(),Box<dyn Error>>{
//byte,shortstring,byte,shortstring //byte,shortstring,byte,shortstring
let username = read_short_string(&mut stream).await?; let username = read_short_string(&mut stream, duration).await?;
let application = read_short_string(&mut stream).await?; let application = read_short_string(&mut stream, duration).await?;
println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username);
{ {
let mut client = client.borrow_mut(); let mut client = client.borrow_mut();
@ -469,10 +577,10 @@ async fn read_rooms_message(mut stream: TcpStream, client: Rc<RefCell<Client>>,
return Ok(()); return Ok(());
} }
async fn read_join_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>)-> Result<(),Box<dyn Error>>{ async fn read_join_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, duration: u64)-> Result<(),Box<dyn Error>>{
//byte,shortstring //byte,shortstring
let short_room_name = read_short_string(&mut stream).await?; let short_room_name = read_short_string(&mut stream, duration).await?;
let extended_room_name; let extended_room_name;
let mut leave_room = false; let mut leave_room = false;
{ {
@ -540,7 +648,7 @@ async fn read_join_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, r
return Ok(()); return Ok(());
} }
async fn read_roomdata_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>)-> Result<(),Box<dyn Error>>{ async fn read_roomdata_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, duration: u64)-> Result<(),Box<dyn Error>>{
//type, room_name //type, room_name
//will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ...
@ -548,7 +656,7 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc<RefCell<Client>
let short_room_name = read_short_string(&mut stream).await?; let short_room_name = read_short_string(&mut stream, duration).await?;
let mut client_ref = client.borrow_mut(); let mut client_ref = client.borrow_mut();
let room_name = format!("{}_{}", client_ref.application, short_room_name); let room_name = format!("{}_{}", client_ref.application, short_room_name);
@ -592,10 +700,10 @@ async fn read_roomdata_message(mut stream: TcpStream, client: Rc<RefCell<Client>
} }
async fn read_send_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, message_type: u8)-> Result<(),Box<dyn Error>>{ async fn read_send_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, rooms: Rc<RefCell<HashMap<String, Rc<RefCell<Room>>>>>, message_type: u8, duration: u64)-> Result<(),Box<dyn Error>>{
//4 byte length, array //4 byte length, array
//this is a message for everyone in the room (maybe) //this is a message for everyone in the room (maybe)
let to_send = read_vec(&mut stream).await?; let to_send = read_vec(&mut stream, duration).await?;
if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 {
send_room_message(client,&to_send,rooms.clone(),false,false); send_room_message(client,&to_send,rooms.clone(),false,false);
}else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 {
@ -605,16 +713,16 @@ async fn read_send_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, r
}else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered
send_room_message(client,&to_send,rooms.clone(),true,true); send_room_message(client,&to_send,rooms.clone(),true,true);
}else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 {
let group = read_short_string(&mut stream).await?; let group = read_short_string(&mut stream, duration).await?;
send_group_message(client,&to_send, &group); send_group_message(client,&to_send, &group);
} }
return Ok(()); return Ok(());
} }
async fn read_group_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>)-> Result<(),Box<dyn Error>>{ async fn read_group_message(mut stream: TcpStream, client: Rc<RefCell<Client>>, clients: Rc<RefCell<HashMap<u32, Rc<RefCell<Client>>>>>, duration: u64)-> Result<(),Box<dyn Error>>{
let group = read_short_string(&mut stream).await?; let group = read_short_string(&mut stream, duration).await?;
let id_bytes = read_vec(&mut stream).await?; let id_bytes = read_vec(&mut stream, duration).await?;
let num = id_bytes.len(); let num = id_bytes.len();
let mut client_ref = client.borrow_mut(); let mut client_ref = client.borrow_mut();
@ -671,7 +779,7 @@ fn client_leave_room(client: Rc<RefCell<Client>>, send_to_client: bool, rooms: R
let mut destroy_room = false; let mut destroy_room = false;
{ {
let mut rooms_ref = rooms.borrow_mut(); let rooms_ref = rooms.borrow_mut();
let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut(); let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut();
for (_k,v) in room_ref.clients.iter() { for (_k,v) in room_ref.clients.iter() {
@ -801,15 +909,8 @@ fn send_room_message(sender: Rc<RefCell<Client>>, message: &Vec<u8>, rooms: Rc<R
let room_ref = rooms_ref[&sender_ref.roomname].borrow(); let room_ref = rooms_ref[&sender_ref.roomname].borrow();
for (_k,v) in room_ref.clients.iter(){ for (_k,v) in room_ref.clients.iter(){
if !include_sender && *_k == sender_ref.id {
continue; if *_k != sender_ref.id {
}
if *_k == sender_ref.id {
sender_ref.message_queue.extend(&write_buf);
sender_ref.notify.notify();
}else {
let mut temp_mut = v.borrow_mut(); let mut temp_mut = v.borrow_mut();
temp_mut.message_queue.extend_from_slice(&write_buf); temp_mut.message_queue.extend_from_slice(&write_buf);
temp_mut.notify.notify(); temp_mut.notify.notify();
@ -822,53 +923,65 @@ fn send_room_message(sender: Rc<RefCell<Client>>, message: &Vec<u8>, rooms: Rc<R
} }
fn send_group_message(sender: Rc<RefCell<Client>>, message: &Vec<u8>, group: &String){ fn send_group_message(sender: Rc<RefCell<Client>>, message: &Vec<u8>, group: &String){
let mut write_buf = vec![]; let mut write_buf = vec![];
let sender_ref = sender.borrow(); let mut sender_ref = sender.borrow_mut();
write_buf.push(ToClientTCPMessageType::DataMessage as u8); write_buf.push(ToClientTCPMessageType::DataMessage as u8);
write_buf.extend_from_slice(&sender_ref.id.to_be_bytes()); write_buf.extend_from_slice(&sender_ref.id.to_be_bytes());
write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes());
write_buf.extend_from_slice(message); write_buf.extend_from_slice(message);
//get the list of client ids for this group //get the list of client ids for this group
let groups = &sender_ref.groups; let mut send_to_client = false;
if groups.contains_key(group) { if sender_ref.groups.contains_key(group) {
let group = groups.get(group).unwrap(); let group = sender_ref.groups.get(group).unwrap();
for c in group { for c in group {
let mut temp_mut = c.borrow_mut(); if ptr::eq(sender.as_ref(),c.as_ref()){
temp_mut.message_queue.extend_from_slice(&write_buf); send_to_client = true;
temp_mut.notify.notify(); }
} else{
let mut temp_mut = c.borrow_mut();
temp_mut.message_queue.extend_from_slice(&write_buf);
temp_mut.notify.notify();
}
}
}
if send_to_client {
sender_ref.message_queue.extend_from_slice(&write_buf);
sender_ref.notify.notify();
} }
} }
async fn read_u8(stream: &mut TcpStream) -> Result<u8,Box<dyn Error>> { async fn read_u8(stream: &mut TcpStream,duration: u64) -> Result<u8,Box<dyn Error>> {
let mut buf = [0; 1]; let mut buf = [0; 1];
stream.read_exact(&mut buf).await?; read_timeout(stream, &mut buf, duration).await?;
return Ok(buf[0]); return Ok(buf[0]);
} }
async fn read_u32(stream: &mut TcpStream) -> Result<u32,Box<dyn Error>> { async fn read_u32(stream: &mut TcpStream,duration: u64) -> Result<u32,Box<dyn Error>> {
let mut buf:[u8;4] = [0; 4]; let mut buf:[u8;4] = [0; 4];
stream.read_exact(&mut buf).await?; read_timeout(stream, &mut buf, duration).await?;
let size = u32::from_be_bytes(buf); let size = u32::from_be_bytes(buf);
return Ok(size); return Ok(size);
} }
async fn _read_string(stream: &mut TcpStream) -> Result<String,Box<dyn Error>> { async fn _read_string(stream: &mut TcpStream,duration: u64) -> Result<String,Box<dyn Error>> {
let size = read_u32(stream).await?; let size = read_u32(stream,duration).await?;
let mut string_bytes = vec![0;size as usize]; let mut string_bytes = vec![0;size as usize];
stream.read_exact(&mut string_bytes).await?; read_timeout(stream, &mut string_bytes, duration).await?;
return Ok(String::from_utf8(string_bytes).unwrap()); return Ok(String::from_utf8(string_bytes)?);
} }
async fn read_short_string(stream: &mut TcpStream) -> Result<String,Box<dyn Error>> { async fn read_short_string(stream: &mut TcpStream,duration: u64) -> Result<String,Box<dyn Error>> {
let size = read_u8(stream).await?; let size = read_u8(stream,duration).await?;
let mut string_bytes = vec![0;size as usize]; let mut string_bytes = vec![0;size as usize];
stream.read_exact(&mut string_bytes).await?; read_timeout(stream, &mut string_bytes, duration).await?;
return Ok(String::from_utf8(string_bytes).unwrap()); return Ok(String::from_utf8(string_bytes)?);
} }
async fn read_vec(stream: &mut TcpStream) -> Result<Vec<u8>,Box<dyn Error>> { async fn read_vec(stream: &mut TcpStream,duration: u64) -> Result<Vec<u8>,Box<dyn Error>> {
let message_size = read_u32(stream).await?; let message_size = read_u32(stream,duration).await?;
let mut message = vec![0u8;message_size as usize]; let mut message = vec![0u8;message_size as usize];
stream.read_exact(&mut message).await?; read_timeout(stream, &mut message, duration).await?;
return Ok(message); return Ok(message);
} }