diff --git a/Cargo.lock b/Cargo.lock index 1f59244..4933522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,19 +4,198 @@ version = 3 [[package]] name = "VelNetServerRust" -version = "0.1.0" +version = "0.2.0" dependencies = [ + "async-notify", + "async-std", "chrono", + "futures", "serde", "serde_json", ] +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "async-channel" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c026b7e44f1316b567ee750fea85103f87fcb80792b860e979f221259796ca0a" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-mutex", + "blocking", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +dependencies = [ + "concurrent-queue", + "futures-lite", + "libc", + "log", + "once_cell", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-notify" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8356f653934a654817bceada97a857ef8d68ab8992753d23ed8e8ccd5fc8fa31" +dependencies = [ + "futures-channel", + "futures-util", +] + +[[package]] +name = "async-std" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" + +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "blocking" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + +[[package]] +name = "bumpalo" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" + +[[package]] +name = "cache-padded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.19" @@ -30,18 +209,236 @@ dependencies = [ "winapi", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" +dependencies = [ + "cfg-if", + "lazy_static", +] + +[[package]] +name = "ctor" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] + +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + +[[package]] +name = "futures-task" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" + +[[package]] +name = "futures-util" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gloo-timers" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d12a7f4e95cfe710f1d624fb1210b7d961a5fb05c4fd942f4feab06e61f590e" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "itoa" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "js-sys" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.117" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c" +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", + "value-bag", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + [[package]] name = "num-integer" version = "0.1.44" @@ -61,6 +458,53 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "polling" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "winapi", +] + [[package]] name = "proc-macro2" version = "1.0.36" @@ -116,6 +560,22 @@ dependencies = [ "serde", ] +[[package]] +name = "slab" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "syn" version = "1.0.86" @@ -144,12 +604,119 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f" +dependencies = [ + "ctor", + "version_check", +] + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb6ec270a31b1d3c7e266b999739109abce8b6c87e4b31fcfcd788b65267395" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" + +[[package]] +name = "web-sys" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 90aeba8..8f52e53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "VelNetServerRust" -version = "0.1.0" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -9,3 +9,6 @@ edition = "2021" chrono = "*" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +async-std = { version = "1.6", features = ["attributes"]} +futures = "*" +async-notify = "*" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index b1f5a5e..883f92f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,20 @@ -extern crate chrono; -extern crate serde; -extern crate serde_json; -use std::io::prelude::*; -use std::thread; -use std::net::{TcpListener, TcpStream,UdpSocket,IpAddr,SocketAddr}; +use async_std::prelude::*; +use async_std::net::TcpListener; +use async_std::net::TcpStream; +use async_std::net::UdpSocket; +use async_notify::Notify; +use futures::stream::StreamExt; +use futures::future; +use futures::join; +use futures::select; +use futures::pin_mut; +use futures::future::FutureExt; +use async_std::net::{IpAddr,SocketAddr}; use std::collections::HashMap; -use std::sync::{Arc,RwLock}; -use std::sync::mpsc; -use std::sync::mpsc::{SyncSender,Receiver}; -use chrono::Local; +use std::rc::{Rc}; +use std::cell::{RefCell}; use std::fs; +use chrono::Local; use std::time; use serde::{Serialize, Deserialize}; enum ToClientTCPMessageType { @@ -48,23 +53,25 @@ enum FromClientUDPMessageType { SendMessageGroupUnbuffered = FromClientTCPMessageType::SendMessageGroupUnbuffered as isize } struct Client { - logged_in: Arc>, + logged_in: bool, id: u32, - username: Arc>, - application: Arc>, - room: Arc>>>, - sender: SyncSender>, - rooms_mutex: Arc>>>, - clients_mutex: Arc>>>, - groups: Arc>>>>, - ip: Arc>, - port: Arc> + username: String, + roomname: String, + application: String, + groups: HashMap>>>, + ip: IpAddr, + udp_port: u16, + message_queue: Vec, + message_queue_udp: Vec>, + notify: Rc, + notify_udp: Rc, + is_master: bool } struct Room { name: String, - clients: RwLock>>, - master_client: Arc>> + clients: HashMap>>, + master_client: Rc> } #[derive(Serialize, Deserialize)] struct Config { @@ -74,80 +81,369 @@ struct Config { } -fn read_u8(stream: &mut TcpStream) -> u8 { - let mut buf = [0; 1]; - stream.read_exact(&mut buf).unwrap(); - return buf[0]; -} -fn read_u32(stream: &mut TcpStream) -> u32 { - let mut buf:[u8;4] = [0; 4]; - stream.read_exact(&mut buf).unwrap(); - let size = u32::from_be_bytes(buf); - return size; -} -fn _read_string(stream: &mut TcpStream) -> String { - let size = read_u32(stream); - let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).unwrap(); - return String::from_utf8(string_bytes).unwrap(); -} -fn read_short_string(stream: &mut TcpStream) -> String { - let size = read_u8(stream); - let mut string_bytes = vec![0;size as usize]; - stream.read_exact(&mut string_bytes).unwrap(); - return String::from_utf8(string_bytes).unwrap(); +#[async_std::main] +async fn main() { + + println!("{}: VelNet Server Starting",Local::now().format("%Y-%m-%d %H:%M:%S")); + + //read the config file + let foo = fs::read_to_string("config.txt").unwrap(); + let config: Config = serde_json::from_str(&foo).unwrap(); + println!("{}",config.port); + + let tcp_listener = TcpListener::bind(format!("0.0.0.0:{}",config.port)).await.unwrap(); + let udp_socket = Rc::new(RefCell::new(UdpSocket::bind(format!("0.0.0.0:{}",config.port)).await.unwrap())); + + let clients: Rc>>>> = Rc::new(RefCell::new(HashMap::new())); + let rooms: Rc>>>> = Rc::new(RefCell::new(HashMap::new())); + let last_client_id = Rc::new(RefCell::new(0)); + + let tcp_future = tcp_listener + .incoming() + .for_each_concurrent(None, |tcpstream| process_client(tcpstream.unwrap(), udp_socket.clone(), clients.clone(),rooms.clone(),last_client_id.clone())); + let udp_future = process_udp(udp_socket.clone(),clients.clone(),rooms.clone()); + join!(tcp_future,udp_future); + } -fn read_vec(stream: &mut TcpStream) -> Vec { - let message_size = read_u32(stream); - let mut message = vec![0u8;message_size as usize]; - stream.read_exact(&mut message).unwrap(); - return message; +async fn process_client(socket: TcpStream, udp_socket: Rc>, clients: Rc>>>>, rooms: Rc>>>>, last_client_id: Rc>){ + + println!("started tcp"); + + let my_id; + { + let mut reference = last_client_id.borrow_mut(); + *reference = *reference + 1; + my_id = *reference; + } + + + let client_notify = Rc::new(Notify::new()); + let client_notify_udp = Rc::new(Notify::new()); + let client = Rc::new(RefCell::new(Client{ + id: my_id, + username: String::from(""), + logged_in: false, + roomname: String::from(""), + application: String::from(""), + groups: HashMap::new(), + ip: socket.peer_addr().unwrap().ip(), + udp_port: 0 as u16, + message_queue: vec![], + message_queue_udp: vec![], + notify: client_notify.clone(), + notify_udp: client_notify_udp.clone(), + is_master: false + })); + + + + { + println!("Spawned client handler = {}",my_id); + } + { + let temp_client = client.clone(); + let mut clients_temp = clients.borrow_mut(); + clients_temp.insert(my_id,temp_client); + } + + + + let read_async = client_read(client.clone(), socket.clone(), clients.clone(), rooms.clone()).fuse(); + let write_async = client_write(client.clone(), socket, client_notify.clone()).fuse(); + let write_async_udp = client_write_udp(client.clone(), udp_socket.clone(), client_notify_udp.clone()).fuse(); + + pin_mut!(read_async,write_async,write_async_udp); //not sure why this is necessary, since select + + select! { // + () = read_async => println!("read async ended"), + () = write_async => println!("write async ended"), + () = write_async_udp => println!("write async udp ended") + } + + + + { + client_leave_room(client.clone(), false, rooms.clone()); + let mut clients_temp = clients.borrow_mut(); + clients_temp.remove(&client.borrow().id); + } + { + println!("Client {} left",client.borrow().id); + } + +} + +async fn client_read(client: Rc>, mut socket: TcpStream, clients: Rc>>>>, rooms: Rc>>>>){ + + let mut buf = [0; 1]; + + loop { + match socket.read(&mut buf).await { + // socket closed + Ok(n) if n == 0 => { + return; + }, + Ok(n) => { + + let t = buf[0]; + if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] + read_login_message(socket.clone(), client.clone()).await; + } else if t == FromClientTCPMessageType::GetRooms as u8 {//[1:u8] + read_rooms_message(socket.clone(), client.clone(), rooms.clone()).await; + } else if t == FromClientTCPMessageType::GetRoomData as u8 { + read_roomdata_message(socket.clone(), client.clone(), rooms.clone()).await; + } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] + read_join_message(socket.clone(), client.clone(), rooms.clone()).await; + } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] + read_send_message(socket.clone(), client.clone(), rooms.clone(), t).await; + } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] + read_group_message(socket.clone(), client.clone(), clients.clone()).await; + } else { + //die...not correct protocol + println!("Incorrect protocol, killing"); + return; + } + + }, + Err(e) => { + eprintln!("failed to read from socket; err = {:?}", e); + //remove the client + return; + } + }; + } +} +async fn client_write(client: Rc>, mut socket: TcpStream, notify: Rc){ + + //wait on messages in my queue + loop { + + notify.notified().await; //there is something to write + println!("Notified"); + let mut to_write = vec![]; + { + let client_ref = client.borrow(); + to_write.extend_from_slice(&client_ref.message_queue); //must do this so that the borrow ends + } + + { + let mut client_ref = client.borrow_mut(); + client_ref.message_queue.clear(); + } + + match socket.write(&to_write).await { + Ok(_) => (), + Err(_) => return + } + + } +} + +async fn client_write_udp(client: Rc>, socket: Rc>, notify: Rc){ + + loop { + notify.notified().await; //there is something to write + println!("Notified udp"); + + + let ip; + let port; + let mut messages = vec![]; + { + let mut client_ref = client.borrow_mut(); + ip = client_ref.ip; + port = client_ref.udp_port; + for msg in client_ref.message_queue_udp.iter() { + messages.push(msg.clone()); + } + client_ref.message_queue_udp.clear(); + } + + for msg in messages.iter() { + let socket = socket.borrow(); + match socket.send_to(&msg,SocketAddr::new(ip, port)).await { + Ok(_)=> (), + Err(_) => () + } + } + + } + + + +} + +async fn process_udp(socket: Rc>,clients: Rc>>>>, rooms: Rc>>>>){ + println!("started udp"); + let mut buf = [0u8;1024]; + loop { + let socket = socket.borrow(); + let res = socket.recv_from(&mut buf).await; + + match res { + Ok(_) => (), + Err(_) => continue + } + + let (packet_size,addr) = res.unwrap(); + let t = buf[0]; + if packet_size >= 5{ + //get the client id, which has to be sent with every udp message, because you don't know where udp messages are coming from + let client_id_bytes = [buf[1],buf[2],buf[3],buf[4]]; + let client_id = u32::from_be_bytes(client_id_bytes); + + + + if t == FromClientUDPMessageType::Connect as u8 { //1 byte, 0. Nothing else. This is just to establish the udp port, Echos back the same thing sent + //connect message, respond back + + + let clients = clients.borrow(); + if clients.contains_key(&client_id){ + + let mut client = clients.get(&client_id).unwrap().borrow_mut(); + client.udp_port = addr.port(); //set the udp port to send data to + client.message_queue_udp.push(vec![0]); + client.notify_udp.notify(); + + } + + } else if t == FromClientUDPMessageType::SendMesssageOthersUnbuffered as u8 { //[3:u8][from:i32][contents:u8array] note that it must fit into the packet of 1024 bytes + + + let clients = clients.borrow(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap().borrow(); + let rooms_ref = rooms.borrow(); + if client.roomname != "" { + let room = rooms_ref[&client.roomname].borrow(); + buf[0] = ToClientUDPMessageType::DataMessage as u8; //technically unecessary, unless we change this number + for (_k,v) in room.clients.iter() { + if *_k != client_id{ + let mut msg = vec![]; + let mut v_ref = v.borrow_mut(); + msg.extend_from_slice(&buf[0..packet_size]); + v_ref.message_queue_udp.push(msg); + v_ref.notify.notify(); + } + } + } + } + + + } else if t == FromClientUDPMessageType::SendMessageAllUnbuffered as u8 { //see above + let clients = clients.borrow(); + if clients.contains_key(&client_id){ + let mut client = clients.get(&client_id).unwrap().borrow_mut(); + let rooms_ref = rooms.borrow(); + if client.roomname != "" { + let room = rooms_ref[&client.roomname].borrow(); + buf[0] = ToClientUDPMessageType::DataMessage as u8; //technically unecessary, unless we change this number + for (_k,v) in room.clients.iter() { + if *_k != client_id{ + let mut msg = vec![]; + let mut v_ref = v.borrow_mut(); + msg.extend_from_slice(&buf[0..packet_size]); + v_ref.message_queue_udp.push(msg); + v_ref.notify.notify(); + }else{ + let mut msg = vec![]; + msg.extend_from_slice(&buf[0..packet_size]); + client.message_queue_udp.push(msg); + client.notify.notify(); + } + } + } + } + } else if t == FromClientUDPMessageType::SendMessageGroupUnbuffered as u8 { //[5:byte][from:i32][group.length():u8][message:u8array] + //this one is a little different, because we don't send the group in the message, so we have to formulate another message (like a 3 message) + //send a message to a group + //read the group name + + let group_name_size = buf[5]; + let message_vec = buf[6..packet_size].to_vec(); + let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize); + let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap(); + + + let clients = clients.borrow(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap().borrow(); + if client.groups.contains_key(&group_name) { + let clients = client.groups.get(&group_name).unwrap(); + //we need to form a new message without the group name + let mut message_to_send = vec![]; + message_to_send.push(ToClientUDPMessageType::DataMessage as u8); + message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]); + message_to_send.extend(message_bytes); + + for v in clients.iter() { + let mut v_ref = v.borrow_mut(); + v_ref.message_queue_udp.push(message_to_send.clone()); + v_ref.notify.notify(); + } + } + } + } + } + + } } //this is in response to someone asking to login (this is where usernames and passwords would be processed, in theory) -fn read_login_message(stream: &mut TcpStream, client: &Arc) { +async fn read_login_message(mut stream: TcpStream, client: Rc>) { //byte,shortstring,byte,shortstring + println!("Reading login message"); - let username = read_short_string(stream); - let application = read_short_string(stream); - + let username = read_short_string(&mut stream).await; + let application = read_short_string(&mut stream).await; println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); - let mut client_user = client.username.write().unwrap(); - *client_user = username; - let mut client_application = client.application.write().unwrap(); - *client_application = application; - let mut client_loggedin = client.logged_in.write().unwrap(); - *client_loggedin = true; - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::LoggedIn as u8); - write_buf.extend_from_slice(&(client.id).to_be_bytes()); //send the client the id + { + let mut client = client.borrow_mut(); + client.username = username; + client.application = application; + client.logged_in = true; + } + - client.sender.try_send(write_buf).unwrap(); + + { + let mut client = client.borrow_mut(); + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::LoggedIn as u8); + write_buf.extend_from_slice(&(client.id).to_be_bytes()); //send the client the id + client.message_queue.extend(write_buf); + client.notify.notify(); + } } -//this is in response to a request for rooms. -fn read_rooms_message(_stream: &mut TcpStream, client: &Arc){ +async fn read_rooms_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ + println!("Reading rooms message"); let mut write_buf = vec![]; write_buf.push(ToClientTCPMessageType::RoomList as u8); //first we need to get the room names - let rooms = client.rooms_mutex.read().unwrap(); + let rooms = rooms.borrow(); + let mut client = client.borrow_mut(); let mut rooms_vec = vec![]; for (k,v) in rooms.iter() { - let app_name = client.application.read().unwrap(); + - if !k.starts_with(&app_name.to_string()) { + if !k.starts_with(&client.application.to_string()) { continue; } - let clients = v.clients.read().unwrap(); let mut iter = k.chars(); - iter.by_ref().nth(app_name.len()); + iter.by_ref().nth(client.application.len()); let application_stripped_room = iter.as_str(); - let room_string = format!("{}:{}",application_stripped_room,clients.len()); + let room_string = format!("{}:{}",application_stripped_room,v.borrow().clients.len()); rooms_vec.push(room_string); } @@ -156,25 +452,98 @@ fn read_rooms_message(_stream: &mut TcpStream, client: &Arc){ let message_len = message_bytes.len() as u32; write_buf.extend_from_slice(&(message_len).to_be_bytes()); write_buf.extend_from_slice(message_bytes); - client.sender.try_send(write_buf).unwrap(); + client.message_queue.extend_from_slice(&write_buf); + client.notify.notify(); } -fn read_roomdata_message(stream: &mut TcpStream, client: &Arc){ +async fn read_join_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ + //byte,shortstring + + println!("Reading join message"); + let short_room_name = read_short_string(&mut stream).await; + let extended_room_name; + let mut leave_room = false; + { + let client_ref = client.borrow(); + extended_room_name = format!("{}_{}", client_ref.application, short_room_name); + if client_ref.roomname != "" { + leave_room = true; + } + } + if leave_room { + //todo + client_leave_room(client.clone(), true, rooms.clone()); + } + + let mut client_ref = client.borrow_mut(); + let mut rooms_ref = rooms.borrow_mut(); + + if short_room_name.trim() == "" || short_room_name == "-1" { + return; + } + + if !rooms_ref.contains_key(&extended_room_name) { //new room, must create it + let map: HashMap>> = HashMap::new(); + let r = Rc::new(RefCell::new(Room { + name: extended_room_name.to_string(), + clients: map, + master_client: client.clone() //client is the master, since they joined first + })); + client_ref.is_master = true; + rooms_ref.insert(String::from(&extended_room_name),r); + println!("{}: {}: New room {} created",Local::now().format("%Y-%m-%d %H:%M:%S"), client_ref.application,&extended_room_name); + }else{ + client_ref.is_master = false; + } + + //the room is guaranteed to exist now, so this call can't fail + let mut room_to_join = rooms_ref[&extended_room_name].borrow_mut(); + room_to_join.clients.insert(client_ref.id,client.clone()); + println!("{}: {}: Client {} joined {}",Local::now().format("%Y-%m-%d %H:%M:%S"), client_ref.application, client_ref.id,&extended_room_name); + + client_ref.roomname = extended_room_name; //we create an option and assign it back to the room + + //send a join message to everyone in the room (except the client) + for (_k,v) in room_to_join.clients.iter() { + if *_k != client_ref.id { + send_client_join_message(v, client_ref.id, &short_room_name); + } + } + + //send a join message to the client that has all of the ids in the room + let mut ids_in_room = vec![]; + for (_k,v) in room_to_join.clients.iter() { + ids_in_room.push(*_k); + + } + send_you_joined_message(&mut *client_ref, ids_in_room, &short_room_name); + + if client_ref.is_master { + let temp = client_ref.id; + send_client_master_message(&mut *client_ref, temp); + }else{ + send_client_master_message(&mut *client_ref, room_to_join.master_client.borrow().id); + } +} + +async fn read_roomdata_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>){ //type, room_name //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... //read the room name and append the client application - let short_room_name = read_short_string(stream); - let application = client.application.read().unwrap().to_string(); - let room_name = format!("{}_{}", application, short_room_name); + + + let short_room_name = read_short_string(&mut stream).await; + let mut client_ref = client.borrow_mut(); + let room_name = format!("{}_{}", client_ref.application, short_room_name); //we need to access the rooms list - let rooms = client.rooms_mutex.read().unwrap(); - if rooms.contains_key(&room_name) { + let rooms_ref = rooms.borrow(); + if rooms_ref.contains_key(&room_name) { - let room = rooms.get(&room_name).unwrap(); + let room = rooms_ref.get(&room_name).unwrap(); //form and send the message let mut write_buf = vec![]; write_buf.push(ToClientTCPMessageType::RoomData as u8); @@ -184,357 +553,57 @@ fn read_roomdata_message(stream: &mut TcpStream, client: &Arc){ write_buf.push(roomname_bytes.len() as u8); write_buf.extend_from_slice(&roomname_bytes); - let clients = room.clients.read().unwrap(); + let clients = &room.borrow().clients; write_buf.extend_from_slice(&(clients.len() as u32).to_be_bytes()); for (_k,c) in clients.iter() { - //write out the client id (u32) and the username (short string) - write_buf.extend_from_slice(&(c.id).to_be_bytes()); - let username = c.username.read().unwrap(); - let username_bytes = username.as_bytes(); - write_buf.push(username_bytes.len() as u8); - write_buf.extend_from_slice(&username_bytes); + write_buf.extend_from_slice(&(_k).to_be_bytes()); + + if *_k != client_ref.id { + let c_ref = c.borrow(); + let username_bytes = c_ref.username.as_bytes(); + write_buf.push(username_bytes.len() as u8); + write_buf.extend_from_slice(&username_bytes); + + }else{ + let username_bytes = client_ref.username.as_bytes(); + write_buf.push(username_bytes.len() as u8); + write_buf.extend_from_slice(&username_bytes); + } + } - client.sender.try_send(write_buf).unwrap(); + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); } } - -fn send_client_master_message(to: &Arc, master_id: u32){ - //2u8, person_id_u32, room_name_len_u8, room_name_bytes - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::MasterMessage as u8); - write_buf.extend_from_slice(&(master_id).to_be_bytes()); //send everyone that the client id joined the room - let res = to.sender.try_send(write_buf); - match res { - Ok(_) => (), - Err(_) => () - } - -} - -fn send_client_join_message(to: &Arc, from: u32, room: &str){ - //2u8, person_id_u32, room_name_len_u8, room_name_bytes - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::PlayerJoined as u8); - write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id joined the room - write_buf.push(room.as_bytes().len() as u8); - write_buf.extend_from_slice(room.as_bytes()); - let res = to.sender.try_send(write_buf); - match res { - Ok(_) => (), - Err(_) => () - } -} - -fn send_you_joined_message(to: &Arc, in_room: Vec, room: &str){ - //you_joined_u8, ids_len_u32, id_list_array_u32, room_name_len_u8, room_name_bytes - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::YouJoined as u8); - write_buf.extend_from_slice(&(in_room.len() as u32).to_be_bytes()); - for id in in_room { - write_buf.extend_from_slice(&(id).to_be_bytes()); - } - write_buf.push(room.as_bytes().len() as u8); - write_buf.extend_from_slice(room.as_bytes()); - let res = to.sender.try_send(write_buf); - match res { - Ok(_) => (), - Err(_) => () - } -} - -fn send_you_left_message(to: &Arc, room: &str){ - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::YouLeft as u8); - write_buf.push(room.as_bytes().len() as u8); - write_buf.extend_from_slice(room.as_bytes()); - let res = to.sender.try_send(write_buf); - match res { - Ok(_) => (), - Err(_) => () - } -} -fn send_client_left_message(to: &Arc, from: u32, room: &str){ - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::PlayerLeft as u8); - write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id left the room - write_buf.push(room.as_bytes().len() as u8); - write_buf.extend_from_slice(room.as_bytes()); - let res = to.sender.try_send(write_buf); - match res { - Ok(_) => (), - Err(_) => () - } -} - - -//helper function, because clients leave room in multiple places -fn client_leave_room(client: &Arc, send_to_client: bool){ - //first remove the client from the room they are in - - { - let room = client.room.read().unwrap(); //I need to get the room, because I'll be modifying the clients in it - if room.is_some(){ - { - let mut change_master = false; - let mut new_master_id = 0; - { - println!("{}: {}: Client {} in room, leaving",Local::now().format("%Y-%m-%d %H:%M:%S"), client.application.read().unwrap().to_string(),client.id); - } - let room = room.as_ref().unwrap(); - - //may have to choose a new master - - { - - let clients = room.clients.read().unwrap(); - let master_client = room.master_client.read().unwrap(); - if master_client.id == client.id { - //change the master - change_master = true; - - } - - for (_k,v) in clients.iter() { - if !send_to_client && v.id == client.id{ - continue; - }else if v.id == client.id { - send_you_left_message(v, &room.name); - }else{ - send_client_left_message(v, client.id, &room.name); - } - } - } - - { - let mut clients = room.clients.write().unwrap(); - clients.remove(&client.id); //remove the client from that list in the room - } - - let clients = room.clients.read().unwrap(); - - //if the room is empty, destroy it as well - - if clients.len() == 0 { - let mut rooms = client.rooms_mutex.write().unwrap(); - rooms.remove(&room.name); - { - println!("{}: {}: Destroyed room {}",Local::now().format("%Y-%m-%d %H:%M:%S"), client.application.read().unwrap().to_string(), &room.name) - } - }else if change_master{ - - for (_k,v) in clients.iter() { - if v.id != client.id { - new_master_id = v.id; - break; - } - } - - { - println!("{}: {}: Changing master to {}",Local::now().format("%Y-%m-%d %H:%M:%S"),client.application.read().unwrap().to_string(), new_master_id); - } - for (_k,v) in clients.iter() { - send_client_master_message(&v, new_master_id); - } - { - let mut master_client = room.master_client.write().unwrap(); - *master_client = clients.get(&new_master_id).unwrap().clone(); - } - - } - } - } - } - { - let mut room = client.room.write().unwrap(); - *room = Option::None; - } - - -} - -fn read_join_message(stream: &mut TcpStream, client: &Arc){ - //byte,shortstring - let short_room_name = read_short_string(stream); - let application = client.application.read().unwrap().to_string(); - let extended_room_name = format!("{}_{}", application, short_room_name); - - - //if the client is in a room, leave it - let mut leave_room = false; - { - let room = client.room.read().unwrap(); //must release this mutex before calling into a function that uses it - if room.as_ref().is_some(){ - leave_room = true; - } - } - if leave_room { - client_leave_room(client, true); - } - - if short_room_name.trim() == "" || short_room_name == "-1" { - return; - } - - - //join room_name - { - { - let mut rooms = client.rooms_mutex.write().unwrap(); - if !rooms.contains_key(&extended_room_name) { //new room, must create it - let map: HashMap> = HashMap::new(); - let r = Arc::new(Room { - name: extended_room_name.to_string(), - clients: RwLock::new(map), - master_client: Arc::new(RwLock::new(client.clone())) //client is the master, since they joined first - }); - - rooms.insert(String::from(&extended_room_name),r); - println!("{}: {}: New room {} created",Local::now().format("%Y-%m-%d %H:%M:%S"), application,&extended_room_name); - } - //the room is guaranteed to exist now, so this call can't fail - let room_to_join = &rooms[&extended_room_name]; - let mut clients = room_to_join.clients.write().unwrap(); - clients.insert(client.id,client.clone()); - println!("{}: {}: Client {} joined {}",Local::now().format("%Y-%m-%d %H:%M:%S"), application, client.id,&extended_room_name); - let mut room = client.room.write().unwrap(); - *room = Some(room_to_join.clone()); //we create an option and assign it back to the room - } - - //once the client is in the room, it can't suddenly die, so we can release the write lock, and can use the extended_room name without issue - - { - let rooms = client.rooms_mutex.read().unwrap(); - let clients = rooms[&extended_room_name].clients.read().unwrap(); //only need a read lock now - //send a join message to everyone in the room (except the client) - for (_k,v) in clients.iter() { - if v.id != client.id { - send_client_join_message(v, client.id, &short_room_name); - } - } - - //send a join message to the client that has all of the ids in the room - let mut ids_in_room = vec![]; - for (_k,v) in clients.iter() { - ids_in_room.push(v.id); - - } - send_you_joined_message(client, ids_in_room, &short_room_name); - } - - - let room = client.room.read().unwrap(); - //tell the client who the master is - let master_client = room.as_ref().unwrap().master_client.read().unwrap(); - send_client_master_message(client, master_client.id); - - } - -} - -// function send_message_to_clients_dictionary(clients: message: &Vec, include_sender: bool){ - - - - -// } - -fn send_room_message(sender: &Arc, message: &Vec, include_sender: bool, ordered: bool){ - //this message is 3u8, sender_id_u32, message_len_u32, message_bytes - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::DataMessage as u8); - write_buf.extend_from_slice(&sender.id.to_be_bytes()); - write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); - write_buf.extend_from_slice(message); - //println!("sending {} bytes from {}",message.len(),sender.id); - { - - if !ordered { - let room = sender.room.read().unwrap(); - - if room.is_some() { - - let clients = room.as_ref().unwrap().clients.read().unwrap(); - for (_k,v) in clients.iter(){ - if !include_sender && v.id == sender.id { - continue; - } - match v.sender.try_send(write_buf.clone()){ - Ok(_) => (), - Err(x) => println!("{}: {}: Error sending to client {}: {}",Local::now().format("%Y-%m-%d %H:%M:%S"), v.application.read().unwrap().to_string(),v.id,x) - } //this sometimes fails. - - } - } - }else{ //I'm bad at rust, so I don't know how else to do this other than repeat the code above because the types are so different - let room = sender.room.write().unwrap(); - - if room.is_some() { - - let clients = room.as_ref().unwrap().clients.read().unwrap(); - for (_k,v) in clients.iter(){ - if !include_sender && v.id == sender.id { - continue; - } - match v.sender.try_send(write_buf.clone()){ - Ok(_) => (), - Err(x) => println!("{}: {}: Error sending to client {}: {}",Local::now().format("%Y-%m-%d %H:%M:%S"),v.application.read().unwrap().to_string(),v.id,x) - } //this sometimes fails. - - } - } - } - } -} -fn send_group_message(sender: &Arc, message: &Vec, group: &String){ - let mut write_buf = vec![]; - write_buf.push(ToClientTCPMessageType::DataMessage as u8); - write_buf.extend_from_slice(&sender.id.to_be_bytes()); - write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); - write_buf.extend_from_slice(message); - - //get the list of client ids for this group - let groups = sender.groups.read().unwrap(); - if groups.contains_key(group) { - let group = groups.get(group).unwrap(); - for c in group { - //there may be a leftover when a client leaves...will fix itself - - match c.sender.try_send(write_buf.clone()) { - Ok(_) => (), - Err(_) => () - } - } - } - -} -fn read_send_message(stream: &mut TcpStream, client: &Arc, message_type: u8){ +async fn read_send_message(mut stream: TcpStream, client: Rc>, rooms: Rc>>>>, message_type: u8){ //4 byte length, array //this is a message for everyone in the room (maybe) - let to_send = read_vec(stream); + let to_send = read_vec(&mut stream).await; if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { - send_room_message(client,&to_send,false,false); + send_room_message(client,&to_send,rooms.clone(),false,false); }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { - send_room_message(client,&to_send,true,false); + send_room_message(client,&to_send,rooms.clone(),true,false); } else if message_type == FromClientTCPMessageType::SendMessageOthersBuffered as u8 { //ordered - send_room_message(client,&to_send,false,true); + send_room_message(client,&to_send,rooms.clone(),false,true); }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered - send_room_message(client,&to_send,true,true); + send_room_message(client,&to_send,rooms.clone(),true,true); }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { - let group = read_short_string(stream); + let group = read_short_string(&mut stream).await; send_group_message(client,&to_send, &group); } } -fn read_group_message(stream: &mut TcpStream, client: &Arc){ +async fn read_group_message(mut stream: TcpStream, client: Rc>, clients: Rc>>>>){ - let group = read_short_string(stream); - let id_bytes = read_vec(stream); + let group = read_short_string(&mut stream).await; + let id_bytes = read_vec(&mut stream).await; let num = id_bytes.len(); - let mut groups = client.groups.write().unwrap(); - let clients = client.clients_mutex.read().unwrap(); + let mut client_ref = client.borrow_mut(); + + let clients_ref = clients.borrow(); let mut group_clients = vec![]; let mut i = 0; loop { @@ -549,7 +618,7 @@ fn read_group_message(stream: &mut TcpStream, client: &Arc){ let id = u32::from_be_bytes(slice); - match clients.get(&id) { + match clients_ref.get(&id) { Some(client) => {group_clients.push(client.clone());}, None => () //not there, so don't add it } @@ -558,289 +627,230 @@ fn read_group_message(stream: &mut TcpStream, client: &Arc){ } //delete the group if it exists - if groups.contains_key(&group) { - groups.remove(&group); //ensures the client references go away + if client_ref.groups.contains_key(&group) { + client_ref.groups.remove(&group); //ensures the client references go away } - groups.insert(group.clone(),group_clients); + client_ref.groups.insert(group.clone(),group_clients); } -fn client_read_thread(mut stream: TcpStream, mut client: Arc) { - let mut read_buf:[u8;1] = [0; 1]; - //messages come through as a 1 byte type identifier, that can be one of 0 (login) 1 (get rooms), 2 (join/leave room) 3 (send message to room), 4 (send message to room including me), 5 (send message to group), 6 (establish group) - loop { +fn client_leave_room(client: Rc>, send_to_client: bool, rooms: Rc>>>>){ + //first remove the client from the room they are in + + { - //read exactly 1 byte - stream.read_exact(&mut read_buf).unwrap(); - //println!("Got a message {}",read_buf[0]); - let t = read_buf[0]; - if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] - read_login_message(&mut stream, &mut client); - } else if t == FromClientTCPMessageType::GetRooms as u8 {//[1:u8] - read_rooms_message(&mut stream, &mut client); - } else if t == FromClientTCPMessageType::GetRoomData as u8 { - read_roomdata_message(&mut stream, &mut client); - } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] - read_join_message(&mut stream, &mut client); - } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || - t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || - t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] - read_send_message(&mut stream, &client, t); - } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] - read_group_message(&mut stream, &client); - } else { - //die...not correct protocol - println!("Incorrect protocol, killing"); + let mut client_ref = client.borrow_mut(); + let roomname = String::from(client_ref.roomname.clone()); + + if roomname == "" { //client not in room, leave return; } - - std::io::stdout().flush().unwrap(); - } -} - -fn client_write_thread(mut stream: TcpStream, rx: Receiver> ) { - - //wait on messages in my queue - loop { - let m = rx.recv().unwrap(); - //println!("Sending a message {}",m.len()); - if m.len() == 1{ - break; + let mut new_master_id = 0; + { + println!("{}: {}: Client {} in room, leaving",Local::now().format("%Y-%m-%d %H:%M:%S"), client_ref.application,client_ref.id); } - stream.write(&m).unwrap(); - } -} + + let mut destroy_room = false; + { + let mut rooms_ref = rooms.borrow_mut(); + let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut(); - -fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc>>>, rooms_mutex: Arc>>>,tcp_timeout: u64,tcp_send_buffer:usize){ - - stream.set_nodelay(true).unwrap(); - stream.set_read_timeout(Some(time::Duration::new(tcp_timeout,0))).unwrap(); - stream.set_write_timeout(Some(time::Duration::new(tcp_timeout,0))).unwrap(); - println!("{}: Accepted new connection, assigned client id {}",Local::now().format("%Y-%m-%d %H:%M:%S"),client_id); - - let (tx, rx) = mpsc::sync_channel(tcp_send_buffer); //the server is very fast. However, if the clients throttle the sending, this buffer may overflow, and when it does, data is lost - //create a new client structure and add it to the list of clients - let client = Arc::new(Client{ - id: client_id, - username: Arc::new(RwLock::new(String::from(""))), - logged_in: Arc::new(RwLock::new(false)), - room: Arc::new(RwLock::new(Option::None)), - application: Arc::new(RwLock::new(String::from(""))), - sender: tx, - rooms_mutex: rooms_mutex.clone(), - clients_mutex: clients_mutex.clone(), - groups: Arc::new(RwLock::new(HashMap::new())), - ip: Arc::new(RwLock::new(stream.peer_addr().unwrap().ip())), - port: Arc::new(RwLock::new(0)) - }); - - { - let mut clients = clients_mutex.write().unwrap(); - clients.insert(client_id, client.clone()); - } - - - let read_clone = stream.try_clone().expect("clone failed"); - let read_client = client.clone(); - let read_handle = thread::spawn(move ||{client_read_thread(read_clone, read_client)}); - let write_handle = thread::spawn(move ||{client_write_thread(stream, rx)}); - - //handle writing to the thread as needed - - match read_handle.join(){ - Ok(_)=>(), - Err(_)=>() - } - - match client.sender.try_send(vec![0]){ //force send thread to exit - Ok(_)=>(), - Err(_)=>() - } - - match write_handle.join() { - Ok(_)=>(), - Err(_)=>() - } - println!("{}: {}: Client {} left",Local::now().format("%Y-%m-%d %H:%M:%S"),client.application.read().unwrap().to_string(),client_id); - //now we can kill the client. - { - //make sure we remove the client from all rooms - client_leave_room(&client, false); - let mut clients = clients_mutex.write().unwrap(); - clients.remove(&client_id); - } - -} - -fn tcp_listen(client_mutex: Arc>>>, room_mutex: Arc>>>,port:u16,tcp_timeout:u64,tcp_send_buffer:usize){ - println!("{}: Started TCP Listener",Local::now().format("%Y-%m-%d %H:%M:%S")); - let listener = TcpListener::bind(format!("0.0.0.0:{}",port)).expect("could not bind port"); - - let mut next_client_id = 0; - // accept connections and process them serially - for stream in listener.incoming() { - let client_mutex = Arc::clone(&client_mutex); - let room_mutex = Arc::clone(&room_mutex); - thread::spawn(move || {handle_client(stream.unwrap(), next_client_id, client_mutex, room_mutex,tcp_timeout,tcp_send_buffer)}); - next_client_id+=1; - } - println!("{}: Ended TCP Listener",Local::now().format("%Y-%m-%d %H:%M:%S")); -} - -fn udp_listen(client_mutex: Arc>>>, _room_mutex: Arc>>>,port:u16){ - let mut buf = [0u8;1024]; - let s = UdpSocket::bind(format!("0.0.0.0:{}",port)).unwrap(); - println!("{}: UDP Thread Started",Local::now().format("%Y-%m-%d %H:%M:%S")); - loop { - let res = s.recv_from(&mut buf); - match res { - Ok(_) => (), - Err(_) => continue - } - let (packet_size,addr) = res.unwrap(); - let t = buf[0]; - if packet_size >= 5{ - //get the client id, which has to be sent with every udp message, because you don't know where udp messages are coming from - let client_id_bytes = [buf[1],buf[2],buf[3],buf[4]]; - let client_id = u32::from_be_bytes(client_id_bytes); - - - - if t == FromClientUDPMessageType::Connect as u8 { //1 byte, 0. Nothing else. This is just to establish the udp port, Echos back the same thing sent - //connect message, respond back - { - let clients = client_mutex.read().unwrap(); - if clients.contains_key(&client_id){ - let client = clients.get(&client_id).unwrap(); - let mut port = client.port.write().unwrap(); - *port = addr.port(); //set the udp port to send data to - buf[0] = ToClientUDPMessageType::Connected as u8; - match s.send_to(&buf,addr) { - Ok(_)=>(), - Err(_)=>() - } - } - } - - - } else if t == FromClientUDPMessageType::SendMesssageOthersUnbuffered as u8 { //[3:u8][from:i32][contents:u8array] note that it must fit into the packet of 1024 bytes - { - let clients = client_mutex.read().unwrap(); - if clients.contains_key(&client_id){ - let client = clients.get(&client_id).unwrap(); - let room_option = client.room.read().unwrap(); - let room = room_option.as_ref().unwrap(); - let room_clients = room.clients.read().unwrap(); //we finally got to the room! - buf[0] = ToClientUDPMessageType::DataMessage as u8; //technically unecessary, unless we change this number - for (_k,v) in room_clients.iter() { - if v.id != client_id{ - let ip = v.ip.read().unwrap(); - let port = v.port.read().unwrap(); - match s.send_to(&buf,SocketAddr::new(*ip, *port)) { - Ok(_)=> (), - Err(_) => () - } - } - } - } - } - - } else if t == FromClientUDPMessageType::SendMessageAllUnbuffered as u8 { //see above - { - let clients = client_mutex.read().unwrap(); - if clients.contains_key(&client_id){ - let client = clients.get(&client_id).unwrap(); - let room_option = client.room.read().unwrap(); - let room = room_option.as_ref().unwrap(); - let room_clients = room.clients.read().unwrap(); //we finally got to the room! - buf[0] = ToClientUDPMessageType::DataMessage as u8; //messages are always 3s, even though this came in as 4 - for (_k,v) in room_clients.iter() { - - let ip = v.ip.read().unwrap(); - let port = v.port.read().unwrap(); - match s.send_to(&buf,SocketAddr::new(*ip, *port)) { - Ok(_)=> (), - Err(_) => () - } - - } - } - } - } else if t == FromClientUDPMessageType::SendMessageGroupUnbuffered as u8 { //[5:byte][from:i32][group.length():u8][message:u8array] - //this one is a little different, because we don't send the group in the message, so we have to formulate another message (like a 3 message) - //send a message to a group - //read the group name - - let group_name_size = buf[5]; - let message_vec = buf[6..packet_size].to_vec(); - let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize); - let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap(); - let clients = client_mutex.read().unwrap(); - if clients.contains_key(&client_id){ - let client = clients.get(&client_id).unwrap(); - let groups = client.groups.read().unwrap(); - if groups.contains_key(&group_name) { - - - let clients = groups.get(&group_name).unwrap(); - - - - //we need to form a new message without the group name - let mut message_to_send = vec![]; - message_to_send.push(ToClientUDPMessageType::DataMessage as u8); - message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]); - message_to_send.extend(message_bytes); - - for v in clients.iter() { - - let ip = v.ip.read().unwrap(); - let port = v.port.read().unwrap(); - - match s.send_to(&message_to_send,SocketAddr::new(*ip, *port)) { - Ok(_)=> (), - Err(_) => () - } - } - } + for (_k,v) in room_ref.clients.iter() { + if *_k != client_ref.id { + send_client_left_message(v, client_ref.id, &roomname); } } - + if send_to_client && client_ref.roomname != "" { + send_you_left_message(&mut *client_ref, &roomname); + } + + room_ref.clients.remove(&client_ref.id); //remove the client from that list in the room + if room_ref.clients.len() == 0 { + destroy_room = true; + } } + //if the room is empty, destroy it as well + let mut rooms_ref = rooms.borrow_mut(); + if destroy_room { + + rooms_ref.remove(&roomname); + println!("{}: {}: Destroyed room {}",Local::now().format("%Y-%m-%d %H:%M:%S"), client_ref.application, &roomname) + }else if client_ref.is_master{ //we need to change the master! + let mut room_ref = rooms_ref.get(&roomname).unwrap().borrow_mut(); + for (_k,v) in room_ref.clients.iter() { + if *_k != client_ref.id { + new_master_id = v.borrow().id; + break; + } + } + + println!("{}: {}: Changing master to {}",Local::now().format("%Y-%m-%d %H:%M:%S"),client_ref.application, new_master_id); + + for (_k,v) in room_ref.clients.iter() { + let mut c = v.borrow_mut(); + send_client_master_message(&mut *c, new_master_id); + } + room_ref.master_client = room_ref.clients.get(&new_master_id).unwrap().clone(); + } + } + let mut client_ref = client.borrow_mut(); + client_ref.roomname = String::from(""); +} + +fn send_you_left_message(client_ref: &mut Client, room: &str){ + println!("Sending ou left message {}", room); + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::YouLeft as u8); + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); +} +fn send_client_left_message(to: &Rc>, from: u32, room: &str){ + println!("Sending client left message {} {}",from,room); + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::PlayerLeft as u8); + write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id left the room + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let mut client_ref = to.borrow_mut(); + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); +} + +fn send_client_join_message(to: &Rc>, from: u32, room: &str){ + println!("Sending client joined message {} {}",from,room); + //2u8, person_id_u32, room_name_len_u8, room_name_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::PlayerJoined as u8); + write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id joined the room + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let mut client_ref = to.borrow_mut(); + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); + +} + +fn send_you_joined_message(client_ref: &mut Client, in_room: Vec, room: &str){ + //you_joined_u8, ids_len_u32, id_list_array_u32, room_name_len_u8, room_name_bytes + println!("Sending you joined message {}",room); + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::YouJoined as u8); + write_buf.extend_from_slice(&(in_room.len() as u32).to_be_bytes()); + for id in in_room { + write_buf.extend_from_slice(&(id).to_be_bytes()); + } + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); +} + +fn send_client_master_message(client_ref: &mut Client, master_id: u32){ + println!("Sending client master message {}",master_id); + //2u8, person_id_u32, room_name_len_u8, room_name_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::MasterMessage as u8); + write_buf.extend_from_slice(&(master_id).to_be_bytes()); //send everyone that the client id joined the room + client_ref.message_queue.extend_from_slice(&write_buf); + client_ref.notify.notify(); + +} + +fn send_room_message(sender: Rc>, message: &Vec, rooms: Rc>>>>, include_sender: bool, ordered: bool){ + //this message is 3u8, sender_id_u32, message_len_u32, message_bytes + + let mut write_buf = vec![]; + + + let mut sender_ref = sender.borrow_mut(); + write_buf.push(ToClientTCPMessageType::DataMessage as u8); + write_buf.extend_from_slice(&sender_ref.id.to_be_bytes()); + write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); + write_buf.extend_from_slice(message); + + + if sender_ref.roomname=="" { + return; } - println!("{}: UDP Thread Ended",Local::now().format("%Y-%m-%d %H:%M:%S")); - + if include_sender { + sender_ref.message_queue.extend_from_slice(&write_buf); + sender_ref.notify.notify(); + } + + let rooms_ref = rooms.borrow(); + let room_ref = rooms_ref[&sender_ref.roomname].borrow(); + + for (_k,v) in room_ref.clients.iter(){ + if !include_sender && *_k == sender_ref.id { + continue; + } + + let mut temp_mut = v.borrow_mut(); + temp_mut.message_queue.extend_from_slice(&write_buf); + temp_mut.notify.notify(); + + } + + } +fn send_group_message(sender: Rc>, message: &Vec, group: &String){ + let mut write_buf = vec![]; + let sender_ref = sender.borrow(); + write_buf.push(ToClientTCPMessageType::DataMessage as u8); + write_buf.extend_from_slice(&sender_ref.id.to_be_bytes()); + write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); + write_buf.extend_from_slice(message); -fn main() { - println!("{}: VelNet Server Starting",Local::now().format("%Y-%m-%d %H:%M:%S")); - - //read the config file - let foo = fs::read_to_string("config.txt").unwrap(); - let config: Config = serde_json::from_str(&foo).unwrap(); - println!("{}",config.port); + //get the list of client ids for this group + let groups = &sender_ref.groups; + if groups.contains_key(group) { + let group = groups.get(group).unwrap(); + for c in group { + let mut temp_mut = c.borrow_mut(); + temp_mut.message_queue.extend_from_slice(&write_buf); + temp_mut.notify.notify(); + } + } - let clients: HashMap> = HashMap::new(); - let rooms: HashMap> = HashMap::new(); - let client_mutex = Arc::new(RwLock::new(clients)); - let room_mutex = Arc::new(RwLock::new(rooms)); - - //start the UDP thread - let udp_clients = Arc::clone(&client_mutex); - let udp_rooms = Arc::clone(&room_mutex); - let udp_handle = thread::spawn(move ||{udp_listen(udp_clients, udp_rooms, config.port);}); - //start the TCP thread - tcp_listen(client_mutex, room_mutex,config.port,config.tcp_timeout,config.tcp_send_buffer); - udp_handle.join().unwrap(); - println!("{}: VelNet Ended", Local::now().format("%Y-%m-%d %H:%M:%S")); } + +async fn read_u8(stream: &mut TcpStream) -> u8 { + let mut buf = [0; 1]; + stream.read_exact(&mut buf).await.unwrap(); + return buf[0]; +} +async fn read_u32(stream: &mut TcpStream) -> u32 { + let mut buf:[u8;4] = [0; 4]; + stream.read_exact(&mut buf).await.unwrap(); + let size = u32::from_be_bytes(buf); + return size; +} +async fn _read_string(stream: &mut TcpStream) -> String { + let size = read_u32(stream).await; + let mut string_bytes = vec![0;size as usize]; + stream.read_exact(&mut string_bytes).await.unwrap(); + return String::from_utf8(string_bytes).unwrap(); +} +async fn read_short_string(stream: &mut TcpStream) -> String { + let size = read_u8(stream).await; + let mut string_bytes = vec![0;size as usize]; + stream.read_exact(&mut string_bytes).await.unwrap(); + return String::from_utf8(string_bytes).unwrap(); +} + +async fn read_vec(stream: &mut TcpStream) -> Vec { + let message_size = read_u32(stream).await; + let mut message = vec![0u8;message_size as usize]; + stream.read_exact(&mut message).await.unwrap(); + return message; +} + diff --git a/src/main_save.rs b/src/main_save.rs new file mode 100644 index 0000000..d6859c9 --- /dev/null +++ b/src/main_save.rs @@ -0,0 +1,846 @@ +extern crate chrono; +extern crate serde; +extern crate serde_json; +use std::io::prelude::*; +use std::thread; +use std::net::{TcpListener, TcpStream,UdpSocket,IpAddr,SocketAddr}; +use std::collections::HashMap; +use std::sync::{Arc,RwLock}; +use std::sync::mpsc; +use std::sync::mpsc::{SyncSender,Receiver}; +use chrono::Local; +use std::fs; +use std::time; +use serde::{Serialize, Deserialize}; +enum ToClientTCPMessageType { + LoggedIn = 0, + RoomList = 1, + PlayerJoined = 2, + DataMessage = 3, + MasterMessage = 4, + YouJoined = 5, + PlayerLeft = 6, + YouLeft = 7, + RoomData = 8 +} + +enum FromClientTCPMessageType { + LogIn = 0, + GetRooms = 1, + JoinRoom = 2, + SendMessageOthersUnbuffered = 3, + SendMessageAllUnbuffered = 4, + SendMessageGroupUnbuffered = 5, + CreateGroup = 6, + SendMessageOthersBuffered = 7, + SendMessageAllBuffered = 8, + GetRoomData =9 +} + +enum ToClientUDPMessageType { + Connected = 0, + DataMessage = ToClientTCPMessageType::DataMessage as isize +} +enum FromClientUDPMessageType { + Connect = 0, + SendMesssageOthersUnbuffered = FromClientTCPMessageType::SendMessageOthersUnbuffered as isize, + SendMessageAllUnbuffered = FromClientTCPMessageType::SendMessageAllUnbuffered as isize, + SendMessageGroupUnbuffered = FromClientTCPMessageType::SendMessageGroupUnbuffered as isize +} +struct Client { + logged_in: Arc>, + id: u32, + username: Arc>, + application: Arc>, + room: Arc>>>, + sender: SyncSender>, + rooms_mutex: Arc>>>, + clients_mutex: Arc>>>, + groups: Arc>>>>, + ip: Arc>, + port: Arc> +} + +struct Room { + name: String, + clients: RwLock>>, + master_client: Arc>> +} +#[derive(Serialize, Deserialize)] +struct Config { + port: u16, + tcp_timeout: u64, + tcp_send_buffer: usize +} + + +fn read_u8(stream: &mut TcpStream) -> u8 { + let mut buf = [0; 1]; + stream.read_exact(&mut buf).unwrap(); + return buf[0]; +} +fn read_u32(stream: &mut TcpStream) -> u32 { + let mut buf:[u8;4] = [0; 4]; + stream.read_exact(&mut buf).unwrap(); + let size = u32::from_be_bytes(buf); + return size; +} +fn _read_string(stream: &mut TcpStream) -> String { + let size = read_u32(stream); + let mut string_bytes = vec![0;size as usize]; + stream.read_exact(&mut string_bytes).unwrap(); + return String::from_utf8(string_bytes).unwrap(); +} +fn read_short_string(stream: &mut TcpStream) -> String { + let size = read_u8(stream); + let mut string_bytes = vec![0;size as usize]; + stream.read_exact(&mut string_bytes).unwrap(); + return String::from_utf8(string_bytes).unwrap(); +} + +fn read_vec(stream: &mut TcpStream) -> Vec { + let message_size = read_u32(stream); + let mut message = vec![0u8;message_size as usize]; + stream.read_exact(&mut message).unwrap(); + return message; +} + +//this is in response to someone asking to login (this is where usernames and passwords would be processed, in theory) +fn read_login_message(stream: &mut TcpStream, client: &Arc) { + //byte,shortstring,byte,shortstring + + let username = read_short_string(stream); + let application = read_short_string(stream); + + println!("{}: Got application {} and userid {}",Local::now().format("%Y-%m-%d %H:%M:%S"),application,username); + let mut client_user = client.username.write().unwrap(); + *client_user = username; + let mut client_application = client.application.write().unwrap(); + *client_application = application; + let mut client_loggedin = client.logged_in.write().unwrap(); + *client_loggedin = true; + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::LoggedIn as u8); + write_buf.extend_from_slice(&(client.id).to_be_bytes()); //send the client the id + + client.sender.try_send(write_buf).unwrap(); +} + +//this is in response to a request for rooms. +fn read_rooms_message(_stream: &mut TcpStream, client: &Arc){ + + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::RoomList as u8); + //first we need to get the room names + + let rooms = client.rooms_mutex.read().unwrap(); + let mut rooms_vec = vec![]; + for (k,v) in rooms.iter() { + let app_name = client.application.read().unwrap(); + + if !k.starts_with(&app_name.to_string()) { + continue; + } + let clients = v.clients.read().unwrap(); + let mut iter = k.chars(); + + iter.by_ref().nth(app_name.len()); + let application_stripped_room = iter.as_str(); + + let room_string = format!("{}:{}",application_stripped_room,clients.len()); + rooms_vec.push(room_string); + } + + let rooms_message = rooms_vec.join(","); + let message_bytes = rooms_message.as_bytes(); + let message_len = message_bytes.len() as u32; + write_buf.extend_from_slice(&(message_len).to_be_bytes()); + write_buf.extend_from_slice(message_bytes); + client.sender.try_send(write_buf).unwrap(); + +} + +fn read_roomdata_message(stream: &mut TcpStream, client: &Arc){ + //type, room_name + //will respond with type, numclients u32, id1 u32, name_len u8, name_bytes ... + + //read the room name and append the client application + + let short_room_name = read_short_string(stream); + let application = client.application.read().unwrap().to_string(); + let room_name = format!("{}_{}", application, short_room_name); + + //we need to access the rooms list + let rooms = client.rooms_mutex.read().unwrap(); + if rooms.contains_key(&room_name) { + + let room = rooms.get(&room_name).unwrap(); + //form and send the message + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::RoomData as u8); + + + let roomname_bytes = short_room_name.as_bytes(); + write_buf.push(roomname_bytes.len() as u8); + write_buf.extend_from_slice(&roomname_bytes); + + let clients = room.clients.read().unwrap(); + write_buf.extend_from_slice(&(clients.len() as u32).to_be_bytes()); + for (_k,c) in clients.iter() { + //write out the client id (u32) and the username (short string) + write_buf.extend_from_slice(&(c.id).to_be_bytes()); + let username = c.username.read().unwrap(); + let username_bytes = username.as_bytes(); + write_buf.push(username_bytes.len() as u8); + write_buf.extend_from_slice(&username_bytes); + } + client.sender.try_send(write_buf).unwrap(); + } + +} + + +fn send_client_master_message(to: &Arc, master_id: u32){ + //2u8, person_id_u32, room_name_len_u8, room_name_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::MasterMessage as u8); + write_buf.extend_from_slice(&(master_id).to_be_bytes()); //send everyone that the client id joined the room + let res = to.sender.try_send(write_buf); + match res { + Ok(_) => (), + Err(_) => () + } + +} + +fn send_client_join_message(to: &Arc, from: u32, room: &str){ + //2u8, person_id_u32, room_name_len_u8, room_name_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::PlayerJoined as u8); + write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id joined the room + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let res = to.sender.try_send(write_buf); + match res { + Ok(_) => (), + Err(_) => () + } +} + +fn send_you_joined_message(to: &Arc, in_room: Vec, room: &str){ + //you_joined_u8, ids_len_u32, id_list_array_u32, room_name_len_u8, room_name_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::YouJoined as u8); + write_buf.extend_from_slice(&(in_room.len() as u32).to_be_bytes()); + for id in in_room { + write_buf.extend_from_slice(&(id).to_be_bytes()); + } + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let res = to.sender.try_send(write_buf); + match res { + Ok(_) => (), + Err(_) => () + } +} + +fn send_you_left_message(to: &Arc, room: &str){ + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::YouLeft as u8); + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let res = to.sender.try_send(write_buf); + match res { + Ok(_) => (), + Err(_) => () + } +} +fn send_client_left_message(to: &Arc, from: u32, room: &str){ + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::PlayerLeft as u8); + write_buf.extend_from_slice(&(from).to_be_bytes()); //send everyone that the client id left the room + write_buf.push(room.as_bytes().len() as u8); + write_buf.extend_from_slice(room.as_bytes()); + let res = to.sender.try_send(write_buf); + match res { + Ok(_) => (), + Err(_) => () + } +} + + +//helper function, because clients leave room in multiple places +fn client_leave_room(client: &Arc, send_to_client: bool){ + //first remove the client from the room they are in + + { + let room = client.room.read().unwrap(); //I need to get the room, because I'll be modifying the clients in it + if room.is_some(){ + { + let mut change_master = false; + let mut new_master_id = 0; + { + println!("{}: {}: Client {} in room, leaving",Local::now().format("%Y-%m-%d %H:%M:%S"), client.application.read().unwrap().to_string(),client.id); + } + let room = room.as_ref().unwrap(); + + //may have to choose a new master + + { + + let clients = room.clients.read().unwrap(); + let master_client = room.master_client.read().unwrap(); + if master_client.id == client.id { + //change the master + change_master = true; + + } + + for (_k,v) in clients.iter() { + if !send_to_client && v.id == client.id{ + continue; + }else if v.id == client.id { + send_you_left_message(v, &room.name); + }else{ + send_client_left_message(v, client.id, &room.name); + } + } + } + + { + let mut clients = room.clients.write().unwrap(); + clients.remove(&client.id); //remove the client from that list in the room + } + + let clients = room.clients.read().unwrap(); + + //if the room is empty, destroy it as well + + if clients.len() == 0 { + let mut rooms = client.rooms_mutex.write().unwrap(); + rooms.remove(&room.name); + { + println!("{}: {}: Destroyed room {}",Local::now().format("%Y-%m-%d %H:%M:%S"), client.application.read().unwrap().to_string(), &room.name) + } + }else if change_master{ + + for (_k,v) in clients.iter() { + if v.id != client.id { + new_master_id = v.id; + break; + } + } + + { + println!("{}: {}: Changing master to {}",Local::now().format("%Y-%m-%d %H:%M:%S"),client.application.read().unwrap().to_string(), new_master_id); + } + for (_k,v) in clients.iter() { + send_client_master_message(&v, new_master_id); + } + { + let mut master_client = room.master_client.write().unwrap(); + *master_client = clients.get(&new_master_id).unwrap().clone(); + } + + } + } + } + } + { + let mut room = client.room.write().unwrap(); + *room = Option::None; + } + + +} + +fn read_join_message(stream: &mut TcpStream, client: &Arc){ + //byte,shortstring + let short_room_name = read_short_string(stream); + let application = client.application.read().unwrap().to_string(); + let extended_room_name = format!("{}_{}", application, short_room_name); + + + //if the client is in a room, leave it + let mut leave_room = false; + { + let room = client.room.read().unwrap(); //must release this mutex before calling into a function that uses it + if room.as_ref().is_some(){ + leave_room = true; + } + } + if leave_room { + client_leave_room(client, true); + } + + if short_room_name.trim() == "" || short_room_name == "-1" { + return; + } + + + //join room_name + { + { + let mut rooms = client.rooms_mutex.write().unwrap(); + if !rooms.contains_key(&extended_room_name) { //new room, must create it + let map: HashMap> = HashMap::new(); + let r = Arc::new(Room { + name: extended_room_name.to_string(), + clients: RwLock::new(map), + master_client: Arc::new(RwLock::new(client.clone())) //client is the master, since they joined first + }); + + rooms.insert(String::from(&extended_room_name),r); + println!("{}: {}: New room {} created",Local::now().format("%Y-%m-%d %H:%M:%S"), application,&extended_room_name); + } + //the room is guaranteed to exist now, so this call can't fail + let room_to_join = &rooms[&extended_room_name]; + let mut clients = room_to_join.clients.write().unwrap(); + clients.insert(client.id,client.clone()); + println!("{}: {}: Client {} joined {}",Local::now().format("%Y-%m-%d %H:%M:%S"), application, client.id,&extended_room_name); + let mut room = client.room.write().unwrap(); + *room = Some(room_to_join.clone()); //we create an option and assign it back to the room + } + + //once the client is in the room, it can't suddenly die, so we can release the write lock, and can use the extended_room name without issue + + { + let rooms = client.rooms_mutex.read().unwrap(); + let clients = rooms[&extended_room_name].clients.read().unwrap(); //only need a read lock now + //send a join message to everyone in the room (except the client) + for (_k,v) in clients.iter() { + if v.id != client.id { + send_client_join_message(v, client.id, &short_room_name); + } + } + + //send a join message to the client that has all of the ids in the room + let mut ids_in_room = vec![]; + for (_k,v) in clients.iter() { + ids_in_room.push(v.id); + + } + send_you_joined_message(client, ids_in_room, &short_room_name); + } + + + let room = client.room.read().unwrap(); + //tell the client who the master is + let master_client = room.as_ref().unwrap().master_client.read().unwrap(); + send_client_master_message(client, master_client.id); + + } + +} + +// function send_message_to_clients_dictionary(clients: message: &Vec, include_sender: bool){ + + + + +// } + +fn send_room_message(sender: &Arc, message: &Vec, include_sender: bool, ordered: bool){ + //this message is 3u8, sender_id_u32, message_len_u32, message_bytes + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::DataMessage as u8); + write_buf.extend_from_slice(&sender.id.to_be_bytes()); + write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); + write_buf.extend_from_slice(message); + //println!("sending {} bytes from {}",message.len(),sender.id); + { + + if !ordered { + let room = sender.room.read().unwrap(); + + if room.is_some() { + + let clients = room.as_ref().unwrap().clients.read().unwrap(); + for (_k,v) in clients.iter(){ + if !include_sender && v.id == sender.id { + continue; + } + match v.sender.try_send(write_buf.clone()){ + Ok(_) => (), + Err(x) => println!("{}: {}: Error sending to client {}: {}",Local::now().format("%Y-%m-%d %H:%M:%S"), v.application.read().unwrap().to_string(),v.id,x) + } //this sometimes fails. + + } + } + }else{ //I'm bad at rust, so I don't know how else to do this other than repeat the code above because the types are so different + let room = sender.room.write().unwrap(); + + if room.is_some() { + + let clients = room.as_ref().unwrap().clients.read().unwrap(); + for (_k,v) in clients.iter(){ + if !include_sender && v.id == sender.id { + continue; + } + match v.sender.try_send(write_buf.clone()){ + Ok(_) => (), + Err(x) => println!("{}: {}: Error sending to client {}: {}",Local::now().format("%Y-%m-%d %H:%M:%S"),v.application.read().unwrap().to_string(),v.id,x) + } //this sometimes fails. + + } + } + } + } +} +fn send_group_message(sender: &Arc, message: &Vec, group: &String){ + let mut write_buf = vec![]; + write_buf.push(ToClientTCPMessageType::DataMessage as u8); + write_buf.extend_from_slice(&sender.id.to_be_bytes()); + write_buf.extend_from_slice(&(message.len() as u32).to_be_bytes()); + write_buf.extend_from_slice(message); + + //get the list of client ids for this group + let groups = sender.groups.read().unwrap(); + if groups.contains_key(group) { + let group = groups.get(group).unwrap(); + for c in group { + //there may be a leftover when a client leaves...will fix itself + + match c.sender.try_send(write_buf.clone()) { + Ok(_) => (), + Err(_) => () + } + } + } + +} +fn read_send_message(stream: &mut TcpStream, client: &Arc, message_type: u8){ + //4 byte length, array + //this is a message for everyone in the room (maybe) + let to_send = read_vec(stream); + if message_type == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 { + send_room_message(client,&to_send,false,false); + }else if message_type == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 { + send_room_message(client,&to_send,true,false); + } else if message_type == FromClientTCPMessageType::SendMessageOthersBuffered as u8 { //ordered + send_room_message(client,&to_send,false,true); + }else if message_type == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //ordered + send_room_message(client,&to_send,true,true); + }else if message_type == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 { + let group = read_short_string(stream); + send_group_message(client,&to_send, &group); + } +} + +fn read_group_message(stream: &mut TcpStream, client: &Arc){ + + let group = read_short_string(stream); + let id_bytes = read_vec(stream); + let num = id_bytes.len(); + + let mut groups = client.groups.write().unwrap(); + let clients = client.clients_mutex.read().unwrap(); + let mut group_clients = vec![]; + let mut i = 0; + loop { + if i >= num { + break; + } + let mut slice = [0u8;4]; + slice[0] = id_bytes[i]; + slice[1] = id_bytes[i+1]; + slice[2] = id_bytes[i+2]; + slice[3] = id_bytes[i+3]; //probably a better way to do this + let id = u32::from_be_bytes(slice); + + + match clients.get(&id) { + Some(client) => {group_clients.push(client.clone());}, + None => () //not there, so don't add it + } + + i = i + 4; + } + + //delete the group if it exists + if groups.contains_key(&group) { + groups.remove(&group); //ensures the client references go away + } + + groups.insert(group.clone(),group_clients); + +} + +fn client_read_thread(mut stream: TcpStream, mut client: Arc) { + let mut read_buf:[u8;1] = [0; 1]; + //messages come through as a 1 byte type identifier, that can be one of 0 (login) 1 (get rooms), 2 (join/leave room) 3 (send message to room), 4 (send message to room including me), 5 (send message to group), 6 (establish group) + loop { + + //read exactly 1 byte + stream.read_exact(&mut read_buf).unwrap(); + //println!("Got a message {}",read_buf[0]); + let t = read_buf[0]; + if t == FromClientTCPMessageType::LogIn as u8 { //[0:u8][username.length():u8][username:shortstring][password.length():u8][password:shortstring] + read_login_message(&mut stream, &mut client); + } else if t == FromClientTCPMessageType::GetRooms as u8 {//[1:u8] + read_rooms_message(&mut stream, &mut client); + } else if t == FromClientTCPMessageType::GetRoomData as u8 { + read_roomdata_message(&mut stream, &mut client); + } else if t == FromClientTCPMessageType::JoinRoom as u8 {//[2:u8][roomname.length():u8][roomname:shortstring] + read_join_message(&mut stream, &mut client); + } else if t == FromClientTCPMessageType::SendMessageOthersUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageGroupUnbuffered as u8 || + t == FromClientTCPMessageType::SendMessageOthersBuffered as u8 || + t == FromClientTCPMessageType::SendMessageAllBuffered as u8 { //others,all,group[t:u8][message.length():i32][message:u8array] + read_send_message(&mut stream, &client, t); + } else if t == FromClientTCPMessageType::CreateGroup as u8 { //[t:u8][list.lengthbytes:i32][clients:i32array] + read_group_message(&mut stream, &client); + } else { + //die...not correct protocol + println!("Incorrect protocol, killing"); + return; + } + + std::io::stdout().flush().unwrap(); + } +} + +fn client_write_thread(mut stream: TcpStream, rx: Receiver> ) { + + //wait on messages in my queue + loop { + let m = rx.recv().unwrap(); + //println!("Sending a message {}",m.len()); + if m.len() == 1{ + break; + } + stream.write(&m).unwrap(); + + } +} + + +fn handle_client(stream: TcpStream, client_id: u32, clients_mutex: Arc>>>, rooms_mutex: Arc>>>,tcp_timeout: u64,tcp_send_buffer:usize){ + + stream.set_nodelay(true).unwrap(); + stream.set_read_timeout(Some(time::Duration::new(tcp_timeout,0))).unwrap(); + stream.set_write_timeout(Some(time::Duration::new(tcp_timeout,0))).unwrap(); + println!("{}: Accepted new connection, assigned client id {}",Local::now().format("%Y-%m-%d %H:%M:%S"),client_id); + + let (tx, rx) = mpsc::sync_channel(tcp_send_buffer); //the server is very fast. However, if the clients throttle the sending, this buffer may overflow, and when it does, data is lost + //create a new client structure and add it to the list of clients + let client = Arc::new(Client{ + id: client_id, + username: Arc::new(RwLock::new(String::from(""))), + logged_in: Arc::new(RwLock::new(false)), + room: Arc::new(RwLock::new(Option::None)), + application: Arc::new(RwLock::new(String::from(""))), + sender: tx, + rooms_mutex: rooms_mutex.clone(), + clients_mutex: clients_mutex.clone(), + groups: Arc::new(RwLock::new(HashMap::new())), + ip: Arc::new(RwLock::new(stream.peer_addr().unwrap().ip())), + port: Arc::new(RwLock::new(0)) + }); + + { + let mut clients = clients_mutex.write().unwrap(); + clients.insert(client_id, client.clone()); + } + + + let read_clone = stream.try_clone().expect("clone failed"); + let read_client = client.clone(); + let read_handle = thread::spawn(move ||{client_read_thread(read_clone, read_client)}); + let write_handle = thread::spawn(move ||{client_write_thread(stream, rx)}); + + //handle writing to the thread as needed + + match read_handle.join(){ + Ok(_)=>(), + Err(_)=>() + } + + match client.sender.try_send(vec![0]){ //force send thread to exit + Ok(_)=>(), + Err(_)=>() + } + + match write_handle.join() { + Ok(_)=>(), + Err(_)=>() + } + println!("{}: {}: Client {} left",Local::now().format("%Y-%m-%d %H:%M:%S"),client.application.read().unwrap().to_string(),client_id); + //now we can kill the client. + { + //make sure we remove the client from all rooms + client_leave_room(&client, false); + let mut clients = clients_mutex.write().unwrap(); + clients.remove(&client_id); + } + +} + +fn tcp_listen(client_mutex: Arc>>>, room_mutex: Arc>>>,port:u16,tcp_timeout:u64,tcp_send_buffer:usize){ + println!("{}: Started TCP Listener",Local::now().format("%Y-%m-%d %H:%M:%S")); + let listener = TcpListener::bind(format!("0.0.0.0:{}",port)).expect("could not bind port"); + + let mut next_client_id = 0; + // accept connections and process them serially + for stream in listener.incoming() { + let client_mutex = Arc::clone(&client_mutex); + let room_mutex = Arc::clone(&room_mutex); + thread::spawn(move || {handle_client(stream.unwrap(), next_client_id, client_mutex, room_mutex,tcp_timeout,tcp_send_buffer)}); + next_client_id+=1; + } + println!("{}: Ended TCP Listener",Local::now().format("%Y-%m-%d %H:%M:%S")); +} + +fn udp_listen(client_mutex: Arc>>>, _room_mutex: Arc>>>,port:u16){ + let mut buf = [0u8;1024]; + let s = UdpSocket::bind(format!("0.0.0.0:{}",port)).unwrap(); + println!("{}: UDP Thread Started",Local::now().format("%Y-%m-%d %H:%M:%S")); + loop { + let res = s.recv_from(&mut buf); + match res { + Ok(_) => (), + Err(_) => continue + } + let (packet_size,addr) = res.unwrap(); + let t = buf[0]; + if packet_size >= 5{ + //get the client id, which has to be sent with every udp message, because you don't know where udp messages are coming from + let client_id_bytes = [buf[1],buf[2],buf[3],buf[4]]; + let client_id = u32::from_be_bytes(client_id_bytes); + + + + if t == FromClientUDPMessageType::Connect as u8 { //1 byte, 0. Nothing else. This is just to establish the udp port, Echos back the same thing sent + //connect message, respond back + { + let clients = client_mutex.read().unwrap(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap(); + let mut port = client.port.write().unwrap(); + *port = addr.port(); //set the udp port to send data to + buf[0] = ToClientUDPMessageType::Connected as u8; + match s.send_to(&buf,addr) { + Ok(_)=>(), + Err(_)=>() + } + } + } + + + } else if t == FromClientUDPMessageType::SendMesssageOthersUnbuffered as u8 { //[3:u8][from:i32][contents:u8array] note that it must fit into the packet of 1024 bytes + { + let clients = client_mutex.read().unwrap(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap(); + let room_option = client.room.read().unwrap(); + let room = room_option.as_ref().unwrap(); + let room_clients = room.clients.read().unwrap(); //we finally got to the room! + buf[0] = ToClientUDPMessageType::DataMessage as u8; //technically unecessary, unless we change this number + for (_k,v) in room_clients.iter() { + if v.id != client_id{ + let ip = v.ip.read().unwrap(); + let port = v.port.read().unwrap(); + match s.send_to(&buf,SocketAddr::new(*ip, *port)) { + Ok(_)=> (), + Err(_) => () + } + } + } + } + } + + } else if t == FromClientUDPMessageType::SendMessageAllUnbuffered as u8 { //see above + { + let clients = client_mutex.read().unwrap(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap(); + let room_option = client.room.read().unwrap(); + let room = room_option.as_ref().unwrap(); + let room_clients = room.clients.read().unwrap(); //we finally got to the room! + buf[0] = ToClientUDPMessageType::DataMessage as u8; //messages are always 3s, even though this came in as 4 + for (_k,v) in room_clients.iter() { + + let ip = v.ip.read().unwrap(); + let port = v.port.read().unwrap(); + match s.send_to(&buf,SocketAddr::new(*ip, *port)) { + Ok(_)=> (), + Err(_) => () + } + + } + } + } + } else if t == FromClientUDPMessageType::SendMessageGroupUnbuffered as u8 { //[5:byte][from:i32][group.length():u8][message:u8array] + //this one is a little different, because we don't send the group in the message, so we have to formulate another message (like a 3 message) + //send a message to a group + //read the group name + + let group_name_size = buf[5]; + let message_vec = buf[6..packet_size].to_vec(); + let (group_name_bytes, message_bytes) = message_vec.split_at(group_name_size as usize); + let group_name = String::from_utf8(group_name_bytes.to_vec()).unwrap(); + let clients = client_mutex.read().unwrap(); + if clients.contains_key(&client_id){ + let client = clients.get(&client_id).unwrap(); + let groups = client.groups.read().unwrap(); + if groups.contains_key(&group_name) { + + + let clients = groups.get(&group_name).unwrap(); + + + + //we need to form a new message without the group name + let mut message_to_send = vec![]; + message_to_send.push(ToClientUDPMessageType::DataMessage as u8); + message_to_send.extend([buf[1],buf[2],buf[3],buf[4]]); + message_to_send.extend(message_bytes); + + for v in clients.iter() { + + let ip = v.ip.read().unwrap(); + let port = v.port.read().unwrap(); + + match s.send_to(&message_to_send,SocketAddr::new(*ip, *port)) { + Ok(_)=> (), + Err(_) => () + } + } + } + } + } + + + } + + } + + println!("{}: UDP Thread Ended",Local::now().format("%Y-%m-%d %H:%M:%S")); + + + +} + +fn main_save() { + println!("{}: VelNet Server Starting",Local::now().format("%Y-%m-%d %H:%M:%S")); + + //read the config file + let foo = fs::read_to_string("config.txt").unwrap(); + let config: Config = serde_json::from_str(&foo).unwrap(); + println!("{}",config.port); + + let clients: HashMap> = HashMap::new(); + let rooms: HashMap> = HashMap::new(); + let client_mutex = Arc::new(RwLock::new(clients)); + let room_mutex = Arc::new(RwLock::new(rooms)); + + //start the UDP thread + let udp_clients = Arc::clone(&client_mutex); + let udp_rooms = Arc::clone(&room_mutex); + let udp_handle = thread::spawn(move ||{udp_listen(udp_clients, udp_rooms, config.port);}); + //start the TCP thread + tcp_listen(client_mutex, room_mutex,config.port,config.tcp_timeout,config.tcp_send_buffer); + udp_handle.join().unwrap(); + println!("{}: VelNet Ended", Local::now().format("%Y-%m-%d %H:%M:%S")); +}