diff --git a/rust-arroyo/src/processing/dlq.rs b/rust-arroyo/src/processing/dlq.rs index 7cb5b1c1..15866dc4 100644 --- a/rust-arroyo/src/processing/dlq.rs +++ b/rust-arroyo/src/processing/dlq.rs @@ -32,7 +32,7 @@ pub trait DlqProducer: Send + Sync { } // Drops all invalid messages. Produce returns an immediately resolved future. -struct NoopDlqProducer {} +pub struct NoopDlqProducer; impl DlqProducer for NoopDlqProducer { fn produce( diff --git a/rust-arroyo/src/processing/strategies/run_task.rs b/rust-arroyo/src/processing/strategies/run_task.rs index 59dec1ba..33d798ed 100644 --- a/rust-arroyo/src/processing/strategies/run_task.rs +++ b/rust-arroyo/src/processing/strategies/run_task.rs @@ -5,38 +5,32 @@ use crate::processing::strategies::{ use crate::types::Message; use std::time::Duration; -type Function = dyn Fn(Message) -> Result, InvalidMessage> - + Send - + Sync - + 'static; - -pub struct RunTask { - pub function: Box>, - pub next_step: Box>, +pub struct RunTask { + pub function: F, + pub next_step: N, pub message_carried_over: Option>, pub commit_request_carried_over: Option, } -impl RunTask { - pub fn new(function: F, next_step: N) -> Self - where - N: ProcessingStrategy + 'static, - F: Fn(Message) -> Result, InvalidMessage> - + Send - + Sync - + 'static, - { +impl RunTask { + pub fn new(function: F, next_step: N) -> Self { Self { - function: Box::new(function), - next_step: Box::new(next_step), + function, + next_step, message_carried_over: None, commit_request_carried_over: None, } } } -impl ProcessingStrategy - for RunTask +impl ProcessingStrategy for RunTask +where + TTransformed: Send + Sync, + F: FnMut(Message) -> Result, InvalidMessage> + + Send + + Sync + + 'static, + N: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { match self.next_step.poll() {