add periodic heart beat
This commit is contained in:
		
							parent
							
								
									b9e36d9f8c
								
							
						
					
					
						commit
						206012e72d
					
				
							
								
								
									
										17
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,17 @@ | ||||
| # Pea 2 Pea   | ||||
| very simple P2P VPN(Virtual Network yes, Private maybe),   | ||||
| this program is intended to help you play LAN games over internet   | ||||
| when all clients are behind Full-cone NAT, does not work with clients behind Symmetric NAT   | ||||
| at least for now   | ||||
| 
 | ||||
| ## how to run   | ||||
| > install rustc and cargo or rustup, you will need 2024 edition   | ||||
| > build using   | ||||
| > ```bash | ||||
| > # to build | ||||
| > cargo build --release | ||||
| > # to run server(registrar) | ||||
| > ./target/release/server | ||||
| > # to run client | ||||
| > sudo ./target/release/client -r SERVER_IP -n NETWORK_ID -P PASSWORD # password is optional | ||||
| > ``` | ||||
| @ -59,7 +59,7 @@ fn main() -> std::io::Result<()> { | ||||
|         exit(7); // posix for E2BIG
 | ||||
|     } | ||||
|     let mut buf: [u8; UDP_BUFFER_SIZE] = [0; UDP_BUFFER_SIZE]; | ||||
|     let (socket, virtual_network, my_public_sock_addr) = { | ||||
|     let (socket, virtual_network, _my_public_sock_addr) = { | ||||
|         let socket: Arc<UdpSocket> = Arc::new(|| -> std::io::Result<UdpSocket> { | ||||
|             match UdpSocket::bind("0.0.0.0:0") { | ||||
|                 // bind to OS assigned random port
 | ||||
| @ -146,7 +146,7 @@ fn main() -> std::io::Result<()> { | ||||
|                     let _ = net::send_heartbeat( | ||||
|                         &mut buf, | ||||
|                         &server_SocketAddr, | ||||
|                         &socket, | ||||
|                         socket.clone(), | ||||
|                         &n, | ||||
|                         &public_sock_addr, | ||||
|                         &iv, | ||||
|  | ||||
| @ -7,7 +7,7 @@ use std::{ | ||||
| use super::types; | ||||
| use colored::Colorize; | ||||
| use pea_2_pea::{shared::net::send_and_recv_with_retry, *}; | ||||
| use rand::{RngCore, rng}; | ||||
| use rand::{Rng, RngCore, rng}; | ||||
| use sha2::Digest; | ||||
| 
 | ||||
| pub fn query_request( | ||||
| @ -255,7 +255,7 @@ pub fn get_request( | ||||
| pub fn send_heartbeat( | ||||
|     buf: &mut [u8; UDP_BUFFER_SIZE], | ||||
|     dst: &SocketAddr, | ||||
|     socket: &UdpSocket, | ||||
|     socket: Arc<std::net::UdpSocket>, | ||||
|     network: &types::Network, | ||||
|     my_public_sock_addr: &Box<[u8]>, | ||||
|     iv: &[u8; BLOCK_SIZE as usize], | ||||
| @ -300,7 +300,16 @@ pub fn send_heartbeat( | ||||
|             .collect::<String>(), | ||||
|     ); | ||||
| 
 | ||||
|     match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) { | ||||
|     { | ||||
|         let sock_clone = socket.clone(); | ||||
|         let send_buf_clone: Box<[u8]> = send_buf.clone(); | ||||
|         let dst_clone: SocketAddr = dst.clone(); | ||||
|         std::thread::spawn(move || { | ||||
|             periodic_heart_beat(sock_clone, send_buf_clone, dst_clone); | ||||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     match send_and_recv_with_retry(buf, &send_buf, dst, &socket, STANDARD_RETRY_MAX) { | ||||
|         Ok((data_lenght, _)) => return Ok(data_lenght), | ||||
|         Err(e) => return Err(e), | ||||
|     } | ||||
| @ -727,3 +736,24 @@ pub async fn handle_incoming_connection( | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub fn periodic_heart_beat(socket: Arc<UdpSocket>, send_buf: Box<[u8]>, dst: SocketAddr) { | ||||
|     loop { | ||||
|         std::thread::sleep(std::time::Duration::from_secs(30)); | ||||
|         println!("{} sending heartbeat to server", "[LOG]".blue()); | ||||
| 
 | ||||
|         match socket.send_to(&send_buf, dst) { | ||||
|             Ok(size) => { | ||||
|                 #[cfg(debug_assertions)] | ||||
|                 println!("send {} bytes", size); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 eprintln!( | ||||
|                     "{} failed to send heartbeat to server Error: {}", | ||||
|                     "[ERROR]".red(), | ||||
|                     e | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -2,10 +2,7 @@ mod net; | ||||
| mod types; | ||||
| mod utils; | ||||
| use smol::net::UdpSocket; | ||||
| use std::{ | ||||
|     process::exit, | ||||
|     sync::{Arc, RwLock}, | ||||
| }; | ||||
| use std::{process::exit, sync::Arc}; | ||||
| 
 | ||||
| use orx_concurrent_vec::ConcurrentVec; | ||||
| fn main() -> std::io::Result<()> { | ||||
| @ -21,6 +18,13 @@ fn main() -> std::io::Result<()> { | ||||
|         let registration_vector: Arc<ConcurrentVec<types::Registration>> = | ||||
|             Arc::new(orx_concurrent_vec::ConcurrentVec::new()); | ||||
| 
 | ||||
|         { | ||||
|             let reg_clone = registration_vector.clone(); | ||||
|             std::thread::spawn(move || { | ||||
|                 utils::disconnected_cleaner(reg_clone); | ||||
|             }); | ||||
|         } | ||||
| 
 | ||||
|         let mut buf: [u8; pea_2_pea::UDP_BUFFER_SIZE] = [0u8; pea_2_pea::UDP_BUFFER_SIZE]; | ||||
|         smol::block_on(async { | ||||
|             loop { | ||||
|  | ||||
| @ -41,6 +41,7 @@ pub struct Registration { | ||||
|     pub encrypted: bool, | ||||
|     #[readonly] | ||||
|     pub salt: [u8; BLOCK_SIZE as usize], | ||||
|     pub invalid: bool, | ||||
| } | ||||
| 
 | ||||
| impl Registration { | ||||
| @ -64,64 +65,7 @@ impl Registration { | ||||
|             encrypted, | ||||
|             last_heart_beat: heart_beat, | ||||
|             salt: salt.unwrap_or([0; BLOCK_SIZE as usize]), | ||||
|             invalid: false, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct BatchLock { | ||||
|     inner: std::sync::Mutex<bool>, // true = blocking new locks
 | ||||
|     condvar: std::sync::Condvar, | ||||
|     active_count: std::sync::atomic::AtomicUsize, | ||||
| } | ||||
| 
 | ||||
| pub struct LockGuard { | ||||
|     lock: Arc<BatchLock>, | ||||
| } | ||||
| 
 | ||||
| impl BatchLock { | ||||
|     pub fn new() -> Arc<Self> { | ||||
|         Arc::new(BatchLock { | ||||
|             inner: std::sync::Mutex::new(false), | ||||
|             condvar: std::sync::Condvar::new(), | ||||
|             active_count: std::sync::atomic::AtomicUsize::new(0), | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     // Acquire a lock (blocks if waiting for all to unlock)
 | ||||
|     pub fn lock(self: &Arc<Self>) -> LockGuard { | ||||
|         let mut blocking = self.inner.lock().unwrap(); | ||||
| 
 | ||||
|         // Wait while new locks are blocked
 | ||||
|         while *blocking { | ||||
|             blocking = self.condvar.wait(blocking).unwrap(); | ||||
|         } | ||||
| 
 | ||||
|         self.active_count.fetch_add(1, Ordering::SeqCst); | ||||
| 
 | ||||
|         LockGuard { | ||||
|             lock: Arc::clone(self), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // Block new locks and wait for all current locks to finish
 | ||||
|     pub fn wait_all_unlock(self: &Arc<Self>) { | ||||
|         // Block new locks
 | ||||
|         *self.inner.lock().unwrap() = true; | ||||
| 
 | ||||
|         // Wait for all active locks to finish
 | ||||
|         while self.active_count.load(Ordering::SeqCst) > 0 { | ||||
|             std::thread::sleep(std::time::Duration::from_millis(1)); | ||||
|         } | ||||
| 
 | ||||
|         // Allow new locks again
 | ||||
|         *self.inner.lock().unwrap() = false; | ||||
|         self.condvar.notify_all(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Drop for LockGuard { | ||||
|     fn drop(&mut self) { | ||||
|         // Automatically release lock when guard is dropped
 | ||||
|         self.lock.active_count.fetch_sub(1, Ordering::SeqCst); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,4 +1,6 @@ | ||||
| use colored::Colorize; | ||||
| use pea_2_pea::*; | ||||
| 
 | ||||
| pub fn send_general_error_to_client<T: std::error::Error>( | ||||
|     dst: core::net::SocketAddr, | ||||
|     e: T, | ||||
| @ -11,3 +13,23 @@ pub fn send_general_error_to_client<T: std::error::Error>( | ||||
| 
 | ||||
|     let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst); | ||||
| } | ||||
| 
 | ||||
| pub fn disconnected_cleaner( | ||||
|     registration_vector: std::sync::Arc< | ||||
|         orx_concurrent_vec::ConcurrentVec<crate::types::Registration>, | ||||
|     >, | ||||
| ) { | ||||
|     loop { | ||||
|         std::thread::sleep(std::time::Duration::from_secs(120)); | ||||
|         println!("{} starting cleanup", "[LOG]".blue()); | ||||
|         let time_now = chrono::Utc::now().timestamp(); | ||||
|         unsafe { | ||||
|             registration_vector.iter_mut().for_each(|reg| { | ||||
|                 reg.clients.retain(|c| time_now - c.last_heart_beat < 120); | ||||
|                 if time_now - reg.last_heart_beat > 120 { | ||||
|                     reg.invalid = true; | ||||
|                 } | ||||
|             }) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user