Skip to content

Commit

Permalink
ref(run_task): Remove unnecessary boxing, make FnMut
Browse files Browse the repository at this point in the history
This will allow you to use mutable state from within RunTask where
previously one couldn't, and remove some boilerplate

Also fix a warning drive-by.
  • Loading branch information
untitaker committed Oct 17, 2024
1 parent 9a773d5 commit c475f0c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
2 changes: 1 addition & 1 deletion rust-arroyo/src/processing/dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait DlqProducer<TPayload>: Send + Sync {
}

// Drops all invalid messages. Produce returns an immediately resolved future.
struct NoopDlqProducer {}
pub struct NoopDlqProducer;

impl<TPayload: Send + Sync + 'static> DlqProducer<TPayload> for NoopDlqProducer {
fn produce(
Expand Down
36 changes: 15 additions & 21 deletions rust-arroyo/src/processing/strategies/run_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,32 @@ use crate::processing::strategies::{
use crate::types::Message;
use std::time::Duration;

type Function<TPayload, TTransformed> = dyn Fn(Message<TPayload>) -> Result<Message<TTransformed>, InvalidMessage>
+ Send
+ Sync
+ 'static;

pub struct RunTask<TPayload, TTransformed> {
pub function: Box<Function<TPayload, TTransformed>>,
pub next_step: Box<dyn ProcessingStrategy<TTransformed>>,
pub struct RunTask<TTransformed, F, N> {
pub function: F,
pub next_step: N,
pub message_carried_over: Option<Message<TTransformed>>,
pub commit_request_carried_over: Option<CommitRequest>,
}

impl<TPayload, TTransformed> RunTask<TPayload, TTransformed> {
pub fn new<N, F>(function: F, next_step: N) -> Self
where
N: ProcessingStrategy<TTransformed> + 'static,
F: Fn(Message<TPayload>) -> Result<Message<TTransformed>, InvalidMessage>
+ Send
+ Sync
+ 'static,
{
impl<TTransformed, F, N> RunTask<TTransformed, F, N> {
pub fn new(function: F, next_step: N) -> Self {
Self {
function: Box::new(function),
next_step: Box::new(next_step),
function,
next_step,
message_carried_over: None,
commit_request_carried_over: None,
}
}
}

impl<TPayload, TTransformed: Send + Sync> ProcessingStrategy<TPayload>
for RunTask<TPayload, TTransformed>
impl<TPayload, TTransformed, F, N> ProcessingStrategy<TPayload> for RunTask<TTransformed, F, N>
where
TTransformed: Send + Sync,
F: FnMut(Message<TPayload>) -> Result<Message<TTransformed>, InvalidMessage>
+ Send
+ Sync
+ 'static,
N: ProcessingStrategy<TTransformed> + 'static,
{
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
match self.next_step.poll() {
Expand Down

0 comments on commit c475f0c

Please sign in to comment.