Skip to content

Commit

Permalink
dev: more work on the torrent repository package
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 22, 2024
1 parent 90f7102 commit 412adc3
Show file tree
Hide file tree
Showing 22 changed files with 472 additions and 494 deletions.
6 changes: 1 addition & 5 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,5 @@
"evenBetterToml.formatter.trailingNewline": true,
"evenBetterToml.formatter.reorderKeys": true,
"evenBetterToml.formatter.reorderArrays": true,
"rust-analyzer.linkedProjects": [
"./packages/torrent-repository/Cargo.toml",
"./packages/primitives/Cargo.toml",
"./packages/primitives/Cargo.toml"
],

}
32 changes: 20 additions & 12 deletions packages/torrent-repository/benches/helpers/asyn.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::stream::FuturesUnordered;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_torrent_repository::repository::UpdateTorrentAsync;
use torrust_tracker_torrent_repository::repository::RepositoryAsync;

use super::utils::{generate_unique_info_hashes, DEFAULT_PEER};

pub async fn add_one_torrent<V>(samples: u64) -> Duration
pub async fn add_one_torrent<V, T>(samples: u64) -> Duration
where
V: UpdateTorrentAsync + Default,
V: RepositoryAsync<T> + Default,
{
let start = Instant::now();

Expand All @@ -26,11 +27,12 @@ where
}

// Add one torrent ten thousand times in parallel (depending on the set worker threads)
pub async fn update_one_torrent_in_parallel<V>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
pub async fn update_one_torrent_in_parallel<V, T>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
where
V: UpdateTorrentAsync + Default + Clone + Send + Sync + 'static,
V: RepositoryAsync<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hash: &'static InfoHash = &InfoHash([0; 20]);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -66,11 +68,16 @@ where
}

// Add ten thousand torrents in parallel (depending on the set worker threads)
pub async fn add_multiple_torrents_in_parallel<V>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
pub async fn add_multiple_torrents_in_parallel<V, T>(
runtime: &tokio::runtime::Runtime,
samples: u64,
sleep: Option<u64>,
) -> Duration
where
V: UpdateTorrentAsync + Default + Clone + Send + Sync + 'static,
V: RepositoryAsync<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hashes = generate_unique_info_hashes(samples.try_into().expect("it should fit in a usize"));
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -101,15 +108,16 @@ where
}

// Async update ten thousand torrents in parallel (depending on the set worker threads)
pub async fn update_multiple_torrents_in_parallel<V>(
pub async fn update_multiple_torrents_in_parallel<V, T>(
runtime: &tokio::runtime::Runtime,
samples: u64,
sleep: Option<u64>,
) -> Duration
where
V: UpdateTorrentAsync + Default + Clone + Send + Sync + 'static,
V: RepositoryAsync<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hashes = generate_unique_info_hashes(samples.try_into().expect("it should fit in usize"));
let handles = FuturesUnordered::new();

Expand Down
32 changes: 20 additions & 12 deletions packages/torrent-repository/benches/helpers/sync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::stream::FuturesUnordered;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_torrent_repository::repository::UpdateTorrentSync;
use torrust_tracker_torrent_repository::repository::Repository;

use super::utils::{generate_unique_info_hashes, DEFAULT_PEER};

// Simply add one torrent
#[must_use]
pub fn add_one_torrent<V>(samples: u64) -> Duration
pub fn add_one_torrent<V, T>(samples: u64) -> Duration
where
V: UpdateTorrentSync + Default,
V: Repository<T> + Default,
{
let start = Instant::now();

Expand All @@ -26,11 +27,12 @@ where
}

// Add one torrent ten thousand times in parallel (depending on the set worker threads)
pub async fn update_one_torrent_in_parallel<V>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
pub async fn update_one_torrent_in_parallel<V, T>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
where
V: UpdateTorrentSync + Default + Clone + Send + Sync + 'static,
V: Repository<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hash: &'static InfoHash = &InfoHash([0; 20]);
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -62,11 +64,16 @@ where
}

// Add ten thousand torrents in parallel (depending on the set worker threads)
pub async fn add_multiple_torrents_in_parallel<V>(runtime: &tokio::runtime::Runtime, samples: u64, sleep: Option<u64>) -> Duration
pub async fn add_multiple_torrents_in_parallel<V, T>(
runtime: &tokio::runtime::Runtime,
samples: u64,
sleep: Option<u64>,
) -> Duration
where
V: UpdateTorrentSync + Default + Clone + Send + Sync + 'static,
V: Repository<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hashes = generate_unique_info_hashes(samples.try_into().expect("it should fit in a usize"));
let handles = FuturesUnordered::new();

