Skip to content

Commit

Permalink
dev: udp client timeouts and errors
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 30, 2024
1 parent 052d068 commit 2a72445
Show file tree
Hide file tree
Showing 13 changed files with 556 additions and 403 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ serde_bytes = "0"
serde_json = "1"
serde_repr = "0"
thiserror = "1"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "packages/clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
torrust-tracker-contrib-bencode = { version = "3.0.0-alpha.12-develop", path = "contrib/bencode" }
torrust-tracker-located-error = { version = "3.0.0-alpha.12-develop", path = "packages/located-error" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "packages/primitives" }
Expand All @@ -89,7 +89,7 @@ members = [
"packages/located-error",
"packages/primitives",
"packages/test-helpers",
"packages/torrent-repository"
"packages/torrent-repository",
]

[profile.dev]
Expand All @@ -104,4 +104,4 @@ opt-level = 3

[target.x86_64-unknown-linux-gnu]
linker = "/usr/bin/clang"
rustflags = ["-Clink-arg=-fuse-ld=lld", "-Clink-arg=-Wl,--no-rosegment"]
rustflags = ["-Clink-arg=-Wl,--no-rosegment", "-Clink-arg=-fuse-ld=lld"]
14 changes: 14 additions & 0 deletions packages/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ use std::collections::HashMap;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{env, fs};

use config::{Config, ConfigError, File, FileFormat};
Expand All @@ -246,6 +247,19 @@ use torrust_tracker_primitives::{DatabaseDriver, TrackerMode};
/// The maximum number of returned peers for a torrent.
pub const TORRENT_PEERS_LIMIT: usize = 74;

/// Client Timeout
pub const CLIENT_TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);

/// The maximum number of bytes in a UDP packet.
pub const MAX_PACKET_SIZE: usize = 1496;

/// The a free port is dynamically chosen by the operating system.
pub const PORT_ASSIGNED_BY_OS: u16 = 0;

/// A magic 64-bit integer constant defined in the protocol that is used to
/// identify the protocol.
pub const PROTOCOL_ID: i64 = 0x0417_2710_1980;

#[derive(Copy, Clone, Debug, PartialEq, Constructor)]
pub struct TrackerPolicy {
pub remove_peerless_torrents: bool,
Expand Down
10 changes: 5 additions & 5 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
description = "A library that provides a repository of torrents files and their peers."
keywords = ["torrents", "repository", "library"]
keywords = ["library", "repository", "torrents"]
name = "torrust-tracker-torrent-repository"
readme = "README.md"

Expand All @@ -17,15 +17,15 @@ version.workspace = true

[dependencies]
futures = "0.3.29"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync"] }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }

[dev-dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
criterion = { version = "0", features = ["async_tokio"] }
rstest = "0"
async-std = {version = "1", features = ["attributes", "tokio1"] }

[[bench]]
harness = false
Expand Down
159 changes: 98 additions & 61 deletions src/console/clients/checker/checks/udp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::net::SocketAddr;
use std::time::Duration;

use aquatic_udp_protocol::{Port, TransactionId};
use colored::Colorize;
use hex_literal::hex;
use log::debug;
Expand All @@ -10,83 +9,121 @@ use torrust_tracker_primitives::info_hash::InfoHash;
use crate::console::clients::checker::console::Console;
use crate::console::clients::checker::printer::Printer;
use crate::console::clients::checker::service::{CheckError, CheckResult};
use crate::console::clients::udp::checker;

const ASSIGNED_BY_OS: u16 = 0;
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;
use crate::console::clients::udp::checker::{self, Client};
use crate::console::clients::udp::Error;

pub async fn run(udp_trackers: Vec<SocketAddr>, _: Duration, console: Console) -> Vec<CheckResult> {
let mut check_results = Vec::default();

console.println("UDP trackers ...");

for ref udp_tracker in udp_trackers {
debug!("UDP tracker: {:?}", udp_tracker);

let colored_tracker_url = udp_tracker.to_string().yellow();

let transaction_id = TransactionId(RANDOM_TRANSACTION_ID);

let mut client = checker::Client::default();

debug!("Bind and connect");
let info_hash = InfoHash(hex!("9c38422213e30bff212b30c360d26f9a02136422")); // # DevSkim: ignore DS173237

for ref addr in udp_trackers {
debug!("UDP tracker: {:?}", addr);

let colored_addr = addr.to_string().yellow();

// Setup Connection
let Ok((client, ctx)) = ({
let res = setup_connection(addr).await;

check_results.push(match res {
Ok(_) => {
console.println(&format!("{} - Setup of {} is OK", "✓".green(), colored_addr));
Ok(())
}
Err(ref e) => {
console.println(&format!("{} - Setup of {} is failing", "✗".red(), colored_addr));
Err(CheckError::UdpCheckError {
addr: *addr,
err: e.clone(),
})
}
});

res
}) else {
break;
};

let Ok(bound_to) = client.bind_and_connect(ASSIGNED_BY_OS, udp_tracker).await else {
check_results.push(Err(CheckError::UdpError {
socket_addr: *udp_tracker,
}));
console.println(&format!("{} - Can't connect to socket {}", "✗".red(), colored_tracker_url));
// Do Announce
if {
let res = check_udp_announce(&client, &ctx, info_hash).await;

check_results.push(match res {
Ok(_) => {
console.println(&format!("{} - Announce of {} is OK", "✓".green(), colored_addr));
Ok(())
}
Err(ref e) => {
console.println(&format!("{} - Announce of {} is failing", "✗".red(), colored_addr));
Err(CheckError::UdpCheckError {
addr: *addr,
err: e.clone(),
})
}
});

res
}
.is_err()
{
break;
};

debug!("Send connection request");

let Ok(connection_id) = client.send_connection_request(transaction_id).await else {
check_results.push(Err(CheckError::UdpError {
socket_addr: *udp_tracker,
}));
console.println(&format!(
"{} - Can't make tracker connection request to {}",
"✗".red(),
colored_tracker_url
));
// Do Scrape
if {
let res = check_udp_scrape(&client, &ctx, vec![info_hash]).await;

check_results.push(match res {
Ok(_) => {
console.println(&format!("{} - Announce of {} is OK", "✓".green(), colored_addr));
Ok(())
}
Err(ref e) => {
console.println(&format!("{} - Announce of {} is failing", "✗".red(), colored_addr));
Err(CheckError::UdpCheckError {
addr: *addr,
err: e.clone(),
})
}
});
res
}
.is_err()
{
break;
};
}

let info_hash = InfoHash(hex!("9c38422213e30bff212b30c360d26f9a02136422")); // # DevSkim: ignore DS173237
check_results
}

debug!("Send announce request");
async fn setup_connection(addr: &SocketAddr) -> Result<(Client, aquatic_udp_protocol::ConnectResponse), Error> {
let client = checker::Client::bind_and_connect(addr).await?;

if (client
.send_announce_request(connection_id, transaction_id, info_hash, Port(bound_to.port()))
.await)
.is_ok()
{
check_results.push(Ok(()));
console.println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url));
} else {
let err = CheckError::UdpError {
socket_addr: *udp_tracker,
};
check_results.push(Err(err));
console.println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url));
}
let transaction_id = aquatic_udp_protocol::TransactionId(rand::Rng::gen(&mut rand::thread_rng()));

debug!("Send scrape request");
let ctx = client.send_connection_request(transaction_id).await?;

let info_hashes = vec![InfoHash(hex!("9c38422213e30bff212b30c360d26f9a02136422"))]; // # DevSkim: ignore DS173237
Ok((client, ctx))
}

if (client.send_scrape_request(connection_id, transaction_id, info_hashes).await).is_ok() {
check_results.push(Ok(()));
console.println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url));
} else {
let err = CheckError::UdpError {
socket_addr: *udp_tracker,
};
check_results.push(Err(err));
console.println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url));
}
}
async fn check_udp_announce(
client: &Client,
ctx: &aquatic_udp_protocol::ConnectResponse,
info_hash: InfoHash,
) -> Result<aquatic_udp_protocol::Response, Error> {
client
.send_announce_request(ctx, info_hash, aquatic_udp_protocol::Port(client.local_addr()?.port()))
.await
}

