Skip to content

Commit

Permalink
Merge torrust#955: udp: processor for requests
Browse files Browse the repository at this point in the history
019cf9f udp: processor for requests (Cameron Garnham)

Pull request description:

  Little work to cleanup the server udp launcher module.

ACKs for top commit:
  da2ce7:
    ACK 019cf9f

Tree-SHA512: fc32f73f7969d6c6b7f112306b2132cb853fbbe120979ea5826f835b570dff646fc73df66585a33c3223c85a3fd73373d9afcad6761ec5b67d7dca6a4688547d
  • Loading branch information
da2ce7 committed Jul 13, 2024
2 parents 035d630 + 019cf9f commit f2629ce
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 84 deletions.
6 changes: 3 additions & 3 deletions src/servers/udp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
/// - Delegating the request to the correct handler depending on the request type.
///
/// It will return an `Error` response if the request is invalid.
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Tracker, local_addr: SocketAddr) -> Response {
debug!("Handling Packets: {udp_request:?}");

let start_time = Instant::now();
Expand All @@ -47,7 +47,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker
}
}) {
Ok(request) => {
log_request(&request, &request_id, &addr);
log_request(&request, &request_id, &local_addr);

let transaction_id = match &request {
Request::Connect(connect_request) => connect_request.transaction_id,
Expand All @@ -62,7 +62,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker

let latency = start_time.elapsed();

log_response(&response, &transaction_id, &request_id, &addr, latency);
log_response(&response, &transaction_id, &request_id, &local_addr, latency);

response
}
Expand Down
7 changes: 2 additions & 5 deletions src/servers/udp/server/bound_socket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::Arc;

use url::Url;

use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket.
pub struct BoundSocket {
socket: Arc<tokio::net::UdpSocket>,
socket: tokio::net::UdpSocket,
}

impl BoundSocket {
Expand All @@ -30,9 +29,7 @@ impl BoundSocket {
let local_addr = format!("udp://{addr}");
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)");

Ok(Self {
socket: Arc::new(socket),
})
Ok(Self { socket })
}

/// # Panics
Expand Down
65 changes: 5 additions & 60 deletions src/servers/udp/server/launcher.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use aquatic_udp_protocol::Response;
use derive_more::Constructor;
use futures_util::StreamExt;
use tokio::select;
use tokio::sync::oneshot;

use super::request_buffer::ActiveRequests;
use super::RawRequest;
use crate::bootstrap::jobs::Started;
use crate::core::Tracker;
use crate::servers::logging::STARTED_ON;
use crate::servers::registar::ServiceHealthCheckJob;
use crate::servers::signals::{shutdown_signal_with_message, Halted};
use crate::servers::udp::server::bound_socket::BoundSocket;
use crate::servers::udp::server::processor::Processor;
use crate::servers::udp::server::receiver::Receiver;
use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET};
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
use crate::shared::bit_torrent::tracker::udp::client::check;
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;

/// A UDP server instance launcher.
#[derive(Constructor)]
Expand Down Expand Up @@ -109,6 +106,8 @@ impl Launcher {
let local_addr = format!("udp://{addr}");

loop {
let processor = Processor::new(receiver.socket.clone(), tracker.clone());

if let Some(req) = {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
receiver.next().await
Expand Down Expand Up @@ -138,9 +137,7 @@ impl Launcher {
// are only adding and removing tasks without given them the
// chance to finish. However, the buffer is yielding before
// aborting one tasks, giving it the chance to finish.
let abort_handle: tokio::task::AbortHandle =
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
.abort_handle();
let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();

if abort_handle.is_finished() {
continue;
Expand All @@ -156,56 +153,4 @@ impl Launcher {
}
}
}

async fn process_request(request: RawRequest, tracker: Arc<Tracker>, socket: Arc<BoundSocket>) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");
Self::process_valid_request(tracker, socket, request).await;
}

async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<BoundSocket>, udp_request: RawRequest) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}");
let from = udp_request.from;
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await;
Self::send_response(&socket.clone(), from, response).await;
}

