Compare commits

43 Commits

Author SHA1 Message Date
PoliEcho 934a053713 bug fixes 2025-08-03 20:23:23 +02:00
PoliEcho a955710350 some more heartbeat modifications 2025-08-03 18:34:31 +02:00
PoliEcho 23dea395d3 Merge branch 'master' of https://git.pupes.org/PoliEcho/pea_2_pea 2025-08-03 15:51:16 +02:00
PoliEcho 6ede42a096 v1.1 2025-08-03 15:49:43 +02:00
PoliEcho 8f2c66c195 Update README.md 2025-08-03 13:41:37 +00:00
PoliEcho 6898f946fa Update README.md 2025-08-03 13:38:18 +00:00
PoliEcho ef83f0216f remove some warnings 2025-08-03 15:12:10 +02:00
PoliEcho 206012e72d add periodic heart beat 2025-08-03 15:05:35 +02:00
PoliEcho b9e36d9f8c maybe it works now? 2025-08-02 10:34:58 +02:00
PoliEcho b8d02b2077 add more debuging 2025-08-02 10:11:33 +02:00
PoliEcho 8e9d179d49 add windows crosscompilation 2025-08-01 23:32:49 +02:00
PoliEcho bc17ffac68 bump version to 1.0 2025-08-01 19:48:35 +02:00
PoliEcho 4ca652cea5 fix unecryted network 2025-08-01 19:38:59 +02:00
PoliEcho 4a70fb61f9 another off by one error 2025-08-01 19:35:09 +02:00
PoliEcho 6aa9fb27e6 add debuging of packets 2025-08-01 19:20:58 +02:00
PoliEcho 20a4907ea0 remove off by one error 2025-08-01 18:44:44 +02:00
PoliEcho 582d458f70 maybe works now 2025-08-01 18:41:17 +02:00
PoliEcho 0a32061960 add some debug messages 2025-08-01 18:33:56 +02:00
PoliEcho b1cc5ddd32 minimal changes 2025-08-01 17:57:23 +02:00
PoliEcho c5a4059a84 this has to work 2025-08-01 16:58:42 +02:00
PoliEcho be37082b87 a 2025-08-01 16:16:00 +02:00
PoliEcho 07df839b6d WHY DOESNT IT WORK :((((((( 2025-08-01 16:07:38 +02:00
PoliEcho bcff895858 fix do nothing packets chaos 2025-08-01 15:50:46 +02:00
PoliEcho d5a5dc33a9 fix some async issues 2025-08-01 14:06:47 +02:00
PoliEcho 3675650864 holepuching fixes 2025-08-01 10:44:05 +02:00
PoliEcho 28dd37bdec remove off by 1 error 2025-07-31 21:07:08 +02:00
PoliEcho 0987a46062 actualy fix size of buffer 2025-07-31 21:00:23 +02:00
PoliEcho b65445e3be fix size of buffer 2025-07-31 20:59:12 +02:00
PoliEcho 2f811db0a3 add debug message 2025-07-31 20:57:06 +02:00
PoliEcho 5dfad8264e add actual hole punching 2025-07-31 20:39:07 +02:00
PoliEcho 4d6ea8e626 fixes 2025-07-31 14:56:44 +02:00
PoliEcho a87899c402 somefixed to P2P communication 2025-07-31 13:40:48 +02:00
PoliEcho dc55e4e1f6 fix some network stuff 2025-07-30 18:41:32 +02:00
PoliEcho c6583ea534 finalize P2P comm now just debugging 2025-07-30 18:09:10 +02:00
PoliEcho ddbe156846 add P2P protocol 2025-07-30 12:50:02 +02:00
PoliEcho b1335bef08 fix some other protocol errors 2025-07-28 22:02:50 +02:00
PoliEcho 752541c9f6 fix some protocol bugs 2025-07-28 20:23:55 +02:00
PoliEcho 81649bf2fd client registrar communication works 2025-07-28 17:30:06 +02:00
PoliEcho 72703aa46b add some basoc logic 2025-07-28 17:12:47 +02:00
PoliEcho 63d485b8b5 add heart beat to client 2025-07-28 14:43:20 +02:00
PoliEcho 7f3aa3076d add get method sender 2025-07-28 13:56:13 +02:00
PoliEcho 1a25e882f3 misc changes 2025-07-27 22:45:44 +02:00
PoliEcho 0095606b60 add heartbet method 2025-07-27 22:44:17 +02:00
15 changed files with 2020 additions and 287 deletions
Generated
+124
View File
@@ -346,6 +346,15 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "colored"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@@ -459,12 +468,48 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
@@ -484,6 +529,47 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -595,6 +681,12 @@ version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "memchr"
version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "num-traits"
version = "0.2.19"
@@ -738,7 +830,10 @@ dependencies = [
"chrono",
"cipher",
"clap",
"colored",
"futures",
"hmac",
"libc",
"orx-concurrent-vec",
"pbkdf2",
"rand",
@@ -747,6 +842,7 @@ dependencies = [
"sha2",
"smol",
"tappers",
"winapi",
]
[[package]]
@@ -755,6 +851,12 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "piper"
version = "0.2.4"
@@ -1066,6 +1168,28 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.61.2"
+24
View File
@@ -18,6 +18,8 @@ cbc = "0.1.2"
chrono = "0.4.41"
cipher = { version = "0.4.4", features = ["block-padding", "alloc"] }
clap = { version = "4.5.41", features = ["derive"] }
colored = "3.0.0"
futures = "0.3.31"
hmac = "0.12.1"
orx-concurrent-vec = "3.6.0"
pbkdf2 = "0.12.2"
@@ -26,4 +28,26 @@ rayon = "1.10.0"
readonly = "0.2.13"
sha2 = "0.10.9"
smol = "2.0.2"
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winsock2", "mswsock", "minwindef"] }
tappers = { version = "0.4.2", features = ["wintun"] }
[target.'cfg(unix)'.dependencies]
libc = "0.2"
tappers = "0.4.2"
[target.x86_64-pc-windows-gnu]
linker = "/usr/bin/x86_64-w64-mingw32-gcc"
ar = "/usr/bin/x86_64-w64-mingw32-ar"
[target.i686-pc-windows-gnu]
linker = "/usr/bin/i686-w64-mingw32-gcc"
ar = "/usr/bin/i686-w64-mingw32-ar"
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"
[features]
no-timeout = []
+21
View File
@@ -0,0 +1,21 @@
# Pea 2 Pea
very simple P2P VPN(Virtual Network yes, Private maybe),
this program is intended to help you play LAN games over internet and as proof of concept
when all clients are behind Full-cone NAT, does not work with clients behind Symmetric NAT
at least for now
> [!WARNING]
> Piercing NAT may fail based on network configuration
## how to run
> install rustc and cargo or rustup, you will need 2024 edition
> build using
> ```bash
> # to build
> cargo build --release
> # to run server(registrar)
> ./target/release/server
> # to run client
> sudo ./target/release/client -r SERVER_IP -n NETWORK_ID -P PASSWORD # password is optional
> ```
+295 -14
View File
@@ -1,21 +1,28 @@
mod net;
mod tun;
mod types;
use colored::Colorize;
use pea_2_pea::*;
use rand::RngCore;
use rayon::prelude::*;
use std::{
io::{Error, ErrorKind, Read, Write},
net::UdpSocket,
process::exit,
sync::{Arc, RwLock},
time::Duration,
};
use crate::types::Network;
#[derive(clap::Parser)]
#[command(name = "pea_2_pea")]
#[command(about = "A CLI tool for pea_2_pea P2P vpn client")]
struct Cli {
#[arg(short = 'r', long = "registrar")]
#[arg(help = "registrar ip address or hostname")]
registrar: String,
#[arg(required_unless_present = "version")]
registrar: Option<String>,
#[arg(short = 'p', long = "registrar-port")]
#[arg(help = format!("optional Port number for the registrar service (1-65535) Default: {}", SERVER_PORT))]
@@ -23,7 +30,8 @@ struct Cli {
#[arg(short = 'n', long = "network-id")]
#[arg(help = "your virtual network id that allows other people to connect to you")]
network_id: String,
#[arg(required_unless_present = "version")]
network_id: Option<String>,
#[arg(short = 'P', long = "password")]
#[arg(
@@ -31,29 +39,50 @@ struct Cli {
)]
password: Option<String>,
#[arg(short = 'i', long = "interface-name")]
#[arg(help = "select tun interface name Default: pea0")]
if_name: Option<String>,
#[arg(short = 'v', long = "verbose")]
verbose: bool,
#[arg(short = 'V', long = "version")]
version: bool,
#[arg(short = 'S', long = "symmetric_NAT_bypass_mode")]
#[arg(help = "NOT IMPLEMENTED")]
symmetric_nat_bypass_mode: bool,
}
fn print_version() {
println!("Pea 2 Pea {}", VERSION);
}
fn main() -> std::io::Result<()> {
let cli = <Cli as clap::Parser>::parse();
if cli.network_id.len() > 0xff {
if cli.version {
print_version();
exit(0);
}
let network_id = cli.network_id.unwrap();
let registrar = cli.registrar.unwrap();
if network_id.len() > 0xff {
eprintln!("network id cannot have more then 255 charactes");
exit(7); // posix for E2BIG
}
{
let socket: UdpSocket = (|| -> std::io::Result<UdpSocket> {
let mut buf: [u8; UDP_BUFFER_SIZE] = [0; UDP_BUFFER_SIZE];
let (socket, virtual_network, _my_public_sock_addr) = {
let socket: Arc<UdpSocket> = Arc::new(|| -> std::io::Result<UdpSocket> {
match UdpSocket::bind("0.0.0.0:0") {
// bind to OS assigned random port
Ok(socket) => return Ok(socket),
Err(e) => Err(e), // exit on error
}
})()
.expect("Failed to bind to any available port");
.expect("Failed to bind to any available port")
.into();
#[cfg(not(feature = "no-timeout"))]
socket.set_read_timeout(Some(Duration::new(10, 0)))?; // set timeout to 10 seconds
let server_port: u16 = (|| -> u16 {
@@ -64,19 +93,271 @@ fn main() -> std::io::Result<()> {
})();
#[allow(non_snake_case)] // i think this is valid snake case but rustc doesnt think so
let server_SocketAddr: core::net::SocketAddr = format!("{}:{}", cli.registrar, server_port)
let server_SocketAddr: core::net::SocketAddr = format!("{}:{}", registrar, server_port)
.parse()
.unwrap();
.expect(&format!(
"{}:{} is invalid sock addr",
registrar, server_port
));
let mut buf: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
// query here
let mut data_lenght: usize = net::query_request(&mut buf, &server_SocketAddr, socket)?;
let public_sock_addr_raw: String =
match net::query_request(&mut buf, &server_SocketAddr, &socket) {
Ok(s) => s,
Err(e) => return Err(ServerErrorResponses::into_io_error(e)),
};
println!(
"{} my bublic sockaddr: {}",
"[LOG]".blue(),
public_sock_addr_raw
);
let mut public_sock_addr: Vec<u8> = buf[1..data_lenght].to_vec();
let mut salt: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
let (mut public_sock_addr, encryption_key) = match cli.password {
Some(ref p) => {
let mut rng = rand::rng();
rng.fill_bytes(&mut salt);
rng.fill_bytes(&mut iv);
let enc_key_tmp = shared::crypto::derive_key_from_password(p.as_bytes(), &salt);
#[cfg(debug_assertions)]
eprintln!(
"key: {}",
enc_key_tmp
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>()
);
(
shared::crypto::encrypt(&enc_key_tmp, &iv, public_sock_addr_raw.as_bytes())
.unwrap()
.into_boxed_slice(),
enc_key_tmp,
)
}
None => (
public_sock_addr_raw.as_bytes().to_vec().into_boxed_slice(),
[0u8; 32],
),
};
// register network
let virtual_network: Arc<RwLock<Network>> = RwLock::new({
match net::get_request(
&mut buf,
&server_SocketAddr,
&socket,
&network_id,
&cli.password,
) {
Ok(n) => {
eprintln!("Network exists joining it");
public_sock_addr =
shared::crypto::encrypt(&n.key, &iv, public_sock_addr_raw.as_bytes())
.unwrap()
.into_boxed_slice();
let _ = net::send_heartbeat(
&mut buf,
&server_SocketAddr,
socket.clone(),
&n,
&public_sock_addr,
&iv,
);
n
}
Err(e) if e.kind() == ServerResponse::ID_DOESNT_EXIST => {
eprintln!("Network does not exist creating it!");
let tmp_v_net: Network = Network::new(
match cli.password {
Some(_) => true,
None => false,
},
encryption_key,
network_id,
salt,
Vec::with_capacity(1),
);
net::register_request(
&mut buf,
&server_SocketAddr,
&socket,
&tmp_v_net,
&public_sock_addr,
&iv,
)
.unwrap();
let mut salt: Option<[u8; SALT_AND_IV_SIZE as usize]>;
let _ = net::send_heartbeat(
// send heart beat to start periodic heart beat
&mut buf,
&server_SocketAddr,
socket.clone(),
&tmp_v_net,
&public_sock_addr,
&iv,
);
tmp_v_net
}
Err(e) => {
eprintln!("Failed to get data from server. Reason: {}", e);
exit(5); //EIO
}
}
})
.into();
(
socket,
virtual_network,
types::EncryptablePulicSockAddr::new(iv, public_sock_addr),
)
};
{
// all loops here will be auto skiped if there are no peers yet
let mut ips_used: [bool; u8::MAX as usize + 1] = [false; u8::MAX as usize + 1];
ips_used[0] = true; // ignore net addr
ips_used[u8::MAX as usize] = true; // ignore broadcast
println!(
"{} reaching to other peers to obtain ip address",
"[LOG]".blue()
);
let mut network_write_lock = virtual_network.write().unwrap(); // avoid deadlock
let encrypted = network_write_lock.encrypted;
let key = network_write_lock.key;
network_write_lock.peers.iter_mut().for_each(|peer| {
println!(
"{} firing salvo of PUNCHING packets to {}",
"[LOG]".blue(),
peer.sock_addr
);
for _ in 0..MAPPING_SHOT_COUNT {
match socket.send_to(&[P2PMethods::DO_NOTHING as u8], peer.sock_addr) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => eprintln!("{} failed to send puching packet: {}", "[ERROR]".red(), e),
}
}
println!(
"{} packets away!, awiting a bit for NAT mappings to estabilish",
"[LOG]".blue()
);
std::thread::sleep(Duration::from_millis(2000));
for _ in 0..STANDARD_RETRY_MAX {
match net::P2P_query(&mut buf, &peer.sock_addr, &socket, encrypted, key) {
Ok(ip) => {
ips_used[ip.octets()[3] as usize] = true;
peer.private_ip = ip;
break;
}
Err(e) => {
eprintln!(
"{} while getting ip from peer: {}, Error: {}",
"[ERROR]".red(),
peer.sock_addr,
e
);
std::thread::sleep(Duration::from_millis(2000));
}
}
}
});
network_write_lock.private_ip = std::net::Ipv4Addr::new(
DEFAULT_NETWORK_PREFIX[0],
DEFAULT_NETWORK_PREFIX[1],
DEFAULT_NETWORK_PREFIX[2],
ips_used.par_iter().position_first(|&b| !b).unwrap() as u8,
); // find first element that is false
network_write_lock
.peers
.retain(|peer| peer.private_ip != std::net::Ipv4Addr::UNSPECIFIED); // remove all peers without ip
network_write_lock.peers.iter().for_each(|peer| {
match net::P2P_hello(
&mut buf,
&peer.sock_addr,
&socket,
network_write_lock.private_ip,
encrypted,
key,
) {
Ok(_) => eprintln!(
"{} registered with peer: {}",
"[SUCCESS]".green(),
peer.sock_addr
),
Err(e) => eprintln!(
"{} failed to register with peer: {}, Error: {}",
"[ERROR]".red(),
peer.sock_addr,
e
),
}
});
}
let tun_iface = Arc::new(
match tun::create_tun_interface(virtual_network.read().unwrap().private_ip, cli.if_name) {
Ok(t) => t,
Err(e) => {
eprintln!(
"{} failed to create Tun interface, Error: {}, are you running as root?",
"[CRITICAL]".red().bold(),
e
);
return Err(e);
}
},
);
// timeout is no longer needed
#[cfg(not(feature = "no-timeout"))]
socket.set_read_timeout(None)?;
{
let tun_iface_clone = tun_iface.clone();
let socket_clone = socket.clone();
let virtual_network_clone = virtual_network.clone();
std::thread::spawn(move || {
tun::read_tun_iface(tun_iface_clone, socket_clone, virtual_network_clone)
});
} // just let me have my thread
smol::block_on(async {
println!("{} listener started!", "[LOG]".blue());
loop {
buf.fill(0);
match socket.recv_from(&mut buf) {
Ok((data_lenght, src)) => {
#[cfg(debug_assertions)]
eprintln!("recived method 0x{:02x} spawning handler", buf[0]);
smol::spawn(net::handle_incoming_connection(
buf,
src,
virtual_network.clone(),
tun_iface.clone(),
socket.clone(),
data_lenght,
))
.await;
}
Err(e) => {
eprintln!(
"{} failed to read from socket Error: {}\n{}",
"[WARNING]".red(),
e,
"Retrying".bright_yellow()
);
}
}
}
});
Ok(())
}
+739 -120
View File
@@ -1,144 +1,763 @@
use std::{
io::ErrorKind,
net::{SocketAddr, UdpSocket},
net::{Ipv4Addr, SocketAddr, UdpSocket},
str::FromStr,
sync::{Arc, RwLock},
};
use pea_2_pea::*;
use rand::RngCore;
// return data_lenght and number of retryes
pub fn send_and_recv_with_retry(
buf: &mut [u8; BUFFER_SIZE],
dst: &SocketAddr,
socket: UdpSocket,
retry_max: usize,
) -> Result<(usize, usize), ServerErrorResponses> {
let mut send_buf = *buf;
let mut retry_count: usize = 0;
loop {
match socket.send_to(&mut send_buf, dst) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
panic!("Error sending data: {}", e);
}
}
match socket.recv_from(buf) {
Ok((data_lenght, src)) => {
if src != *dst {
continue;
}
match buf[0] {
x if x == send_buf[0] as u8 => {
return Ok((data_lenght, retry_count));
}
x if x == ServerResponse::GENERAL_ERROR as u8 => {
return Err(ServerErrorResponses::IO(std::io::Error::new(
std::io::ErrorKind::InvalidData,
match std::str::from_utf8(&buf[1..data_lenght]) {
// the firts byte is compensated for sice this is len not index
Ok(s) => s.to_string(),
Err(e) => format!("invalid error string: {}", e).to_string(),
},
)));
}
x if x == ServerResponse::ID_DOESNT_EXIST as u8 => {
return Err(ServerErrorResponses::ID_DOESNT_EXIST);
}
x if x == ServerResponse::ID_EXISTS as u8 => {
return Err(ServerErrorResponses::ID_EXISTS);
}
_ => {
continue;
}
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => {
// timedout
if retry_count >= retry_max {
return Err(ServerErrorResponses::IO(std::io::Error::new(
ErrorKind::TimedOut,
"max retry count reached without responce",
)));
}
retry_count += 1;
continue;
}
Err(e) => {
return Err(ServerErrorResponses::IO(e));
}
}
}
}
use super::types;
use colored::Colorize;
use pea_2_pea::{shared::net::send_and_recv_with_retry, *};
use rand::{RngCore, rng};
use sha2::Digest;
pub fn query_request(
buf: &mut [u8; BUFFER_SIZE],
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: UdpSocket,
) -> Result<usize, ServerErrorResponses> {
match send_and_recv_with_retry(buf, dst, socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => return Ok(data_lenght),
socket: &UdpSocket,
) -> Result<String, ServerErrorResponses> {
#[cfg(debug_assertions)]
println!("QUERY method");
match send_and_recv_with_retry(
buf,
&[ServerMethods::QUERY as u8],
dst,
socket,
STANDARD_RETRY_MAX,
) {
Ok((data_lenght, _)) => {
return Ok(match std::str::from_utf8(&buf[1..data_lenght]) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
return Err(ServerErrorResponses::GENERAL_ERROR(format!("{}", e)));
}
});
}
Err(e) => return Err(e),
}
}
pub fn register_request(
buf: &mut [u8; BUFFER_SIZE],
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: UdpSocket,
encryption_key: Option<[u8; 32]>,
salt_opt: Option<[u8; SALT_AND_IV_SIZE as usize]>,
mut public_sock_addr: Vec<u8>,
network_id: String,
socket: &UdpSocket,
network: &types::Network,
public_sock_addr: &Box<[u8]>,
iv: &[u8; BLOCK_SIZE as usize],
) -> Result<usize, ServerErrorResponses> {
buf[0] = ServerMethods::REGISTER as u8; // set metod identification byte
buf[RegisterRequestDataPositions::ENCRYPTED as usize] = match encryption_key {
// stor encryption flag byte
Some(_) => true as u8,
None => false as u8,
};
buf[RegisterRequestDataPositions::ID_LEN as usize] = network_id.len() as u8;
#[cfg(debug_assertions)]
println!("REGISTER method");
let mut send_buf: Box<[u8]> = vec![
0u8;
RegisterRequestDataPositions::DATA as usize
+ network.net_id.len()
+ public_sock_addr.len()
]
.into_boxed_slice();
buf[RegisterRequestDataPositions::DATA as usize
..RegisterRequestDataPositions::DATA as usize + network_id.len()]
.copy_from_slice(network_id.as_bytes()); // store network id
#[cfg(debug_assertions)]
eprintln!(
"registering network:\niv: {}\nSockAddr: {}\nsalt: {}",
iv.iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
public_sock_addr
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
network
.salt
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
let mut iv: [u8; SALT_AND_IV_SIZE as usize] = [0; SALT_AND_IV_SIZE as usize];
let salt: [u8; SALT_AND_IV_SIZE as usize];
match salt_opt {
Some(s) => salt = s,
None => salt = [0; SALT_AND_IV_SIZE as usize],
}
match encryption_key {
Some(encryption_key) => {
let mut rng = rand::rng();
rng.fill_bytes(&mut iv);
public_sock_addr =
shared::crypto::encrypt(&encryption_key, &iv, public_sock_addr.as_slice()).unwrap();
}
None => {
iv = [0; SALT_AND_IV_SIZE as usize];
}
};
send_buf[0] = ServerMethods::REGISTER as u8; // set metod identification byte
send_buf[RegisterRequestDataPositions::ENCRYPTED as usize] = network.encrypted as u8;
buf[RegisterRequestDataPositions::IV as usize
..RegisterRequestDataPositions::IV as usize + SALT_AND_IV_SIZE as usize]
.copy_from_slice(&iv); // copy iv ad salt do the request
buf[RegisterRequestDataPositions::SALT as usize
..RegisterRequestDataPositions::SALT as usize + SALT_AND_IV_SIZE as usize]
.copy_from_slice(&salt);
send_buf[RegisterRequestDataPositions::ID_LEN as usize] = network.net_id.len() as u8;
buf[RegisterRequestDataPositions::SOCKADDR_LEN as usize] = public_sock_addr.len() as u8;
send_buf[RegisterRequestDataPositions::DATA as usize
..RegisterRequestDataPositions::DATA as usize + network.net_id.len()]
.copy_from_slice(network.net_id.as_bytes()); // store network id
buf[RegisterRequestDataPositions::DATA as usize + network_id.len()
..RegisterRequestDataPositions::DATA as usize + network_id.len() + public_sock_addr.len()]
send_buf[RegisterRequestDataPositions::IV as usize
..RegisterRequestDataPositions::IV as usize + BLOCK_SIZE as usize]
.copy_from_slice(iv); // copy iv ad salt do the request
send_buf[RegisterRequestDataPositions::SALT as usize
..RegisterRequestDataPositions::SALT as usize + BLOCK_SIZE as usize]
.copy_from_slice(&network.salt);
send_buf[RegisterRequestDataPositions::SOCKADDR_LEN as usize] = public_sock_addr.len() as u8;
send_buf[RegisterRequestDataPositions::DATA as usize + network.net_id.len()
..RegisterRequestDataPositions::DATA as usize
+ network.net_id.len()
+ public_sock_addr.len()]
.copy_from_slice(&public_sock_addr);
match send_and_recv_with_retry(buf, dst, socket, STANDARD_RETRY_MAX) {
match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => return Ok(data_lenght),
Err(e) => return Err(e),
}
}
pub fn get_request(
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: &UdpSocket,
network_id: &String,
password: &Option<String>,
) -> Result<types::Network, ServerErrorResponses> {
#[cfg(debug_assertions)]
println!("GET method");
let mut send_buf: Box<[u8]> =
vec![0u8; GetRequestDataPositions::ID as usize + network_id.len()].into_boxed_slice();
send_buf[0] = ServerMethods::GET as u8;
send_buf[GetRequestDataPositions::ID as usize
..GetRequestDataPositions::ID as usize + network_id.len()]
.copy_from_slice(network_id.as_bytes());
// this is unused now it will be used to bounds check in the future
let data_lenght: usize =
match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => data_lenght,
Err(e) => return Err(e),
};
let encrypted: bool = if buf[GetResponseDataPositions::ENCRYPTED as usize] != 0 {
match password {
Some(_) => true,
None => panic!("Network is encrypted but no password was provided"),
}
} else {
match password {
Some(_) => {
eprintln!(
"Warning! Network is not encrypted but password was provided, ignoring password!"
)
}
None => {}
}
false
};
let mut num_of_clients: u8 = buf[GetResponseDataPositions::NUM_OF_CLIENTS as usize];
let salt: [u8; BLOCK_SIZE as usize] = buf[GetResponseDataPositions::SALT as usize
..GetResponseDataPositions::SALT as usize + BLOCK_SIZE as usize]
.try_into()
.unwrap();
let mut offset: usize = 0;
let mut peers: Vec<types::Peer> = Vec::with_capacity(1); // at least one client
let key: [u8; 32] = match password {
Some(p) => shared::crypto::derive_key_from_password(p.as_bytes(), &salt),
None => [0; 32],
};
#[cfg(debug_assertions)]
eprintln!(
"key: {}",
key.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>()
);
while num_of_clients != 0 {
let sock_addr_len: u8 = buf[GetResponseDataPositions::CLIENTS as usize + offset];
let mut iv: [u8; BLOCK_SIZE as usize] = [0; BLOCK_SIZE as usize];
let sock_addr_raw: Box<[u8]> =
buf[GetResponseDataPositions::CLIENTS as usize + 1 + offset + BLOCK_SIZE as usize
..GetResponseDataPositions::CLIENTS as usize
+ 1
+ offset
+ BLOCK_SIZE as usize
+ sock_addr_len as usize]
.to_vec()
.into_boxed_slice();
loop {
// loop used to easily skip peer
let peer: SocketAddr = if encrypted {
iv.copy_from_slice(
&buf[GetResponseDataPositions::CLIENTS as usize + 1 + offset
..GetResponseDataPositions::CLIENTS as usize
+ 1
+ offset
+ BLOCK_SIZE as usize],
);
#[cfg(debug_assertions)]
eprintln!(
"IV: {}\nSockAddr: {}",
iv.iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
sock_addr_raw
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
match SocketAddr::from_str(&{
// sacrificed a goat to borrow checker to make this work
let decrypted = match shared::crypto::decrypt(&key, &iv, &sock_addr_raw) {
Ok(v) => v,
Err(e) => {
eprintln!("Warning peer ignored due to invalid data\nError: {}", e);
break;
}
};
match std::str::from_utf8(decrypted.as_slice()) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
eprintln!("Warning peer ignored due to invalid data");
break;
}
}
}) {
Ok(s) => s,
Err(e) => {
eprintln!("Warning peer ignored due to invalid data\nError: {}", e);
break;
}
}
} else {
match SocketAddr::from_str(&match std::str::from_utf8(&sock_addr_raw) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
eprintln!("Warning peer ignored due to invalid data");
break;
}
}) {
Ok(s) => s,
Err(e) => {
eprintln!("Warning peer ignored due to invalid data\nError: {}", e);
break;
}
}
};
peers.push(types::Peer::new(peer, None));
break;
}
offset += BLOCK_SIZE as usize + sock_addr_len as usize + 1 /*for size byte */;
num_of_clients -= 1;
}
return Ok(types::Network::new(
encrypted,
key,
network_id.to_string(),
salt,
peers,
));
}
pub fn send_heartbeat(
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: Arc<std::net::UdpSocket>,
network: &types::Network,
my_public_sock_addr: &Box<[u8]>,
iv: &[u8; BLOCK_SIZE as usize],
) -> Result<usize, ServerErrorResponses> {
#[cfg(debug_assertions)]
println!("HEARTBEAT method");
let mut send_buf: Box<[u8]> = vec![
0u8;
HeartBeatRequestDataPositions::IV as usize
+ BLOCK_SIZE as usize
+ my_public_sock_addr.len()
+ network.net_id.len()
]
.into_boxed_slice();
send_buf[0] = ServerMethods::HEARTBEAT as u8;
send_buf[HeartBeatRequestDataPositions::ID_LEN as usize] = network.net_id.len() as u8;
send_buf[HeartBeatRequestDataPositions::SOCKADDR_LEN as usize] =
my_public_sock_addr.len() as u8;
send_buf[HeartBeatRequestDataPositions::IV as usize
..HeartBeatRequestDataPositions::IV as usize + BLOCK_SIZE as usize]
.copy_from_slice(iv);
send_buf[HeartBeatRequestDataPositions::DATA as usize
..HeartBeatRequestDataPositions::DATA as usize + network.net_id.len()]
.copy_from_slice(network.net_id.as_bytes());
send_buf[HeartBeatRequestDataPositions::DATA as usize + network.net_id.len()
..HeartBeatRequestDataPositions::DATA as usize
+ network.net_id.len()
+ my_public_sock_addr.len()]
.copy_from_slice(&my_public_sock_addr);
#[cfg(debug_assertions)]
eprintln!(
"IV: {}\nSockAddr: {}",
iv.iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
my_public_sock_addr
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
{
let sock_clone = socket.clone();
let send_buf_clone: Box<[u8]> = send_buf.clone();
let dst_clone: SocketAddr = dst.clone();
std::thread::spawn(move || {
periodic_heart_beat(sock_clone, send_buf_clone, dst_clone);
});
}
match send_and_recv_with_retry(buf, &send_buf, dst, &socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => return Ok(data_lenght),
Err(e) => return Err(e),
}
}
#[allow(non_snake_case)]
pub fn P2P_query(
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: &UdpSocket,
encrypted: bool, // avoid deadlock
key: [u8; 32],
) -> Result<std::net::Ipv4Addr, Box<dyn std::error::Error>> {
#[cfg(debug_assertions)]
println!("P2P QUERY method");
let (data_lenght, _) = send_and_recv_with_retry(
buf,
&[P2PMethods::PEER_QUERY as u8],
dst,
socket,
STANDARD_RETRY_MAX,
)?;
let iv: [u8; BLOCK_SIZE] = buf
[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize + BLOCK_SIZE]
.try_into()
.expect("this should never happen");
let tmp_decrypted: Vec<u8>;
return Ok(std::net::Ipv4Addr::from_str(if encrypted {
match shared::crypto::decrypt(
&key,
&iv,
&buf[P2PStandardDataPositions::DATA as usize..data_lenght - 1],
) {
Ok(decrypted) => {
tmp_decrypted = decrypted;
match std::str::from_utf8(&tmp_decrypted) {
Ok(s) => s,
Err(e) => return Err(Box::new(e)),
}
}
Err(e) => {
return Err(Box::new(ServerErrorResponses::GENERAL_ERROR(format!(
"{}",
e
))));
}
}
} else {
match std::str::from_utf8(&buf[P2PStandardDataPositions::DATA as usize..data_lenght - 1]) {
Ok(s) => s,
Err(e) => return Err(Box::new(e)),
}
})?);
}
#[allow(non_snake_case)]
pub fn P2P_hello(
buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr,
socket: &UdpSocket,
private_ip: Ipv4Addr,
encrypted: bool, // avoid deadlock
key: [u8; 32],
) -> Result<usize, ServerErrorResponses> {
let private_ip_str = private_ip.to_string();
let (private_ip_final, iv) = if encrypted {
let mut rng = rng();
let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
rng.fill_bytes(&mut iv);
(
shared::crypto::encrypt(&key, &iv, &private_ip_str.as_bytes())
.unwrap()
.into_boxed_slice(),
iv,
)
} else {
(
private_ip_str.as_bytes().to_vec().into_boxed_slice(),
[0u8; BLOCK_SIZE],
)
};
let mut send_buf: Box<[u8]> =
vec![0u8; P2PStandardDataPositions::DATA as usize + private_ip_final.len()].into();
#[cfg(debug_assertions)]
eprintln!(
"registering network:\niv: {}\nIP: {}",
iv.iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
private_ip_final
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
send_buf[0] = P2PMethods::PEER_HELLO as u8;
send_buf
[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize + BLOCK_SIZE]
.copy_from_slice(&iv);
send_buf[P2PStandardDataPositions::DATA as usize..].copy_from_slice(&private_ip_final);
match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => return Ok(data_lenght),
Err(e) => return Err(e),
}
}
pub async fn handle_incoming_connection(
buf: [u8; UDP_BUFFER_SIZE],
src: SocketAddr,
network: Arc<RwLock<types::Network>>,
tun_iface: Arc<tappers::Tun>,
socket: Arc<std::net::UdpSocket>,
data_lenght: usize,
) {
#[cfg(debug_assertions)]
eprintln!("recived method 0x{:02x}", buf[0]);
match buf[0] {
x if x == P2PMethods::PACKET as u8 => {
#[cfg(debug_assertions)]
println!("PACKET from different peer receved");
if network.read().unwrap().encrypted {
match shared::crypto::decrypt(
&network.read().unwrap().key,
&buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE],
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize],
) {
Ok(data) => {
#[cfg(debug_assertions)]
eprintln!(
"packet contets: {}\nhash: {:x}",
data.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
{
let mut hasher = sha2::Sha256::new();
hasher.update(&data);
hasher.finalize()
}
);
match tun_iface.send(&data) {
Ok(_) => {}
Err(e) => eprintln!(
"{} failed to write packet to tun interface, Error: {}",
"[WARNING]".yellow(),
e
),
}
}
Err(e) => eprintln!(
"{} failed to decrypt packet, Error: {}",
"[WARNING]".yellow(),
e
),
}
} else {
match tun_iface
.send(&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize])
{
Ok(_) => {}
Err(e) => eprintln!(
"{} failed to write packet to tun interface, Error: {}",
"[WARNING]".yellow(),
e
),
};
}
}
x if x == P2PMethods::PEER_QUERY as u8 => {
let encrypted = network.read().unwrap().encrypted;
let private_ip = network.read().unwrap().private_ip;
let private_ip_str = private_ip.to_string();
let mut send_buf: Box<[u8]> = if encrypted {
vec![
0;
P2PStandardDataPositions::DATA as usize
+ 1
+ (private_ip_str.len()
+ (BLOCK_SIZE - (private_ip_str.len() % BLOCK_SIZE)))
]
.into() // calculate lenght of data with block alligment
} else {
vec![0; P2PStandardDataPositions::DATA as usize + 1 + private_ip_str.len()].into()
};
send_buf[0] = P2PMethods::PEER_QUERY as u8;
let mut iv = [0u8; BLOCK_SIZE];
if encrypted {
let mut rng = rng();
rng.fill_bytes(&mut iv);
send_buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE]
.copy_from_slice(&iv);
send_buf[P2PStandardDataPositions::DATA as usize
..P2PStandardDataPositions::DATA as usize
+ (private_ip_str.len()
+ (BLOCK_SIZE - (private_ip_str.len() % BLOCK_SIZE)))]
.copy_from_slice(
shared::crypto::encrypt(
&network.read().unwrap().key,
&iv,
private_ip_str.as_bytes(),
)
.unwrap()
.as_slice(),
);
} else {
send_buf[P2PStandardDataPositions::DATA as usize
..P2PStandardDataPositions::DATA as usize + private_ip_str.len()]
.copy_from_slice(private_ip_str.as_bytes());
}
match socket.send_to(&send_buf, &src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
}
}
x if x == P2PMethods::PEER_HELLO as u8 => {
println!("{} peer hello receved from: {}", "[LOG]".blue(), src);
if data_lenght - 1 < P2PStandardDataPositions::DATA as usize {
eprintln!("{} peer hello packet too small", "[ERROR]".red());
return;
}
let tmp_data: Vec<u8>;
{
let mut network_write_lock = network.write().unwrap();
let key: [u8; 32] = network_write_lock.key;
let encrypted: bool = network_write_lock.encrypted;
#[cfg(debug_assertions)]
eprintln!(
"registering network:\niv: {}\nIP: {}",
&buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE].iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/]
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
network_write_lock.peers.push(types::Peer::new(
src,
Some(
match std::net::Ipv4Addr::from_str(
match std::str::from_utf8(if encrypted {
match shared::crypto::decrypt(&key, &buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE], &buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize]) {
Ok(data) => {tmp_data = data; &tmp_data},
Err(e) => {
eprintln!(
"{} failed to decrypt ip from peer, ignoring it Error: {}",
"[WARNING]".yellow(),
e
);
return;
},
}
} else {
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize]
}) {
Ok(s) => s,
Err(e) => {
eprintln!(
"{} failed to parse ip from peer, ignoring it Error: {}",
"[WARNING]".yellow(),
e
);
return;
}
},
) {
Ok(ip) => ip,
Err(e) => {
eprintln!(
"{} failed to parse ip from peer, ignoring it Error: {}",
"[WARNING]".yellow(),
e
);
return;
}
},
),
));
}
match socket.send_to(&[P2PMethods::PEER_HELLO as u8], &src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
}
}
x if x == P2PMethods::PEER_GOODBYE as u8 => {
println!("{} peer goodbye receved from: {}", "[LOG]".blue(), src);
let mut network_lock = network.write().unwrap();
let key = network_lock.key;
let encrypted: bool = network_lock.encrypted;
let mut data_tmp: Vec<u8> = Vec::with_capacity(BLOCK_SIZE); // block size
network_lock.peers.retain(|peer| !{peer.private_ip == match std::net::Ipv4Addr::from_str(match std::str::from_utf8( if encrypted {
match shared::crypto::decrypt(&key, &buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize + BLOCK_SIZE], &buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/]) {
Ok(data) => {data_tmp = data;
&data_tmp},
Err(e) => {eprintln!("{} error parsing ip, Error: {}", "[ERROR]".red(), e); return false;},
}
} else {&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/]}) {
Ok(s) => s,
Err(e) => {eprintln!("{} error parsing ip, Error: {}", "[ERROR]".red(), e); return false;},
}) {
Ok(ip) => ip,
Err(e) => {eprintln!("{} error parsing ip, Error: {}", "[ERROR]".red(), e); return false;},
} && peer.sock_addr == src});
match socket.send_to(&[P2PMethods::PEER_GOODBYE as u8], &src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
}
}
x if x == P2PMethods::NEW_CLIENT_NOTIFY as u8 => {
println!(
"{} Notified about new client, creating NAT mapping",
"[LOG]".blue()
);
#[cfg(debug_assertions)]
eprintln!(
"registering network:\niv: {}\nsockaddr: {}",
&buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE].iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/]
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
let data_tmp: Box<[u8]>;
let peer_addr: std::net::SocketAddr = match std::net::SocketAddr::from_str(
match std::str::from_utf8(if network.read().unwrap().encrypted {
match shared::crypto::decrypt(
&network.read().unwrap().key,
&buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE],
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize /*compensate for size and index diference*/],
) {
Ok(v) => {
data_tmp = v.into_boxed_slice();
&data_tmp
}
Err(e) => {
eprintln!(
"{} failed to decrypt sock addr of new client connection not posible Error: {}",
"[ERROR]".red(),
e
);
return;
}
}
} else {
&buf[P2PStandardDataPositions::DATA as usize..]
}) {
Ok(s) => s,
Err(e) => {
eprintln!(
"{} failed to decode sock addr of new client connection not posible Error: {}",
"[ERROR]".red(),
e
);
return;
}
},
) {
Ok(sa) => sa,
Err(e) => {
eprintln!(
"{} failed to parse sock addr of new client connection not posible Error: {}",
"[ERROR]".red(),
e
);
return;
}
};
for _ in 0..MAPPING_SHOT_COUNT {
match socket.send_to(&[P2PMethods::DO_NOTHING as u8], peer_addr) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => eprintln!("{} failed to send puching packet: {}", "[ERROR]".red(), e),
}
}
}
x if x == P2PMethods::DO_NOTHING as u8 => {
println!(
"{} punching succesful DO_NOTHING receved",
"[SUCCESS]".green()
);
}
x if x == ServerMethods::HEARTBEAT as u8 => {
println!("{} heart beat recive confirmed", "[OK]".green());
}
_ => {
eprintln!(
"{} unknown method ID: 0x{:02x}, Droping!",
"[WARNING]".bright_yellow(),
buf[0]
)
}
}
}
pub fn periodic_heart_beat(socket: Arc<UdpSocket>, send_buf: Box<[u8]>, dst: SocketAddr) {
println!("{} periodic heartbeat started", "[LOG]".blue());
loop {
std::thread::sleep(std::time::Duration::from_secs(30));
println!("{} sending heartbeat to server", "[LOG]".blue());
match socket.send_to(&send_buf, dst) {
Ok(size) => {
#[cfg(debug_assertions)]
println!("send {} bytes", size);
}
Err(e) => {
eprintln!(
"{} failed to send heartbeat to server Error: {}",
"[ERROR]".red(),
e
);
}
}
}
}
+149
View File
@@ -0,0 +1,149 @@
use pea_2_pea::*;
use rand::RngCore;
use rayon::prelude::*;
use sha2::Digest;
use std::sync::{Arc, RwLock};
use tappers::Interface;
use crate::types::Network;
pub fn create_tun_interface(
private_ip: std::net::Ipv4Addr,
if_name: Option<String>,
) -> Result<tappers::Tun, std::io::Error> {
#[cfg(not(target_os = "windows"))]
let mut tun_iface: tappers::Tun = tappers::Tun::new_named(Interface::new(
&if_name.unwrap_or(DEFAULT_INTERFACE_NAME.to_owned()),
)?)?;
#[cfg(target_os = "windows")]
let mut tun_iface: tappers::Tun = tappers::Tun::new()?;
#[cfg(not(target_os = "windows"))]
{
let mut addr_req = tappers::AddAddressV4::new(private_ip);
addr_req.set_netmask(24);
let mut broadcast_addr_oct = private_ip.octets();
broadcast_addr_oct[3] = 255;
addr_req.set_broadcast(std::net::Ipv4Addr::from(broadcast_addr_oct));
tun_iface.add_addr(addr_req)?;
}
#[cfg(target_os = "windows")]
std::process::Command::new("netsh").args([
"interface",
"ipv4",
"set",
"address",
&format!(
"name=\"{}\"",
tun_iface.name()?.name().into_string().unwrap()
),
"static",
&private_ip.to_string(),
"255.255.255.0",
]);
tun_iface.set_up()?;
return Ok(tun_iface);
}
pub fn read_tun_iface(
tun_iface: Arc<tappers::Tun>,
socket: Arc<std::net::UdpSocket>,
network: Arc<RwLock<Network>>,
) {
let mut buf: [u8; IP_BUFFER_SIZE] = [0u8; IP_BUFFER_SIZE];
smol::block_on(async {
#[cfg(debug_assertions)]
eprintln!("Started listening for ip packets");
loop {
let data_lenght = tun_iface.recv(&mut buf).unwrap(); // build in auto termination, isn't it great
smol::spawn(handle_ip_packet(
buf[..data_lenght].to_vec().into(),
network.clone(),
socket.clone(),
))
.detach();
}
});
}
pub async fn handle_ip_packet(
packet_data: Box<[u8]>,
network: Arc<RwLock<Network>>,
socket: Arc<std::net::UdpSocket>,
) {
#[cfg(debug_assertions)]
eprintln!("Processing IP packet");
#[cfg(debug_assertions)]
eprintln!(
"packet contets: {}\nhash: {:x}",
packet_data
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
{
let mut hasher = sha2::Sha256::new();
hasher.update(&packet_data);
hasher.finalize()
}
);
let dst_ip = std::net::Ipv4Addr::from(
match <[u8; 4]>::try_from(
&packet_data[DEST_IN_IPV4_OFFSET..DEST_IN_IPV4_OFFSET + IPV4_SIZE],
) {
Ok(slice) => slice,
Err(e) => {
eprintln!("Procesing of IP packet failed, Invalid dst IP: {}", e);
return;
}
},
);
let mut rng = rand::rng();
let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
rng.fill_bytes(&mut iv);
let mut procesed_data: Vec<u8> = if network.read().unwrap().encrypted {
match shared::crypto::encrypt(&network.read().unwrap().key, &iv, &packet_data) {
Ok(cr) => cr,
Err(e) => {
eprintln!("Failed to encrypt packet droping it: {}", e);
return;
}
}
} else {
packet_data.to_vec()
};
procesed_data.insert(0, P2PMethods::PACKET as u8);
procesed_data.splice(1..1, iv);
if dst_ip.octets()[3] == 255 {
network.read().unwrap().peers.par_iter().for_each(|peer| {
// broadcast
match socket.send_to(&procesed_data, peer.sock_addr) {
Ok(_) => {}
Err(e) => eprintln!("failed to send packet: {}", e),
};
});
} else {
let dst = match network
.read()
.unwrap()
.peers
.par_iter()
.find_any(|&p| p.private_ip == dst_ip)
.map(|p| p.sock_addr)
{
Some(sa) => sa,
None => return,
};
match socket.send_to(&procesed_data, dst) {
Ok(_) => {}
Err(e) => eprintln!("failed to send packet: {}", e),
};
}
}
+66
View File
@@ -0,0 +1,66 @@
use pea_2_pea::*;
#[readonly::make]
pub struct Peer {
#[readonly]
pub sock_addr: std::net::SocketAddr,
pub private_ip: std::net::Ipv4Addr,
}
impl Peer {
pub fn new(sock_addr: std::net::SocketAddr, private_ip: Option<std::net::Ipv4Addr>) -> Self {
Peer {
sock_addr,
private_ip: match private_ip {
Some(ip) => ip,
None => std::net::Ipv4Addr::UNSPECIFIED,
},
}
}
}
#[readonly::make]
pub struct Network {
#[readonly]
pub encrypted: bool,
#[readonly]
pub key: [u8; 32],
#[readonly]
pub net_id: String,
#[readonly]
pub salt: [u8; BLOCK_SIZE as usize],
pub peers: Vec<Peer>,
pub private_ip: std::net::Ipv4Addr,
}
impl Network {
pub fn new(
encrypted: bool,
key: [u8; 32],
net_id: String,
salt: [u8; BLOCK_SIZE as usize],
peers: Vec<Peer>,
) -> Self {
Network {
encrypted,
key,
net_id,
salt,
peers,
private_ip: std::net::Ipv4Addr::UNSPECIFIED,
}
}
}
#[readonly::make]
pub struct EncryptablePulicSockAddr {
#[readonly]
pub iv: [u8; BLOCK_SIZE],
#[readonly]
pub sock_addr: Box<[u8]>,
}
impl EncryptablePulicSockAddr {
pub fn new(iv: [u8; BLOCK_SIZE], sock_addr: Box<[u8]>) -> Self {
EncryptablePulicSockAddr { iv, sock_addr }
}
}
+68 -10
View File
@@ -1,29 +1,44 @@
use core::fmt;
pub const SERVER_PORT: u16 = 3543;
pub const BUFFER_SIZE: usize = 65535;
pub const UDP_BUFFER_SIZE: usize = 65527;
pub const IP_BUFFER_SIZE: usize = 65535;
pub const DEFAULT_TIMEOUT: u64 = 30;
pub const VERSION: &str = "v0.1";
pub const SALT_AND_IV_SIZE: u8 = 16;
pub const VERSION: &str = "v1.1.1";
pub const BLOCK_SIZE: usize = 16;
pub const STANDARD_RETRY_MAX: usize = 10;
pub const DEST_IN_IPV4_OFFSET: usize = 16;
pub const IPV4_SIZE: usize = 4;
pub const MAPPING_SHOT_COUNT: u8 = 5;
pub const DEFAULT_NETWORK_PREFIX: [u8; 3] = [172, 22, 44];
pub const DEFAULT_INTERFACE_NAME: &str = "pea0";
#[repr(u8)]
#[allow(non_camel_case_types)]
pub enum ServerMethods {
QUERY = 0, // return IP and port of the client
REGISTER = 1,
GET = 2,
HEARTBEAT = 3,
HEARTBEAT = 3, // this also registers addtional clients
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[allow(non_camel_case_types)]
#[repr(u8)]
pub enum ServerResponse {
GENERAL_ERROR = 255,
ID_EXISTS = 254,
ID_DOESNT_EXIST = 253, // both error since sometimes it is the problem that the id exist and somethimes problem is that is doesn't
IO = 252, // had to place it here to avoid creating anther enum
}
#[allow(non_camel_case_types)]
#[derive(Debug)]
pub enum ServerErrorResponses {
// success server returns id of method
GENERAL_ERROR(String),
ID_EXISTS,
ID_DOESNT_EXIST,
@@ -54,25 +69,68 @@ impl ServerErrorResponses {
}
}
}
impl ServerErrorResponses {
pub fn kind(&self) -> ServerResponse {
match self {
ServerErrorResponses::GENERAL_ERROR(_) => ServerResponse::GENERAL_ERROR,
ServerErrorResponses::ID_EXISTS => ServerResponse::ID_EXISTS,
ServerErrorResponses::ID_DOESNT_EXIST => ServerResponse::ID_DOESNT_EXIST,
ServerErrorResponses::IO(_) => ServerResponse::IO,
}
}
}
#[allow(non_camel_case_types)]
#[repr(usize)]
pub enum RegisterRequestDataPositions {
ENCRYPTED = 1, // this feeld should be 0 if not encrypted
ID_LEN = 2,
SOCKADDR_LEN = 3,
SALT = 4,
IV = (SALT_AND_IV_SIZE + RegisterRequestDataPositions::SALT as u8) as isize,
DATA = (SALT_AND_IV_SIZE + RegisterRequestDataPositions::IV as u8) as isize, // after this there will be id and sockaddr in string or encrypted form after
IV = (BLOCK_SIZE as usize + RegisterRequestDataPositions::SALT as usize) as usize,
DATA = (BLOCK_SIZE as usize + RegisterRequestDataPositions::IV as usize) as usize, // after this there will be id and sockaddr in string or encrypted form after
}
#[allow(non_camel_case_types)]
#[repr(usize)]
pub enum GetRequestDataPositions {
ID = 1, // no need for len since id is the whoule rest of the packet
}
#[allow(non_camel_case_types)]
#[repr(usize)]
pub enum GetResponseDataPositions {
ENCRYPTED = 1, // this feeld should be 0 if not encrypted
ID_LEN = 2,
NUM_OF_CLIENTS = 3,
SALT = 4,
CLIENTS = (SALT_AND_IV_SIZE + RegisterRequestDataPositions::SALT as u8) as isize,
NUM_OF_CLIENTS = 2,
SALT = 3,
CLIENTS = (BLOCK_SIZE as usize + RegisterRequestDataPositions::SALT as usize) - 1 as usize,
// after this there will be blocks of this sturcture: one byte size of sockaddr than there will be IV that is SALT_AND_IV_SIZE long and after that there will be sockaddr this repeats until the end of packet
}
#[allow(non_camel_case_types)]
#[repr(usize)]
pub enum HeartBeatRequestDataPositions {
ID_LEN = 1,
SOCKADDR_LEN = 2,
IV = 3,
DATA = (HeartBeatRequestDataPositions::IV as usize + BLOCK_SIZE as usize) as usize, // first ID than sockaddr
}
#[allow(non_camel_case_types)]
#[repr(u8)]
pub enum P2PMethods {
PEER_QUERY = 20, // responds with its private ip
PEER_HELLO = 21, // sends private ip encrypted if on
PEER_GOODBYE = 22, // sends private ip encrypted if on
PACKET = 23, // sends IP packet encrypted if on
NEW_CLIENT_NOTIFY = 24,
DO_NOTHING = 25,
}
#[repr(usize)]
pub enum P2PStandardDataPositions {
// sould apply to all P2P Methods
IV = 1,
DATA = P2PStandardDataPositions::IV as usize + BLOCK_SIZE,
}
pub mod shared;
+15 -13
View File
@@ -1,33 +1,35 @@
mod net;
mod types;
mod utils;
use std::{
net::UdpSocket,
process::exit,
sync::{Arc, RwLock},
};
use smol::net::UdpSocket;
use std::{process::exit, sync::Arc};
use orx_concurrent_vec::ConcurrentVec;
fn main() -> std::io::Result<()> {
{
let socket: Arc<UdpSocket> = Arc::new(
(|| -> std::io::Result<UdpSocket> {
smol::block_on(async {
let listen_port: u16 = pea_2_pea::SERVER_PORT;
match UdpSocket::bind(format!("0.0.0.0:{}", listen_port)) {
Ok(socket) => return Ok(socket),
Err(e) => return Err(e),
}
})()
UdpSocket::bind(format!("0.0.0.0:{}", listen_port)).await
})
.expect("Failed to bind to any available port"),
);
let registration_vector: Arc<ConcurrentVec<types::Registration>> =
Arc::new(orx_concurrent_vec::ConcurrentVec::new());
let mut buf: [u8; pea_2_pea::BUFFER_SIZE] = [0; pea_2_pea::BUFFER_SIZE];
{
let reg_clone = registration_vector.clone();
std::thread::spawn(move || {
utils::disconnected_cleaner(reg_clone);
});
}
let mut buf: [u8; pea_2_pea::UDP_BUFFER_SIZE] = [0u8; pea_2_pea::UDP_BUFFER_SIZE];
smol::block_on(async {
loop {
match socket.recv_from(&mut buf) {
buf.fill(0);
match socket.recv_from(&mut buf).await {
Ok((data_length, src)) => {
smol::spawn(net::handle_request(
buf,
+190 -61
View File
@@ -1,5 +1,6 @@
use crate::utils::send_general_error_to_client;
use smol::net::UdpSocket;
use super::types;
use super::utils;
use orx_concurrent_vec::ConcurrentVec;
@@ -8,9 +9,21 @@ use rayon::prelude::*;
use std::sync::Arc;
use std::u8;
async fn send_with_count(socket: std::sync::Arc<UdpSocket> , dst: &core::net::SocketAddr, buf: &[u8]) {
match socket.send_to(buf, dst).await {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
}
pub async fn handle_request(
buf: [u8; BUFFER_SIZE],
socket: std::sync::Arc<std::net::UdpSocket>,
buf: [u8; UDP_BUFFER_SIZE],
socket: std::sync::Arc<UdpSocket>,
src: core::net::SocketAddr,
data_len: usize,
registration_vector: Arc<ConcurrentVec<types::Registration>>,
@@ -24,15 +37,7 @@ pub async fn handle_request(
let mut send_vec: Vec<u8> = client_sock_addr_str.into();
send_vec.insert(0, ServerMethods::QUERY as u8);
match socket.send_to(&send_vec, &src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
send_with_count(socket, &src, &send_vec).await;
}
x if x == ServerMethods::GET as u8 => {
@@ -48,7 +53,7 @@ pub async fn handle_request(
return; // drop packet if id lenght is biger than posible
}
let net_id: String = match std::str::from_utf8(&buf[1..]) {
let net_id: String = match std::str::from_utf8(&buf[1..data_len]) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
@@ -59,26 +64,45 @@ pub async fn handle_request(
let registration = match registration_vector
.iter()
.find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists
.find(|elem| elem.map(|s| &s.net_id == &net_id && !s.invalid)) // find if id exists
{
Some(registration) => registration,
None => {let _ = socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src);
None => {futures::executor::block_on(send_with_count(socket, &src ,&[ServerResponse::ID_DOESNT_EXIST as u8]));
return;
},
}
.cloned();
let mut send_vec: Vec<u8> = Vec::with_capacity(
GetResponseDataPositions::SALT as usize + /*2 times one for SALT and other for first IV*/ 2*SALT_AND_IV_SIZE as usize + 20, /*magic number guess for how long is encrypted residencial ipv4 with port long */
1/*initial status byte */ +
GetResponseDataPositions::SALT as usize + /*2 times one for SALT and other for first IV*/ 2*BLOCK_SIZE as usize + 20, /*magic number guess for how long is encrypted residencial ipv4 with port long */
); // use vector to handle many clients
send_vec.push(ServerMethods::GET as u8); // this means success
// lets start serializing
send_vec.push(registration.encrypted as u8);
send_vec.push(registration.net_id.len() as u8);
send_vec.push(registration.clients.len() as u8);
// todo!("make sure it allows only 255 client per network max");
send_vec.extend_from_slice(&registration.salt);
#[cfg(debug_assertions)]
eprintln!("Found {} clients", registration.clients.len());
registration.clients.iter().for_each(|client| {
#[cfg(debug_assertions)]
eprintln!(
"Client:\nIV: {}\nSockAddr: {}",
client
.iv
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
client
.client_sock_addr
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
let sock_addr_len: u8 = client.client_sock_addr.len() as u8;
send_vec.push(sock_addr_len);
@@ -88,7 +112,7 @@ pub async fn handle_request(
send_vec.extend_from_slice(&client.client_sock_addr);
});
if (send_vec.len() > BUFFER_SIZE) {
if send_vec.len() > UDP_BUFFER_SIZE {
send_general_error_to_client(
src,
std::io::Error::new(
@@ -103,15 +127,7 @@ pub async fn handle_request(
return;
}
match socket.send_to(&send_vec, &src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
send_with_count(socket, &src, &send_vec).await;
}
x if x == ServerMethods::REGISTER as u8 => {
#[cfg(debug_assertions)]
@@ -120,7 +136,7 @@ pub async fn handle_request(
//read lenght of sockaddr
// rustc be like RUST HAS NO TERNARY OPERATON USE if-else
let len_id: u8 = if buf[RegisterRequestDataPositions::ID_LEN as usize] != 0 {
let id_len: u8 = if buf[RegisterRequestDataPositions::ID_LEN as usize] != 0 {
buf[RegisterRequestDataPositions::ID_LEN as usize]
} else {
return;
@@ -135,7 +151,7 @@ pub async fn handle_request(
let net_id: String = match std::str::from_utf8(
&buf[(RegisterRequestDataPositions::DATA as usize)
..(len_id as usize) + (RegisterRequestDataPositions::DATA as usize)],
..(id_len as usize) + (RegisterRequestDataPositions::DATA as usize)],
) {
Ok(s) => s.to_string(),
Err(e) => {
@@ -147,38 +163,28 @@ pub async fn handle_request(
match registration_vector
.iter()
.find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists
.find(|elem| elem.map(|s| &s.net_id == &net_id && !s.invalid)) // find if id exists
{
Some(_) => {
match socket.send_to(&[ServerResponse::ID_EXISTS as u8], src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
};
futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_EXISTS as u8]));
return;
}
None => {}
}
let salt: Option<[u8; SALT_AND_IV_SIZE as usize]>;
let iv: Option<[u8; SALT_AND_IV_SIZE as usize]>;
let salt: Option<[u8; BLOCK_SIZE as usize]>;
let iv: Option<[u8; BLOCK_SIZE as usize]>;
if encrypted {
salt = Some(
buf[(RegisterRequestDataPositions::SALT as usize)
..(RegisterRequestDataPositions::SALT as usize)
+ (SALT_AND_IV_SIZE as usize)]
..(RegisterRequestDataPositions::SALT as usize) + (BLOCK_SIZE as usize)]
.try_into()
.expect("this should never happen"),
);
iv = Some(
buf[(RegisterRequestDataPositions::IV as usize)
..(RegisterRequestDataPositions::IV as usize)
+ (SALT_AND_IV_SIZE as usize)]
..(RegisterRequestDataPositions::IV as usize) + (BLOCK_SIZE as usize)]
.try_into()
.expect("this should never happen"),
)
@@ -186,26 +192,60 @@ pub async fn handle_request(
salt = None;
iv = None;
}
let client_sock_addr: Vec<u8> = buf[RegisterRequestDataPositions::DATA as usize
+ id_len as usize
..RegisterRequestDataPositions::DATA as usize
+ id_len as usize
+ (sock_addr_len as usize)]
.to_vec();
registration_vector.push(types::Registration::new(
net_id,
buf[(RegisterRequestDataPositions::DATA as usize)
..(RegisterRequestDataPositions::DATA as usize) + (sock_addr_len as usize)]
.to_vec(),
#[cfg(debug_assertions)]
eprintln!(
"first client registerd:\n iv: {}\nSockAddr: {}\nsalt: {}",
iv.iter()
.flatten()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
client_sock_addr
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
salt.iter()
.flatten()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
let mut first_invalid_registration: Option<usize> = None;
for (i, reg) in registration_vector.iter().enumerate() {
if reg.map(|r| r.invalid) {
first_invalid_registration = Some(i);
break;
}
}
match first_invalid_registration {
Some(i) => registration_vector[i].update(|r|{*r = types::Registration::new(
net_id.clone(),
client_sock_addr.clone(),
encrypted,
chrono::Utc::now().timestamp(),
salt,
iv,
));
match socket.send_to(&[ServerMethods::REGISTER as u8], src) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
}
src
);}),
None => {registration_vector.push(types::Registration::new(
net_id,
client_sock_addr,
encrypted,
chrono::Utc::now().timestamp(),
salt,
iv,
src
));},
};
send_with_count(socket, &src, &[ServerMethods::REGISTER as u8]).await;
#[cfg(debug_assertions)]
println!("network registered");
}
@@ -213,10 +253,99 @@ pub async fn handle_request(
x if x == ServerMethods::HEARTBEAT as u8 => {
#[cfg(debug_assertions)]
println!("HEARTBEAT method");
let id_len: u8 = if buf[HeartBeatRequestDataPositions::ID_LEN as usize] != 0 {
buf[HeartBeatRequestDataPositions::ID_LEN as usize]
} else {
send_general_error_to_client(
src,
std::io::Error::new(std::io::ErrorKind::InvalidInput, "ID too short!"),
socket,
);
return;
};
let sock_addr_len: u8 = if buf[HeartBeatRequestDataPositions::SOCKADDR_LEN as usize]
!= 0
{
buf[HeartBeatRequestDataPositions::SOCKADDR_LEN as usize]
} else {
send_general_error_to_client(
src,
std::io::Error::new(std::io::ErrorKind::InvalidInput, "SockAddr too short!"),
socket,
);
return;
};
let net_id: String = match std::str::from_utf8(
&buf[(HeartBeatRequestDataPositions::DATA as usize)
..(id_len as usize) + (HeartBeatRequestDataPositions::DATA as usize)],
) {
Ok(s) => s.to_string(),
Err(e) => {
eprint!("id to utf-8 failed: {}", e);
utils::send_general_error_to_client(src, e, socket);
return;
}
};
let iv: [u8; BLOCK_SIZE as usize] = buf[HeartBeatRequestDataPositions::IV as usize
..HeartBeatRequestDataPositions::IV as usize + BLOCK_SIZE as usize]
.try_into()
.unwrap();
let sock_addr: Vec<u8> = buf[HeartBeatRequestDataPositions::DATA as usize
+ id_len as usize
..HeartBeatRequestDataPositions::DATA as usize
+ id_len as usize
+ sock_addr_len as usize]
.to_vec();
#[cfg(debug_assertions)]
eprintln!(
"IV: {}\nSockAddr: {}",
iv.iter().map(|x| format!("{:02X} ", x)).collect::<String>(),
sock_addr
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
);
match registration_vector
.iter()
.find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists
{
Some(reg) => {
let current_time = chrono::Utc::now().timestamp();
reg.update(|r| {r.last_heart_beat = current_time;
match r.clients.par_iter_mut().find_any(|c| *c.client_sock_addr == *sock_addr && c.iv == iv) {
Some(c) => c.last_heart_beat = current_time,
None => {// add new client if it isn't found
r.clients.par_iter().for_each(|c| {let mut send_buf: Box<[u8]> = vec![0; P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].into();
send_buf[0] = P2PMethods::NEW_CLIENT_NOTIFY as u8;
send_buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize+ BLOCK_SIZE].copy_from_slice(&iv);
send_buf[P2PStandardDataPositions::DATA as usize..P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].copy_from_slice(&sock_addr);
let sock_clone = socket.clone();
futures::executor::block_on(async move {
send_with_count(sock_clone, &c.src, &send_buf).await});
});
r.clients.push(types::Client::new(sock_addr.clone(), current_time, iv, src));
}
};
});
}
None => {futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_DOESNT_EXIST as u8])); return;}
}
send_with_count(socket, &src, &[ServerMethods::HEARTBEAT as u8]).await;
return;
}
_ => {
#[cfg(debug_assertions)]
println!("Unknown method");
println!(
"Warning!: client: {} called Unknown method: 0x{:02x}",
src.to_string(),
buf[0]
);
return;
}
}
+19 -67
View File
@@ -1,5 +1,4 @@
use pea_2_pea::*;
use std::sync::{Arc, atomic::Ordering};
#[derive(Clone)]
#[readonly::make]
@@ -8,15 +7,23 @@ pub struct Client {
pub client_sock_addr: Vec<u8>,
pub last_heart_beat: i64,
#[readonly]
pub iv: [u8; SALT_AND_IV_SIZE as usize],
pub iv: [u8; BLOCK_SIZE as usize],
#[readonly]
pub src: std::net::SocketAddr,
}
impl Client {
pub fn new(client_addr: Vec<u8>, heart_beat: i64, iv: [u8; SALT_AND_IV_SIZE as usize]) -> Self {
pub fn new(
client_addr: Vec<u8>,
heart_beat: i64,
iv: [u8; BLOCK_SIZE as usize],
src: std::net::SocketAddr,
) -> Self {
Client {
client_sock_addr: client_addr,
last_heart_beat: heart_beat,
iv,
src,
}
}
}
@@ -25,7 +32,6 @@ impl Client {
pub struct Registration {
#[readonly]
pub net_id: String,
#[readonly]
pub clients: Vec<Client>,
pub last_heart_beat: i64,
@@ -33,7 +39,8 @@ pub struct Registration {
#[readonly]
pub encrypted: bool,
#[readonly]
pub salt: [u8; SALT_AND_IV_SIZE as usize],
pub salt: [u8; BLOCK_SIZE as usize],
pub invalid: bool,
}
impl Registration {
@@ -42,77 +49,22 @@ impl Registration {
client_addr: Vec<u8>,
encrypted: bool,
heart_beat: i64,
salt: Option<[u8; SALT_AND_IV_SIZE as usize]>,
iv: Option<[u8; SALT_AND_IV_SIZE as usize]>,
salt: Option<[u8; BLOCK_SIZE as usize]>,
iv: Option<[u8; BLOCK_SIZE as usize]>,
src: std::net::SocketAddr,
) -> Self {
Registration {
net_id,
clients: vec![Client::new(
client_addr,
heart_beat,
iv.unwrap_or([0; SALT_AND_IV_SIZE as usize]),
iv.unwrap_or([0; BLOCK_SIZE as usize]),
src,
)],
encrypted,
last_heart_beat: heart_beat,
salt: salt.unwrap_or([0; SALT_AND_IV_SIZE as usize]),
salt: salt.unwrap_or([0; BLOCK_SIZE as usize]),
invalid: false,
}
}
}
pub struct BatchLock {
inner: std::sync::Mutex<bool>, // true = blocking new locks
condvar: std::sync::Condvar,
active_count: std::sync::atomic::AtomicUsize,
}
pub struct LockGuard {
lock: Arc<BatchLock>,
}
impl BatchLock {
pub fn new() -> Arc<Self> {
Arc::new(BatchLock {
inner: std::sync::Mutex::new(false),
condvar: std::sync::Condvar::new(),
active_count: std::sync::atomic::AtomicUsize::new(0),
})
}
// Acquire a lock (blocks if waiting for all to unlock)
pub fn lock(self: &Arc<Self>) -> LockGuard {
let mut blocking = self.inner.lock().unwrap();
// Wait while new locks are blocked
while *blocking {
blocking = self.condvar.wait(blocking).unwrap();
}
self.active_count.fetch_add(1, Ordering::SeqCst);
LockGuard {
lock: Arc::clone(self),
}
}
// Block new locks and wait for all current locks to finish
pub fn wait_all_unlock(self: &Arc<Self>) {
// Block new locks
*self.inner.lock().unwrap() = true;
// Wait for all active locks to finish
while self.active_count.load(Ordering::SeqCst) > 0 {
std::thread::sleep(std::time::Duration::from_millis(1));
}
// Allow new locks again
*self.inner.lock().unwrap() = false;
self.condvar.notify_all();
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
// Automatically release lock when guard is dropped
self.lock.active_count.fetch_sub(1, Ordering::SeqCst);
}
}
+23 -1
View File
@@ -1,8 +1,10 @@
use colored::Colorize;
use pea_2_pea::*;
pub fn send_general_error_to_client<T: std::error::Error>(
dst: core::net::SocketAddr,
e: T,
socket: std::sync::Arc<std::net::UdpSocket>,
socket: std::sync::Arc<smol::net::UdpSocket>,
) {
let mut resp_buf: Box<[u8]> = vec![0; e.to_string().len() + 1].into_boxed_slice();
@@ -11,3 +13,23 @@ pub fn send_general_error_to_client<T: std::error::Error>(
let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst);
}
pub fn disconnected_cleaner(
registration_vector: std::sync::Arc<
orx_concurrent_vec::ConcurrentVec<crate::types::Registration>,
>,
) {
loop {
std::thread::sleep(std::time::Duration::from_secs(120));
println!("{} starting cleanup", "[LOG]".blue());
let time_now = chrono::Utc::now().timestamp();
unsafe {
registration_vector.iter_mut().for_each(|reg| {
reg.clients.retain(|c| time_now - c.last_heart_beat < 120);
if time_now - reg.last_heart_beat > 120 {
reg.invalid = true;
}
})
}
}
}
+67 -1
View File
@@ -30,5 +30,71 @@ pub fn decrypt(
ciphertext: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let cipher = Aes256CbcDec::new_from_slices(key, iv)?;
Ok(cipher.decrypt_padded_vec_mut::<Pkcs7>(ciphertext).unwrap())
match cipher.decrypt_padded_vec_mut::<Pkcs7>(ciphertext) {
Ok(v) => Ok(v),
Err(e) => Err(format!("Decryption unpad error: {:?}", e).into()),
}
}
#[cfg(debug_assertions)]
pub fn test_all_crypto_functions() {
// Test data
let password = b"test_password_123";
let salt = b"random_salt_data";
let iv = b"1234567890123456"; // 16 bytes for AES-256-CBC
let test_data = b"Hello, this is secret data to encrypt and decrypt!";
println!("Testing crypto functions...");
// Test 1: Key derivation
println!("1. Testing key derivation...");
let key = derive_key_from_password(password, salt);
println!(" ✓ Key derived successfully: {} bytes", key.len());
// Test 2: Encryption
println!("2. Testing encryption...");
match encrypt(&key, iv, test_data) {
Ok(ciphertext) => {
println!(" ✓ Encryption successful");
println!(" Original data length: {} bytes", test_data.len());
println!(" Encrypted data length: {} bytes", ciphertext.len());
// Test 3: Decryption
println!("3. Testing decryption...");
match decrypt(&key, iv, &ciphertext) {
Ok(decrypted) => {
println!(" ✓ Decryption successful");
// Test 4: Verify data integrity
println!("4. Verifying data integrity...");
if decrypted == test_data {
println!(
" ✓ Data integrity verified - original and decrypted data match!"
);
} else {
println!(" ✗ Data integrity failed - data doesn't match!");
}
}
Err(e) => {
println!(" ✗ Decryption failed: {:?}", e);
}
}
}
Err(e) => {
println!(" ✗ Encryption failed: {:?}", e);
}
}
// Test 5: Test with different key (should fail to decrypt properly)
println!("5. Testing with wrong key (should fail)...");
let wrong_key = derive_key_from_password(b"wrong_password", salt);
match encrypt(&key, iv, test_data) {
Ok(ciphertext) => match decrypt(&wrong_key, iv, &ciphertext) {
Ok(_) => println!(" ⚠ Unexpected success with wrong key"),
Err(_) => println!(" ✓ Correctly failed with wrong key"),
},
Err(e) => println!(" Error in setup: {:?}", e),
}
println!("All tests completed!");
}
+1
View File
@@ -1 +1,2 @@
pub mod crypto;
pub mod net;
+219
View File
@@ -0,0 +1,219 @@
use std::io::ErrorKind;
use std::net::{SocketAddr, UdpSocket};
use crate::*;
#[cfg(target_os = "windows")]
use std::os::windows::io::AsRawSocket;
#[cfg(target_os = "windows")]
use winapi::shared::minwindef::{BOOL, DWORD, FALSE};
#[cfg(target_os = "windows")]
use winapi::um::mswsock::SIO_UDP_CONNRESET;
#[cfg(target_os = "windows")]
use winapi::um::winsock2::{SOCKET_ERROR, WSAIoctl};
#[cfg(target_os = "linux")]
use std::os::unix::io::AsRawFd;
#[cfg(target_os = "windows")]
fn enable_icmp_errors(socket: &UdpSocket) -> std::io::Result<()> {
let socket_handle = socket.as_raw_socket();
let mut bytes_returned: DWORD = 0;
let enable: BOOL = FALSE;
let result = unsafe {
WSAIoctl(
socket_handle as usize,
SIO_UDP_CONNRESET,
&enable as *const _ as *mut _,
std::mem::size_of::<BOOL>() as DWORD,
std::ptr::null_mut(),
0,
&mut bytes_returned,
std::ptr::null_mut(),
None,
)
};
if result == SOCKET_ERROR {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(target_os = "linux")]
fn enable_icmp_errors(socket: &UdpSocket) -> std::io::Result<()> {
let fd = socket.as_raw_fd();
let optval: libc::c_int = 1;
let ret = unsafe {
libc::setsockopt(
fd,
libc::SOL_IP,
libc::IP_RECVERR,
&optval as *const _ as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
)
};
if ret < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(target_os = "linux")]
fn check_icmp_error_queue(socket: &UdpSocket) -> std::io::Result<()> {
use libc::{MSG_ERRQUEUE, iovec, msghdr, recvmsg};
let fd = socket.as_raw_fd();
let mut buf = [0u8; 1024];
let mut control_buf = [0u8; 1024];
let mut iov = iovec {
iov_base: buf.as_mut_ptr() as *mut libc::c_void,
iov_len: buf.len(),
};
let mut msg: msghdr = unsafe { std::mem::zeroed() };
msg.msg_iov = &mut iov;
msg.msg_iovlen = 1;
msg.msg_control = control_buf.as_mut_ptr() as *mut libc::c_void;
msg.msg_controllen = control_buf.len();
let result = unsafe { recvmsg(fd, &mut msg, MSG_ERRQUEUE) };
if result < 0 {
let error = std::io::Error::last_os_error();
if error.kind() == std::io::ErrorKind::WouldBlock {
return Ok(());
}
return Err(error);
}
Err(std::io::Error::new(
std::io::ErrorKind::NetworkUnreachable,
"ICMP destination unreachable received",
))
}
#[cfg(target_os = "windows")]
fn check_icmp_error_queue(_socket: &UdpSocket) -> std::io::Result<()> {
Ok(())
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
fn enable_icmp_errors(_socket: &UdpSocket) -> std::io::Result<()> {
Ok(())
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
fn check_icmp_error_queue(_socket: &UdpSocket) -> std::io::Result<()> {
Ok(())
}
// return data_lenght and number of retryes
pub fn send_and_recv_with_retry(
buf: &mut [u8; UDP_BUFFER_SIZE],
send_buf: &[u8],
dst: &SocketAddr,
socket: &UdpSocket,
retry_max: usize,
) -> Result<(usize, usize), ServerErrorResponses> {
#[cfg(any(target_os = "linux", target_os = "windows"))]
enable_icmp_errors(socket)?;
let mut retry_count: usize = 0;
let mut resend: bool = true;
loop {
if resend {match socket.send_to(send_buf, dst) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => match e.kind() {
ErrorKind::ConnectionReset
| ErrorKind::ConnectionRefused
| ErrorKind::NetworkUnreachable
| ErrorKind::HostUnreachable => {
return Err(ServerErrorResponses::IO(std::io::Error::new(
e.kind(),
format!("Destination unreachable: {}", e),
)));
}
_ => return Err(ServerErrorResponses::IO(e)),
},
}} else {resend = true;}
#[cfg(target_os = "linux")]
if let Err(icmp_error) = check_icmp_error_queue(socket) {
return Err(ServerErrorResponses::IO(icmp_error));
}
match socket.recv_from(buf) {
Ok((data_length, src)) => {
if src != *dst {
continue;
}
match buf[0] {
x if x == send_buf[0] as u8 => {
return Ok((data_length, retry_count));
}
x if x == ServerResponse::GENERAL_ERROR as u8 => {
return Err(ServerErrorResponses::IO(std::io::Error::new(
std::io::ErrorKind::InvalidData,
match std::str::from_utf8(&buf[1..data_length]) {
Ok(s) => s.to_string(),
Err(e) => format!("invalid error string: {}", e),
},
)));
}
x if x == ServerResponse::ID_DOESNT_EXIST as u8 => {
return Err(ServerErrorResponses::ID_DOESNT_EXIST);
}
x if x == ServerResponse::ID_EXISTS as u8 => {
return Err(ServerErrorResponses::ID_EXISTS);
}
x if x == P2PMethods::DO_NOTHING as u8 => {
resend = false;
continue;
}
_ => {
continue;
}
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => {
#[cfg(target_os = "linux")]
if let Err(icmp_error) = check_icmp_error_queue(socket) {
return Err(ServerErrorResponses::IO(icmp_error));
}
if retry_count >= retry_max {
return Err(ServerErrorResponses::IO(std::io::Error::new(
ErrorKind::TimedOut,
"Max retry count reached - destination may be unreachable",
)));
}
retry_count += 1;
continue;
}
Err(e) => match e.kind() {
ErrorKind::ConnectionReset
| ErrorKind::ConnectionRefused
| ErrorKind::NetworkUnreachable
| ErrorKind::HostUnreachable => {
return Err(ServerErrorResponses::IO(std::io::Error::new(
e.kind(),
format!("Destination unreachable during receive: {}", e),
)));
}
_ => return Err(ServerErrorResponses::IO(e)),
},
}
}
}