Skip to content

Commit

Permalink
tests: improve cleanup and generic testing
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Jul 19, 2024
1 parent 1f37478 commit 49e707a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 37 deletions.
10 changes: 5 additions & 5 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ mod tests {

let _job = consume_one(&mut storage, &worker_id).await;

cleanup(storage, &worker_id).await;
cleanup(&mut storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -1095,7 +1095,7 @@ mod tests {
.expect("failed to acknowledge the job");

let _job = get_job(&mut storage, &job_id).await;
cleanup(storage, &worker_id).await;
cleanup(&mut storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -1116,7 +1116,7 @@ mod tests {

let _job = get_job(&mut storage, &job_id).await;

cleanup(storage, &worker_id).await;
cleanup(&mut storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -1132,7 +1132,7 @@ mod tests {
.reenqueue_orphaned(5, 300)
.await
.expect("failed to reenqueue_orphaned");
cleanup(storage, &worker_id).await;
cleanup(&mut storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -1149,6 +1149,6 @@ mod tests {
.await
.expect("failed to reenqueue_orphaned");

cleanup(storage, &worker_id).await;
cleanup(&mut storage, &worker_id).await;
}
}
4 changes: 2 additions & 2 deletions packages/apalis-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Default for Config {
Self {
keep_alive: Duration::from_secs(30),
buffer_size: 10,
poll_interval: Duration::from_millis(50),
poll_interval: Duration::from_millis(100),
namespace: String::from("apalis::sql"),
}
}
Expand All @@ -63,7 +63,7 @@ impl Config {

/// Interval between database poll queries
///
/// Defaults to 30ms
/// Defaults to 100ms
pub fn set_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
Expand Down
22 changes: 7 additions & 15 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ mod tests {
MysqlStorage::setup(&pool)
.await
.expect("failed to migrate DB");
let storage = MysqlStorage::new(pool);

let mut storage = MysqlStorage::new(pool);
cleanup(&mut storage, &WorkerId::new("test-worker")).await;
storage
}

Expand All @@ -549,9 +549,9 @@ mod tests {
/// - worker identified by `worker_id`
///
/// You should execute this function in the end of a test
async fn cleanup(storage: MysqlStorage<Email>, worker_id: &WorkerId) {
sqlx::query("DELETE FROM jobs WHERE lock_by = ? OR status = 'Pending'")
.bind(worker_id.to_string())
async fn cleanup<T>(storage: &mut MysqlStorage<T>, worker_id: &WorkerId) {
sqlx::query("DELETE FROM jobs WHERE job_type = ?")
.bind(storage.config.namespace())
.execute(&storage.pool)
.await
.expect("failed to delete jobs");
Expand Down Expand Up @@ -612,6 +612,8 @@ mod tests {
}

async fn get_job(storage: &mut MysqlStorage<Email>, job_id: &TaskId) -> Request<Email> {
// add a slight delay to allow background actions like ack to complete
apalis_core::sleep(Duration::from_secs(1)).await;
storage
.fetch_by_id(job_id)
.await
Expand All @@ -632,8 +634,6 @@ mod tests {
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id.clone()));
assert!(ctx.lock_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand Down Expand Up @@ -663,8 +663,6 @@ mod tests {
// TODO: Fix assertions
assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -690,8 +688,6 @@ mod tests {
// TODO: Fix assertions
assert_eq!(*ctx.status(), State::Killed);
assert!(ctx.done_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand Down Expand Up @@ -730,8 +726,6 @@ mod tests {
assert!(context.lock_at().is_none());
assert!(context.done_at().is_none());
assert_eq!(*context.last_error(), Some("Job was abandoned".to_string()));

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand Down Expand Up @@ -768,7 +762,5 @@ mod tests {
// TODO: Fix assertions
assert_eq!(*context.status(), State::Running);
assert_eq!(*context.lock_by(), Some(worker_id.clone()));

cleanup(storage, &worker_id).await;
}
}
27 changes: 12 additions & 15 deletions packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,24 +627,25 @@ mod tests {
// (different runtimes are created for each test),
// we don't share the storage and tests must be run sequentially.
PostgresStorage::setup(&pool).await.unwrap();
let storage = PostgresStorage::new(pool);
let mut storage = PostgresStorage::new(pool);
cleanup(&mut storage, &WorkerId::new("test-worker")).await;
storage
}

/// rollback DB changes made by tests.
/// Delete the following rows:
/// - jobs whose state is `Pending` or locked by `worker_id`
/// - jobs of the current type
/// - worker identified by `worker_id`
///
/// You should execute this function in the end of a test
async fn cleanup(storage: PostgresStorage<Email>, worker_id: &WorkerId) {
async fn cleanup<T>(storage: &mut PostgresStorage<T>, worker_id: &WorkerId) {
let mut tx = storage
.pool
.acquire()
.await
.expect("failed to get connection");
sqlx::query("Delete from apalis.jobs where lock_by = $1 or status = 'Pending'")
.bind(worker_id.to_string())
sqlx::query("Delete from apalis.jobs where job_type = $1")
.bind(storage.config.namespace())
.execute(&mut *tx)
.await
.expect("failed to delete jobs");
Expand Down Expand Up @@ -693,6 +694,8 @@ mod tests {
}

async fn get_job(storage: &mut PostgresStorage<Email>, job_id: &TaskId) -> Request<Email> {
// add a slight delay to allow background actions like ack to complete
apalis_core::sleep(Duration::from_secs(1)).await;
storage
.fetch_by_id(job_id)
.await
Expand All @@ -709,11 +712,13 @@ mod tests {

let job = consume_one(&mut storage, &worker_id).await;
let ctx = job.get::<SqlContext>().unwrap();
let job_id = ctx.id();
// Refresh our job
let job = get_job(&mut storage, job_id).await;
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id.clone()));
assert!(ctx.lock_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand Down Expand Up @@ -741,8 +746,6 @@ mod tests {
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -766,8 +769,6 @@ mod tests {
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Killed);
assert!(ctx.done_at().is_some());

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand All @@ -793,8 +794,6 @@ mod tests {
assert!(ctx.lock_by().is_none());
assert!(ctx.lock_at().is_none());
assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_string()));

cleanup(storage, &worker_id).await;
}

#[tokio::test]
Expand Down Expand Up @@ -822,7 +821,5 @@ mod tests {

assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id.clone()));

cleanup(storage, &worker_id).await;
}
}

0 comments on commit 49e707a

Please sign in to comment.