Expand Down Expand Up @@ -95,15 +102,16 @@ where
}

// Update ten thousand torrents in parallel (depending on the set worker threads)
pub async fn update_multiple_torrents_in_parallel<V>(
pub async fn update_multiple_torrents_in_parallel<V, T>(
runtime: &tokio::runtime::Runtime,
samples: u64,
sleep: Option<u64>,
) -> Duration
where
V: UpdateTorrentSync + Default + Clone + Send + Sync + 'static,
V: Repository<T> + Default,
Arc<V>: Clone + Send + Sync + 'static,
{
let torrent_repository = V::default();
let torrent_repository = Arc::<V>::default();
let info_hashes = generate_unique_info_hashes(samples.try_into().expect("it should fit in usize"));
let handles = FuturesUnordered::new();

Expand Down
55 changes: 26 additions & 29 deletions packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::Duration;

mod helpers;
Expand All @@ -20,30 +19,30 @@ fn add_one_torrent(c: &mut Criterion) {
group.measurement_time(Duration::from_millis(1000));

group.bench_function("RwLockStd", |b| {
b.iter_custom(sync::add_one_torrent::<Arc<TorrentsRwLockStd>>);
b.iter_custom(sync::add_one_torrent::<TorrentsRwLockStd, _>);
});

group.bench_function("RwLockStdMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<Arc<TorrentsRwLockStdMutexStd>>);
b.iter_custom(sync::add_one_torrent::<TorrentsRwLockStdMutexStd, _>);
});

group.bench_function("RwLockStdMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(asyn::add_one_torrent::<Arc<TorrentsRwLockStdMutexTokio>>);
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockStdMutexTokio, _>);
});

group.bench_function("RwLockTokio", |b| {
b.to_async(&rt).iter_custom(asyn::add_one_torrent::<Arc<TorrentsRwLockTokio>>);
b.to_async(&rt).iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokio, _>);
});

group.bench_function("RwLockTokioMutexStd", |b| {
b.to_async(&rt)
.iter_custom(asyn::add_one_torrent::<Arc<TorrentsRwLockTokioMutexStd>>);
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexStd, _>);
});

group.bench_function("RwLockTokioMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(asyn::add_one_torrent::<Arc<TorrentsRwLockTokioMutexTokio>>);
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexTokio, _>);
});

group.finish();
Expand All @@ -62,32 +61,32 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {

group.bench_function("RwLockStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStd>>(&rt, iters, None));
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsRwLockStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStdMutexStd>>(&rt, iters, None));
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsRwLockStdMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStdMutexTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockStdMutexTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokioMutexStd>>(&rt, iters, None));
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokioMutexTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.finish();
Expand All @@ -106,32 +105,32 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {

group.bench_function("RwLockStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<Arc<TorrentsRwLockStd>>(&rt, iters, None));
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsRwLockStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<Arc<TorrentsRwLockStdMutexStd>>(&rt, iters, None));
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsRwLockStdMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<Arc<TorrentsRwLockStdMutexTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockStdMutexTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<Arc<TorrentsRwLockTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<Arc<TorrentsRwLockTokioMutexStd>>(&rt, iters, None));
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<Arc<TorrentsRwLockTokioMutexTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.finish();
Expand All @@ -150,34 +149,32 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {

group.bench_function("RwLockStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStd>>(&rt, iters, None));
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsRwLockStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStdMutexStd>>(&rt, iters, None));
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsRwLockStdMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockStdMutexTokio", |b| {
b.to_async(&rt).iter_custom(|iters| {
asyn::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockStdMutexTokio>>(&rt, iters, None)
});
b.to_async(&rt)
.iter_custom(|iters| asyn::update_multiple_torrents_in_parallel::<TorrentsRwLockStdMutexTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokio", |b| {
b.to_async(&rt)
.iter_custom(|iters| asyn::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokio>>(&rt, iters, None));
.iter_custom(|iters| asyn::update_multiple_torrents_in_parallel::<TorrentsRwLockTokio, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexStd", |b| {
b.to_async(&rt).iter_custom(|iters| {
asyn::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokioMutexStd>>(&rt, iters, None)
});
b.to_async(&rt)
.iter_custom(|iters| asyn::update_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexStd, _>(&rt, iters, None));
});

group.bench_function("RwLockTokioMutexTokio", |b| {
b.to_async(&rt).iter_custom(|iters| {
asyn::update_multiple_torrents_in_parallel::<Arc<TorrentsRwLockTokioMutexTokio>>(&rt, iters, None)
asyn::update_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None)
});
});

Expand Down
Loading

0 comments on commit 412adc3

Please sign in to comment.