Skip to content

Commit

Permalink
dev: more torrent repo testing
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 22, 2024
1 parent f7f4f37 commit 1b36562
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 19 deletions.
3 changes: 2 additions & 1 deletion packages/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! which is a `BitTorrent` tracker server. These structures are used not only
//! by the tracker server crate, but also by other crates in the Torrust
//! ecosystem.
use std::collections::BTreeMap;
use std::time::Duration;

use info_hash::InfoHash;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub enum DatabaseDriver {
MySQL,
}

pub type PersistentTorrents = Vec<(InfoHash, u32)>;
pub type PersistentTorrents = BTreeMap<InfoHash, u32>;

/// The mode the tracker will run in.
///
Expand Down
4 changes: 2 additions & 2 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ where
fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
let mut torrents = self.get_torrents_mut();

for (info_hash, completed) in persistent_torrents {
for (info_hash, downloaded) in persistent_torrents {
// Skip if torrent entry already exists
if torrents.contains_key(info_hash) {
continue;
}

let entry = EntrySingle {
peers: BTreeMap::default(),
downloaded: *completed,
downloaded: *downloaded,
};

torrents.insert(*info_hash, entry);
Expand Down
164 changes: 159 additions & 5 deletions packages/torrent-repository/tests/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::hash::{DefaultHasher, Hash};
use std::hash::{DefaultHasher, Hash, Hasher};

use rstest::{fixture, rstest};
use torrust_tracker_primitives::announce_event::AnnounceEvent;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::NumberOfBytes;
use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents};
use torrust_tracker_torrent_repository::entry::Entry as _;
use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio};
use torrust_tracker_torrent_repository::EntrySingle;
Expand Down Expand Up @@ -104,6 +104,37 @@ fn three() -> Entries {
]
}

#[fixture]
fn persistent_empty() -> PersistentTorrents {
PersistentTorrents::default()
}

#[fixture]
fn persistent_single() -> PersistentTorrents {
let hash = &mut DefaultHasher::default();

hash.write_u8(1);
let t = [(InfoHash::from(&hash.clone()), 0_u32)];

t.iter().copied().collect()
}

#[fixture]
fn persistent_three() -> PersistentTorrents {
let hash = &mut DefaultHasher::default();

hash.write_u8(1);
let info_1 = InfoHash::from(&hash.clone());
hash.write_u8(2);
let info_2 = InfoHash::from(&hash.clone());
hash.write_u8(3);
let info_3 = InfoHash::from(&hash.clone());

let t = [(info_1, 1_u32), (info_2, 2_u32), (info_3, 3_u32)];

t.iter().copied().collect()
}

async fn make(repo: &Repo, entries: &Entries) {
for (info_hash, entry) in entries {
repo.insert(info_hash, entry.clone()).await;
Expand All @@ -116,7 +147,7 @@ async fn make(repo: &Repo, entries: &Entries) {
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::four(three())]
#[case::three(three())]
#[tokio::test]
async fn it_should_get_a_torrent_entry(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand All @@ -137,7 +168,7 @@ async fn it_should_get_a_torrent_entry(
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::four(three())]
#[case::three(three())]
#[tokio::test]
async fn it_should_get_entries(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand All @@ -158,7 +189,7 @@ async fn it_should_get_entries(
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::four(three())]
#[case::three(three())]
#[tokio::test]
async fn it_should_get_metrics(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand All @@ -181,3 +212,126 @@ async fn it_should_get_metrics(

assert_eq!(repo.get_metrics().await, metrics);
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[tokio::test]
async fn it_should_import_persistent_torrents(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
#[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents,
) {
make(&repo, &entries).await;

let mut downloaded = repo.get_metrics().await.downloaded;
persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d));

repo.import_persistent(&persistent_torrents).await;

assert_eq!(repo.get_metrics().await.downloaded, downloaded);

for (entry, _) in persistent_torrents {
assert!(repo.get(&entry).await.is_some());
}
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[tokio::test]
async fn it_should_remove_an_entry(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
) {
make(&repo, &entries).await;

for (info_hash, torrent) in entries {
assert_eq!(repo.get(&info_hash).await, Some(torrent.clone()));
assert_eq!(repo.remove(&info_hash).await, Some(torrent));

assert_eq!(repo.get(&info_hash).await, None);
assert_eq!(repo.remove(&info_hash).await, None);
}

assert_eq!(repo.get_metrics().await.torrents, 0);
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[tokio::test]
async fn it_should_remove_inactive_peers(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
) {
use std::ops::Sub as _;
use std::time::Duration;

use torrust_tracker_clock::clock::stopped::Stopped as _;
use torrust_tracker_clock::clock::{self, Time as _};
use torrust_tracker_primitives::peer;

use crate::CurrentClock;

const TIMEOUT: Duration = Duration::from_secs(120);
const EXPIRE: Duration = Duration::from_secs(121);

make(&repo, &entries).await;

let info_hash: InfoHash;
let mut peer: peer::Peer;

// Generate a new infohash and peer.
{
let hash = &mut DefaultHasher::default();
hash.write_u8(255);
info_hash = InfoHash::from(&hash.clone());
peer = a_completed_peer(-1);
}

// Set the last updated time of the peer to be 121 seconds ago.
{
let now = clock::Working::now();
clock::Stopped::local_set(&now);

peer.updated = now.sub(EXPIRE);
}

// Insert the infohash and peer into the repository
// and verify there is an extra torrent entry.
{
repo.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await;
assert_eq!(repo.get_metrics().await.torrents, entries.len() as u64 + 1);
}

// Verify that this new peer was inserted into the repository.
{
let entry = repo.get(&info_hash).await.expect("it_should_get_some");
assert!(entry.get_peers(None).contains(&peer.into()));
}

// Remove peers that have not been updated since the timeout (120 seconds ago).
{
repo.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed"))
.await;
}

// Verify that the this peer was removed from the repository.
{
let entry = repo.get(&info_hash).await.expect("it_should_get_some");
assert!(!entry.get_peers(None).contains(&peer.into()));
}
}
6 changes: 3 additions & 3 deletions src/core/databases/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use r2d2_mysql::mysql::prelude::Queryable;
use r2d2_mysql::mysql::{params, Opts, OptsBuilder};
use r2d2_mysql::MySqlConnectionManager;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::DatabaseDriver;
use torrust_tracker_primitives::{DatabaseDriver, PersistentTorrents};

use super::{Database, Error};
use crate::core::auth::{self, Key};
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Database for Mysql {
}

/// Refer to [`databases::Database::load_persistent_torrents`](crate::core::databases::Database::load_persistent_torrents).
async fn load_persistent_torrents(&self) -> Result<Vec<(InfoHash, u32)>, Error> {
async fn load_persistent_torrents(&self) -> Result<PersistentTorrents, Error> {
let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?;

let torrents = conn.query_map(
Expand All @@ -116,7 +116,7 @@ impl Database for Mysql {
},
)?;

Ok(torrents)
Ok(torrents.iter().copied().collect())
}

/// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys).
Expand Down
11 changes: 3 additions & 8 deletions src/core/databases/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::{DatabaseDriver, DurationSinceUnixEpoch};
use torrust_tracker_primitives::{DatabaseDriver, DurationSinceUnixEpoch, PersistentTorrents};

use super::{Database, Error};
use crate::core::auth::{self, Key};
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Database for Sqlite {
}

/// Refer to [`databases::Database::load_persistent_torrents`](crate::core::databases::Database::load_persistent_torrents).
async fn load_persistent_torrents(&self) -> Result<Vec<(InfoHash, u32)>, Error> {
async fn load_persistent_torrents(&self) -> Result<PersistentTorrents, Error> {
let conn = self.pool.get().map_err(|e| (e, DRIVER))?;

let mut stmt = conn.prepare("SELECT info_hash, completed FROM torrents")?;
Expand All @@ -101,12 +101,7 @@ impl Database for Sqlite {
Ok((info_hash, completed))
})?;

//torrent_iter?;
//let torrent_iter = torrent_iter.unwrap();

let torrents: Vec<(InfoHash, u32)> = torrent_iter.filter_map(std::result::Result::ok).collect();

Ok(torrents)
Ok(torrent_iter.filter_map(std::result::Result::ok).collect())
}

/// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys).
Expand Down

0 comments on commit 1b36562

Please sign in to comment.