Skip to content

Commit

Permalink
dev: improve torrent entry tests
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 22, 2024
1 parent 5ed94e3 commit f7f4f37
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 341 deletions.
9 changes: 5 additions & 4 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;

//use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -33,7 +34,7 @@ pub trait Entry {
///
/// It filters out the input peer, typically because we want to return this
/// list of peers to that client peer.
fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;

/// It updates a peer and returns true if the number of complete downloads have increased.
///
Expand All @@ -55,7 +56,7 @@ pub trait EntrySync {
fn peers_is_empty(&self) -> bool;
fn get_peers_len(&self) -> usize;
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
Expand All @@ -68,9 +69,9 @@ pub trait EntryAsync {
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
fn get_peers(&self, limit: Option<usize>) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn get_peers_for_peer(
fn get_peers_for_client(
&self,
client: &peer::Peer,
client: &SocketAddr,
limit: Option<usize>,
) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
Expand Down
5 changes: 3 additions & 2 deletions packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
Expand Down Expand Up @@ -28,8 +29,8 @@ impl EntrySync for EntryMutexStd {
self.lock().expect("it should get lock").get_peers(limit)
}

fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers_for_peer(client, limit)
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
}

fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool {
Expand Down
5 changes: 3 additions & 2 deletions packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
Expand Down Expand Up @@ -28,8 +29,8 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.get_peers(limit)
}

async fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers_for_peer(client, limit)
async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers_for_client(client, limit)
}

async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool {
Expand Down
7 changes: 4 additions & 3 deletions packages/torrent-repository/src/entry/single.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
Expand Down Expand Up @@ -48,13 +49,13 @@ impl Entry for EntrySingle {
}
}

fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match limit {
Some(limit) => self
.peers
.values()
// Take peers which are not the client peer
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != peer::ReadInfo::get_address(client))
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != *client)
// Limit the number of peers on the result
.take(limit)
.cloned()
Expand All @@ -63,7 +64,7 @@ impl Entry for EntrySingle {
.peers
.values()
// Take peers which are not the client peer
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != peer::ReadInfo::get_address(client))
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != *client)
.cloned()
.collect(),
}
Expand Down
2 changes: 1 addition & 1 deletion packages/torrent-repository/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod repo;
pub mod torrent;
pub mod torrent_peer_builder;
pub mod torrents;
148 changes: 148 additions & 0 deletions packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
use torrust_tracker_torrent_repository::{
EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio,
};

#[derive(Debug)]
pub(crate) enum Repo {
Std(TorrentsRwLockStd),
StdMutexStd(TorrentsRwLockStdMutexStd),
StdMutexTokio(TorrentsRwLockStdMutexTokio),
Tokio(TorrentsRwLockTokio),
TokioMutexStd(TorrentsRwLockTokioMutexStd),
TokioMutexTokio(TorrentsRwLockTokioMutexTokio),
}

#[allow(dead_code)]
impl Repo {
pub(crate) async fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.get(key),
Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::Tokio(repo) => repo.get(key).await,
Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
}
}
pub(crate) async fn get_metrics(&self) -> TorrentsMetrics {
match self {
Repo::Std(repo) => repo.get_metrics(),
Repo::StdMutexStd(repo) => repo.get_metrics(),
Repo::StdMutexTokio(repo) => repo.get_metrics().await,
Repo::Tokio(repo) => repo.get_metrics().await,
Repo::TokioMutexStd(repo) => repo.get_metrics().await,
Repo::TokioMutexTokio(repo) => repo.get_metrics().await,
}
}
pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> {
match self {
Repo::Std(repo) => repo.get_paginated(pagination),
Repo::StdMutexStd(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::StdMutexTokio(repo) => {
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];

for (i, t) in repo.get_paginated(pagination).await {
v.push((i, t.lock().await.clone()));
}
v
}
Repo::Tokio(repo) => repo.get_paginated(pagination).await,
Repo::TokioMutexStd(repo) => repo
.get_paginated(pagination)
.await
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::TokioMutexTokio(repo) => {
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];

for (i, t) in repo.get_paginated(pagination).await {
v.push((i, t.lock().await.clone()));
}
v
}
}
}
pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
match self {
Repo::Std(repo) => repo.import_persistent(persistent_torrents),
Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
}
}
pub(crate) async fn remove(&self, key: &InfoHash) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.remove(key),
Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::Tokio(repo) => repo.remove(key).await,
Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
}
}
pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
match self {
Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
}
}
pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
match self {
Repo::Std(repo) => repo.remove_peerless_torrents(policy),
Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
}
}
pub(crate) async fn update_torrent_with_peer_and_get_stats(
&self,
info_hash: &InfoHash,
peer: &peer::Peer,
) -> (bool, SwarmMetadata) {
match self {
Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
}
}
pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option<EntrySingle> {
match self {
Repo::Std(repo) => repo.write().insert(*info_hash, torrent),
Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
Repo::StdMutexTokio(repo) => {
let r = repo.write().insert(*info_hash, torrent.into());
match r {
Some(t) => Some(t.lock().await.clone()),
None => None,
}
}
Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent),
Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()),
}
}
}
9 changes: 5 additions & 4 deletions packages/torrent-repository/tests/common/torrent.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
Expand Down Expand Up @@ -54,11 +55,11 @@ impl Torrent {
}
}

