19 Commits

Author SHA1 Message Date
PoliEcho 6ede42a096 v1.1 2025-08-03 15:49:43 +02:00
PoliEcho ef83f0216f remove some warnings 2025-08-03 15:12:10 +02:00
PoliEcho 206012e72d add periodic heart beat 2025-08-03 15:05:35 +02:00
PoliEcho b9e36d9f8c maybe it works now? 2025-08-02 10:34:58 +02:00
PoliEcho b8d02b2077 add more debuging 2025-08-02 10:11:33 +02:00
PoliEcho 8e9d179d49 add windows crosscompilation 2025-08-01 23:32:49 +02:00
PoliEcho bc17ffac68 bump version to 1.0 2025-08-01 19:48:35 +02:00
PoliEcho 4ca652cea5 fix unecryted network 2025-08-01 19:38:59 +02:00
PoliEcho 4a70fb61f9 another off by one error 2025-08-01 19:35:09 +02:00
PoliEcho 6aa9fb27e6 add debuging of packets 2025-08-01 19:20:58 +02:00
PoliEcho 20a4907ea0 remove off by one error 2025-08-01 18:44:44 +02:00
PoliEcho 582d458f70 maybe works now 2025-08-01 18:41:17 +02:00
PoliEcho 0a32061960 add some debug messages 2025-08-01 18:33:56 +02:00
PoliEcho b1cc5ddd32 minimal changes 2025-08-01 17:57:23 +02:00
PoliEcho c5a4059a84 this has to work 2025-08-01 16:58:42 +02:00
PoliEcho be37082b87 a 2025-08-01 16:16:00 +02:00
PoliEcho 07df839b6d WHY DOESNT IT WORK :((((((( 2025-08-01 16:07:38 +02:00
PoliEcho bcff895858 fix do nothing packets chaos 2025-08-01 15:50:46 +02:00
PoliEcho d5a5dc33a9 fix some async issues 2025-08-01 14:06:47 +02:00
12 changed files with 403 additions and 206 deletions
Generated
+90
View File
@@ -468,12 +468,48 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.31"
@@ -493,6 +529,47 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.7" version = "0.14.7"
@@ -604,6 +681,12 @@ version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "memchr"
version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@@ -748,6 +831,7 @@ dependencies = [
"cipher", "cipher",
"clap", "clap",
"colored", "colored",
"futures",
"hmac", "hmac",
"libc", "libc",
"orx-concurrent-vec", "orx-concurrent-vec",
@@ -767,6 +851,12 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "piper" name = "piper"
version = "0.2.4" version = "0.2.4"
+12 -1
View File
@@ -19,6 +19,7 @@ chrono = "0.4.41"
cipher = { version = "0.4.4", features = ["block-padding", "alloc"] } cipher = { version = "0.4.4", features = ["block-padding", "alloc"] }
clap = { version = "4.5.41", features = ["derive"] } clap = { version = "4.5.41", features = ["derive"] }
colored = "3.0.0" colored = "3.0.0"
futures = "0.3.31"
hmac = "0.12.1" hmac = "0.12.1"
orx-concurrent-vec = "3.6.0" orx-concurrent-vec = "3.6.0"
pbkdf2 = "0.12.2" pbkdf2 = "0.12.2"
@@ -27,13 +28,23 @@ rayon = "1.10.0"
readonly = "0.2.13" readonly = "0.2.13"
sha2 = "0.10.9" sha2 = "0.10.9"
smol = "2.0.2" smol = "2.0.2"
tappers = "0.4.2"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winsock2", "mswsock", "minwindef"] } winapi = { version = "0.3", features = ["winsock2", "mswsock", "minwindef"] }
tappers = { version = "0.4.2", features = ["wintun"] }
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
libc = "0.2" libc = "0.2"
tappers = "0.4.2"
[target.x86_64-pc-windows-gnu]
linker = "/usr/bin/x86_64-w64-mingw32-gcc"
ar = "/usr/bin/x86_64-w64-mingw32-ar"
[target.i686-pc-windows-gnu]
linker = "/usr/bin/i686-w64-mingw32-gcc"
ar = "/usr/bin/i686-w64-mingw32-ar"
[features] [features]
+17
View File
@@ -0,0 +1,17 @@
# Pea 2 Pea
very simple P2P VPN(Virtual Network yes, Private maybe),
this program is intended to help you play LAN games over internet
when all clients are behind Full-cone NAT, does not work with clients behind Symmetric NAT
at least for now
## how to run
> install rustc and cargo or rustup, you will need 2024 edition
> build using
> ```bash
> # to build
> cargo build --release
> # to run server(registrar)
> ./target/release/server
> # to run client
> sudo ./target/release/client -r SERVER_IP -n NETWORK_ID -P PASSWORD # password is optional
> ```
+60 -13
View File
@@ -21,7 +21,8 @@ use crate::types::Network;
struct Cli { struct Cli {
#[arg(short = 'r', long = "registrar")] #[arg(short = 'r', long = "registrar")]
#[arg(help = "registrar ip address or hostname")] #[arg(help = "registrar ip address or hostname")]
registrar: String, #[arg(required_unless_present = "version")]
registrar: Option<String>,
#[arg(short = 'p', long = "registrar-port")] #[arg(short = 'p', long = "registrar-port")]
#[arg(help = format!("optional Port number for the registrar service (1-65535) Default: {}", SERVER_PORT))] #[arg(help = format!("optional Port number for the registrar service (1-65535) Default: {}", SERVER_PORT))]
@@ -29,7 +30,8 @@ struct Cli {
#[arg(short = 'n', long = "network-id")] #[arg(short = 'n', long = "network-id")]
#[arg(help = "your virtual network id that allows other people to connect to you")] #[arg(help = "your virtual network id that allows other people to connect to you")]
network_id: String, #[arg(required_unless_present = "version")]
network_id: Option<String>,
#[arg(short = 'P', long = "password")] #[arg(short = 'P', long = "password")]
#[arg( #[arg(
@@ -46,16 +48,30 @@ struct Cli {
#[arg(short = 'V', long = "version")] #[arg(short = 'V', long = "version")]
version: bool, version: bool,
#[arg(short = 'S', long = "symmetric_NAT_bypass_mode")]
#[arg(help = "NOT IMPLEMENTED")]
symmetric_nat_bypass_mode: bool,
}
fn print_version() {
println!("Pea 2 Pea {}", VERSION);
} }
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
let cli = <Cli as clap::Parser>::parse(); let cli = <Cli as clap::Parser>::parse();
if cli.network_id.len() > 0xff { if cli.version {
print_version();
exit(0);
}
let network_id = cli.network_id.unwrap();
let registrar = cli.registrar.unwrap();
if network_id.len() > 0xff {
eprintln!("network id cannot have more then 255 charactes"); eprintln!("network id cannot have more then 255 charactes");
exit(7); // posix for E2BIG exit(7); // posix for E2BIG
} }
let mut buf: [u8; UDP_BUFFER_SIZE] = [0; UDP_BUFFER_SIZE]; let mut buf: [u8; UDP_BUFFER_SIZE] = [0; UDP_BUFFER_SIZE];
let (socket, virtual_network, my_public_sock_addr) = { let (socket, virtual_network, _my_public_sock_addr) = {
let socket: Arc<UdpSocket> = Arc::new(|| -> std::io::Result<UdpSocket> { let socket: Arc<UdpSocket> = Arc::new(|| -> std::io::Result<UdpSocket> {
match UdpSocket::bind("0.0.0.0:0") { match UdpSocket::bind("0.0.0.0:0") {
// bind to OS assigned random port // bind to OS assigned random port
@@ -77,11 +93,11 @@ fn main() -> std::io::Result<()> {
})(); })();
#[allow(non_snake_case)] // i think this is valid snake case but rustc doesnt think so #[allow(non_snake_case)] // i think this is valid snake case but rustc doesnt think so
let server_SocketAddr: core::net::SocketAddr = format!("{}:{}", cli.registrar, server_port) let server_SocketAddr: core::net::SocketAddr = format!("{}:{}", registrar, server_port)
.parse() .parse()
.expect(&format!( .expect(&format!(
"{}:{} is invalid sock addr", "{}:{} is invalid sock addr",
cli.registrar, server_port registrar, server_port
)); ));
// query here // query here
@@ -90,6 +106,11 @@ fn main() -> std::io::Result<()> {
Ok(s) => s, Ok(s) => s,
Err(e) => return Err(ServerErrorResponses::into_io_error(e)), Err(e) => return Err(ServerErrorResponses::into_io_error(e)),
}; };
println!(
"{} my bublic sockaddr: {}",
"[LOG]".blue(),
public_sock_addr_raw
);
let mut salt: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE]; let mut salt: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE]; let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
@@ -125,7 +146,7 @@ fn main() -> std::io::Result<()> {
&mut buf, &mut buf,
&server_SocketAddr, &server_SocketAddr,
&socket, &socket,
&cli.network_id, &network_id,
&cli.password, &cli.password,
) { ) {
Ok(n) => { Ok(n) => {
@@ -137,7 +158,7 @@ fn main() -> std::io::Result<()> {
let _ = net::send_heartbeat( let _ = net::send_heartbeat(
&mut buf, &mut buf,
&server_SocketAddr, &server_SocketAddr,
&socket, socket.clone(),
&n, &n,
&public_sock_addr, &public_sock_addr,
&iv, &iv,
@@ -152,7 +173,7 @@ fn main() -> std::io::Result<()> {
None => false, None => false,
}, },
encryption_key, encryption_key,
cli.network_id, network_id,
salt, salt,
Vec::with_capacity(1), Vec::with_capacity(1),
); );
@@ -195,18 +216,43 @@ fn main() -> std::io::Result<()> {
let encrypted = network_write_lock.encrypted; let encrypted = network_write_lock.encrypted;
let key = network_write_lock.key; let key = network_write_lock.key;
network_write_lock.peers.iter_mut().for_each(|peer| { network_write_lock.peers.iter_mut().for_each(|peer| {
println!(
"{} firing salvo of PUNCHING packets to {}",
"[LOG]".blue(),
peer.sock_addr
);
for _ in 0..MAPPING_SHOT_COUNT {
match socket.send_to(&[P2PMethods::DO_NOTHING as u8], peer.sock_addr) {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => eprintln!("{} failed to send puching packet: {}", "[ERROR]".red(), e),
}
}
println!(
"{} packets away!, awiting a bit for NAT mappings to estabilish",
"[LOG]".blue()
);
std::thread::sleep(Duration::from_millis(2000));
for _ in 0..STANDARD_RETRY_MAX {
match net::P2P_query(&mut buf, &peer.sock_addr, &socket, encrypted, key) { match net::P2P_query(&mut buf, &peer.sock_addr, &socket, encrypted, key) {
Ok(ip) => { Ok(ip) => {
ips_used[ip.octets()[3] as usize] = true; ips_used[ip.octets()[3] as usize] = true;
peer.private_ip = ip; peer.private_ip = ip;
break;
} }
Err(e) => eprintln!( Err(e) => {
eprintln!(
"{} while getting ip from peer: {}, Error: {}", "{} while getting ip from peer: {}, Error: {}",
"[ERROR]".red(), "[ERROR]".red(),
peer.sock_addr, peer.sock_addr,
e e
), );
}; std::thread::sleep(Duration::from_millis(2000));
}
}
}
}); });
network_write_lock.private_ip = std::net::Ipv4Addr::new( network_write_lock.private_ip = std::net::Ipv4Addr::new(
@@ -273,6 +319,7 @@ fn main() -> std::io::Result<()> {
} // just let me have my thread } // just let me have my thread
smol::block_on(async { smol::block_on(async {
println!("{} listener started!", "[LOG]".blue());
loop { loop {
buf.fill(0); buf.fill(0);
match socket.recv_from(&mut buf) { match socket.recv_from(&mut buf) {
@@ -287,7 +334,7 @@ fn main() -> std::io::Result<()> {
socket.clone(), socket.clone(),
data_lenght, data_lenght,
)) ))
.detach(); .await;
} }
Err(e) => { Err(e) => {
eprintln!( eprintln!(
+73 -11
View File
@@ -8,6 +8,7 @@ use super::types;
use colored::Colorize; use colored::Colorize;
use pea_2_pea::{shared::net::send_and_recv_with_retry, *}; use pea_2_pea::{shared::net::send_and_recv_with_retry, *};
use rand::{RngCore, rng}; use rand::{RngCore, rng};
use sha2::Digest;
pub fn query_request( pub fn query_request(
buf: &mut [u8; UDP_BUFFER_SIZE], buf: &mut [u8; UDP_BUFFER_SIZE],
@@ -254,7 +255,7 @@ pub fn get_request(
pub fn send_heartbeat( pub fn send_heartbeat(
buf: &mut [u8; UDP_BUFFER_SIZE], buf: &mut [u8; UDP_BUFFER_SIZE],
dst: &SocketAddr, dst: &SocketAddr,
socket: &UdpSocket, socket: Arc<std::net::UdpSocket>,
network: &types::Network, network: &types::Network,
my_public_sock_addr: &Box<[u8]>, my_public_sock_addr: &Box<[u8]>,
iv: &[u8; BLOCK_SIZE as usize], iv: &[u8; BLOCK_SIZE as usize],
@@ -299,7 +300,16 @@ pub fn send_heartbeat(
.collect::<String>(), .collect::<String>(),
); );
match send_and_recv_with_retry(buf, &send_buf, dst, socket, STANDARD_RETRY_MAX) { {
let sock_clone = socket.clone();
let send_buf_clone: Box<[u8]> = send_buf.clone();
let dst_clone: SocketAddr = dst.clone();
std::thread::spawn(move || {
periodic_heart_beat(sock_clone, send_buf_clone, dst_clone);
});
}
match send_and_recv_with_retry(buf, &send_buf, dst, &socket, STANDARD_RETRY_MAX) {
Ok((data_lenght, _)) => return Ok(data_lenght), Ok((data_lenght, _)) => return Ok(data_lenght),
Err(e) => return Err(e), Err(e) => return Err(e),
} }
@@ -413,7 +423,7 @@ pub fn P2P_hello(
} }
pub async fn handle_incoming_connection( pub async fn handle_incoming_connection(
mut buf: [u8; UDP_BUFFER_SIZE], buf: [u8; UDP_BUFFER_SIZE],
src: SocketAddr, src: SocketAddr,
network: Arc<RwLock<types::Network>>, network: Arc<RwLock<types::Network>>,
tun_iface: Arc<tappers::Tun>, tun_iface: Arc<tappers::Tun>,
@@ -425,23 +435,37 @@ pub async fn handle_incoming_connection(
match buf[0] { match buf[0] {
x if x == P2PMethods::PACKET as u8 => { x if x == P2PMethods::PACKET as u8 => {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
println!("PACKET from difernt peer receved"); println!("PACKET from different peer receved");
if network.read().unwrap().encrypted { if network.read().unwrap().encrypted {
match shared::crypto::decrypt( match shared::crypto::decrypt(
&network.read().unwrap().key, &network.read().unwrap().key,
&buf[P2PStandardDataPositions::IV as usize &buf[P2PStandardDataPositions::IV as usize
..P2PStandardDataPositions::IV as usize + BLOCK_SIZE], ..P2PStandardDataPositions::IV as usize + BLOCK_SIZE],
&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/], &buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize],
) { ) {
Ok(data) => match tun_iface.send(&data) { Ok(data) => {
#[cfg(debug_assertions)]
eprintln!(
"packet contets: {}\nhash: {:x}",
data.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
{
let mut hasher = sha2::Sha256::new();
hasher.update(&data);
hasher.finalize()
}
);
match tun_iface.send(&data) {
Ok(_) => {} Ok(_) => {}
Err(e) => eprintln!( Err(e) => eprintln!(
"{} failed to write packet to tun interface, Error: {}", "{} failed to write packet to tun interface, Error: {}",
"[WARNING]".yellow(), "[WARNING]".yellow(),
e e
), ),
}, }
}
Err(e) => eprintln!( Err(e) => eprintln!(
"{} failed to decrypt packet, Error: {}", "{} failed to decrypt packet, Error: {}",
"[WARNING]".yellow(), "[WARNING]".yellow(),
@@ -449,9 +473,15 @@ pub async fn handle_incoming_connection(
), ),
} }
} else { } else {
match tun_iface.send(&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize-1 /*compensate for size and index diference*/]) { match tun_iface
Ok(_) => {}, .send(&buf[P2PStandardDataPositions::DATA as usize..data_lenght as usize])
Err(e) => eprintln!("{} failed to write packet to tun interface, Error: {}", "[WARNING]".yellow(), e), {
Ok(_) => {}
Err(e) => eprintln!(
"{} failed to write packet to tun interface, Error: {}",
"[WARNING]".yellow(),
e
),
}; };
} }
} }
@@ -511,6 +541,11 @@ pub async fn handle_incoming_connection(
x if x == P2PMethods::PEER_HELLO as u8 => { x if x == P2PMethods::PEER_HELLO as u8 => {
println!("{} peer hello receved from: {}", "[LOG]".blue(), src); println!("{} peer hello receved from: {}", "[LOG]".blue(), src);
if data_lenght - 1 < P2PStandardDataPositions::DATA as usize {
eprintln!("{} peer hello packet too small", "[ERROR]".red());
return;
}
let tmp_data: Vec<u8>; let tmp_data: Vec<u8>;
{ {
let mut network_write_lock = network.write().unwrap(); let mut network_write_lock = network.write().unwrap();
@@ -677,7 +712,7 @@ pub async fn handle_incoming_connection(
} }
}; };
for _ in 0..MAPPING_SHOT_COUNT { for _ in 0..MAPPING_SHOT_COUNT {
match socket.send_to(&[P2PMethods::PEER_QUERY as u8], peer_addr) { match socket.send_to(&[P2PMethods::DO_NOTHING as u8], peer_addr) {
Ok(s) => { Ok(s) => {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
eprintln!("send {} bytes", s); eprintln!("send {} bytes", s);
@@ -686,6 +721,12 @@ pub async fn handle_incoming_connection(
} }
} }
} }
x if x == P2PMethods::DO_NOTHING as u8 => {
println!(
"{} punching succesful DO_NOTHING receved",
"[SUCCESS]".green()
);
}
_ => { _ => {
eprintln!( eprintln!(
"{} unknown method ID: 0x{:02x}, Droping!", "{} unknown method ID: 0x{:02x}, Droping!",
@@ -695,3 +736,24 @@ pub async fn handle_incoming_connection(
} }
} }
} }
pub fn periodic_heart_beat(socket: Arc<UdpSocket>, send_buf: Box<[u8]>, dst: SocketAddr) {
loop {
std::thread::sleep(std::time::Duration::from_secs(30));
println!("{} sending heartbeat to server", "[LOG]".blue());
match socket.send_to(&send_buf, dst) {
Ok(size) => {
#[cfg(debug_assertions)]
println!("send {} bytes", size);
}
Err(e) => {
eprintln!(
"{} failed to send heartbeat to server Error: {}",
"[ERROR]".red(),
e
);
}
}
}
}
+55 -11
View File
@@ -1,8 +1,9 @@
use std::sync::{Arc, RwLock};
use pea_2_pea::*; use pea_2_pea::*;
use rand::RngCore; use rand::RngCore;
use rayon::prelude::*; use rayon::prelude::*;
use sha2::Digest;
use std::sync::{Arc, RwLock};
use tappers::Interface;
use crate::types::Network; use crate::types::Network;
@@ -10,20 +11,41 @@ pub fn create_tun_interface(
private_ip: std::net::Ipv4Addr, private_ip: std::net::Ipv4Addr,
if_name: Option<String>, if_name: Option<String>,
) -> Result<tappers::Tun, std::io::Error> { ) -> Result<tappers::Tun, std::io::Error> {
let mut tun_iface: tappers::Tun = tappers::Tun::new_named(tappers::Interface::new( #[cfg(not(target_os = "windows"))]
if_name.unwrap_or("pea0".to_owned()), let mut tun_iface: tappers::Tun = tappers::Tun::new_named(Interface::new(
&if_name.unwrap_or(DEFAULT_INTERFACE_NAME.to_owned()),
)?)?; )?)?;
#[cfg(target_os = "windows")]
let mut tun_iface: tappers::Tun = tappers::Tun::new()?;
#[cfg(not(target_os = "windows"))]
{
let mut addr_req = tappers::AddAddressV4::new(private_ip); let mut addr_req = tappers::AddAddressV4::new(private_ip);
addr_req.set_netmask(24); addr_req.set_netmask(24);
let mut broadcast_addr_oct = private_ip.octets(); let mut broadcast_addr_oct = private_ip.octets();
broadcast_addr_oct[3] = 255; broadcast_addr_oct[3] = 255;
addr_req.set_broadcast(std::net::Ipv4Addr::from(broadcast_addr_oct)); addr_req.set_broadcast(std::net::Ipv4Addr::from(broadcast_addr_oct));
tun_iface.add_addr(addr_req)?; tun_iface.add_addr(addr_req)?;
}
#[cfg(target_os = "windows")]
std::process::Command::new("netsh").args([
"interface",
"ipv4",
"set",
"address",
&format!(
"name=\"{}\"",
tun_iface.name()?.name().into_string().unwrap()
),
"static",
&private_ip.to_string(),
"255.255.255.0",
]);
tun_iface.set_up()?; tun_iface.set_up()?;
return Ok(tun_iface); return Ok(tun_iface);
} }
pub async fn read_tun_iface( pub fn read_tun_iface(
tun_iface: Arc<tappers::Tun>, tun_iface: Arc<tappers::Tun>,
socket: Arc<std::net::UdpSocket>, socket: Arc<std::net::UdpSocket>,
network: Arc<RwLock<Network>>, network: Arc<RwLock<Network>>,
@@ -31,10 +53,12 @@ pub async fn read_tun_iface(
let mut buf: [u8; IP_BUFFER_SIZE] = [0u8; IP_BUFFER_SIZE]; let mut buf: [u8; IP_BUFFER_SIZE] = [0u8; IP_BUFFER_SIZE];
smol::block_on(async { smol::block_on(async {
#[cfg(debug_assertions)]
eprintln!("Started listening for ip packets");
loop { loop {
let data_lenght = tun_iface.recv(&mut buf).unwrap(); // build in auto termination, isn't it great let data_lenght = tun_iface.recv(&mut buf).unwrap(); // build in auto termination, isn't it great
smol::spawn(handle_ip_packet( smol::spawn(handle_ip_packet(
buf[..data_lenght - 1].to_vec().into(), buf[..data_lenght].to_vec().into(),
network.clone(), network.clone(),
socket.clone(), socket.clone(),
)) ))
@@ -48,6 +72,23 @@ pub async fn handle_ip_packet(
network: Arc<RwLock<Network>>, network: Arc<RwLock<Network>>,
socket: Arc<std::net::UdpSocket>, socket: Arc<std::net::UdpSocket>,
) { ) {
#[cfg(debug_assertions)]
eprintln!("Processing IP packet");
#[cfg(debug_assertions)]
eprintln!(
"packet contets: {}\nhash: {:x}",
packet_data
.iter()
.map(|x| format!("{:02X} ", x))
.collect::<String>(),
{
let mut hasher = sha2::Sha256::new();
hasher.update(&packet_data);
hasher.finalize()
}
);
let dst_ip = std::net::Ipv4Addr::from( let dst_ip = std::net::Ipv4Addr::from(
match <[u8; 4]>::try_from( match <[u8; 4]>::try_from(
&packet_data[DEST_IN_IPV4_OFFSET..DEST_IN_IPV4_OFFSET + IPV4_SIZE], &packet_data[DEST_IN_IPV4_OFFSET..DEST_IN_IPV4_OFFSET + IPV4_SIZE],
@@ -64,22 +105,25 @@ pub async fn handle_ip_packet(
let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE]; let mut iv: [u8; BLOCK_SIZE] = [0u8; BLOCK_SIZE];
rng.fill_bytes(&mut iv); rng.fill_bytes(&mut iv);
let mut encrypted_data = let mut procesed_data: Vec<u8> = if network.read().unwrap().encrypted {
match shared::crypto::encrypt(&network.read().unwrap().key, &iv, &packet_data) { match shared::crypto::encrypt(&network.read().unwrap().key, &iv, &packet_data) {
Ok(cr) => cr, Ok(cr) => cr,
Err(e) => { Err(e) => {
eprintln!("Failed to encrypt packet droping it: {}", e); eprintln!("Failed to encrypt packet droping it: {}", e);
return; return;
} }
}
} else {
packet_data.to_vec()
}; };
encrypted_data.insert(0, P2PMethods::PACKET as u8); procesed_data.insert(0, P2PMethods::PACKET as u8);
encrypted_data.splice(1..1, iv); procesed_data.splice(1..1, iv);
if dst_ip.octets()[3] == 255 { if dst_ip.octets()[3] == 255 {
network.read().unwrap().peers.par_iter().for_each(|peer| { network.read().unwrap().peers.par_iter().for_each(|peer| {
// broadcast // broadcast
match socket.send_to(&encrypted_data, peer.sock_addr) { match socket.send_to(&procesed_data, peer.sock_addr) {
Ok(_) => {} Ok(_) => {}
Err(e) => eprintln!("failed to send packet: {}", e), Err(e) => eprintln!("failed to send packet: {}", e),
}; };
@@ -97,7 +141,7 @@ pub async fn handle_ip_packet(
None => return, None => return,
}; };
match socket.send_to(&encrypted_data, dst) { match socket.send_to(&procesed_data, dst) {
Ok(_) => {} Ok(_) => {}
Err(e) => eprintln!("failed to send packet: {}", e), Err(e) => eprintln!("failed to send packet: {}", e),
}; };
+4 -1
View File
@@ -4,7 +4,7 @@ pub const SERVER_PORT: u16 = 3543;
pub const UDP_BUFFER_SIZE: usize = 65527; pub const UDP_BUFFER_SIZE: usize = 65527;
pub const IP_BUFFER_SIZE: usize = 65535; pub const IP_BUFFER_SIZE: usize = 65535;
pub const DEFAULT_TIMEOUT: u64 = 30; pub const DEFAULT_TIMEOUT: u64 = 30;
pub const VERSION: &str = "v0.1"; pub const VERSION: &str = "v1.1";
pub const BLOCK_SIZE: usize = 16; pub const BLOCK_SIZE: usize = 16;
pub const STANDARD_RETRY_MAX: usize = 10; pub const STANDARD_RETRY_MAX: usize = 10;
@@ -15,6 +15,8 @@ pub const MAPPING_SHOT_COUNT: u8 = 5;
pub const DEFAULT_NETWORK_PREFIX: [u8; 3] = [172, 22, 44]; pub const DEFAULT_NETWORK_PREFIX: [u8; 3] = [172, 22, 44];
pub const DEFAULT_INTERFACE_NAME: &str = "pea0";
#[repr(u8)] #[repr(u8)]
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub enum ServerMethods { pub enum ServerMethods {
@@ -122,6 +124,7 @@ pub enum P2PMethods {
PEER_GOODBYE = 22, // sends private ip encrypted if on PEER_GOODBYE = 22, // sends private ip encrypted if on
PACKET = 23, // sends IP packet encrypted if on PACKET = 23, // sends IP packet encrypted if on
NEW_CLIENT_NOTIFY = 24, NEW_CLIENT_NOTIFY = 24,
DO_NOTHING = 25,
} }
#[repr(usize)] #[repr(usize)]
pub enum P2PStandardDataPositions { pub enum P2PStandardDataPositions {
+13 -12
View File
@@ -1,34 +1,35 @@
mod net; mod net;
mod types; mod types;
mod utils; mod utils;
use std::{ use smol::net::UdpSocket;
net::UdpSocket, use std::{process::exit, sync::Arc};
process::exit,
sync::{Arc, RwLock},
};
use orx_concurrent_vec::ConcurrentVec; use orx_concurrent_vec::ConcurrentVec;
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
{ {
let socket: Arc<UdpSocket> = Arc::new( let socket: Arc<UdpSocket> = Arc::new(
(|| -> std::io::Result<UdpSocket> { smol::block_on(async {
let listen_port: u16 = pea_2_pea::SERVER_PORT; let listen_port: u16 = pea_2_pea::SERVER_PORT;
match UdpSocket::bind(format!("0.0.0.0:{}", listen_port)) { UdpSocket::bind(format!("0.0.0.0:{}", listen_port)).await
Ok(socket) => return Ok(socket), })
Err(e) => return Err(e),
}
})()
.expect("Failed to bind to any available port"), .expect("Failed to bind to any available port"),
); );
let registration_vector: Arc<ConcurrentVec<types::Registration>> = let registration_vector: Arc<ConcurrentVec<types::Registration>> =
Arc::new(orx_concurrent_vec::ConcurrentVec::new()); Arc::new(orx_concurrent_vec::ConcurrentVec::new());
{
let reg_clone = registration_vector.clone();
std::thread::spawn(move || {
utils::disconnected_cleaner(reg_clone);
});
}
let mut buf: [u8; pea_2_pea::UDP_BUFFER_SIZE] = [0u8; pea_2_pea::UDP_BUFFER_SIZE]; let mut buf: [u8; pea_2_pea::UDP_BUFFER_SIZE] = [0u8; pea_2_pea::UDP_BUFFER_SIZE];
smol::block_on(async { smol::block_on(async {
loop { loop {
buf.fill(0); buf.fill(0);
match socket.recv_from(&mut buf) { match socket.recv_from(&mut buf).await {
Ok((data_length, src)) => { Ok((data_length, src)) => {
smol::spawn(net::handle_request( smol::spawn(net::handle_request(
buf, buf,
+24 -73
View File
@@ -1,17 +1,29 @@
use crate::utils::send_general_error_to_client; use crate::utils::send_general_error_to_client;
use smol::net::UdpSocket;
use super::types; use super::types;
use super::utils; use super::utils;
use colored::Colorize;
use orx_concurrent_vec::ConcurrentVec; use orx_concurrent_vec::ConcurrentVec;
use pea_2_pea::*; use pea_2_pea::*;
use rayon::prelude::*; use rayon::prelude::*;
use std::sync::Arc; use std::sync::Arc;
use std::u8; use std::u8;
async fn send_with_count(socket: std::sync::Arc<UdpSocket> , dst: &core::net::SocketAddr, buf: &[u8]) {
match socket.send_to(buf, dst).await {
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
}
pub async fn handle_request( pub async fn handle_request(
buf: [u8; UDP_BUFFER_SIZE], buf: [u8; UDP_BUFFER_SIZE],
socket: std::sync::Arc<std::net::UdpSocket>, socket: std::sync::Arc<UdpSocket>,
src: core::net::SocketAddr, src: core::net::SocketAddr,
data_len: usize, data_len: usize,
registration_vector: Arc<ConcurrentVec<types::Registration>>, registration_vector: Arc<ConcurrentVec<types::Registration>>,
@@ -25,15 +37,7 @@ pub async fn handle_request(
let mut send_vec: Vec<u8> = client_sock_addr_str.into(); let mut send_vec: Vec<u8> = client_sock_addr_str.into();
send_vec.insert(0, ServerMethods::QUERY as u8); send_vec.insert(0, ServerMethods::QUERY as u8);
match socket.send_to(&send_vec, &src) { send_with_count(socket, &src, &send_vec).await;
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
} }
x if x == ServerMethods::GET as u8 => { x if x == ServerMethods::GET as u8 => {
@@ -63,15 +67,7 @@ pub async fn handle_request(
.find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists
{ {
Some(registration) => registration, Some(registration) => registration,
None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src){ None => {futures::executor::block_on(send_with_count(socket, &src ,&[ServerResponse::ID_DOESNT_EXIST as u8]));
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
};
return; return;
}, },
} }
@@ -131,15 +127,7 @@ pub async fn handle_request(
return; return;
} }
match socket.send_to(&send_vec, &src) { send_with_count(socket, &src, &send_vec).await;
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error snding data: {}", e);
}
}
} }
x if x == ServerMethods::REGISTER as u8 => { x if x == ServerMethods::REGISTER as u8 => {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
@@ -178,15 +166,7 @@ pub async fn handle_request(
.find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists .find(|elem| elem.map(|s| &s.net_id == &net_id)) // find if id exists
{ {
Some(_) => { Some(_) => {
match socket.send_to(&[ServerResponse::ID_EXISTS as u8], src) { futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_EXISTS as u8]));
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
};
return; return;
} }
None => {} None => {}
@@ -245,15 +225,7 @@ pub async fn handle_request(
iv, iv,
src src
)); ));
match socket.send_to(&[ServerMethods::REGISTER as u8], src) { send_with_count(socket, &src, &[ServerMethods::REGISTER as u8]).await;
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
}
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
println!("network registered"); println!("network registered");
} }
@@ -333,13 +305,9 @@ pub async fn handle_request(
send_buf[0] = P2PMethods::NEW_CLIENT_NOTIFY as u8; send_buf[0] = P2PMethods::NEW_CLIENT_NOTIFY as u8;
send_buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize+ BLOCK_SIZE].copy_from_slice(&iv); send_buf[P2PStandardDataPositions::IV as usize..P2PStandardDataPositions::IV as usize+ BLOCK_SIZE].copy_from_slice(&iv);
send_buf[P2PStandardDataPositions::DATA as usize..P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].copy_from_slice(&sock_addr); send_buf[P2PStandardDataPositions::DATA as usize..P2PStandardDataPositions::DATA as usize + sock_addr_len as usize].copy_from_slice(&sock_addr);
match socket.send_to(&send_buf, src) { let sock_clone = socket.clone();
Ok(data_lenght) => { futures::executor::block_on(async move {
#[cfg(debug_assertions)] send_with_count(sock_clone, &c.src, &send_buf).await});
eprintln!("send {} bytes", data_lenght);
},
Err(e) => eprintln!("{} failed to send data to client Error: {}", "[ERROR]".red(), e),
};
}); });
r.clients.push(types::Client::new(sock_addr.clone(), current_time, iv, src)); r.clients.push(types::Client::new(sock_addr.clone(), current_time, iv, src));
@@ -347,26 +315,9 @@ pub async fn handle_request(
}; };
}); });
} }
None => {match socket.send_to(&[ServerResponse::ID_DOESNT_EXIST as u8], src) { None => {futures::executor::block_on(send_with_count(socket, &src, &[ServerResponse::ID_DOESNT_EXIST as u8])); return;}
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
} return;}
}
match socket.send_to(&[ServerMethods::HEARTBEAT as u8], src) {
// succes responce
Ok(s) => {
#[cfg(debug_assertions)]
eprintln!("send {} bytes", s);
}
Err(e) => {
eprintln!("Error sending data: {}", e);
}
} }
send_with_count(socket, &src, &[ServerMethods::HEARTBEAT as u8]).await;
return; return;
} }
_ => { _ => {
+2 -59
View File
@@ -1,5 +1,4 @@
use pea_2_pea::*; use pea_2_pea::*;
use std::sync::{Arc, atomic::Ordering};
#[derive(Clone)] #[derive(Clone)]
#[readonly::make] #[readonly::make]
@@ -41,6 +40,7 @@ pub struct Registration {
pub encrypted: bool, pub encrypted: bool,
#[readonly] #[readonly]
pub salt: [u8; BLOCK_SIZE as usize], pub salt: [u8; BLOCK_SIZE as usize],
pub invalid: bool,
} }
impl Registration { impl Registration {
@@ -64,64 +64,7 @@ impl Registration {
encrypted, encrypted,
last_heart_beat: heart_beat, last_heart_beat: heart_beat,
salt: salt.unwrap_or([0; BLOCK_SIZE as usize]), salt: salt.unwrap_or([0; BLOCK_SIZE as usize]),
invalid: false,
} }
} }
} }
pub struct BatchLock {
inner: std::sync::Mutex<bool>, // true = blocking new locks
condvar: std::sync::Condvar,
active_count: std::sync::atomic::AtomicUsize,
}
pub struct LockGuard {
lock: Arc<BatchLock>,
}
impl BatchLock {
pub fn new() -> Arc<Self> {
Arc::new(BatchLock {
inner: std::sync::Mutex::new(false),
condvar: std::sync::Condvar::new(),
active_count: std::sync::atomic::AtomicUsize::new(0),
})
}
// Acquire a lock (blocks if waiting for all to unlock)
pub fn lock(self: &Arc<Self>) -> LockGuard {
let mut blocking = self.inner.lock().unwrap();
// Wait while new locks are blocked
while *blocking {
blocking = self.condvar.wait(blocking).unwrap();
}
self.active_count.fetch_add(1, Ordering::SeqCst);
LockGuard {
lock: Arc::clone(self),
}
}
// Block new locks and wait for all current locks to finish
pub fn wait_all_unlock(self: &Arc<Self>) {
// Block new locks
*self.inner.lock().unwrap() = true;
// Wait for all active locks to finish
while self.active_count.load(Ordering::SeqCst) > 0 {
std::thread::sleep(std::time::Duration::from_millis(1));
}
// Allow new locks again
*self.inner.lock().unwrap() = false;
self.condvar.notify_all();
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
// Automatically release lock when guard is dropped
self.lock.active_count.fetch_sub(1, Ordering::SeqCst);
}
}
+23 -1
View File
@@ -1,8 +1,10 @@
use colored::Colorize;
use pea_2_pea::*; use pea_2_pea::*;
pub fn send_general_error_to_client<T: std::error::Error>( pub fn send_general_error_to_client<T: std::error::Error>(
dst: core::net::SocketAddr, dst: core::net::SocketAddr,
e: T, e: T,
socket: std::sync::Arc<std::net::UdpSocket>, socket: std::sync::Arc<smol::net::UdpSocket>,
) { ) {
let mut resp_buf: Box<[u8]> = vec![0; e.to_string().len() + 1].into_boxed_slice(); let mut resp_buf: Box<[u8]> = vec![0; e.to_string().len() + 1].into_boxed_slice();
@@ -11,3 +13,23 @@ pub fn send_general_error_to_client<T: std::error::Error>(
let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst); let _ = socket.send_to(&[ServerResponse::GENERAL_ERROR as u8], dst);
} }
pub fn disconnected_cleaner(
registration_vector: std::sync::Arc<
orx_concurrent_vec::ConcurrentVec<crate::types::Registration>,
>,
) {
loop {
std::thread::sleep(std::time::Duration::from_secs(120));
println!("{} starting cleanup", "[LOG]".blue());
let time_now = chrono::Utc::now().timestamp();
unsafe {
registration_vector.iter_mut().for_each(|reg| {
reg.clients.retain(|c| time_now - c.last_heart_beat < 120);
if time_now - reg.last_heart_beat > 120 {
reg.invalid = true;
}
})
}
}
}
+8 -2
View File
@@ -127,8 +127,10 @@ pub fn send_and_recv_with_retry(
let mut retry_count: usize = 0; let mut retry_count: usize = 0;
let mut resend: bool = true;
loop { loop {
match socket.send_to(send_buf, dst) { if resend {match socket.send_to(send_buf, dst) {
Ok(s) => { Ok(s) => {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
eprintln!("send {} bytes", s); eprintln!("send {} bytes", s);
@@ -145,7 +147,7 @@ pub fn send_and_recv_with_retry(
} }
_ => return Err(ServerErrorResponses::IO(e)), _ => return Err(ServerErrorResponses::IO(e)),
}, },
} }} else {resend = true;}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
if let Err(icmp_error) = check_icmp_error_queue(socket) { if let Err(icmp_error) = check_icmp_error_queue(socket) {
@@ -176,6 +178,10 @@ pub fn send_and_recv_with_retry(
x if x == ServerResponse::ID_EXISTS as u8 => { x if x == ServerResponse::ID_EXISTS as u8 => {
return Err(ServerErrorResponses::ID_EXISTS); return Err(ServerErrorResponses::ID_EXISTS);
} }
x if x == P2PMethods::DO_NOTHING as u8 => {
resend = false;
continue;
}
_ => { _ => {
continue; continue;
} }