Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multicore improvements #69

Merged
merged 24 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d90bb21
feat: allow controll of the maximum number of cpus using BELLMAN_NUM_…
dignifiedquire Mar 25, 2019
2b7a58b
feat(groth16): control parallel proving threads
dignifiedquire Jun 8, 2020
4970b8f
fix: only spawn threads into a pool, to avoid unlimited thread spawns
dignifiedquire Sep 29, 2020
b4276a3
feat: limited thread spawning and unroll recursion in multiexp
dignifiedquire Oct 2, 2020
4d85780
chore: log error if waiter is used within a thread pool
vmx Feb 11, 2021
7d7c453
feat: fix bug in parallelism inherent in the Worker/Waiter model
vmx Feb 12, 2021
103113a
style: remove verbose logging
cryptonemo Mar 8, 2021
360788d
Bump crossbeam-channel dependence
str4d Jun 1, 2021
9f05fec
multicore: Simplify NUM_CPUS definition
str4d Jun 1, 2021
d7ec539
Update changelog with multicore changes
str4d Jun 1, 2021
510b908
CI: Build for wasm32-wasi target to ensure no-multicore works
str4d Jun 1, 2021
6e9f575
Remove broken intra-doc link
str4d Jun 1, 2021
22caab9
multicore: Unconditionally panic if Waiter::wait is called from threa…
str4d Jun 1, 2021
a4b4e14
multicore: Code simplification
str4d Jun 1, 2021
7d81964
multicore: Documentation tweaks
str4d Jun 1, 2021
6916c3b
Update dependencies.
daira Aug 15, 2021
1815042
Use global threadpool unless we exceed `WORKER_SPAWN_MAX_COUNT`. This…
daira Aug 15, 2021
1e39743
Rename `multicore::log_num_cpus` to `log_num_threads`.
daira Aug 15, 2021
313245b
Relax dependencies for `log` and `num_cpus`
daira Aug 25, 2021
e3a4ea6
Simplify by removing the overflow thread pool
daira Aug 26, 2021
a94da42
Bugfix: `log_num_cpus` should be renamed also for the dummy (non-mult…
daira Aug 26, 2021
6af1fcf
Merge pull request #71 from daira/daira-multicore
str4d Aug 26, 2021
5e68a2e
Fix code comment after switch to global threadpool
str4d Aug 27, 2021
2c96556
Remove println! statements from test
str4d Sep 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ jobs:
command: test
args: --verbose --release

build:
name: Build target ${{ matrix.target }}
runs-on: ubuntu-latest
strategy:
matrix:
target:
- wasm32-wasi

steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.51.0
override: true
- name: Add target
run: rustup target add ${{ matrix.target }}
- name: cargo fetch
uses: actions-rs/cargo@v1
with:
command: fetch
- name: Build for target
run: cargo build --verbose --no-default-features --target ${{ matrix.target }}

bitrot:
name: Bitrot check
runs-on: ubuntu-latest
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ and this project adheres to Rust's notion of
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- `bellman` now uses `rayon` for multithreading when the (default) `multicore`
feature flag is enabled. This means that, when this flag is enabled, the
`RAYON_NUM_THREADS` environment variable controls the number of threads that
`bellman` will use. The default, which has not changed, is to use the same
number of threads as logical CPUs.
- `bellman::multicore::Waiter`

### Changed
- `bellman::multicore` has migrated from `crossbeam` to `rayon`:
- `bellman::multicore::Worker::compute` now returns
`bellman::multicore::Waiter`.
- `bellman::multiexp::multiexp` now returns
`bellman::multicore::Waiter<Result<G, SynthesisError>>` instead of
`Box<dyn Future<Item = G, Error = SynthesisError>>`.
- `bellman::multicore::log_num_cpus` is renamed to `log_num_threads`.

### Removed
- `bellman::multicore::WorkerFuture` (replaced by `Waiter`).

## [0.10.0] - 2021-06-04
### Added
Expand Down
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ edition = "2018"
bitvec = "0.22"
blake2s_simd = "0.5"
ff = "0.10"
futures = "0.1"
futures-cpupool = { version = "0.1", optional = true }
group = "0.10"
num_cpus = { version = "1", optional = true }
crossbeam = { version = "0.7", optional = true }
pairing = { version = "0.20", optional = true }
rand_core = "0.6"
byteorder = "1"
subtle = "2.2.1"

# Multicore dependencies
crossbeam-channel = { version = "0.5.1", optional = true }
lazy_static = { version = "1.4.0", optional = true }
log = { version = "0.4", optional = true }
num_cpus = { version = "1", optional = true }
rayon = { version = "1.5.1", optional = true }

[dev-dependencies]
bls12_381 = "0.5"
criterion = "0.3"
Expand All @@ -36,7 +39,7 @@ sha2 = "0.9"

[features]
groth16 = ["pairing"]
multicore = ["futures-cpupool", "crossbeam", "num_cpus"]
multicore = ["crossbeam-channel", "lazy_static", "log", "num_cpus", "rayon"]
default = ["groth16", "multicore"]

[[test]]
Expand Down
1 change: 0 additions & 1 deletion benches/slow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use bellman::{
use bls12_381::{Bls12, Scalar};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use ff::{Field, PrimeFieldBits};
use futures::Future;
use group::{Curve, Group};
use pairing::Engine;
use rand_core::SeedableRng;
Expand Down
2 changes: 1 addition & 1 deletion src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl<S: PrimeField> Group<S> for Scalar<S> {
}

fn best_fft<S: PrimeField, T: Group<S>>(a: &mut [T], worker: &Worker, omega: &S, log_n: u32) {
let log_cpus = worker.log_num_cpus();
let log_cpus = worker.log_num_threads();

if log_n <= log_cpus {
serial_fft(a, omega, log_n);
Expand Down
2 changes: 0 additions & 2 deletions src/groth16/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use rand_core::RngCore;
use std::ops::{AddAssign, MulAssign};
use std::sync::Arc;

use futures::Future;

use ff::{Field, PrimeField, PrimeFieldBits};
use group::{prime::PrimeCurveAffine, Curve};
use pairing::Engine;
Expand Down
175 changes: 113 additions & 62 deletions src/multicore.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,119 @@
//! An interface for dealing with the kinds of parallel computations involved in
//! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and
//! [`crossbeam`] but may be extended in the future to allow for various
//! parallelism strategies.
//!
//! [`CpuPool`]: futures_cpupool::CpuPool
//! `bellman`. It's currently just a thin wrapper around [`rayon`] but may be
//! extended in the future to allow for various parallelism strategies.

#[cfg(feature = "multicore")]
mod implementation {
use crossbeam::{self, thread::Scope};
use futures::{Future, IntoFuture, Poll};
use futures_cpupool::{CpuFuture, CpuPool};
use num_cpus;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
pub struct Worker {
cpus: usize,
pool: CpuPool,
use crossbeam_channel::{bounded, Receiver};
use lazy_static::lazy_static;
use log::{error, trace};
use rayon::current_num_threads;

static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0);

lazy_static! {
// See Worker::compute below for a description of this.
static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4;
}

impl Worker {
// We don't expose this outside the library so that
// all `Worker` instances have the same number of
// CPUs configured.
pub(crate) fn new_with_cpus(cpus: usize) -> Worker {
Worker {
cpus,
pool: CpuPool::new(cpus),
}
}
#[derive(Clone, Default)]
pub struct Worker {}

impl Worker {
pub fn new() -> Worker {
Self::new_with_cpus(num_cpus::get())
Worker {}
}
str4d marked this conversation as resolved.
Show resolved Hide resolved

pub fn log_num_cpus(&self) -> u32 {
log2_floor(self.cpus)
pub fn log_num_threads(&self) -> u32 {
log2_floor(current_num_threads())
}

pub fn compute<F, R>(&self, f: F) -> WorkerFuture<R::Item, R::Error>
pub fn compute<F, R>(&self, f: F) -> Waiter<R>
where
F: FnOnce() -> R + Send + 'static,
R: IntoFuture + 'static,
R::Future: Send + 'static,
R::Item: Send + 'static,
R::Error: Send + 'static,
R: Send + 'static,
{
WorkerFuture {
future: self.pool.spawn_fn(f),
let (sender, receiver) = bounded(1);

// We keep track here of how many times spawn has been called.
// It can be called without limit, each time, putting a
// request for a new thread to execute a method on the
// ThreadPool. However, if we allow it to be called without
// limits, we run the risk of memory exhaustion due to limited
// stack space consumed by all of the pending closures to be
// executed.
let previous_count = WORKER_SPAWN_COUNTER.fetch_add(1, Ordering::SeqCst);

// If the number of spawns requested has exceeded the number
// of cores available for processing by some factor (the
// default being 4), instead of requesting that we spawn a new
// thread, we instead execute the closure in the context of a
// scope call (which blocks the current thread) to help clear
// the growing work queue and minimize the chances of memory
// exhaustion.
if previous_count > *WORKER_SPAWN_MAX_COUNT {
let thread_index = rayon::current_thread_index().unwrap_or(0);
rayon::scope(move |_| {
trace!("[{}] switching to scope to help clear backlog [threads: current {}, requested {}]",
thread_index,
current_num_threads(),
WORKER_SPAWN_COUNTER.load(Ordering::SeqCst));
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
});
} else {
rayon::spawn(move || {
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
});
}

Waiter { receiver }
}

pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R
where
F: FnOnce(&Scope<'a>, usize) -> R,
F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send,
R: Send,
{
let chunk_size = if elements < self.cpus {
let num_threads = current_num_threads();
let chunk_size = if elements < num_threads {
1
} else {
elements / self.cpus
elements / num_threads
};

// TODO: Handle case where threads fail
crossbeam::scope(|scope| f(scope, chunk_size))
.expect("Threads aren't allowed to fail yet")
rayon::scope(|scope| f(scope, chunk_size))
}
}

pub struct WorkerFuture<T, E> {
future: CpuFuture<T, E>,
pub struct Waiter<T> {
receiver: Receiver<T>,
}

impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
type Item = T;
type Error = E;
impl<T> Waiter<T> {
/// Wait for the result.
pub fn wait(&self) -> T {
// This will be Some if this thread is in the global thread pool.
if rayon::current_thread_index().is_some() {
let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks";
// panic! doesn't necessarily kill the process, so we log as well.
error!("{}", msg);
panic!("{}", msg);
}
self.receiver.recv().unwrap()
}

/// One-off sending.
pub fn done(val: T) -> Self {
let (sender, receiver) = bounded(1);
sender.send(val).unwrap();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.future.poll()
Waiter { receiver }
}
}

Expand Down Expand Up @@ -106,8 +144,6 @@ mod implementation {

#[cfg(not(feature = "multicore"))]
mod implementation {
use futures::{future, Future, IntoFuture, Poll};

#[derive(Clone)]
pub struct Worker;

Expand All @@ -116,19 +152,16 @@ mod implementation {
Worker
}

pub fn log_num_cpus(&self) -> u32 {
pub fn log_num_threads(&self) -> u32 {
0
}

pub fn compute<F, R>(&self, f: F) -> R::Future
pub fn compute<F, R>(&self, f: F) -> Waiter<R>
where
F: FnOnce() -> R + Send + 'static,
R: IntoFuture + 'static,
R::Future: Send + 'static,
R::Item: Send + 'static,
R::Error: Send + 'static,
R: Send + 'static,
{
f().into_future()
Waiter::done(f())
}

pub fn scope<F, R>(&self, elements: usize, f: F) -> R
Expand All @@ -139,16 +172,19 @@ mod implementation {
}
}

pub struct WorkerFuture<T, E> {
future: future::FutureResult<T, E>,
pub struct Waiter<T> {
val: Option<T>,
}

impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
type Item = T;
type Error = E;
impl<T> Waiter<T> {
/// Wait for the result.
Copy link
Contributor

@daira daira Jun 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, this is inaccurate: std::Option::take will never wait.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't, but this is emulating the threaded API. Both instances are effectively blocking, one just blocks for longer.

pub fn wait(&mut self) -> T {
self.val.take().expect("unmet data dependency")
}

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.future.poll()
/// One-off sending.
pub fn done(val: T) -> Self {
Waiter { val: Some(val) }
}
}

Expand All @@ -159,6 +195,21 @@ mod implementation {
f(self);
}
}

/// A fake rayon ParallelIterator that is just a serial iterator.
pub(crate) trait FakeParallelIterator {
type Iter: Iterator<Item = Self::Item>;
type Item: Send;
fn into_par_iter(self) -> Self::Iter;
}

impl FakeParallelIterator for core::ops::Range<u32> {
type Iter = Self;
type Item = u32;
fn into_par_iter(self) -> Self::Iter {
self
}
}
}

pub use self::implementation::*;
Loading