Skip to content

Commit

Permalink
stepped tasks: adds ability to write stepped tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Dec 10, 2024
1 parent d563db3 commit 05ff8b0
Show file tree
Hide file tree
Showing 18 changed files with 423 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ members = [
"examples/unmonitored-worker",
"examples/fn-args",
"examples/persisted-cron",
"examples/rest-api",
"examples/rest-api", "examples/stepped-tasks",
]


Expand Down
2 changes: 2 additions & 0 deletions examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ where

type Layer = AckLayer<Self, Req, RedisMqContext, Res>;

type Codec = C;

fn poll<Svc>(mut self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<Req, RedisMqContext>> = Box::pin(rx);
Expand Down
22 changes: 22 additions & 0 deletions examples/stepped-tasks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "stepped-tasks"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
tower = { version = "0.5", features = ["util"] }
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "catch-panic"] }
apalis-redis = { path = "../../packages/apalis-redis" }
# apalis-sql = { path = "../../packages/apalis-sql", features = [
# "sqlite",
# "tokio-comp",
# ] }
serde = "1"
serde_json = "1"
tracing-subscriber = "0.3.11"
futures = "0.3"
[dependencies.tracing]
default-features = false
version = "0.1"
85 changes: 85 additions & 0 deletions examples/stepped-tasks/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::time::Duration;

use apalis::prelude::*;
use apalis_redis::RedisStorage;
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize, Clone)]
struct WelcomeEmail {
user_id: usize,
}

#[derive(Debug, Serialize, Deserialize, Clone)]

struct FirstWeekEmail {
user_id: usize,
}

#[derive(Debug, Serialize, Deserialize, Clone)]

struct FirstMonthEmail {
user_id: usize,
}

async fn welcome(req: WelcomeEmail, ctx: Data<()>) -> Result<GoTo<FirstWeekEmail>, Error> {
Ok::<_, _>(GoTo::Next(FirstWeekEmail {
user_id: req.user_id + 1,
}))
}

async fn first_week_email(
req: FirstWeekEmail,
ctx: Data<()>,
) -> Result<GoTo<FirstMonthEmail>, Error> {
Ok::<_, _>(GoTo::Delay {
next: FirstMonthEmail {
user_id: req.user_id + 1,
},
delay: Duration::from_secs(10),
})
}

async fn first_month_email(req: FirstMonthEmail, ctx: Data<()>) -> Result<GoTo<()>, Error> {
Ok::<_, _>(GoTo::Done)
}

async fn produce_jobs(storage: &mut RedisStorage<StepRequest<Vec<u8>>>) {
storage
.push(StepRequest {
current: 0,
inner: serde_json::to_vec(&WelcomeEmail { user_id: 1 }).unwrap(),
})
.await
.unwrap();
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let conn = apalis_redis::connect("redis://127.0.0.1/").await.unwrap();
let config = apalis_redis::Config::default()
.set_namespace("apalis_redis-with-msg-pack")
.set_max_retries(5);

let mut storage = RedisStorage::new_with_config(conn, config);
produce_jobs(&mut storage).await;

// Build steps
let steps = StepBuilder::new()
.step_fn(welcome)
.step_fn(first_week_email)
.step_fn(first_month_email);

WorkerBuilder::new("tasty-banana")
.data(())
.enable_tracing()
.concurrency(2)
.backend(storage)
.build_steps(steps)
.on_event(|e| info!("{e}"))
.run()
.await;
Ok(())
}
2 changes: 1 addition & 1 deletion packages/apalis-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ optional = true


[features]
default = ["test-utils"]
default = ["test-utils", "json"]
docsrs = ["document-features"]
sleep = ["futures-timer"]
json = ["serde_json"]
Expand Down
3 changes: 3 additions & 0 deletions packages/apalis-core/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub trait Backend<Req, Res> {
/// Returns the final decoration of layers
type Layer;

/// Specifies the codec type used by the backend
type Codec;

/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
Expand Down
8 changes: 4 additions & 4 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::{
/// Allows building a [`Worker`].
/// Usually the output is [`Worker<Ready>`]
pub struct WorkerBuilder<Req, Ctx, Source, Middleware, Serv> {
id: WorkerId,
request: PhantomData<Request<Req, Ctx>>,
layer: ServiceBuilder<Middleware>,
source: Source,
pub id: WorkerId,

Check warning on line 22 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 22 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 22 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
pub request: PhantomData<Request<Req, Ctx>>,

Check warning on line 23 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 23 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 23 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
pub layer: ServiceBuilder<Middleware>,

Check warning on line 24 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 24 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 24 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
pub source: Source,

Check warning on line 25 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 25 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 25 in packages/apalis-core/src/builder.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
service: PhantomData<Serv>,
}

Expand Down
2 changes: 2 additions & 0 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub mod task;
/// Codec for handling data
pub mod codec;

pub mod step;

Check warning on line 65 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a module

Check warning on line 65 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

missing documentation for a module

Check warning on line 65 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a module

Check warning on line 65 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a module

/// Sleep utilities
#[cfg(feature = "sleep")]
pub async fn sleep(duration: std::time::Duration) {
Expand Down
2 changes: 2 additions & 0 deletions packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl<T: Send + 'static + Sync, Res> Backend<Request<T, ()>, Res> for MemoryStora

type Layer = Identity;

type Codec = ();

fn poll<Svc>(self, _worker: &Worker<worker::Context>) -> Poller<Self::Stream> {
let stream = self.inner.map(|r| Ok(Some(r))).boxed();
Poller {
Expand Down
2 changes: 2 additions & 0 deletions packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx

type Layer = Identity;

type Codec = ();

fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
Poller {
stream: self,
Expand Down
Loading

0 comments on commit 05ff8b0

Please sign in to comment.