Skip to content

Commit

Permalink
feat: return of exposing backends to help in building apis (#457) (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi authored Nov 26, 2024
1 parent c70e5c6 commit c4ec3d3
Show file tree
Hide file tree
Showing 23 changed files with 802 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ members = [
"examples/catch-panic",
"examples/graceful-shutdown",
"examples/unmonitored-worker",
"examples/fn-args", "examples/persisted-cron",
"examples/fn-args", "examples/persisted-cron", "examples/rest-api",
]


Expand Down
15 changes: 15 additions & 0 deletions examples/rest-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "rest-api"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
anyhow = "1"
apalis = { path = "../../" }
apalis-redis = { path = "../../packages/apalis-redis" }
serde = "1"
env_logger = "0.10"
actix-web = "4"
futures = "0.3"
email-service = { path = "../email-service" }
108 changes: 108 additions & 0 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use actix_web::rt::signal;
use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::Result;
use apalis::prelude::*;

use apalis_redis::RedisStorage;
use futures::future;

use email_service::{send_email, Email};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Debug)]
struct Filter {
#[serde(default)]
pub status: State,
#[serde(default = "default_page")]
pub page: i32,
}

fn default_page() -> i32 {
1
}

#[derive(Debug, Serialize, Deserialize)]
struct GetJobsResult<T> {
pub stats: Stat,
pub jobs: Vec<T>,
}