pub(crate) async fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
pub(crate) async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match self {
Torrent::Single(entry) => entry.get_peers_for_peer(client, limit),
Torrent::MutexStd(entry) => entry.get_peers_for_peer(client, limit),
Torrent::MutexTokio(entry) => entry.clone().get_peers_for_peer(client, limit).await,
Torrent::Single(entry) => entry.get_peers_for_client(client, limit),
Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit),
Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await,
}
}

Expand Down
25 changes: 14 additions & 11 deletions packages/torrent-repository/tests/common/torrent_peer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, NumberOfBytes};
use crate::CurrentClock;

#[derive(Debug, Default)]
pub struct TorrentPeerBuilder {
struct TorrentPeerBuilder {
peer: peer::Peer,
}

#[allow(dead_code)]
impl TorrentPeerBuilder {
#[must_use]
pub fn new() -> Self {
fn new() -> Self {
Self {
peer: peer::Peer {
updated: CurrentClock::now(),
Expand All @@ -23,63 +24,65 @@ impl TorrentPeerBuilder {
}

#[must_use]
pub fn with_event_completed(mut self) -> Self {
fn with_event_completed(mut self) -> Self {
self.peer.event = AnnounceEvent::Completed;
self
}

#[must_use]
pub fn with_event_started(mut self) -> Self {
fn with_event_started(mut self) -> Self {
self.peer.event = AnnounceEvent::Started;
self
}

#[must_use]
pub fn with_peer_address(mut self, peer_addr: SocketAddr) -> Self {
fn with_peer_address(mut self, peer_addr: SocketAddr) -> Self {
self.peer.peer_addr = peer_addr;
self
}

#[must_use]
pub fn with_peer_id(mut self, peer_id: peer::Id) -> Self {
fn with_peer_id(mut self, peer_id: peer::Id) -> Self {
self.peer.peer_id = peer_id;
self
}

#[must_use]
pub fn with_number_of_bytes_left(mut self, left: i64) -> Self {
fn with_number_of_bytes_left(mut self, left: i64) -> Self {
self.peer.left = NumberOfBytes(left);
self
}

#[must_use]
pub fn updated_at(mut self, updated: DurationSinceUnixEpoch) -> Self {
fn updated_at(mut self, updated: DurationSinceUnixEpoch) -> Self {
self.peer.updated = updated;
self
}

#[must_use]
pub fn into(self) -> peer::Peer {
fn into(self) -> peer::Peer {
self.peer
}
}

/// A torrent seeder is a peer with 0 bytes left to download which
/// has not announced it has stopped
#[must_use]
pub fn a_completed_peer() -> peer::Peer {
pub fn a_completed_peer(id: i32) -> peer::Peer {
TorrentPeerBuilder::new()
.with_number_of_bytes_left(0)
.with_event_completed()
.with_peer_id(id.into())
.into()
}

/// A torrent leecher is a peer that is not a seeder.
/// Leecher: left > 0 OR event = Stopped
#[must_use]
pub fn a_started_peer() -> peer::Peer {
pub fn a_started_peer(id: i32) -> peer::Peer {
TorrentPeerBuilder::new()
.with_number_of_bytes_left(1)
.with_event_started()
.with_peer_id(id.into())
.into()
}
Loading

0 comments on commit f7f4f37

Please sign in to comment.