Skip to content

Commit

Permalink
isolate underway migrations
Browse files Browse the repository at this point in the history
This is something of a hack to work around the fact that SQLx migrations
do not currently support specifying a schema under which the migrations
table will live.

Here we provide a search path throughout our migrations and in the
transaction that will run the migrations to ensure that migrations are
applied to `underway._sqlx_migrations`. Note that this assumes a
`public._sqlx_migrations` exists.

In the future we should be able to use `sqlx.toml` to address this more
robustly. That's expected as part of the `0.9.0` release of SQLx. Please
see: launchbadge/sqlx#3383

Closes #11
  • Loading branch information
maxcountryman committed Oct 31, 2024
1 parent 707f284 commit 95badb9
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 17 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/graceful_shutdown/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/multitask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create the task queue.
let queue = Queue::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/rag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

let openai_client = Client::new();

Expand Down
2 changes: 1 addition & 1 deletion examples/scheduled/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(&database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/step/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create our job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
4 changes: 4 additions & 0 deletions migrations/20240921151751_0.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
create schema if not exists underway;

-- Manage Underway migrations within the Underway schema.
create table if not exists underway._sqlx_migrations
(like public._sqlx_migrations including all);

create table underway.task_queue (
name text not null,
dlq_name text,
Expand Down
102 changes: 95 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -135,7 +135,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -194,7 +194,7 @@
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::MIGRATOR.run(&pool).await?;
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
Expand Down Expand Up @@ -265,7 +265,7 @@
#![warn(clippy::all, nonstandard_style, future_incompatible, missing_docs)]

use sqlx::migrate::Migrator;
use sqlx::{migrate::Migrator, PgPool};

pub use crate::{
job::{Job, To},
Expand All @@ -281,7 +281,16 @@ mod scheduler;
pub mod task;
pub mod worker;

/// A SQLx [`Migrator`] which provides Underway's schema migrations.
static MIGRATOR: Migrator = sqlx::migrate!();

/// Runs Underway migrations.
///
/// A transaction is acquired via the provided pool and migrations are run via
/// this transaction.
///
/// As no direct support for specifying the schema under which the migrations
/// table will live, we manually specify this via the search path. This ensures
/// that migrations are isolated to underway._sqlx_migrations.
///
/// These migrations must be applied before queues, tasks, and workers can be
/// run.
Expand All @@ -304,8 +313,87 @@ pub mod worker;
/// let pool = PgPool::connect(database_url).await?;
///
/// // Run migrations.
/// underway::MIGRATOR.run(&pool).await?;
/// underway::run_migrations(&pool).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
pub static MIGRATOR: Migrator = sqlx::migrate!();
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;

// Ensure the 'underway' schema exists
sqlx::query!("create schema if not exists underway;")
.execute(&mut *tx)
.await?;

// Temporarily set search_path for this transaction
sqlx::query!("set local search_path to underway;")
.execute(&mut *tx)
.await?;

// Run migrations within the 'underway' schema
MIGRATOR.run(&mut *tx).await?;

tx.commit().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use sqlx::PgPool;

use super::run_migrations;

#[sqlx::test(migrations = false)]
async fn sanity_check_run_migrations(pool: PgPool) -> Result<(), sqlx::Error> {
run_migrations(&pool).await?;

let schema_exists: bool = sqlx::query_scalar!(
r#"
select exists (
select 1 from pg_namespace where nspname = 'underway'
);
"#,
)
.fetch_one(&pool)
.await?
.unwrap();
assert!(
schema_exists,
"Schema 'underway' should exist after migrations."
);

let migrations_table_exists: bool = sqlx::query_scalar!(
r#"
select exists (
select 1 from information_schema.tables
where table_schema = 'underway' and
table_name = '_sqlx_migrations'
);
"#,
)
.fetch_one(&pool)
.await?
.unwrap();
assert!(
migrations_table_exists,
"Migrations table should exist in 'underway' schema."
);

let search_path: String = sqlx::query_scalar("show search_path;")
.fetch_one(&pool)
.await?;

assert!(
!search_path.contains("underway"),
"search_path should not include 'underway' after the transaction."
);

assert!(
search_path.contains("public"),
"Default search_path should include 'public'."
);

Ok(())
}
}

0 comments on commit 95badb9

Please sign in to comment.