async fn send_response(bound_socket: &Arc<BoundSocket>, to: SocketAddr, response: Response) {
let response_type = match &response {
Response::Connect(_) => "Connect".to_string(),
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
Response::Scrape(_) => "Scrape".to_string(),
Response::Error(e) => format!("Error: {e:?}"),
};

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");

let buffer = vec![0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer);

match response.write_bytes(&mut cursor) {
Ok(()) => {
#[allow(clippy::cast_possible_truncation)]
let position = cursor.position() as usize;
let inner = cursor.get_ref();

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" );
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)");

Self::send_packet(bound_socket, &to, &inner[..position]).await;

tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)");
}
Err(e) => {
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
}
}
}

async fn send_packet(bound_socket: &Arc<BoundSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");

// doesn't matter if it reaches or not
drop(bound_socket.send_to(payload, remote_addr).await);
}
}
1 change: 1 addition & 0 deletions src/servers/udp/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::RawRequest;

pub mod bound_socket;
pub mod launcher;
pub mod processor;
pub mod receiver;
pub mod request_buffer;
pub mod spawner;
Expand Down
66 changes: 66 additions & 0 deletions src/servers/udp/server/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;

use aquatic_udp_protocol::Response;

use super::bound_socket::BoundSocket;
use crate::core::Tracker;
use crate::servers::udp::{handlers, RawRequest, UDP_TRACKER_LOG_TARGET};

pub struct Processor {
socket: Arc<BoundSocket>,
tracker: Arc<Tracker>,
}

impl Processor {
pub fn new(socket: Arc<BoundSocket>, tracker: Arc<Tracker>) -> Self {
Self { socket, tracker }
}

pub async fn process_request(self, request: RawRequest) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");

let from = request.from;
let response = handlers::handle_packet(request, &self.tracker, self.socket.address()).await;
self.send_response(from, response).await;
}

async fn send_response(self, to: SocketAddr, response: Response) {
let response_type = match &response {
Response::Connect(_) => "Connect".to_string(),
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
Response::Scrape(_) => "Scrape".to_string(),
Response::Error(e) => format!("Error: {e:?}"),
};

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");

let mut writer = Cursor::new(Vec::with_capacity(200));

match response.write_bytes(&mut writer) {
Ok(()) => {
let bytes_count = writer.get_ref().len();
let payload = writer.get_ref();

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sending...)" );
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, ?payload, "Udp::send_response (sending...)");

self.send_packet(&to, payload).await;

tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sent)");
}
Err(e) => {
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
}
}
}

async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");

// doesn't matter if it reaches or not
drop(self.socket.send_to(payload, remote_addr).await);
}
}
8 changes: 4 additions & 4 deletions src/servers/udp/server/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ use super::RawRequest;
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;

pub struct Receiver {
pub bound_socket: Arc<BoundSocket>,
pub socket: Arc<BoundSocket>,
data: RefCell<[u8; MAX_PACKET_SIZE]>,
}

impl Receiver {
#[must_use]
pub fn new(bound_socket: Arc<BoundSocket>) -> Self {
Receiver {
bound_socket,
socket: bound_socket,
data: RefCell::new([0; MAX_PACKET_SIZE]),
}
}

pub fn bound_socket_address(&self) -> SocketAddr {
self.bound_socket.address()
self.socket.address()
}
}

Expand All @@ -36,7 +36,7 @@ impl Stream for Receiver {
let mut buf = *self.data.borrow_mut();
let mut buf = tokio::io::ReadBuf::new(&mut buf);

let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else {
let Poll::Ready(ready) = self.socket.poll_recv_from(cx, &mut buf) else {
return Poll::Pending;
};

Expand Down
12 changes: 0 additions & 12 deletions src/shared/bit_torrent/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! `BitTorrent` protocol primitive types
//!
//! [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html)
use serde::{Deserialize, Serialize};
/// The maximum number of torrents that can be returned in an `scrape` response.
///
Expand All @@ -21,14 +20,3 @@ pub const MAX_SCRAPE_TORRENTS: u8 = 74;
/// See function to [`generate`](crate::core::auth::generate) the
/// [`ExpiringKeys`](crate::core::auth::ExpiringKey) for more information.
pub const AUTH_KEY_LENGTH: usize = 32;

#[repr(u32)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
enum Actions {
// todo: it seems this enum is not used anywhere. Values match the ones in
// aquatic_udp_protocol::request::Request::from_bytes.
Connect = 0,
Announce = 1,
Scrape = 2,
Error = 3,
}

0 comments on commit f2629ce

Please sign in to comment.