diff --git a/rust_snuba/rust_arroyo/examples/base_processor.rs b/rust_snuba/rust_arroyo/examples/base_processor.rs index 362c8bb17a..58c4141a04 100644 --- a/rust_snuba/rust_arroyo/examples/base_processor.rs +++ b/rust_snuba/rust_arroyo/examples/base_processor.rs @@ -7,6 +7,7 @@ use rust_arroyo::processing::strategies::commit_offsets::CommitOffsets; use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory}; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::Topic; +use std::sync::{Arc, Mutex}; use std::time::Duration; struct TestFactory {} @@ -24,7 +25,7 @@ fn main() { false, None, ); - let consumer = Box::new(KafkaConsumer::new(config)); + let consumer = Arc::new(Mutex::new(KafkaConsumer::new(config))); let topic = Topic { name: "test_static".to_string(), }; diff --git a/rust_snuba/rust_arroyo/examples/transform_and_produce.rs b/rust_snuba/rust_arroyo/examples/transform_and_produce.rs index 134294fecd..6f88455ffb 100644 --- a/rust_snuba/rust_arroyo/examples/transform_and_produce.rs +++ b/rust_snuba/rust_arroyo/examples/transform_and_produce.rs @@ -16,6 +16,7 @@ use rust_arroyo::processing::strategies::{ }; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::{Message, Topic, TopicOrPartition}; +use std::sync::{Arc, Mutex}; use std::time::Duration; fn reverse_string(value: KafkaPayload) -> Result { @@ -73,7 +74,7 @@ async fn main() { None, ); - let consumer = Box::new(KafkaConsumer::new(config.clone())); + let consumer = Arc::new(Mutex::new(KafkaConsumer::new(config.clone()))); let mut processor = StreamProcessor::new( consumer, Box::new(ReverseStringAndProduceStrategyFactory { diff --git a/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs b/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs index 2e2d24f5d9..08717f435c 100644 --- a/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs @@ -160,7 +160,7 @@ impl KafkaConsumer { } } -impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { +impl ArroyoConsumer for KafkaConsumer { fn subscribe( &mut self, topics: &[Topic], diff --git a/rust_snuba/rust_arroyo/src/backends/local/broker.rs b/rust_snuba/rust_arroyo/src/backends/local/broker.rs index 572feb15bf..900957b7bd 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/broker.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/broker.rs @@ -32,7 +32,7 @@ impl From for BrokerError { } } -impl LocalBroker { +impl LocalBroker { pub fn new(storage: Box>, clock: Box) -> Self { Self { storage, diff --git a/rust_snuba/rust_arroyo/src/backends/local/mod.rs b/rust_snuba/rust_arroyo/src/backends/local/mod.rs index e162ffe950..d04cc037fa 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/mod.rs @@ -24,10 +24,10 @@ struct SubscriptionState { last_eof_at: HashMap, } -pub struct LocalConsumer<'a, TPayload: Clone> { +pub struct LocalConsumer { id: Uuid, group: String, - broker: &'a mut LocalBroker, + broker: LocalBroker, pending_callback: VecDeque, paused: HashSet, // The offset that a the last ``EndOfPartition`` exception that was @@ -40,10 +40,10 @@ pub struct LocalConsumer<'a, TPayload: Clone> { closed: bool, } -impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { +impl LocalConsumer { pub fn new( id: Uuid, - broker: &'a mut LocalBroker, + broker: LocalBroker, group: String, enable_end_of_partition: bool, ) -> Self { @@ -68,7 +68,7 @@ impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { } } -impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload> { +impl Consumer for LocalConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -327,7 +327,7 @@ mod tests { #[test] fn test_consumer_subscription() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -337,8 +337,7 @@ mod tests { }; let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); assert!(consumer.subscription_state.topics.is_empty()); let res = consumer.subscribe(&[topic1.clone(), topic2.clone()], my_callbacks); @@ -381,7 +380,7 @@ mod tests { #[test] fn test_subscription_callback() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -455,8 +454,7 @@ mod tests { let my_callbacks: Box = Box::new(TheseCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic1, topic2], my_callbacks); let _ = consumer.poll(Some(Duration::from_millis(100))); @@ -500,8 +498,7 @@ mod tests { } let my_callbacks: Box = Box::new(TheseCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic2], my_callbacks); @@ -523,7 +520,7 @@ mod tests { #[test] fn test_paused() { - let mut broker = build_broker(); + let broker = build_broker(); let topic2 = Topic { name: "test2".to_string(), }; @@ -532,8 +529,7 @@ mod tests { index: 0, }; let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let _ = consumer.subscribe(&[topic2], my_callbacks); assert_eq!(consumer.poll(None).unwrap(), None); @@ -549,10 +545,9 @@ mod tests { #[test] fn test_commit() { - let mut broker = build_broker(); + let broker = build_broker(); let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let topic2 = Topic { name: "test2".to_string(), }; diff --git a/rust_snuba/rust_arroyo/src/backends/mod.rs b/rust_snuba/rust_arroyo/src/backends/mod.rs index 1a75b71a16..6806f74dfa 100755 --- a/rust_snuba/rust_arroyo/src/backends/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/mod.rs @@ -41,7 +41,7 @@ pub enum ProducerError { /// This is basically an observer pattern to receive the callbacks from /// the consumer when partitions are assigned/revoked. -pub trait AssignmentCallbacks: Send + Sync { +pub trait AssignmentCallbacks: Send { fn on_assign(&mut self, partitions: HashMap); fn on_revoke(&mut self, partitions: Vec); } @@ -80,7 +80,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// occurs even if the consumer retains ownership of the partition across /// assignments.) For this reason, it is generally good practice to ensure /// offsets are committed as part of the revocation callback. -pub trait Consumer<'a, TPayload: Clone> { +pub trait Consumer: Send { fn subscribe( &mut self, topic: &[Topic], diff --git a/rust_snuba/rust_arroyo/src/backends/storages/memory.rs b/rust_snuba/rust_arroyo/src/backends/storages/memory.rs index 5ace8de907..b696d318d3 100755 --- a/rust_snuba/rust_arroyo/src/backends/storages/memory.rs +++ b/rust_snuba/rust_arroyo/src/backends/storages/memory.rs @@ -68,7 +68,7 @@ impl Default for MemoryMessageStorage { } } -impl MessageStorage for MemoryMessageStorage { +impl MessageStorage for MemoryMessageStorage { fn create_topic(&mut self, topic: Topic, partitions: u16) -> Result<(), TopicExists> { if self.topics.contains_key(&topic) { return Err(TopicExists); diff --git a/rust_snuba/rust_arroyo/src/backends/storages/mod.rs b/rust_snuba/rust_arroyo/src/backends/storages/mod.rs index 2ac3caaace..d4789e5d1e 100755 --- a/rust_snuba/rust_arroyo/src/backends/storages/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/storages/mod.rs @@ -21,7 +21,7 @@ pub enum ConsumeError { OffsetOutOfRange, } -pub trait MessageStorage { +pub trait MessageStorage: Send { // Create a topic with the given number of partitions. // // If the topic already exists, a ``TopicExists`` exception will be diff --git a/rust_snuba/rust_arroyo/src/processing/mod.rs b/rust_snuba/rust_arroyo/src/processing/mod.rs index f521cc4cc2..46d2360262 100644 --- a/rust_snuba/rust_arroyo/src/processing/mod.rs +++ b/rust_snuba/rust_arroyo/src/processing/mod.rs @@ -94,8 +94,8 @@ impl Callbacks { /// instance and a ``ProcessingStrategy``, ensuring that processing /// strategies are instantiated on partition assignment and closed on /// partition revocation. -pub struct StreamProcessor<'a, TPayload: Clone> { - consumer: Box + 'a>, +pub struct StreamProcessor { + consumer: Arc>>, strategies: Arc>>, message: Option>, processor_handle: ProcessorHandle, @@ -104,9 +104,9 @@ pub struct StreamProcessor<'a, TPayload: Clone> { metrics_buffer: metrics_buffer::MetricsBuffer, } -impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { +impl StreamProcessor { pub fn new( - consumer: Box + 'a>, + consumer: Arc>>, processing_factory: Box>, ) -> Self { let strategies = Arc::new(Mutex::new(Strategies { @@ -130,14 +130,23 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { pub fn subscribe(&mut self, topic: Topic) { let callbacks: Box = Box::new(Callbacks::new(self.strategies.clone())); - self.consumer.subscribe(&[topic], callbacks).unwrap(); + self.consumer + .lock() + .unwrap() + .subscribe(&[topic], callbacks) + .unwrap(); } pub fn run_once(&mut self) -> Result<(), RunError> { if self.is_paused { // If the consumer waas paused, it should not be returning any messages // on ``poll``. - let res = self.consumer.poll(Some(Duration::ZERO)).unwrap(); + let res = self + .consumer + .lock() + .unwrap() + .poll(Some(Duration::ZERO)) + .unwrap(); match res { None => {} @@ -148,7 +157,12 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { // even if there is no active assignment and/or processing strategy. let poll_start = Instant::now(); //TODO: Support errors properly - match self.consumer.poll(Some(Duration::from_secs(1))) { + match self + .consumer + .lock() + .unwrap() + .poll(Some(Duration::from_secs(1))) + { Ok(msg) => { self.message = msg.map(|inner| Message { inner_message: InnerMessage::BrokerMessage(inner), @@ -174,8 +188,12 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { match commit_request { Ok(None) => {} Ok(Some(request)) => { - self.consumer.stage_offsets(request.positions).unwrap(); - self.consumer.commit_offsets().unwrap(); + self.consumer + .lock() + .unwrap() + .stage_offsets(request.positions) + .unwrap(); + self.consumer.lock().unwrap().commit_offsets().unwrap(); } Err(e) => { println!("TODOO: Handle invalid message {:?}", e); @@ -194,10 +212,17 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { Ok(()) => { // Resume if we are currently in a paused state if self.is_paused { - let partitions: std::collections::HashSet = - self.consumer.tell().unwrap().keys().cloned().collect(); - - let res = self.consumer.resume(partitions); + let partitions: std::collections::HashSet = self + .consumer + .lock() + .unwrap() + .tell() + .unwrap() + .keys() + .cloned() + .collect(); + + let res = self.consumer.lock().unwrap().resume(partitions); match res { Ok(()) => { self.is_paused = false; @@ -236,10 +261,17 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { log::warn!("Consumer is in backpressure state for more than 1 second, pausing",); - let partitions = - self.consumer.tell().unwrap().keys().cloned().collect(); - - let res = self.consumer.pause(partitions); + let partitions = self + .consumer + .lock() + .unwrap() + .tell() + .unwrap() + .keys() + .cloned() + .collect(); + + let res = self.consumer.lock().unwrap().pause(partitions); match res { Ok(()) => { self.is_paused = true; @@ -279,7 +311,7 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { } } drop(trait_callbacks); // unlock mutex so we can close consumer - self.consumer.close(); + self.consumer.lock().unwrap().close(); return Err(e); } } @@ -293,11 +325,11 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { } pub fn shutdown(&mut self) { - self.consumer.close(); + self.consumer.lock().unwrap().close(); } pub fn tell(self) -> HashMap { - self.consumer.tell().unwrap() + self.consumer.lock().unwrap().tell().unwrap() } } @@ -313,6 +345,7 @@ mod tests { use crate::types::{Message, Partition, Topic}; use crate::utils::clock::SystemClock; use std::collections::HashMap; + use std::sync::{Arc, Mutex}; use std::time::Duration; use uuid::Uuid; @@ -366,13 +399,13 @@ mod tests { #[test] fn test_processor() { - let mut broker = build_broker(); - let consumer = Box::new(LocalConsumer::new( + let broker = build_broker(); + let consumer = Arc::new(Mutex::new(LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, - )); + ))); let mut processor = StreamProcessor::new(consumer, Box::new(TestFactory {})); processor.subscribe(Topic { @@ -395,12 +428,12 @@ mod tests { let _ = broker.produce(&partition, "message1".to_string()); let _ = broker.produce(&partition, "message2".to_string()); - let consumer = Box::new(LocalConsumer::new( + let consumer = Arc::new(Mutex::new(LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, - )); + ))); let mut processor = StreamProcessor::new(consumer, Box::new(TestFactory {})); processor.subscribe(Topic { diff --git a/rust_snuba/rust_arroyo/src/utils/clock.rs b/rust_snuba/rust_arroyo/src/utils/clock.rs index 0b161dec2a..26f84fb78c 100644 --- a/rust_snuba/rust_arroyo/src/utils/clock.rs +++ b/rust_snuba/rust_arroyo/src/utils/clock.rs @@ -1,7 +1,7 @@ use std::thread::sleep; use std::time::{Duration, SystemTime}; -pub trait Clock { +pub trait Clock: Send { fn time(&self) -> SystemTime; fn sleep(self, duration: Duration); diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 188a9b675a..ccdfb50857 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::time::Duration; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -123,9 +124,8 @@ pub fn consumer_impl( Some(broker_config), ); - let consumer = Box::new(KafkaConsumer::new(config)); + let consumer = Arc::new(Mutex::new(KafkaConsumer::new(config))); let logical_topic_name = consumer_config.raw_topic.logical_topic_name; - let mut processor = StreamProcessor::new( consumer, Box::new(ConsumerStrategyFactory::new(