async fn push_job(job: web::Json<Email>, storage: web::Data<RedisStorage<Email>>) -> HttpResponse {
let mut storage = (**storage).clone();
let res = storage.push(job.into_inner()).await;
match res {
Ok(parts) => {
HttpResponse::Ok().body(format!("Job with ID [{}] added to queue", parts.task_id))
}
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_jobs(
storage: web::Data<RedisStorage<Email>>,
filter: web::Query<Filter>,
) -> HttpResponse {
let stats = storage.stats().await.unwrap_or_default();
let res = storage.list_jobs(&filter.status, filter.page).await;
match res {
Ok(jobs) => HttpResponse::Ok().json(GetJobsResult { stats, jobs }),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_workers(storage: web::Data<RedisStorage<Email>>) -> HttpResponse {
let workers = storage.list_workers().await;
match workers {
Ok(workers) => HttpResponse::Ok().json(workers),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_job(
job_id: web::Path<TaskId>,
storage: web::Data<RedisStorage<Email>>,
) -> HttpResponse {
let mut storage = (**storage).clone();

let res = storage.fetch_by_id(&job_id).await;
match res {
Ok(Some(job)) => HttpResponse::Ok().json(job),
Ok(None) => HttpResponse::NotFound().finish(),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

#[actix_web::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();

let conn = apalis_redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn);
let data = web::Data::new(storage.clone());
let http = async {
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.route("/", web::get().to(get_jobs)) // Fetch jobs in queue
.route("/workers", web::get().to(get_workers)) // Fetch workers
.route("/job", web::put().to(push_job)) // Allow add jobs via api
.route("/job/{job_id}", web::get().to(get_job)) // Allow fetch specific job
})
.bind("127.0.0.1:8000")?
.run()
.await?;
Ok(())
};
let worker = Monitor::new()
.register({
WorkerBuilder::new("tasty-avocado")
.enable_tracing()
.backend(storage)
.build_fn(send_email)
})
.run_with_signal(signal::ctrl_c());

future::try_join(http, worker).await?;
Ok(())
}
92 changes: 92 additions & 0 deletions packages/apalis-core/src/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{any::type_name, future::Future};

use futures::Stream;
use serde::{Deserialize, Serialize};
use tower::Service;

use crate::{
poller::Poller,
request::State,
worker::{Context, Worker},
};

/// A backend represents a task source
/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
///
/// [`Storage`]: crate::storage::Storage
/// [`MessageQueue`]: crate::mq::MessageQueue
pub trait Backend<Req, Res> {
/// The stream to be produced by the backend
type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;

/// Returns the final decoration of layers
type Layer;

/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}

/// Represents functionality that allows reading of jobs and stats from a backend
/// Some backends esp MessageQueues may not currently implement this
pub trait BackendExpose<T>
where
Self: Sized,
{
/// The request type being handled by the backend
type Request;
/// The error returned during reading jobs and stats
type Error;
/// List all Workers that are working on a backend
fn list_workers(
&self,
) -> impl Future<Output = Result<Vec<Worker<WorkerState>>, Self::Error>> + Send;

/// Returns the counts of jobs in different states
fn stats(&self) -> impl Future<Output = Result<Stat, Self::Error>> + Send;

/// Fetch jobs persisted in a backend
fn list_jobs(
&self,
status: &State,
page: i32,
) -> impl Future<Output = Result<Vec<Self::Request>, Self::Error>> + Send;
}

/// Represents the current statistics of a backend
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct Stat {
/// Represents pending tasks
pub pending: usize,
/// Represents running tasks
pub running: usize,
/// Represents dead tasks
pub dead: usize,
/// Represents failed tasks
pub failed: usize,
/// Represents successful tasks
pub success: usize,
}

/// A serializable version of a worker's state.
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerState {
/// Type of task being consumed by the worker, useful for display and filtering
pub r#type: String,
/// The type of job stream
pub source: String,
// TODO: // The layers that were loaded for worker.
// TODO: // pub layers: Vec<Layer>,
// TODO: // last_seen: Timestamp,
}
impl WorkerState {
/// Build a new state
pub fn new<S>(r#type: String) -> Self {
Self {
r#type,
source: type_name::<S>().to_string(),
}
}
}
2 changes: 1 addition & 1 deletion packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use tower::{
};

use crate::{
backend::Backend,
error::Error,
layers::extensions::Data,
request::Request,
service_fn::service_fn,
service_fn::ServiceFn,
worker::{Ready, Worker, WorkerId},
Backend,
};

/// Allows building a [`Worker`].
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/codec/json.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::marker::PhantomData;

use crate::Codec;
use crate::codec::Codec;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down
20 changes: 20 additions & 0 deletions packages/apalis-core/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
use serde::{Deserialize, Serialize};

use crate::error::BoxDynError;

/// A codec allows backends to encode and decode data
pub trait Codec {
/// The mode of storage by the codec
type Compact;
/// Error encountered by the codec
type Error: Into<BoxDynError>;
/// The encoding method
fn encode<I>(input: I) -> Result<Self::Compact, Self::Error>
where
I: Serialize;
/// The decoding method
fn decode<O>(input: Self::Compact) -> Result<O, Self::Error>
where
O: for<'de> Deserialize<'de>;
}

/// Encoding for tasks using json
#[cfg(feature = "json")]
pub mod json;
46 changes: 4 additions & 42 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
//! # apalis-core
//! Utilities for building job and message processing tools.
use error::BoxDynError;
use futures::Stream;
use poller::Poller;
use serde::{Deserialize, Serialize};
use tower::Service;
use worker::{Context, Worker};

/// Represent utilities for creating worker instances.
pub mod builder;

/// Represents a task source eg Postgres or Redis
pub mod backend;
/// Includes all possible error types.
pub mod error;
/// Represents middleware offered through [`tower`]
Expand Down Expand Up @@ -66,40 +62,6 @@ pub mod task;
/// Codec for handling data
pub mod codec;

/// A backend represents a task source
/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
///
/// [`Storage`]: crate::storage::Storage
/// [`MessageQueue`]: crate::mq::MessageQueue
pub trait Backend<Req, Res> {
/// The stream to be produced by the backend
type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;

/// Returns the final decoration of layers
type Layer;

/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}
/// A codec allows backends to encode and decode data
pub trait Codec {
/// The mode of storage by the codec
type Compact;
/// Error encountered by the codec
type Error: Into<BoxDynError>;
/// The encoding method
fn encode<I>(input: I) -> Result<Self::Compact, Self::Error>
where
I: Serialize;
/// The decoding method
fn decode<O>(input: Self::Compact) -> Result<O, Self::Error>
where
O: for<'de> Deserialize<'de>;
}

/// Sleep utilities
#[cfg(feature = "sleep")]
pub async fn sleep(duration: std::time::Duration) {
Expand Down Expand Up @@ -162,11 +124,11 @@ pub mod interval {
#[cfg(feature = "test-utils")]
/// Test utilities that allows you to test backends
pub mod test_utils {
use crate::backend::Backend;
use crate::error::BoxDynError;
use crate::request::Request;
use crate::task::task_id::TaskId;
use crate::worker::{Worker, WorkerId};
use crate::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt};
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
backend::Backend,
mq::MessageQueue,
poller::Poller,
poller::{controller::Controller, stream::BackendStream},
request::{Request, RequestStream},
worker::{self, Worker},
Backend, Poller,
};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use tower::{Layer, Service};
pub mod shutdown;

use crate::{
backend::Backend,
error::BoxDynError,
request::Request,
worker::{Context, Event, EventHandler, Ready, Worker, WorkerId},
Backend,
};

use self::shutdown::Shutdown;
Expand Down
Loading

0 comments on commit c4ec3d3

Please sign in to comment.