check_results
async fn check_udp_scrape(
client: &Client,
ctx: &aquatic_udp_protocol::ConnectResponse,
infohashes: Vec<InfoHash>,
) -> Result<aquatic_udp_protocol::Response, Error> {
client.send_scrape_request(ctx, infohashes).await
}
6 changes: 3 additions & 3 deletions src/console/clients/checker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::checks;
use super::config::Configuration;
use super::console::Console;
use crate::console::clients::checker::printer::Printer;
use crate::console::clients::http;
use crate::console::clients::{http, udp};

pub struct Service {
pub(crate) config: Arc<Configuration>,
Expand All @@ -21,8 +21,8 @@ pub type CheckResult = Result<(), CheckError>;

#[derive(Debug, Clone, Error)]
pub enum CheckError {
#[error("Error In Udp: socket: {socket_addr:?}")]
UdpError { socket_addr: SocketAddr },
#[error("Error In Udp: {addr:?}")]
UdpCheckError { addr: SocketAddr, err: udp::Error },
#[error("Error In Http: url: {url:?}")]
HttpCheckError { url: Url, err: http::Error },
#[error("Error In HeathCheck: url: {url:?}")]
Expand Down
19 changes: 9 additions & 10 deletions src/console/clients/udp/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use url::Url;
use crate::console::clients::udp::checker;
use crate::console::clients::udp::responses::{AnnounceResponseDto, ScrapeResponseDto};

const ASSIGNED_BY_OS: u16 = 0;
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -144,29 +143,29 @@ fn setup_logging(level: LevelFilter) {
async fn handle_announce(tracker_socket_addr: &SocketAddr, info_hash: &TorrustInfoHash) -> anyhow::Result<Response> {
let transaction_id = TransactionId(RANDOM_TRANSACTION_ID);

let mut client = checker::Client::default();
let client = checker::Client::bind_and_connect(tracker_socket_addr).await?;

let bound_to = client.bind_and_connect(ASSIGNED_BY_OS, tracker_socket_addr).await?;
let bound_to = client.client.local_addr()?;

let connection_id = client.send_connection_request(transaction_id).await?;
let ctx = client.send_connection_request(transaction_id).await?;

client
.send_announce_request(connection_id, transaction_id, *info_hash, Port(bound_to.port()))
.send_announce_request(&ctx, *info_hash, Port(bound_to.port()))
.await
.context("failed to handle announce")
}

async fn handle_scrape(tracker_socket_addr: &SocketAddr, info_hashes: &[TorrustInfoHash]) -> anyhow::Result<Response> {
let transaction_id = TransactionId(RANDOM_TRANSACTION_ID);

let mut client = checker::Client::default();
let client = checker::Client::bind_and_connect(tracker_socket_addr).await?;

let _bound_to = client.bind_and_connect(ASSIGNED_BY_OS, tracker_socket_addr).await?;

let connection_id = client.send_connection_request(transaction_id).await?;
let ctx = client.send_connection_request(transaction_id).await?;

client
.send_scrape_request(connection_id, transaction_id, info_hashes.to_vec())
.send_scrape_request(&ctx, info_hashes.to_vec())
.await
.context("failed to handle scrape")
}

fn print_response(response: Response) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit 2a72445

Please sign in to comment.