Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve on benches #379

Merged
merged 12 commits into from
Aug 5, 2024
1 change: 1 addition & 0 deletions .github/workflows/bench.yaml
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ on:
paths:
- 'packages/**'
- '.github/workflows/bench.yaml'
- 'benches/**'
name: Benchmark
jobs:
storageBenchmark:
14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -80,7 +80,13 @@ pprof = { version = "0.13", features = ["flamegraph"] }
paste = "1.0.14"
serde = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
apalis = { path = ".", features = ["limit"]}
apalis-redis = { path = "./packages/apalis-redis" }
apalis-sql = { path = "./packages/apalis-sql", features = [
"postgres",
"mysql",
"sqlite",
] }
redis = { version = "0.25.3", default-features = false, features = [
"tokio-comp",
"script",
@@ -91,7 +97,7 @@ redis = { version = "0.25.3", default-features = false, features = [
[dev-dependencies.sqlx]
version = "0.8.0"
default-features = false
features = ["chrono", "mysql", "sqlite", "postgres"]
features = ["chrono", "mysql", "sqlite", "postgres", "runtime-tokio"]


[[bench]]
@@ -120,7 +126,11 @@ members = [
"examples/redis-with-msg-pack",
"examples/redis-deadpool",
"examples/redis-mq-example",
"examples/cron", "examples/catch-panic", "examples/graceful-shutdown", "examples/unmonitored-worker", "examples/fn-args",
"examples/cron",
"examples/catch-panic",
"examples/graceful-shutdown",
"examples/unmonitored-worker",
"examples/fn-args",
]


125 changes: 67 additions & 58 deletions benches/storages.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use apalis::prelude::*;

use apalis::{
mysql::{MySqlPool, MysqlStorage},
postgres::{PgPool, PostgresStorage},
sqlite::{SqlitePool, SqliteStorage},
};
use apalis_redis::RedisStorage;
use apalis_sql::mysql::MysqlStorage;
use apalis_sql::postgres::PostgresStorage;
use apalis_sql::sqlite::SqliteStorage;
use apalis_sql::Config;
use criterion::*;
use futures::Future;
use paste::paste;
use serde::{Deserialize, Serialize};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sqlx::MySqlPool;
use sqlx::PgPool;
use sqlx::SqlitePool;
use std::time::Duration;
use std::time::Instant;
use tokio::runtime::Runtime;
macro_rules! define_bench {
($name:expr, $setup:expr ) => {
@@ -23,50 +22,46 @@ macro_rules! define_bench {

let mut group = c.benchmark_group($name);
group.sample_size(10);
group.bench_with_input(BenchmarkId::new("consume", size), &size, |b, &s| {
group.bench_with_input(BenchmarkId::new("consume", size), &size, |b, &size| {
b.to_async(Runtime::new().unwrap())
.iter_custom(|iters| async move {
let mut interval = tokio::time::interval(Duration::from_millis(150));
let storage = { $setup };
let mut s1 = storage.clone();
let counter = Counter::default();
let c = counter.clone();
tokio::spawn(async move {
Monitor::<TokioExecutor>::new()
.register({
let worker =
WorkerBuilder::new(format!("{}-bench", $name))
.data(c)
.backend(storage)
.build_fn(handle_test_job);
worker
})
.run()
.await
.unwrap();
});
.iter(|| async move {

let start = Instant::now();
for _ in 0..iters {
for _i in 0..s {
let _ = s1.push(TestJob).await;
let mut storage = { $setup };
let mut s = storage.clone();
storage.cleanup().await;
tokio::spawn(async move {
for i in 0..=size {
let _ = s.push(TestJob(i)).await;
}
while (counter.0.load(Ordering::Relaxed) != s) || (s1.len().await.unwrap_or(-1) != 0) {
interval.tick().await;
});
async fn handle_test_job(
req: TestJob,
size: Data<usize>,
wrk: Context<TokioExecutor>,
) -> Result<(), Error> {
if req.0 == *size {
wrk.stop();
}
counter.0.store(0, Ordering::Relaxed);
Ok(())
}
let elapsed = start.elapsed();
s1.cleanup().await;
elapsed
let start = Instant::now();
WorkerBuilder::new(format!("{}-bench", $name))
.data(size as usize)
.backend(storage.clone())
.build_fn(handle_test_job)
.with_executor(TokioExecutor)
.run()
.await;
storage.cleanup().await;
start.elapsed()
})
});
group.bench_with_input(BenchmarkId::new("push", size), &size, |b, &s| {
b.to_async(Runtime::new().unwrap()).iter(|| async move {
let mut storage = { $setup };
let start = Instant::now();
for _i in 0..s {
let _ = black_box(storage.push(TestJob).await);
for i in 0..s {
let _ = black_box(storage.push(TestJob(i)).await);
}
start.elapsed()
});
@@ -76,15 +71,7 @@ macro_rules! define_bench {
}

#[derive(Serialize, Deserialize, Debug)]
struct TestJob;
#[derive(Debug, Default, Clone)]
struct Counter(Arc<AtomicUsize>);

async fn handle_test_job(_req: TestJob, counter: Data<Counter>) -> Result<(), Error> {
counter.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

struct TestJob(usize);
trait CleanUp {
fn cleanup(&mut self) -> impl Future<Output = ()> + Send;
}
@@ -110,7 +97,9 @@ impl CleanUp for PostgresStorage<TestJob> {
impl CleanUp for MysqlStorage<TestJob> {
async fn cleanup(&mut self) {
let pool = self.pool();
let query = "DELETE FROM jobs; DELETE from workers;";
let query = "DELETE FROM jobs;";
sqlx::query(query).execute(pool).await.unwrap();
let query = "DELETE from workers;";
sqlx::query(query).execute(pool).await.unwrap();
}
}
@@ -128,26 +117,46 @@ impl CleanUp for RedisStorage<TestJob> {
define_bench!("sqlite_in_memory", {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
let _ = SqliteStorage::setup(&pool).await;
SqliteStorage::new(pool)
SqliteStorage::new_with_config(
pool,
Config::default()
.set_buffer_size(100)
.set_poll_interval(Duration::from_millis(50)),
)
});

define_bench!("redis", {
let conn = apalis_redis::connect(env!("REDIS_URL")).await.unwrap();
let redis = RedisStorage::new(conn);
let redis = RedisStorage::new_with_config(
conn,
apalis_redis::Config::default()
.set_namespace("redis-bench")
.set_buffer_size(100),
);
redis
});

define_bench!("postgres", {
let pool = PgPool::connect(env!("POSTGRES_URL")).await.unwrap();
let _ = PostgresStorage::setup(&pool).await.unwrap();
PostgresStorage::new(pool)
PostgresStorage::new_with_config(
pool,
Config::new("postgres:bench")
.set_buffer_size(100)
.set_poll_interval(Duration::from_millis(50)),
)
});

define_bench!("mysql", {
let pool = MySqlPool::connect(env!("MYSQL_URL")).await.unwrap();
let _ = MysqlStorage::setup(&pool).await.unwrap();
MysqlStorage::new(pool)
MysqlStorage::new_with_config(
pool,
Config::new("mysql:bench")
.set_buffer_size(100)
.set_poll_interval(Duration::from_millis(50)),
)
});

criterion_group!(benches, sqlite_in_memory);
criterion_group!(benches, sqlite_in_memory, redis, postgres, mysql);
criterion_main!(benches);
9 changes: 1 addition & 8 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
@@ -95,10 +95,9 @@ impl MysqlStorage<(), JsonCodec<Value>> {
}
}

impl<T, C> MysqlStorage<T, C>
impl<T> MysqlStorage<T>
where
T: Serialize + DeserializeOwned,
C: Codec,
{
/// Create a new instance from a pool
pub fn new(pool: MySqlPool) -> Self {
@@ -122,12 +121,6 @@ where
&self.pool
}

/// Expose the codec
#[doc(hidden)]
pub fn codec(&self) -> &PhantomData<C> {
&self.codec
}

/// Get the config used by the storage
pub fn get_config(&self) -> &Config {
&self.config
2 changes: 2 additions & 0 deletions src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ pub mod tracing;
#[cfg(feature = "limit")]
#[cfg_attr(docsrs, doc(cfg(feature = "limit")))]
pub mod limit {
pub use tower::limit::ConcurrencyLimitLayer;
pub use tower::limit::GlobalConcurrencyLimitLayer;
pub use tower::limit::RateLimitLayer;
}

Loading