From 206012e72d21102847ede7249ca8ece3c8126456 Mon Sep 17 00:00:00 2001 From: PoliEcho Date: Sun, 3 Aug 2025 15:05:35 +0200 Subject: [PATCH] add periodic heart beat --- README.md | 17 +++++++++++++ src/client/main.rs | 4 +-- src/client/net.rs | 36 ++++++++++++++++++++++++--- src/server/main.rs | 12 ++++++--- src/server/types.rs | 60 ++------------------------------------------- src/server/utils.rs | 22 +++++++++++++++++ 6 files changed, 84 insertions(+), 67 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..75cfb5f --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +# Pea 2 Pea +very simple P2P VPN(Virtual Network yes, Private maybe), +this program is intended to help you play LAN games over internet +when all clients are behind Full-cone NAT, does not work with clients behind Symmetric NAT +at least for now + +## how to run +> install rustc and cargo or rustup, you will need 2024 edition +> build using +> ```bash +> # to build +> cargo build --release +> # to run server(registrar) +> ./target/release/server +> # to run client +> sudo ./target/release/client -r SERVER_IP -n NETWORK_ID -P PASSWORD # password is optional +> ``` \ No newline at end of file diff --git a/src/client/main.rs b/src/client/main.rs index fe95de0..be85997 100644 --- a/src/client/main.rs +++ b/src/client/main.rs @@ -59,7 +59,7 @@ fn main() -> std::io::Result<()> { exit(7); // posix for E2BIG } let mut buf: [u8; UDP_BUFFER_SIZE] = [0; UDP_BUFFER_SIZE]; - let (socket, virtual_network, my_public_sock_addr) = { + let (socket, virtual_network, _my_public_sock_addr) = { let socket: Arc = Arc::new(|| -> std::io::Result { match UdpSocket::bind("0.0.0.0:0") { // bind to OS assigned random port @@ -146,7 +146,7 @@ fn main() -> std::io::Result<()> { let _ = net::send_heartbeat( &mut buf, &server_SocketAddr, - &socket, + socket.clone(), &n, &public_sock_addr, &iv, diff --git a/src/client/net.rs b/src/client/net.rs index 9aa6c24..4041aaa 100644 --- a/src/client/net.rs +++ b/src/client/net.rs @@ -7,7 +7,7 @@ use std::{ use super::types; use colored::Colorize; use pea_2_pea::{shared::net::send_and_recv_with_retry, *}; -use rand::{RngCore, rng}; +use rand::{Rng, RngCore, rng}; use sha2::Digest; pub fn query_request( @@ -255,7 +255,7 @@ pub fn get_request( pub fn send_heartbeat( buf: &mut [u8; UDP_BUFFER_SIZE], dst: &SocketAddr, - socket: &UdpSocket, + socket: Arc, network: &types::Network, my_public_sock_addr: &Box<[u8]>, iv: &[u8; BLOCK_SIZE as usize], @@ -300,7 +300,16 @@ pub fn send_heartbeat( .collect::(), ); - match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) { + { + let sock_clone = socket.clone(); + let send_buf_clone: Box<[u8]> = send_buf.clone(); + let dst_clone: SocketAddr = dst.clone(); + std::thread::spawn(move || { + periodic_heart_beat(sock_clone, send_buf_clone, dst_clone); + }); + } + + match send_and_recv_with_retry(buf, &send_buf, dst, &socket, STANDARD_RETRY_MAX) { Ok((data_lenght, _)) => return Ok(data_lenght), Err(e) => return Err(e), } @@ -727,3 +736,24 @@ pub async fn handle_incoming_connection( } } } + +pub fn periodic_heart_beat(socket: Arc, send_buf: Box<[u8]>, dst: SocketAddr) { + loop { + std::thread::sleep(std::time::Duration::from_secs(30)); + println!("{} sending heartbeat to server", "[LOG]".blue()); + + match socket.send_to(&send_buf, dst) { + Ok(size) => { + #[cfg(debug_assertions)] + println!("send {} bytes", size); + } + Err(e) => { + eprintln!( + "{} failed to send heartbeat to server Error: {}", + "[ERROR]".red(), + e + ); + } + } + } +} diff --git a/src/server/main.rs b/src/server/main.rs index 4a31784..ab0f455 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -2,10 +2,7 @@ mod net; mod types; mod utils; use smol::net::UdpSocket; -use std::{ - process::exit, - sync::{Arc, RwLock}, -}; +use std::{process::exit, sync::Arc}; use orx_concurrent_vec::ConcurrentVec; fn main() -> std::io::Result<()> { @@ -21,6 +18,13 @@ fn main() -> std::io::Result<()> { let registration_vector: Arc> = Arc::new(orx_concurrent_vec::ConcurrentVec::new()); + { + let reg_clone = registration_vector.clone(); + std::thread::spawn(move || { + utils::disconnected_cleaner(reg_clone); + }); + } + let mut buf: [u8; pea_2_pea::UDP_BUFFER_SIZE] = [0u8; pea_2_pea::UDP_BUFFER_SIZE]; smol::block_on(async { loop { diff --git a/src/server/types.rs b/src/server/types.rs index 25b9ad9..85f20c9 100644 --- a/src/server/types.rs +++ b/src/server/types.rs @@ -41,6 +41,7 @@ pub struct Registration { pub encrypted: bool, #[readonly] pub salt: [u8; BLOCK_SIZE as usize], + pub invalid: bool, } impl Registration { @@ -64,64 +65,7 @@ impl Registration { encrypted, last_heart_beat: heart_beat, salt: salt.unwrap_or([0; BLOCK_SIZE as usize]), + invalid: false, } } } - -pub struct BatchLock { - inner: std::sync::Mutex, // true = blocking new locks - condvar: std::sync::Condvar, - active_count: std::sync::atomic::AtomicUsize, -} - -pub struct LockGuard { - lock: Arc, -} - -impl BatchLock { - pub fn new() -> Arc { - Arc::new(BatchLock { - inner: std::sync::Mutex::new(false), - condvar: std::sync::Condvar::new(), - active_count: std::sync::atomic::AtomicUsize::new(0), - }) - } - - // Acquire a lock (blocks if waiting for all to unlock) - pub fn lock(self: &Arc) -> LockGuard { - let mut blocking = self.inner.lock().unwrap(); - - // Wait while new locks are blocked - while *blocking { - blocking = self.condvar.wait(blocking).unwrap(); - } - - self.active_count.fetch_add(1, Ordering::SeqCst); - - LockGuard { - lock: Arc::clone(self), - } - } - - // Block new locks and wait for all current locks to finish - pub fn wait_all_unlock(self: &Arc) { - // Block new locks - *self.inner.lock().unwrap() = true; - - // Wait for all active locks to finish - while self.active_count.load(Ordering::SeqCst) > 0 { - std::thread::sleep(std::time::Duration::from_millis(1)); - } - - // Allow new locks again - *self.inner.lock().unwrap() = false; - self.condvar.notify_all(); - } -} - -impl Drop for LockGuard { - fn drop(&mut self) { - // Automatically release lock when guard is dropped - self.lock.active_count.fetch_sub(1, Ordering::SeqCst); - } -} diff --git a/src/server/utils.rs b/src/server/utils.rs index b06432b..6d64414 100644 --- a/src/server/utils.rs +++ b/src/server/utils.rs @@ -1,4 +1,6 @@ +use colored::Colorize; use pea_2_pea::*; + pub fn send_general_error_to_client( dst: core::net::SocketAddr, e: T, @@ -11,3 +13,23 @@ pub fn send_general_error_to_client( let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst); } + +pub fn disconnected_cleaner( + registration_vector: std::sync::Arc< + orx_concurrent_vec::ConcurrentVec, + >, +) { + loop { + std::thread::sleep(std::time::Duration::from_secs(120)); + println!("{} starting cleanup", "[LOG]".blue()); + let time_now = chrono::Utc::now().timestamp(); + unsafe { + registration_vector.iter_mut().for_each(|reg| { + reg.clients.retain(|c| time_now - c.last_heart_beat < 120); + if time_now - reg.last_heart_beat > 120 { + reg.invalid = true; + } + }) + } + } +}