addtions to REGISTER request

This commit is contained in:
2025-07-26 15:03:53 +02:00
parent 2e08a3093e
commit 3bcccc7620
8 changed files with 557 additions and 102 deletions
+21 -20
View File
@@ -1,7 +1,7 @@
use pea_2_pea::SERVER_PORT;
use std::{
io::{ErrorKind, Read, Write},
io::{Error, ErrorKind, Read, Write},
net::UdpSocket,
process::exit,
time::Duration,
@@ -15,35 +15,35 @@ struct Cli {
#[arg(help = "registrar ip address or hostname")]
registrar: String,
#[arg(short = 'p', long = "registrar-port")]
#[arg(help = format!("optional Port number for the registrar service (1-65535) Default: {}", SERVER_PORT))]
registrar_port: Option<u16>,
#[arg(short = 'n', long = "network-id")]
#[arg(help = "your virtual network id that allows other people to connect to you")]
network_id: String,
#[arg(short = 'P', long = "password")]
#[arg(
help = "encryption password for your virtual network if not provided transmitions will be unencrypted"
)]
password: Option<String>,
#[arg(short = 'v', long = "verbose")]
verbose: bool,
#[arg(short = 'V', long = "version")]
version: bool,
#[arg(short = 'p', long = "registrar-port")]
#[arg(help = format!("Port number for the registrar service (1-65535) Default: {}", SERVER_PORT))]
registrar_port: Option<u16>,
#[arg(short = 'P', long = "bind-port")]
bind_port: Option<u16>,
}
fn main() -> std::io::Result<()> {
let cli = <Cli as clap::Parser>::parse();
{
let socket: UdpSocket = (|| -> std::io::Result<UdpSocket> {
let mut port: u16;
match cli.bind_port {
Some(port_proveded) => port = port_proveded,
None => port = 59999, // Magic number
}
loop {
port += 1;
match UdpSocket::bind(format!("0.0.0.0:{}", port)) {
Ok(socket) => return Ok(socket),
Err(_) => continue, // Retry on error
}
match UdpSocket::bind("0.0.0.0:0") {
// bind to OS assigned random port
Ok(socket) => return Ok(socket),
Err(e) => Err(e), // exit on error
}
})()
.expect("Failed to bind to any available port");
@@ -52,12 +52,13 @@ fn main() -> std::io::Result<()> {
// send query request to get server public key
let server_port: u16 = (|| -> u16 {
match cli.bind_port {
match cli.registrar_port {
Some(port_proveded) => return port_proveded,
None => return pea_2_pea::SERVER_PORT,
}
})();
#[allow(non_snake_case)] // i think this is valid snake case but rustc doesnt think so
let server_SocketAddr: core::net::SocketAddr = format!("{}:{}", cli.registrar, server_port)
.parse()
.unwrap();
+16 -1
View File
@@ -6,10 +6,25 @@ pub const RSA_SIZE: usize = 2048;
#[repr(u8)]
pub enum ServerMethods {
QUERY = 0,
QUERY = 0, // return IP and port of the client
REGISTER = 1,
GET = 2,
HEARTBEAT = 3,
}
pub enum ServerResponse {
// avoid 0 from empty buffers
OK = 1,
GENERAL_ERROR = 255,
ID_EXISTS = 254,
ID_DOESNT_EXIST = 253, // both error since sometimes it is the problem that the id exist and somethimes problem is that is doesn't
}
pub enum RegisterRequestDataPositions {
ENCRYPTED = 1, // this feeld should be 0 if not encrypted
ID_LEN = 2,
SOCKADDR_LEN = 3,
DATA = 4, // after this there will be id and sockaddr in string or encrypted form after
}
pub mod shared;
+19 -3
View File
@@ -1,6 +1,13 @@
mod net;
use std::{net::UdpSocket, process::exit, sync::Arc};
mod types;
mod utils;
use std::{
net::UdpSocket,
process::exit,
sync::{Arc, RwLock},
};
use orx_concurrent_vec::ConcurrentVec;
fn main() -> std::io::Result<()> {
{
let socket: Arc<UdpSocket> = Arc::new(
@@ -14,13 +21,22 @@ fn main() -> std::io::Result<()> {
.expect("Failed to bind to any available port"),
);
let registration_vector: Arc<ConcurrentVec<types::Registration>> =
Arc::new(orx_concurrent_vec::ConcurrentVec::new());
let mut buf: [u8; pea_2_pea::BUFFER_SIZE] = [0; pea_2_pea::BUFFER_SIZE];
smol::block_on(async {
loop {
match socket.recv_from(&mut buf) {
Ok((data_length, src)) => {
smol::spawn(net::handle_request(buf, socket.clone(), src, data_length))
.detach();
smol::spawn(net::handle_request(
buf,
socket.clone(),
src,
data_length,
registration_vector.clone(),
))
.detach();
}
Err(e) => {
eprintln!("Error receiving data: {}", e);
+51 -8
View File
@@ -1,19 +1,24 @@
use super::types;
use super::utils;
use orx_concurrent_vec::ConcurrentVec;
use pea_2_pea::*;
use std::sync::Arc;
pub async fn handle_request(
buf: [u8; pea_2_pea::BUFFER_SIZE],
buf: [u8; BUFFER_SIZE],
socket: std::sync::Arc<std::net::UdpSocket>,
src: core::net::SocketAddr,
data_len: usize,
registration_vector: Arc<ConcurrentVec<types::Registration>>,
) {
let mut rng: rand::prelude::ThreadRng = rand::thread_rng();
match buf[0] {
x if x == pea_2_pea::ServerMethods::QUERY as u8 => {
x if x == ServerMethods::QUERY as u8 => {
#[cfg(debug_assertions)]
eprintln!("QUERY method");
let client_sock_addr_str: String = src.to_string();
let mut send_vec: Vec<u8> = client_sock_addr_str.into();
send_vec.insert(0, pea_2_pea::ServerMethods::QUERY as u8);
send_vec.insert(0, ServerMethods::QUERY as u8);
match socket.send_to(&send_vec, &src) {
Ok(s) => {
@@ -26,15 +31,53 @@ pub async fn handle_request(
}
}
x if x == pea_2_pea::ServerMethods::GET as u8 => {
x if x == ServerMethods::GET as u8 => {
#[cfg(debug_assertions)]
println!("GET method");
}
x if x == pea_2_pea::ServerMethods::REGISTER as u8 => {
x if x == ServerMethods::REGISTER as u8 => {
#[cfg(debug_assertions)]
println!("REGISTER method");
let encrypted: bool = buf[RegisterRequestDataPositions::ENCRYPTED as usize] != 0;
//read lenght of sockaddr
// rustc be like RUST HAS NO TERNARY OPERATON USE if-else
let len_id: u8 = if buf[RegisterRequestDataPositions::ID_LEN as usize] != 0 {
buf[RegisterRequestDataPositions::ID_LEN as usize]
} else {
return;
};
let sock_addr_len: u8 = if buf[RegisterRequestDataPositions::SOCKADDR_LEN as usize] != 0
{
buf[RegisterRequestDataPositions::SOCKADDR_LEN as usize]
} else {
return;
};
registration_vector.push(types::Registration::new(
match std::str::from_utf8(
&buf[(RegisterRequestDataPositions::DATA as usize)
..(len_id as usize) + (RegisterRequestDataPositions::DATA as usize)],
) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
utils::send_general_error_to_client(src, e, socket);
return;
}
},
buf[(len_id as usize) + (RegisterRequestDataPositions::DATA as usize)
..(len_id as usize)
+ (RegisterRequestDataPositions::DATA as usize)
+ (sock_addr_len as usize)]
.to_vec(),
encrypted,
chrono::Utc::now().timestamp(),
));
}
x if x == pea_2_pea::ServerMethods::HEARTBEAT as u8 => {
x if x == ServerMethods::HEARTBEAT as u8 => {
#[cfg(debug_assertions)]
println!("HEARTBEAT method");
}
+96
View File
@@ -0,0 +1,96 @@
use std::sync::{Arc, atomic::Ordering};
#[readonly::make]
pub struct Client {
#[readonly]
pub client_sock_addr: Vec<u8>,
pub last_heart_beat: i64,
}
impl Client {
pub fn new(client_addr: Vec<u8>, heart_beat: i64) -> Self {
Client {
client_sock_addr: client_addr,
last_heart_beat: heart_beat,
}
}
}
#[readonly::make]
pub struct Registration {
#[readonly]
pub net_id: String,
pub clients: Vec<Client>,
#[readonly]
pub encrypted: bool,
pub last_heart_beat: i64,
}
impl Registration {
pub fn new(net_id: String, client_addr: Vec<u8>, encrypted: bool, heart_beat: i64) -> Self {
Registration {
net_id: net_id,
clients: vec![Client::new(client_addr, heart_beat)],
encrypted,
last_heart_beat: heart_beat,
}
}
}
pub struct BatchLock {
inner: std::sync::Mutex<bool>, // true = blocking new locks
condvar: std::sync::Condvar,
active_count: std::sync::atomic::AtomicUsize,
}
pub struct LockGuard {
lock: Arc<BatchLock>,
}
impl BatchLock {
pub fn new() -> Arc<Self> {
Arc::new(BatchLock {
inner: std::sync::Mutex::new(false),
condvar: std::sync::Condvar::new(),
active_count: std::sync::atomic::AtomicUsize::new(0),
})
}
// Acquire a lock (blocks if waiting for all to unlock)
pub fn lock(self: &Arc<Self>) -> LockGuard {
let mut blocking = self.inner.lock().unwrap();
// Wait while new locks are blocked
while *blocking {
blocking = self.condvar.wait(blocking).unwrap();
}
self.active_count.fetch_add(1, Ordering::SeqCst);
LockGuard {
lock: Arc::clone(self),
}
}
// Block new locks and wait for all current locks to finish
pub fn wait_all_unlock(self: &Arc<Self>) {
// Block new locks
*self.inner.lock().unwrap() = true;
// Wait for all active locks to finish
while self.active_count.load(Ordering::SeqCst) > 0 {
std::thread::sleep(std::time::Duration::from_millis(1));
}
// Allow new locks again
*self.inner.lock().unwrap() = false;
self.condvar.notify_all();
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
// Automatically release lock when guard is dropped
self.lock.active_count.fetch_sub(1, Ordering::SeqCst);
}
}
+14
View File
@@ -0,0 +1,14 @@
use pea_2_pea::*;
pub fn send_general_error_to_client<T: std::error::Error>(
dst: core::net::SocketAddr,
e: T,
socket: std::sync::Arc<std::net::UdpSocket>,
) {
let mut resp_buf: Box<[u8]> = vec![0; e.to_string().len() + 1].into_boxed_slice();
resp_buf[0] = ServerResponse::GENERAL_ERROR as u8; // set 1st byte to ERROR
resp_buf[1..1 + e.to_string().len()] // send error text to client
.copy_from_slice(e.to_string().as_bytes());
let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst);
}