Skip to content

Commit

Permalink
fix(scanner): Switch threadpool to tokio runtime
Browse files Browse the repository at this point in the history
This will fix compatibility with `musicbrainz_rs_nova`, which will yield
errors like these:

    thread '<unnamed>' panicked at ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/musicbrainz_rs_nova-0.8.0/src/rate_limit.rs:23:5:
    there is no reactor running, must be called from the context of a Tokio 1.x runtime
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
  • Loading branch information
Holzhaus committed Nov 9, 2024
1 parent 7288e3e commit 1bb3f78
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 40 deletions.
1 change: 1 addition & 0 deletions src/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub async fn run(config: &Config, cache: Option<&Cache>, args: Args) -> crate::R

drop(importer_tx);
importer_handle.await.unwrap();
scanner.shutdown();

Ok(())
}
81 changes: 41 additions & 40 deletions src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::release_candidate::ReleaseCandidateCollection;
use crate::util::walk_dir;
use crate::Cache;
use crate::{Config, TaggedFile, TaggedFileCollection};
use futures::executor::ThreadPool;
use futures::task::{SpawnError, SpawnExt};
use futures::{future, FutureExt};
use futures::FutureExt;
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinSet;

/// An error type that contains the path that was scanned when the error occurred.
pub struct ScanError {
Expand All @@ -44,8 +44,7 @@ type ScanResult = Result<
/// Scanner struct.
pub struct Scanner {
/// Worker thread pool.
#[expect(dead_code)]
pool: ThreadPool,
pool: Runtime,
/// Channel receiver for the scanner results.
results_rx: Receiver<ScanResult>,
}
Expand All @@ -56,11 +55,11 @@ impl Scanner {
log::info!("Starting scan of {}", path.display());

let (results_tx, results_rx) = tokio::sync::mpsc::channel(20);
let pool = ThreadPool::new().unwrap();
let pool = Runtime::new().unwrap();

let cloned_results_tx = results_tx.clone();
let cloned_pool = pool.clone();
pool.spawn_ok(async move {
let pool_handle = pool.handle().clone();
let _scanner = pool.spawn(async move {
// First, search the file system to find track paths.
for (path, tracks) in find_track_paths(path) {
let cloned_config = config.clone();
Expand All @@ -69,42 +68,38 @@ impl Scanner {
// Some tracks were found, spawn individual tasks for analyzing the tracks in the
// threadpool. We keep track of the spawned task handles in a Vec, so that we
// combine the results of these tasks in a track collection.
let result = tracks
.into_iter()
.map(|track| {
cloned_pool.spawn_with_handle({
let mut handles = JoinSet::new();
for track in tracks {
let _analysis_abort_handle = handles.spawn_on(
{
let config = cloned_config.clone();
async move { analyze_tagged_file(&config, track) }
})
})
.collect::<Result<Vec<_>, SpawnError>>();
},
&pool_handle,
);
}

// When all handles are joined, make a collection out of it and search similar
// releases on MusicBrainz. The result is sent to the `results_tx` queue.
match result {
Ok(handles) => {
let cloned_cache = cache.clone();
let results_tx = cloned_results_tx.clone();
cloned_pool.spawn_ok(async move {
let musicbrainz =
MusicBrainzClient::new(&cloned_config2, cloned_cache.as_ref());
if let Err(err) = results_tx
.send(
join_analysis_tasks_to_collection_and_find_release_candidates(
&musicbrainz,
path,
handles,
)
.await,
)
.await
{
log::error!("Failed to queue results: {err}");
}
});
let cloned_cache = cache.clone();
let results_tx = cloned_results_tx.clone();
let _matching_logic = pool_handle.spawn(async move {
let musicbrainz =
MusicBrainzClient::new(&cloned_config2, cloned_cache.as_ref());
if let Err(err) = results_tx
.send(
join_analysis_tasks_to_collection_and_find_release_candidates(
&musicbrainz,
path,
handles,
)
.await,
)
.await
{
log::error!("Failed to queue results: {err}");
}
Err(err) => log::error!("Failed to spawn analyzer tasks: {err}"),
}
});
}
});

Expand All @@ -115,6 +110,11 @@ impl Scanner {
pub async fn recv(&mut self) -> Option<ScanResult> {
self.results_rx.recv().await
}

/// Receive the next track collection from the scanner.
pub fn shutdown(self) {
self.pool.shutdown_background();
}
}

/// Find track collections in the given path.
Expand Down Expand Up @@ -170,9 +170,10 @@ fn analyze_tagged_file(config: &Config, tagged_file: TaggedFile) -> TaggedFile {
async fn join_analysis_tasks_to_collection_and_find_release_candidates(
musicbrainz: &MusicBrainzClient<'_>,
path: PathBuf,
handles: Vec<future::RemoteHandle<TaggedFile>>,
handles: JoinSet<TaggedFile>,
) -> ScanResult {
future::join_all(handles)
handles
.join_all()
.then(|tracks| async {
let track_collection = TaggedFileCollection::new(tracks);
musicbrainz
Expand Down

0 comments on commit 1bb3f78

Please sign in to comment.