Selecting the storage type at runtime through config #420
-
First off thanks for this crate! It's really nice and almost exactly what I am looking for. I had a quick question. I am using apalis on an axum project and I would like be able to specify the storage layer that apalis uses for orchestrating everything at runtime through a config file. For single node deploys of the application, I want to be able to run it with an in memory Sqlite store while for more intensive applications I want to use a redis server for the storage with multiple worker machines. Any examples of how I might do this? |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 1 reply
-
Ok went down a couple of rabbit holes trying to get this to work. I first tried a dynamic dispatch, hosing the different storage types in an Then I went for a static dispatch approach like so #[derive(Clone)]
pub enum WorkerStorage<T>
where
T: Clone,
{
SqliteMemory(SqliteStorage<T>),
Redis(RedisStorage<T>),
}
#[derive(Error, Debug)]
pub enum WorkerStorageError {
#[error("Error in worker sql storage: {0} ")]
SqliteError(#[from] sqlx::Error),
#[error("Error in worker redis storage: {0}")]
RedisError(#[from] RedisError),
}
impl<T: Sync + Clone> Ack<T> for WorkerStorage<T> {
type Acknowledger = TaskId;
type Error = WorkerStorageError;
async fn ack(
&self,
worker_id: &WorkerId,
task_id: &Self::Acknowledger,
) -> Result<(), Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.ack(worker_id, task_id)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.ack(worker_id, task_id)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
}
impl<T: Job + Serialize + DeserializeOwned + Sync + Send + Unpin + 'static + Clone>
Backend<Request<T>> for WorkerStorage<T>
{
type Stream = BackendStream<RequestStream<Request<T>>>;
type Layer = AckLayer<WorkerStorage<T>, T>;
fn common_layer(&self, worker_id: WorkerId) -> Self::Layer {
AckLayer::new((*self).clone(), worker_id)
}
fn poll(mut self, worker: WorkerId) -> Poller<Self::Stream> {
match self {
WorkerStorage::SqliteMemory(s) => s.poll(worker),
WorkerStorage::Redis(r) => r.poll(worker),
}
}
}
impl<T> Storage for WorkerStorage<T>
where
T: Job + Serialize + DeserializeOwned + Send + 'static + Unpin + Sync + std::fmt::Debug + Clone,
{
type Job = T;
type Error = WorkerStorageError;
type Identifier = TaskId;
async fn push(&mut self, job: Self::Job) -> Result<TaskId, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.push(job)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.push(job)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn schedule(&mut self, job: Self::Job, on: i64) -> Result<TaskId, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.schedule(job, on)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.schedule(job, on)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn len(&self) -> Result<i64, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.len()
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r.len().await.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn fetch_by_id(
&self,
job_id: &TaskId,
) -> Result<Option<Request<Self::Job>>, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.fetch_by_id(job_id)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.fetch_by_id(job_id)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn update(&self, job: Request<Self::Job>) -> Result<(), Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.update(job)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.update(job)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn reschedule(&mut self, job: Request<T>, wait: Duration) -> Result<(), Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.reschedule(job, wait)
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.reschedule(job, wait)
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn is_empty(&self) -> Result<bool, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.is_empty()
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.is_empty()
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
async fn vacuum(&self) -> Result<usize, Self::Error> {
match self {
WorkerStorage::SqliteMemory(s) => s
.vacuum()
.await
.map_err(|e| WorkerStorageError::SqliteError(e)),
WorkerStorage::Redis(r) => r
.vacuum()
.await
.map_err(|e| WorkerStorageError::RedisError(e)),
}
}
}
pub async fn start_job_workers(
config: &Config
) -> Result<(), AppWorkerError> {
info!("Starting math job worker");
let storage = match &config.workers.coordinator {
WorkerCoordinator::InMemory => {
let pool = SqlitePool::connect("sqlite::memory:?cache=shared")
.await
.map_err(|_| AppWorkerError::FailedToConnectToWorkerDB)?;
let storage: SqliteStorage<Job> = SqliteStorage::new(pool);
WorkerStorage::SqliteMemory(storage))
},
WorkerCoordinator::Redis(url) => {
let conn = apalis::redis::connect(url.to_owned())
.await
.map_err(|_| PolirsWorkerError::FailedToStartWorker)?;
let config = apalis::redis::Config::default();
WorkerStorage::Redis(RedisStorage::new_with_config(conn, config))
}
};
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new(format!("job-worker"))
.data(0usize)
.with_storage(storage)
.build_fn(run_job)
})
.run()
.await
.map_err(|_| AppWorkerError::FailedToStartWorker)?;
Ok(())
}
Which allows me to do the following in the Axum app let app = Router::new()
.route("/test_worker", get(test_worker))
.layer(Extension(worker_storage.clone()));
...
async fn test_worker(
Extension(mut math_worker): Extension<WorkerStorage<Job>>,
) -> impl IntoResponse {
println!("testing worker");
let result = math_worker
.push(Job {
text: "test".into(),
})
.await;
println!("Result {result:#?}");
StatusCode::OK
}
This works and will let pick between the in memory and Redis storage options at run time but it feels very fragile to up stream changes. I wonder if there would be any appetite on the project for including an optional "Any" module which would include this kind of runtime dynamic dispatch for a set of backends? If it's in the library itself then something like enum dispatch might be able to automate away most of this boiler plate. |
Beta Was this translation helpful? Give feedback.
-
Well, enum seems to be a runtime solution that works
You might also want to consider feature flags: #[cfg(feature = "sqlite")]
pub type WorkerStorage<T> = SqliteStorage<T>);
#[cfg(feature = "redis")]
pub type WorkerStorage<T> = RedisStorage<T>); Then you can pass the feature at compile time |
Beta Was this translation helpful? Give feedback.
-
i'm trying to achieve static dispatch in version impl<T: Clone + Send + Sync + Unpin + Serialize + DeserializeOwned, Res>
Backend<Request<T, SqlContext>, Res> for WorkerStorage<T>
{
type Stream = BackendStream<RequestStream<Request<T, SqlContext>>>;
// ???
type Layer = AckLayer<WorkerStorage<T>, T, SqlContext, Res>;
fn poll<Svc: Service<Request<T, SqlContext>, Response = Res>>(
self,
worker: WorkerId,
) -> Poller<Self::Stream, Self::Layer> {
match self {
#[cfg(feature = "postgres")]
WorkerStorage::Postgres(storage) => storage.poll(worker),
#[cfg(feature = "sqlite")]
WorkerStorage::Sqlite(storage) => storage.poll(worker),
}
}
} Full error message: error[E0308]: mismatched types
--> crates/task/src/lib.rs:141:49
|
138 | ) -> Poller<Self::Stream, Self::Layer> {
| --------------------------------- expected `apalis::prelude::Poller<apalis::prelude::BackendStream<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<std::option::Option<apalis::prelude::Request<T, apalis_sql::context::SqlContext>>, apalis::prelude::Error>> + std::marker::Send + 'static)>>>, apalis_core::layers::AckLayer<WorkerStorage<T>, T, apalis_sql::context::SqlContext, Res>>` because of return type
...
141 | WorkerStorage::Postgres(storage) => storage.poll(worker),
| ^^^^^^^^^^^^^^^^^^^^ expected `WorkerStorage<T>`, found `PostgresStorage<T>`
|
= note: expected struct `apalis::prelude::Poller<_, apalis_core::layers::AckLayer<WorkerStorage<T>, _, _, Res>>`
found struct `apalis::prelude::Poller<_, apalis_core::layers::AckLayer<apalis_sql::postgres::PostgresStorage<T>, _, _, _>>` |
Beta Was this translation helpful? Give feedback.
-
I want to mark this as resolved as the types are now public: pub struct Poller<S, L = Identity> {
pub stream: S,
pub heartbeat: BoxFuture<'static, ()>,
pub layer: L,
} |
Beta Was this translation helpful? Give feedback.
I want to mark this as resolved as the types are now public: