From d5a5dc33a95b413b3d7051dc5e3974aa120115eb Mon Sep 17 00:00:00 2001 From: PoliEcho Date: Fri, 1 Aug 2025 14:06:47 +0200 Subject: [PATCH] fix some async issues --- Cargo.lock | 90 +++++++++++++++++++++++++++++ Cargo.toml | 1 + p.patch | 137 ++++++++++++++++++++++++++++++++++++++++++++ src/server/main.rs | 13 ++--- src/server/net.rs | 64 ++++++++------------- src/server/utils.rs | 2 +- 6 files changed, 258 insertions(+), 49 deletions(-) create mode 100644 p.patch diff --git a/Cargo.lock b/Cargo.lock index cd6911b..668c8ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -468,12 +468,48 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -493,6 +529,47 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -604,6 +681,12 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + [[package]] name = "num-traits" version = "0.2.19" @@ -748,6 +831,7 @@ dependencies = [ "cipher", "clap", "colored", + "futures", "hmac", "libc", "orx-concurrent-vec", @@ -767,6 +851,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "piper" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 782f211..5df6b93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ chrono = "0.4.41" cipher = { version = "0.4.4", features = ["block-padding", "alloc"] } clap = { version = "4.5.41", features = ["derive"] } colored = "3.0.0" +futures = "0.3.31" hmac = "0.12.1" orx-concurrent-vec = "3.6.0" pbkdf2 = "0.12.2" diff --git a/p.patch b/p.patch new file mode 100644 index 0000000..d1e7e5f --- /dev/null +++ b/p.patch @@ -0,0 +1,137 @@ +--- a/src/server/net.rs ++++ b/src/server/net.rs +@@ -59,19 +59,22 @@ pub async fn handle_request( + }; + } + +- let registration = match registration_vector ++ let registration_opt = registration_vector + .iter() +- .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists +- { +- Some(registration) => registration, +- None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src).await{ ++ .find(|elem| elem.map(|s| &s.net_id == &net_id)) ++ .cloned(); ++ ++ let registration = match registration_opt { ++ Some(registration) => registration, ++ None => { ++ match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src).await { + Ok(s) => { + #[cfg(debug_assertions)] + eprintln!("send {} bytes", s); + } + Err(e) => { + eprintln!("Error sending data: {}", e); + } +- }; ++ } + return; + }, + }; +@@ -162,19 +165,22 @@ pub async fn handle_request( + return; + }; + +- match registration_vector ++ let registration_exists = registration_vector + .iter() +- .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists +- { +- Some(_) => { +- match socket.send_to(&[ServerResponse::ID_EXISTS as u8], src).await { ++ .find(|elem| elem.map(|s| &s.net_id == &net_id)) ++ .is_some(); ++ ++ if registration_exists { ++ match socket.send_to(&[ServerResponse::ID_EXISTS as u8], src).await { ++ Ok(s) => { ++ #[cfg(debug_assertions)] ++ eprintln!("send {} bytes", s); ++ } ++ Err(e) => { ++ eprintln!("Error sending data: {}", e); ++ } ++ } ++ return; +- Ok(s) => { +- #[cfg(debug_assertions)] +- eprintln!("send {} bytes", s); +- } +- Err(e) => { +- eprintln!("Error sending data: {}", e); +- } +- }; +- return; +- } +- None => {} + } + + let salt: Option<[u8; BLOCK_SIZE as usize]>; +@@ -321,19 +327,22 @@ pub async fn handle_request( + ); + +- match registration_vector ++ let registration_opt = registration_vector + .iter() +- .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists +- { +- Some(reg) => { ++ .find(|elem| elem.map(|s| &s.net_id == &net_id)) ++ .cloned(); ++ ++ match registration_opt { ++ Some(reg) => { + let current_time = chrono::Utc::now().timestamp(); +- reg.update(|r| {r.last_heart_beat = current_time; ++ reg.update(|r| { ++ r.last_heart_beat = current_time; + match r.clients.par_iter_mut().find_any(|c| *c.client_sock_addr == *sock_addr && c.iv == iv) { + Some(c) => c.last_heart_beat = current_time, +- None => {// add new client if it isn't found ++ None => { + r.clients.par_iter().for_each(|c| {let mut send_buf: Box<[u8]> = vec![0; P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].into(); + send_buf[0] = P2PMethods::NEW_CLIENT_NOTIFY as u8; + send_buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize+ BLOCK_SIZE].copy_from_slice(&iv); + send_buf[P2PStandardDataPositions::DATA as usize..P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].copy_from_slice(&sock_addr); + let sock_clone = socket.clone(); + async move { + match sock_clone.send_to(&send_buf, c.src).await { + Ok(data_lenght) => { + #[cfg(debug_assertions)] + eprintln!("send {} bytes", data_lenght); + }, + Err(e) => eprintln!("{} failed to send data to client Error: {}", "[ERROR]".red(), e), + } + }; + }); + r.clients.push(types::Client::new(sock_addr.clone(), current_time, iv, src)); + } + }; + }); + } +- None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src).await { +- Ok(s) => { +- #[cfg(debug_assertions)] +- eprintln!("send {} bytes", s); +- } +- Err(e) => { +- eprintln!("Error sending data: {}", e); +- } +- }; return;} ++ None => { ++ match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src).await { ++ Ok(s) => { ++ #[cfg(debug_assertions)] ++ eprintln!("send {} bytes", s); ++ } ++ Err(e) => { ++ eprintln!("Error sending data: {}", e); ++ } ++ } ++ return; ++ } + } + + match socket.send_to(&[ServerMethods::HEARTBEAT as u8], src).await { diff --git a/src/server/main.rs b/src/server/main.rs index b5e5180..4a31784 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,8 +1,8 @@ mod net; mod types; mod utils; +use smol::net::UdpSocket; use std::{ - net::UdpSocket, process::exit, sync::{Arc, RwLock}, }; @@ -11,13 +11,10 @@ use orx_concurrent_vec::ConcurrentVec; fn main() -> std::io::Result<()> { { let socket: Arc = Arc::new( - (|| -> std::io::Result { + smol::block_on(async { let listen_port: u16 = pea_2_pea::SERVER_PORT; - match UdpSocket::bind(format!("0.0.0.0:{}", listen_port)) { - Ok(socket) => return Ok(socket), - Err(e) => return Err(e), - } - })() + UdpSocket::bind(format!("0.0.0.0:{}", listen_port)).await + }) .expect("Failed to bind to any available port"), ); @@ -28,7 +25,7 @@ fn main() -> std::io::Result<()> { smol::block_on(async { loop { buf.fill(0); - match socket.recv_from(&mut buf) { + match socket.recv_from(&mut buf).await { Ok((data_length, src)) => { smol::spawn(net::handle_request( buf, diff --git a/src/server/net.rs b/src/server/net.rs index 7f00ad1..f10c586 100644 --- a/src/server/net.rs +++ b/src/server/net.rs @@ -1,17 +1,29 @@ use crate::utils::send_general_error_to_client; +use smol::net::UdpSocket; use super::types; use super::utils; -use colored::Colorize; use orx_concurrent_vec::ConcurrentVec; use pea_2_pea::*; use rayon::prelude::*; use std::sync::Arc; use std::u8; + +async fn send_with_count(socket: std::sync::Arc , dst: &core::net::SocketAddr, buf: &[u8]) { + match socket.send_to(buf, dst).await { + Ok(s) => { + #[cfg(debug_assertions)] + eprintln!("send {} bytes", s); + } + Err(e) => { + eprintln!("Error snding data: {}", e); + } + } +} pub async fn handle_request( buf: [u8; UDP_BUFFER_SIZE], - socket: std::sync::Arc, + socket: std::sync::Arc, src: core::net::SocketAddr, data_len: usize, registration_vector: Arc>, @@ -25,7 +37,7 @@ pub async fn handle_request( let mut send_vec: Vec = client_sock_addr_str.into(); send_vec.insert(0, ServerMethods::QUERY as u8); - match socket.send_to(&send_vec, &src) { + match socket.send_to(&send_vec, &src).await { Ok(s) => { #[cfg(debug_assertions)] eprintln!("send {} bytes", s); @@ -63,15 +75,7 @@ pub async fn handle_request( .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists { Some(registration) => registration, - None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src){ - Ok(s) => { - #[cfg(debug_assertions)] - eprintln!("send {} bytes", s); - } - Err(e) => { - eprintln!("Error sending data: {}", e); - } - }; + None => {futures::executor::block_on(send_with_count(socket, &src ,&[ServerResponse::ID_DOESNT_EXIST as u8])); return; }, } @@ -131,7 +135,7 @@ pub async fn handle_request( return; } - match socket.send_to(&send_vec, &src) { + match socket.send_to(&send_vec, &src).await { Ok(s) => { #[cfg(debug_assertions)] eprintln!("send {} bytes", s); @@ -178,15 +182,7 @@ pub async fn handle_request( .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists { Some(_) => { - match socket.send_to(&[ServerResponse::ID_EXISTS as u8], src) { - Ok(s) => { - #[cfg(debug_assertions)] - eprintln!("send {} bytes", s); - } - Err(e) => { - eprintln!("Error sending data: {}", e); - } - }; + futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_EXISTS as u8])); return; } None => {} @@ -245,7 +241,7 @@ pub async fn handle_request( iv, src )); - match socket.send_to(&[ServerMethods::REGISTER as u8], src) { + match socket.send_to(&[ServerMethods::REGISTER as u8], src).await { Ok(s) => { #[cfg(debug_assertions)] eprintln!("send {} bytes", s); @@ -333,13 +329,9 @@ pub async fn handle_request( send_buf[0] = P2PMethods::NEW_CLIENT_NOTIFY as u8; send_buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize+ BLOCK_SIZE].copy_from_slice(&iv); send_buf[P2PStandardDataPositions::DATA as usize..P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].copy_from_slice(&sock_addr); - match socket.send_to(&send_buf, src) { - Ok(data_lenght) => { - #[cfg(debug_assertions)] - eprintln!("send {} bytes", data_lenght); - }, - Err(e) => eprintln!("{} failed to send data to client Error: {}", "[ERROR]".red(), e), - }; + let sock_clone = socket.clone(); + futures::executor::block_on(async move { + send_with_count(sock_clone, &c.src, &send_buf).await}); }); r.clients.push(types::Client::new(sock_addr.clone(), current_time, iv, src)); @@ -347,17 +339,9 @@ pub async fn handle_request( }; }); } - None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src) { - Ok(s) => { - #[cfg(debug_assertions)] - eprintln!("send {} bytes", s); - } - Err(e) => { - eprintln!("Error sending data: {}", e); - } - } return;} + None => {futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_DOESNT_EXIST as u8])); return;} } - match socket.send_to(&[ServerMethods::HEARTBEAT as u8], src) { + match socket.send_to(&[ServerMethods::HEARTBEAT as u8], src).await { // succes responce Ok(s) => { #[cfg(debug_assertions)] diff --git a/src/server/utils.rs b/src/server/utils.rs index 19d7f03..b06432b 100644 --- a/src/server/utils.rs +++ b/src/server/utils.rs @@ -2,7 +2,7 @@ use pea_2_pea::*; pub fn send_general_error_to_client( dst: core::net::SocketAddr, e: T, - socket: std::sync::Arc, + socket: std::sync::Arc, ) { let mut resp_buf: Box<[u8]> = vec![0; e.to_string().len() + 1].into_boxed_slice();