From 5ed94e3441d927316b892c599187c1ee9a2ada39 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Wed, 20 Mar 2024 04:44:51 +0800 Subject: [PATCH] dev: add some torrent repo tests --- packages/primitives/src/announce_event.rs | 2 +- packages/primitives/src/info_hash.rs | 12 + packages/primitives/src/lib.rs | 2 +- packages/primitives/src/peer.rs | 2 +- packages/primitives/src/torrent_metrics.rs | 12 +- packages/torrent-repository/src/entry/mod.rs | 2 +- .../torrent-repository/src/entry/mutex_std.rs | 8 +- .../src/entry/mutex_tokio.rs | 8 +- .../torrent-repository/src/repository/mod.rs | 38 +- .../src/repository/rw_lock_std.rs | 6 +- .../src/repository/rw_lock_std_mutex_std.rs | 6 +- .../src/repository/rw_lock_std_mutex_tokio.rs | 6 +- .../src/repository/rw_lock_tokio.rs | 6 +- .../src/repository/rw_lock_tokio_mutex_std.rs | 6 +- .../repository/rw_lock_tokio_mutex_tokio.rs | 6 +- .../torrent-repository/tests/common/mod.rs | 2 + .../tests/common/torrent.rs | 88 +++ .../tests/common/torrents.rs | 148 +++++ .../torrent-repository/tests/entry/mod.rs | 553 ++++++------------ .../tests/repository/mod.rs | 193 ++++++ src/core/mod.rs | 18 +- .../apis/v1/context/stats/resources.rs | 12 +- 22 files changed, 726 insertions(+), 410 deletions(-) create mode 100644 packages/torrent-repository/tests/common/torrent.rs create mode 100644 packages/torrent-repository/tests/common/torrents.rs diff --git a/packages/primitives/src/announce_event.rs b/packages/primitives/src/announce_event.rs index 16e47da99..3bd560084 100644 --- a/packages/primitives/src/announce_event.rs +++ b/packages/primitives/src/announce_event.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; /// Announce events. Described on the /// [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html) -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum AnnounceEvent { /// The peer has started downloading the torrent. Started, diff --git a/packages/primitives/src/info_hash.rs b/packages/primitives/src/info_hash.rs index 46ae6283e..1275e7d52 100644 --- a/packages/primitives/src/info_hash.rs +++ b/packages/primitives/src/info_hash.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use std::panic::Location; use thiserror::Error; @@ -77,6 +78,17 @@ impl std::convert::From<&[u8]> for InfoHash { } } +/// for testing +impl std::convert::From<&DefaultHasher> for InfoHash { + fn from(data: &DefaultHasher) -> InfoHash { + let n = data.finish().to_le_bytes(); + InfoHash([ + n[0], n[1], n[2], n[3], n[4], n[5], n[6], n[7], n[0], n[1], n[2], n[3], n[4], n[5], n[6], n[7], n[0], n[1], n[2], + n[3], + ]) + } +} + impl std::convert::From<[u8; 20]> for InfoHash { fn from(val: [u8; 20]) -> Self { InfoHash(val) diff --git a/packages/primitives/src/lib.rs b/packages/primitives/src/lib.rs index 664c0c82d..f89c69af3 100644 --- a/packages/primitives/src/lib.rs +++ b/packages/primitives/src/lib.rs @@ -38,7 +38,7 @@ pub enum IPVersion { } /// Number of bytes downloaded, uploaded or pending to download (left) by the peer. -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct NumberOfBytes(pub i64); /// The database management system used by the tracker. diff --git a/packages/primitives/src/peer.rs b/packages/primitives/src/peer.rs index 65f7b83ef..f5b009f2a 100644 --- a/packages/primitives/src/peer.rs +++ b/packages/primitives/src/peer.rs @@ -51,7 +51,7 @@ use crate::{ser_unix_time_value, DurationSinceUnixEpoch, IPVersion, NumberOfByte /// event: AnnounceEvent::Started, /// }; /// ``` -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Copy)] +#[derive(Debug, Clone, Serialize, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Peer { /// ID used by the downloader peer pub peer_id: Id, diff --git a/packages/primitives/src/torrent_metrics.rs b/packages/primitives/src/torrent_metrics.rs index c60507171..02de02954 100644 --- a/packages/primitives/src/torrent_metrics.rs +++ b/packages/primitives/src/torrent_metrics.rs @@ -6,20 +6,20 @@ use std::ops::AddAssign; #[derive(Copy, Clone, Debug, PartialEq, Default)] pub struct TorrentsMetrics { /// Total number of seeders for all torrents - pub seeders: u64, + pub complete: u64, /// Total number of peers that have ever completed downloading for all torrents. - pub completed: u64, + pub downloaded: u64, /// Total number of leechers for all torrents. - pub leechers: u64, + pub incomplete: u64, /// Total number of torrents. pub torrents: u64, } impl AddAssign for TorrentsMetrics { fn add_assign(&mut self, rhs: Self) { - self.seeders += rhs.seeders; - self.completed += rhs.completed; - self.leechers += rhs.leechers; + self.complete += rhs.complete; + self.downloaded += rhs.downloaded; + self.incomplete += rhs.incomplete; self.torrents += rhs.torrents; } } diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 93a064af3..cdfa88c45 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -86,7 +86,7 @@ pub trait EntryAsync { /// This is the tracker entry for a given torrent and contains the swarm data, /// that's the list of all the peers trying to download the same torrent. /// The tracker keeps one entry like this for every torrent. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Torrent { /// The swarm: a network of peers that are all trying to download the torrent associated to this entry // #[serde(skip)] diff --git a/packages/torrent-repository/src/entry/mutex_std.rs b/packages/torrent-repository/src/entry/mutex_std.rs index 20df3514f..6151f5f03 100644 --- a/packages/torrent-repository/src/entry/mutex_std.rs +++ b/packages/torrent-repository/src/entry/mutex_std.rs @@ -5,7 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntrySync}; -use crate::EntryMutexStd; +use crate::{EntryMutexStd, EntrySingle}; impl EntrySync for EntryMutexStd { fn get_stats(&self) -> SwarmMetadata { @@ -48,3 +48,9 @@ impl EntrySync for EntryMutexStd { .remove_inactive_peers(current_cutoff); } } + +impl From for EntryMutexStd { + fn from(entry: EntrySingle) -> Self { + Arc::new(std::sync::Mutex::new(entry)) + } +} diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index 622c20bbd..cc7fd5934 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -5,7 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntryAsync}; -use crate::EntryMutexTokio; +use crate::{EntryMutexTokio, EntrySingle}; impl EntryAsync for EntryMutexTokio { async fn get_stats(&self) -> SwarmMetadata { @@ -44,3 +44,9 @@ impl EntryAsync for EntryMutexTokio { self.lock().await.remove_inactive_peers(current_cutoff); } } + +impl From for EntryMutexTokio { + fn from(entry: EntrySingle) -> Self { + Arc::new(tokio::sync::Mutex::new(entry)) + } +} diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index b46771163..494040c9d 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -12,7 +12,9 @@ pub mod rw_lock_tokio; pub mod rw_lock_tokio_mutex_std; pub mod rw_lock_tokio_mutex_tokio; -pub trait Repository: Default + 'static { +use std::fmt::Debug; + +pub trait Repository: Debug + Default + Sized + 'static { fn get(&self, key: &InfoHash) -> Option; fn get_metrics(&self) -> TorrentsMetrics; fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, T)>; @@ -24,7 +26,7 @@ pub trait Repository: Default + 'static { } #[allow(clippy::module_name_repetitions)] -pub trait RepositoryAsync: Default + 'static { +pub trait RepositoryAsync: Debug + Default + Sized + 'static { fn get(&self, key: &InfoHash) -> impl std::future::Future> + Send; fn get_metrics(&self) -> impl std::future::Future + Send; fn get_paginated(&self, pagination: Option<&Pagination>) -> impl std::future::Future> + Send; @@ -39,12 +41,36 @@ pub trait RepositoryAsync: Default + 'static { ) -> impl std::future::Future + Send; } -#[derive(Default)] +#[derive(Default, Debug)] +pub struct RwLockStd { + torrents: std::sync::RwLock>, +} + +#[derive(Default, Debug)] pub struct RwLockTokio { torrents: tokio::sync::RwLock>, } -#[derive(Default)] -pub struct RwLockStd { - torrents: std::sync::RwLock>, +impl RwLockStd { + /// # Panics + /// + /// Panics if unable to get a lock. + pub fn write( + &self, + ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { + self.torrents.write().expect("it should get lock") + } +} + +impl RwLockTokio { + pub fn write( + &self, + ) -> impl std::future::Future< + Output = tokio::sync::RwLockWriteGuard< + '_, + std::collections::BTreeMap, + >, + > { + self.torrents.write() + } } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index 5da4e8b82..29fa24155 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -49,9 +49,9 @@ where for entry in self.get_torrents().values() { let stats = entry.get_stats(); - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index daafd0a0e..0b65234e3 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -57,9 +57,9 @@ where for entry in self.get_torrents().values() { let stats = entry.lock().expect("it should get a lock").get_stats(); - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 55b6e56e1..d039be240 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -75,9 +75,9 @@ where for entry in entries { let stats = entry.lock().await.get_stats(); - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index 5ad47ac5b..fa84e2451 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -64,9 +64,9 @@ where for entry in self.get_torrents().await.values() { let stats = entry.get_stats(); - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index 2429e07da..fbbc51a09 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -72,9 +72,9 @@ where for entry in self.get_torrents().await.values() { let stats = entry.get_stats(); - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index 5837211ed..7f0394179 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -72,9 +72,9 @@ where for entry in self.get_torrents().await.values() { let stats = entry.get_stats().await; - metrics.seeders += u64::from(stats.complete); - metrics.completed += u64::from(stats.downloaded); - metrics.leechers += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); metrics.torrents += 1; } diff --git a/packages/torrent-repository/tests/common/mod.rs b/packages/torrent-repository/tests/common/mod.rs index c77ca2769..84639ef60 100644 --- a/packages/torrent-repository/tests/common/mod.rs +++ b/packages/torrent-repository/tests/common/mod.rs @@ -1 +1,3 @@ +pub mod torrent; pub mod torrent_peer_builder; +pub mod torrents; diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs new file mode 100644 index 000000000..012f84e20 --- /dev/null +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; +use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; +use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; + +#[derive(Debug, Clone)] +pub(crate) enum Torrent { + Single(EntrySingle), + MutexStd(EntryMutexStd), + MutexTokio(EntryMutexTokio), +} + +impl Torrent { + pub(crate) async fn get_stats(&self) -> SwarmMetadata { + match self { + Torrent::Single(entry) => entry.get_stats(), + Torrent::MutexStd(entry) => entry.get_stats(), + Torrent::MutexTokio(entry) => entry.clone().get_stats().await, + } + } + + pub(crate) async fn is_good(&self, policy: &TrackerPolicy) -> bool { + match self { + Torrent::Single(entry) => entry.is_good(policy), + Torrent::MutexStd(entry) => entry.is_good(policy), + Torrent::MutexTokio(entry) => entry.clone().is_good(policy).await, + } + } + + pub(crate) async fn peers_is_empty(&self) -> bool { + match self { + Torrent::Single(entry) => entry.peers_is_empty(), + Torrent::MutexStd(entry) => entry.peers_is_empty(), + Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, + } + } + + pub(crate) async fn get_peers_len(&self) -> usize { + match self { + Torrent::Single(entry) => entry.get_peers_len(), + Torrent::MutexStd(entry) => entry.get_peers_len(), + Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, + } + } + + pub(crate) async fn get_peers(&self, limit: Option) -> Vec> { + match self { + Torrent::Single(entry) => entry.get_peers(limit), + Torrent::MutexStd(entry) => entry.get_peers(limit), + Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, + } + } + + pub(crate) async fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option) -> Vec> { + match self { + Torrent::Single(entry) => entry.get_peers_for_peer(client, limit), + Torrent::MutexStd(entry) => entry.get_peers_for_peer(client, limit), + Torrent::MutexTokio(entry) => entry.clone().get_peers_for_peer(client, limit).await, + } + } + + pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + match self { + Torrent::Single(entry) => entry.insert_or_update_peer(peer), + Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer), + Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await, + } + } + + pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { + match self { + Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer), + Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer), + Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await, + } + } + + pub(crate) async fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { + match self { + Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), + Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), + Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, + } + } +} diff --git a/packages/torrent-repository/tests/common/torrents.rs b/packages/torrent-repository/tests/common/torrents.rs new file mode 100644 index 000000000..0b5c5bdba --- /dev/null +++ b/packages/torrent-repository/tests/common/torrents.rs @@ -0,0 +1,148 @@ +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; +use torrust_tracker_torrent_repository::{ + EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, +}; + +#[derive(Debug)] +pub(crate) enum Torrents { + Std(TorrentsRwLockStd), + StdMutexStd(TorrentsRwLockStdMutexStd), + StdMutexTokio(TorrentsRwLockStdMutexTokio), + Tokio(TorrentsRwLockTokio), + TokioMutexStd(TorrentsRwLockTokioMutexStd), + TokioMutexTokio(TorrentsRwLockTokioMutexTokio), +} + +#[allow(dead_code)] +impl Torrents { + pub(crate) async fn get(&self, key: &InfoHash) -> Option { + match self { + Torrents::Std(repo) => repo.get(key), + Torrents::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Torrents::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Torrents::Tokio(repo) => repo.get(key).await, + Torrents::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), + Torrents::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + } + } + pub(crate) async fn get_metrics(&self) -> TorrentsMetrics { + match self { + Torrents::Std(repo) => repo.get_metrics(), + Torrents::StdMutexStd(repo) => repo.get_metrics(), + Torrents::StdMutexTokio(repo) => repo.get_metrics().await, + Torrents::Tokio(repo) => repo.get_metrics().await, + Torrents::TokioMutexStd(repo) => repo.get_metrics().await, + Torrents::TokioMutexTokio(repo) => repo.get_metrics().await, + } + } + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + match self { + Torrents::Std(repo) => repo.get_paginated(pagination), + Torrents::StdMutexStd(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), + Torrents::StdMutexTokio(repo) => { + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + + for (i, t) in repo.get_paginated(pagination).await { + v.push((i, t.lock().await.clone())); + } + v + } + Torrents::Tokio(repo) => repo.get_paginated(pagination).await, + Torrents::TokioMutexStd(repo) => repo + .get_paginated(pagination) + .await + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), + Torrents::TokioMutexTokio(repo) => { + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + + for (i, t) in repo.get_paginated(pagination).await { + v.push((i, t.lock().await.clone())); + } + v + } + } + } + pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + match self { + Torrents::Std(repo) => repo.import_persistent(persistent_torrents), + Torrents::StdMutexStd(repo) => repo.import_persistent(persistent_torrents), + Torrents::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Torrents::Tokio(repo) => repo.import_persistent(persistent_torrents).await, + Torrents::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, + Torrents::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + } + } + pub(crate) async fn remove(&self, key: &InfoHash) -> Option { + match self { + Torrents::Std(repo) => repo.remove(key), + Torrents::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Torrents::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Torrents::Tokio(repo) => repo.remove(key).await, + Torrents::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), + Torrents::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + } + } + pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + match self { + Torrents::Std(repo) => repo.remove_inactive_peers(current_cutoff), + Torrents::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Torrents::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Torrents::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Torrents::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, + Torrents::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + } + } + pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + match self { + Torrents::Std(repo) => repo.remove_peerless_torrents(policy), + Torrents::StdMutexStd(repo) => repo.remove_peerless_torrents(policy), + Torrents::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Torrents::Tokio(repo) => repo.remove_peerless_torrents(policy).await, + Torrents::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, + Torrents::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + } + } + pub(crate) async fn update_torrent_with_peer_and_get_stats( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + ) -> (bool, SwarmMetadata) { + match self { + Torrents::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Torrents::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Torrents::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Torrents::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Torrents::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Torrents::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + } + } + pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { + match self { + Torrents::Std(repo) => repo.write().insert(*info_hash, torrent), + Torrents::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()), + Torrents::StdMutexTokio(repo) => { + let r = repo.write().insert(*info_hash, torrent.into()); + match r { + Some(t) => Some(t.lock().await.clone()), + None => None, + } + } + Torrents::Tokio(repo) => repo.write().await.insert(*info_hash, torrent), + Torrents::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()), + Torrents::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()), + } + } +} diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index e7d791304..2afbb1b37 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -1,6 +1,5 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::ops::Sub; -use std::sync::Arc; use std::time::Duration; use rstest::{fixture, rstest}; @@ -9,185 +8,25 @@ use torrust_tracker_clock::clock::{self, Time as _}; use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::peer::Peer; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, NumberOfBytes}; -use torrust_tracker_torrent_repository::entry::{Entry, EntryAsync, EntrySync}; +use torrust_tracker_primitives::{peer, NumberOfBytes}; use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; use crate::CurrentClock; -#[derive(Debug)] -pub enum Torrent { - Single(EntrySingle), - MutexStd(EntryMutexStd), - MutexTokio(EntryMutexTokio), -} - -impl Torrent { - async fn get_stats(&self) -> SwarmMetadata { - match self { - Torrent::Single(entry) => entry.get_stats(), - Torrent::MutexStd(entry) => entry.get_stats(), - Torrent::MutexTokio(entry) => entry.clone().get_stats().await, - } - } - - async fn is_good(&self, policy: &TrackerPolicy) -> bool { - match self { - Torrent::Single(entry) => entry.is_good(policy), - Torrent::MutexStd(entry) => entry.is_good(policy), - Torrent::MutexTokio(entry) => entry.clone().is_good(policy).await, - } - } - - async fn peers_is_empty(&self) -> bool { - match self { - Torrent::Single(entry) => entry.peers_is_empty(), - Torrent::MutexStd(entry) => entry.peers_is_empty(), - Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, - } - } - - async fn get_peers_len(&self) -> usize { - match self { - Torrent::Single(entry) => entry.get_peers_len(), - Torrent::MutexStd(entry) => entry.get_peers_len(), - Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, - } - } - - async fn get_peers(&self, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.get_peers(limit), - Torrent::MutexStd(entry) => entry.get_peers(limit), - Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, - } - } - - async fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.get_peers_for_peer(client, limit), - Torrent::MutexStd(entry) => entry.get_peers_for_peer(client, limit), - Torrent::MutexTokio(entry) => entry.clone().get_peers_for_peer(client, limit).await, - } - } - - async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { - match self { - Torrent::Single(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await, - } - } - - async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - match self { - Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await, - } - } - - async fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { - match self { - Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), - Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), - Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, - } - } -} - #[fixture] -async fn single_empty() -> (Torrent, Peer) { - (Torrent::Single(EntrySingle::default()), a_started_peer()) -} -#[fixture] -async fn mutex_std_empty() -> (Torrent, Peer) { - (Torrent::MutexStd(EntryMutexStd::default()), a_started_peer()) -} - -#[fixture] -async fn mutex_tokio_empty() -> (Torrent, Peer) { - (Torrent::MutexTokio(EntryMutexTokio::default()), a_started_peer()) -} - -#[fixture] -async fn single_with_started() -> (Torrent, Peer) { - let mut torrent = Torrent::Single(EntrySingle::default()); - let peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} - -#[fixture] -async fn mutex_std_with_started() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexStd(EntryMutexStd::default()); - let peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} -#[fixture] -async fn mutex_tokio_with_started() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexTokio(EntryMutexTokio::default()); - let peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} - -#[fixture] -async fn single_with_completed() -> (Torrent, Peer) { - let mut torrent = Torrent::Single(EntrySingle::default()); - let peer = a_completed_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} - -#[fixture] -async fn mutex_std_with_completed() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexStd(EntryMutexStd::default()); - let peer = a_completed_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} -#[fixture] -async fn mutex_tokio_with_completed() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexTokio(EntryMutexTokio::default()); - let peer = a_completed_peer(); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) +fn single() -> Torrent { + Torrent::Single(EntrySingle::default()) } - #[fixture] -async fn single_with_downloaded() -> (Torrent, Peer) { - let mut torrent = Torrent::Single(EntrySingle::default()); - let mut peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - peer.event = AnnounceEvent::Completed; - peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) +fn standard_mutex() -> Torrent { + Torrent::MutexStd(EntryMutexStd::default()) } #[fixture] -async fn mutex_std_with_downloaded() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexStd(EntryMutexStd::default()); - let mut peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - peer.event = AnnounceEvent::Completed; - peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) -} -#[fixture] -async fn mutex_tokio_with_downloaded() -> (Torrent, Peer) { - let mut torrent = Torrent::MutexTokio(EntryMutexTokio::default()); - let mut peer = a_started_peer(); - torrent.insert_or_update_peer(&peer).await; - peer.event = AnnounceEvent::Completed; - peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer).await; - (torrent, peer) +fn mutex_tokio() -> Torrent { + Torrent::MutexTokio(EntryMutexTokio::default()) } #[fixture] @@ -210,42 +49,61 @@ fn policy_remove_persist() -> TrackerPolicy { TrackerPolicy::new(true, 0, true) } +pub enum Makes { + Empty, + Started, + Completed, + Downloaded, +} + +async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { + match makes { + Makes::Empty => vec![], + Makes::Started => { + let peer = a_started_peer(); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + Makes::Completed => { + let peer = a_completed_peer(); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + Makes::Downloaded => { + let mut peer = a_started_peer(); + torrent.insert_or_update_peer(&peer).await; + peer.event = AnnounceEvent::Completed; + peer.left = NumberOfBytes(0); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + } +} + #[rstest] -#[case::single(single_empty())] -#[case::mutex_std(mutex_std_empty())] -#[case::mutex_tokio(mutex_tokio_empty())] +#[case::empty(&Makes::Empty)] #[tokio::test] async fn it_should_have_a_empty_peerlist_for_a_default_torrent_entry( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (torrent, _) = torrent_peer.await; + make(&mut torrent, makes).await; assert_eq!(torrent.get_peers_len().await, 0); } #[rstest] -#[case::single_empty(single_empty())] -#[case::mutex_std_empty(mutex_std_empty())] -#[case::mutex_tokio_empty(mutex_tokio_empty())] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn it_should_check_if_torrent_entry_is_good( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { - let (torrent, _) = torrent_peer.await; + make(&mut torrent, makes).await; let has_peers = !torrent.peers_is_empty().await; let has_downloads = torrent.get_stats().await.downloaded != 0; @@ -272,112 +130,124 @@ async fn it_should_check_if_torrent_entry_is_good( } #[rstest] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[tokio::test] async fn it_should_add_a_peer_to_a_torrent_entry( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (torrent, peer) = torrent_peer.await; + let peers = make(&mut torrent, makes).await; - assert_eq!(*torrent.get_peers(None).await[0], peer); - assert_eq!(torrent.get_peers(None).await.len(), 1); -} + let torrent_peers = torrent.get_peers(None).await; -#[rstest] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] -#[tokio::test] -async fn a_torrent_entry_should_contain_the_list_of_peers_that_were_added_to_the_torrent( - #[future] - #[case] - torrent_peer: (Torrent, Peer), -) { - let (torrent, peer) = torrent_peer.await; + assert_eq!(torrent_peers.len(), peers.len()); - assert_eq!(torrent.get_peers(None).await, vec![Arc::new(peer)]); + for peer in torrent_peers { + assert!(peers.contains(&peer)); + } } #[rstest] -#[case::single(single_with_started())] -#[case::mutex_std(mutex_std_with_started())] -#[case::mutex_tokio(mutex_tokio_with_started())] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn a_peer_can_be_updated_in_a_torrent_entry( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + use torrust_tracker_primitives::peer::ReadInfo as _; + + make(&mut torrent, makes).await; + + let peer_id = peer::Id::from(1); + let mut peer = a_started_peer(); + peer.peer_id = peer_id; + + torrent.insert_or_update_peer(&peer).await; + + let peers = torrent.get_peers(None).await; - assert_eq!(torrent.get_peers(None).await[0].event, AnnounceEvent::Started); + // Original + let original = peers + .iter() + .find(|p| p.get_id() == peer_id) + .expect("it should find peer by id"); + + assert_eq!(original.event, AnnounceEvent::Started); // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; torrent.insert_or_update_peer(&peer).await; // Update the peer in the torrent entry - assert_eq!(torrent.get_peers(None).await[0].event, AnnounceEvent::Completed); + let peers = torrent.get_peers(None).await; + + // It should be updated now. + let updated = peers + .iter() + .find(|p| p.get_id() == peer_id) + .expect("it should find peer by id"); + + assert_eq!(updated.event, AnnounceEvent::Completed); } #[rstest] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn a_peer_should_be_removed_from_a_torrent_entry_when_the_peer_announces_it_has_stopped( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + use torrust_tracker_primitives::peer::ReadInfo as _; + + make(&mut torrent, makes).await; + + let peer_id = peer::Id::from(1); + let mut peer = a_started_peer(); + peer.peer_id = peer_id; + + torrent.insert_or_update_peer(&peer).await; + + // The started peer should be inserted. + let peers = torrent.get_peers(None).await; + let original = peers + .iter() + .find(|p| p.get_id() == peer_id) + .expect("it should find peer by id"); - assert_eq!(torrent.get_peers_len().await, 1); + assert_eq!(original.event, AnnounceEvent::Started); - // Announce "Stopped" torrent event. + // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.insert_or_update_peer(&peer).await; // Update the peer in the torrent entry + torrent.insert_or_update_peer(&peer).await; - assert_eq!(torrent.get_peers_len().await, 0); + // It should be removed now. + let peers = torrent.get_peers(None).await; + let updated = peers.iter().find(|p| p.get_id() == peer_id); + + assert_eq!(updated, None); } #[rstest] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn torrent_stats_change_when_a_previously_known_peer_announces_it_has_completed_the_torrent( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); let is_already_completed = peer.event == AnnounceEvent::Completed; @@ -392,22 +262,18 @@ async fn torrent_stats_change_when_a_previously_known_peer_announces_it_has_comp } #[rstest] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn torrent_stats_should_have_the_number_of_peers_that_having_announced_at_least_two_events_the_latest_one_is_the_completed_event( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; @@ -418,69 +284,55 @@ async fn torrent_stats_should_have_the_number_of_peers_that_having_announced_at_ } #[rstest] -#[case::single(single_with_started())] -#[case::mutex_std(mutex_std_with_started())] -#[case::mutex_tokio(mutex_tokio_with_started())] +#[case::started(&Makes::Started)] #[tokio::test] async fn torrent_stats_should_have_the_number_of_leechers_for_a_torrent( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (torrent, _) = torrent_peer.await; + make(&mut torrent, makes).await; assert_eq!(torrent.get_stats().await.incomplete, 1); } #[rstest] -#[case::single(single_with_completed())] -#[case::mutex_std(mutex_std_with_completed())] -#[case::mutex_tokio(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn torrent_stats_should_have_the_number_of_seeders_for_a_torrent( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (torrent, _) = torrent_peer.await; + make(&mut torrent, makes).await; assert_eq!(torrent.get_stats().await.complete, 1); } #[rstest] -#[case::single(single_empty())] -#[case::mutex_std(mutex_std_empty())] -#[case::mutex_tokio(mutex_tokio_empty())] +#[case::empty(&Makes::Empty)] #[tokio::test] async fn torrent_stats_should_not_change_when_a_peer_announces_it_has_completed_the_torrent_if_it_is_the_first_announce_from_the_peer( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; - peer.event = AnnounceEvent::Completed; + let peer = a_completed_peer(); // Add a completed peer that did not exist before in the entry assert!(!torrent.insert_or_update_peer(&peer).await); } #[rstest] -#[case::single(single_empty())] -#[case::mutex_std(mutex_std_empty())] -#[case::mutex_tokio(mutex_tokio_empty())] +#[case::empty(&Makes::Empty)] #[tokio::test] async fn torrent_stats_should_not_include_a_peer_in_the_completed_counter_if_the_peer_has_announced_only_one_event( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; - peer.event = AnnounceEvent::Completed; + let peer = a_completed_peer(); // Announce "Completed" torrent download event. // It's the first event announced from this peer. @@ -490,25 +342,18 @@ async fn torrent_stats_should_not_include_a_peer_in_the_completed_counter_if_the } #[rstest] -#[case::single_empty(single_empty())] -#[case::mutex_std_empty(mutex_std_empty())] -#[case::mutex_tokio_empty(mutex_tokio_empty())] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn a_torrent_entry_should_return_the_list_of_peers_for_a_given_peer_filtering_out_the_client_that_is_making_the_request( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); peer.peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); @@ -521,19 +366,17 @@ async fn a_torrent_entry_should_return_the_list_of_peers_for_a_given_peer_filter } #[rstest] -#[case::single(single_empty())] -#[case::mutex_std(mutex_std_empty())] -#[case::mutex_tokio(mutex_tokio_empty())] +#[case::empty(&Makes::Empty)] #[tokio::test] async fn two_peers_with_the_same_ip_but_different_port_should_be_considered_different_peers( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { const IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - let (mut torrent, peer) = torrent_peer.await; - let (mut peer_a, mut peer_b) = (peer, peer); + make(&mut torrent, makes).await; + + let (mut peer_a, mut peer_b) = (a_started_peer(), a_started_peer()); peer_a.peer_addr = SocketAddr::new(IP, 8080); peer_b.peer_addr = SocketAddr::new(IP, 8081); @@ -550,29 +393,19 @@ async fn two_peers_with_the_same_ip_but_different_port_should_be_considered_diff } #[rstest] -#[case::single_empty(single_empty())] -#[case::mutex_std_empty(mutex_std_empty())] -#[case::mutex_tokio_empty(mutex_tokio_empty())] -#[case::single_with_started(single_with_started())] -#[case::mutex_std_with_started(mutex_std_with_started())] -#[case::mutex_tokio_with_started(mutex_tokio_with_started())] -#[case::single_with_completed(single_with_completed())] -#[case::mutex_std_with_completed(mutex_std_with_completed())] -#[case::mutex_tokio_with_completed(mutex_tokio_with_completed())] -#[case::single_with_downloaded(single_with_downloaded())] -#[case::mutex_std_with_downloaded(mutex_std_with_downloaded())] -#[case::mutex_tokio_with_downloaded(mutex_tokio_with_downloaded())] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn the_tracker_should_limit_the_list_of_peers_to_74_when_clients_scrape_torrents( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { - let (mut torrent, peer) = torrent_peer.await; + make(&mut torrent, makes).await; // We add one more peer than the scrape limit for peer_number in 1..=74 + 1 { - let mut peer = peer; + let mut peer = a_started_peer(); peer.peer_id = peer::Id::from(peer_number); torrent.insert_or_update_peer(&peer).await; } @@ -583,19 +416,21 @@ async fn the_tracker_should_limit_the_list_of_peers_to_74_when_clients_scrape_to } #[rstest] -#[case::single(single_empty())] -#[case::mutex_std(mutex_std_empty())] -#[case::mutex_tokio(mutex_tokio_empty())] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] #[tokio::test] async fn a_torrent_entry_should_remove_a_peer_not_updated_after_a_timeout_in_seconds( - #[future] - #[case] - torrent_peer: (Torrent, Peer), + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, ) { const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); - let (mut torrent, mut peer) = torrent_peer.await; + make(&mut torrent, makes).await; + + let mut peer = a_completed_peer(); let now = clock::Working::now(); clock::Stopped::local_set(&now); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index 8b1378917..c5883a951 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -1 +1,194 @@ +use std::hash::{DefaultHasher, Hash}; +use rstest::{fixture, rstest}; +use torrust_tracker_primitives::announce_event::AnnounceEvent; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::NumberOfBytes; +use torrust_tracker_torrent_repository::entry::Entry as _; +use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio}; +use torrust_tracker_torrent_repository::EntrySingle; + +use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; +use crate::common::torrents::Torrents; + +#[fixture] +fn standard() -> Torrents { + Torrents::Std(RwLockStd::default()) +} +#[fixture] +fn standard_mutex() -> Torrents { + Torrents::StdMutexStd(RwLockStd::default()) +} + +#[fixture] +fn standard_tokio() -> Torrents { + Torrents::StdMutexTokio(RwLockStd::default()) +} + +#[fixture] +fn tokio_std() -> Torrents { + Torrents::Tokio(RwLockTokio::default()) +} +#[fixture] +fn tokio_mutex() -> Torrents { + Torrents::TokioMutexStd(RwLockTokio::default()) +} + +#[fixture] +fn tokio_tokio() -> Torrents { + Torrents::TokioMutexTokio(RwLockTokio::default()) +} + +type Entries = Vec<(InfoHash, EntrySingle)>; + +#[fixture] +fn empty() -> Entries { + vec![] +} + +#[fixture] +fn default() -> Entries { + vec![(InfoHash::default(), EntrySingle::default())] +} + +#[fixture] +fn started() -> Entries { + let mut torrent = EntrySingle::default(); + torrent.insert_or_update_peer(&a_started_peer()); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn completed() -> Entries { + let mut torrent = EntrySingle::default(); + torrent.insert_or_update_peer(&a_completed_peer()); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn downloaded() -> Entries { + let mut torrent = EntrySingle::default(); + let mut peer = a_started_peer(); + torrent.insert_or_update_peer(&peer); + peer.event = AnnounceEvent::Completed; + peer.left = NumberOfBytes(0); + torrent.insert_or_update_peer(&peer); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn four() -> Entries { + let default = EntrySingle::default(); + let default_h = &mut DefaultHasher::default(); + default.hash(default_h); + + let mut started = EntrySingle::default(); + let started_h = &mut DefaultHasher::default(); + started.insert_or_update_peer(&a_started_peer()); + started.hash(started_h); + + let mut completed = EntrySingle::default(); + let completed_h = &mut DefaultHasher::default(); + completed.insert_or_update_peer(&a_completed_peer()); + completed.hash(completed_h); + + let mut downloaded = EntrySingle::default(); + let downloaded_h = &mut DefaultHasher::default(); + let mut downloaded_peer = a_started_peer(); + downloaded.insert_or_update_peer(&downloaded_peer); + downloaded_peer.event = AnnounceEvent::Completed; + downloaded_peer.left = NumberOfBytes(0); + downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.hash(downloaded_h); + + vec![ + (InfoHash::from(&default_h.clone()), default), + (InfoHash::from(&started_h.clone()), started), + (InfoHash::from(&completed_h.clone()), completed), + (InfoHash::from(&downloaded_h.clone()), downloaded), + ] +} + +async fn make(torrents: &Torrents, entries: &Entries) { + for (info_hash, entry) in entries { + torrents.insert(info_hash, entry.clone()).await; + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::four(four())] +#[tokio::test] +async fn it_should_get_a_torrent_entry( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] torrents: Torrents, + #[case] entries: Entries, +) { + make(&torrents, &entries).await; + + if let Some((info_hash, torrent)) = entries.first() { + assert_eq!(torrents.get(info_hash).await, Some(torrent.clone())); + } else { + assert_eq!(torrents.get(&InfoHash::default()).await, None); + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::four(four())] +#[tokio::test] +async fn it_should_get_entries( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] torrents: Torrents, + #[case] entries: Entries, +) { + make(&torrents, &entries).await; + + if entries.first().is_some() { + assert!(entries.contains( + torrents + .get_paginated(None) + .await + .first() + .expect("it should have at least one") + )); + } else { + assert!(torrents.get_paginated(None).await.is_empty()); + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::four(four())] +#[tokio::test] +async fn it_should_get_metrics( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] torrents: Torrents, + #[case] entries: Entries, +) { + use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; + + make(&torrents, &entries).await; + + let mut metrics = TorrentsMetrics::default(); + + for (_, torrent) in entries { + let stats = torrent.get_stats(); + + metrics.torrents += 1; + metrics.incomplete += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + } + + assert_eq!(torrents.get_metrics().await, metrics); +} diff --git a/src/core/mod.rs b/src/core/mod.rs index 21cd1b501..d5b9c52b5 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1115,9 +1115,9 @@ mod tests { assert_eq!( torrents_metrics, TorrentsMetrics { - seeders: 0, - completed: 0, - leechers: 0, + complete: 0, + downloaded: 0, + incomplete: 0, torrents: 0 } ); @@ -1164,9 +1164,9 @@ mod tests { assert_eq!( torrent_metrics, TorrentsMetrics { - seeders: 0, - completed: 0, - leechers: 1, + complete: 0, + downloaded: 0, + incomplete: 1, torrents: 1, } ); @@ -1191,9 +1191,9 @@ mod tests { assert_eq!( (torrent_metrics), (TorrentsMetrics { - seeders: 0, - completed: 0, - leechers: 1_000_000, + complete: 0, + downloaded: 0, + incomplete: 1_000_000, torrents: 1_000_000, }), "{result_a:?} {result_b:?}" diff --git a/src/servers/apis/v1/context/stats/resources.rs b/src/servers/apis/v1/context/stats/resources.rs index 48ac660cf..9e8ab6bab 100644 --- a/src/servers/apis/v1/context/stats/resources.rs +++ b/src/servers/apis/v1/context/stats/resources.rs @@ -50,9 +50,9 @@ impl From for Stats { fn from(metrics: TrackerMetrics) -> Self { Self { torrents: metrics.torrents_metrics.torrents, - seeders: metrics.torrents_metrics.seeders, - completed: metrics.torrents_metrics.completed, - leechers: metrics.torrents_metrics.leechers, + seeders: metrics.torrents_metrics.complete, + completed: metrics.torrents_metrics.downloaded, + leechers: metrics.torrents_metrics.incomplete, tcp4_connections_handled: metrics.protocol_metrics.tcp4_connections_handled, tcp4_announces_handled: metrics.protocol_metrics.tcp4_announces_handled, tcp4_scrapes_handled: metrics.protocol_metrics.tcp4_scrapes_handled, @@ -82,9 +82,9 @@ mod tests { assert_eq!( Stats::from(TrackerMetrics { torrents_metrics: TorrentsMetrics { - seeders: 1, - completed: 2, - leechers: 3, + complete: 1, + downloaded: 2, + incomplete: 3, torrents: 4 }, protocol_metrics: Metrics {