Skip to content

Commit

Permalink
Add a Rust HealthCheck strategy (#5117)
Browse files Browse the repository at this point in the history
Similar to the python functionality, this just bumps the file timestamp in a regular interval.
  • Loading branch information
Swatinem authored Nov 30, 2023
1 parent 4d71a78 commit 388639f
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 3 deletions.
26 changes: 24 additions & 2 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ fn create_factory(
concurrency,
None,
true,
None,
);
Box::new(factory)
}
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/rust_arroyo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
chrono = "0.4.26"
clap = "2.18.0"
env_logger = "0.9.0"
filetime = "0.2.22"
futures = "0.3.21"
once_cell = "1.18.0"
rdkafka = { git = "https://github.com/getsentry/rust-rdkafka", branch = "base-consumer", features = ["cmake-build"] }
Expand Down
75 changes: 75 additions & 0 deletions rust_snuba/rust_arroyo/src/processing/strategies/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::path::PathBuf;
use std::time::{Duration, SystemTime};

use crate::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, SubmitError,
};
use crate::types::Message;
use crate::utils::metrics::{get_metrics, BoxMetrics};

const TOUCH_INTERVAL: Duration = Duration::from_secs(1);

pub struct HealthCheck<Next> {
next_step: Next,
path: PathBuf,
interval: Duration,
deadline: SystemTime,
metrics: BoxMetrics,
}

impl<Next> HealthCheck<Next> {
pub fn new(next_step: Next, path: impl Into<PathBuf>) -> Self {
let interval = TOUCH_INTERVAL;
let deadline = SystemTime::now() + interval;

Self {
next_step,
path: path.into(),
interval,
deadline,
metrics: get_metrics(),
}
}

fn maybe_touch_file(&mut self) {
let now = SystemTime::now();
if now < self.deadline {
return;
}
if let Err(err) = filetime::set_file_mtime(&self.path, filetime::FileTime::now()) {
let error: &dyn std::error::Error = &err;
tracing::error!(error);
}

self.metrics
.increment("arroyo.processing.strategies.healthcheck.touch", 1, None);
self.deadline = now + self.interval;
}
}

impl<TPayload, Next> ProcessingStrategy<TPayload> for HealthCheck<Next>
where
Next: ProcessingStrategy<TPayload> + 'static,
{
fn poll(&mut self) -> Result<Option<CommitRequest>, InvalidMessage> {
self.maybe_touch_file();

self.next_step.poll()
}

fn submit(&mut self, message: Message<TPayload>) -> Result<(), SubmitError<TPayload>> {
self.next_step.submit(message)
}

fn close(&mut self) {
self.next_step.close()
}

fn terminate(&mut self) {
self.next_step.terminate()
}

fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, InvalidMessage> {
self.next_step.join(timeout)
}
}
23 changes: 23 additions & 0 deletions rust_snuba/rust_arroyo/src/processing/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::time::Duration;

pub mod commit_offsets;
pub mod healthcheck;
pub mod produce;
pub mod reduce;
pub mod run_task;
Expand Down Expand Up @@ -117,6 +118,28 @@ pub trait ProcessingStrategy<TPayload>: Send + Sync {
fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, InvalidMessage>;
}

impl<TPayload, S: ProcessingStrategy<TPayload> + ?Sized> ProcessingStrategy<TPayload> for Box<S> {
fn poll(&mut self) -> Result<Option<CommitRequest>, InvalidMessage> {
(**self).poll()
}

fn submit(&mut self, message: Message<TPayload>) -> Result<(), SubmitError<TPayload>> {
(**self).submit(message)
}

fn close(&mut self) {
(**self).close()
}

fn terminate(&mut self) {
(**self).terminate()
}

fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, InvalidMessage> {
(**self).join(timeout)
}
}

pub trait ProcessingStrategyFactory<TPayload>: Send + Sync {
/// Instantiate and return a ``ProcessingStrategy`` instance.
///
Expand Down
5 changes: 5 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn consumer(
concurrency: usize,
use_rust_processor: bool,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
) {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -44,10 +45,12 @@ pub fn consumer(
concurrency,
use_rust_processor,
python_max_queue_depth,
health_check_file,
)
});
}

#[allow(clippy::too_many_arguments)]
pub fn consumer_impl(
consumer_group: &str,
auto_offset_reset: &str,
Expand All @@ -56,6 +59,7 @@ pub fn consumer_impl(
concurrency: usize,
use_rust_processor: bool,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
) {
setup_logging();

Expand Down Expand Up @@ -151,6 +155,7 @@ pub fn consumer_impl(
ConcurrencyConfig::new(concurrency),
python_max_queue_depth,
use_rust_processor,
health_check_file.map(ToOwned::to_owned),
)),
dlq_policy,
);
Expand Down
12 changes: 11 additions & 1 deletion rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;

use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::processing::strategies::commit_offsets::CommitOffsets;
use rust_arroyo::processing::strategies::healthcheck::HealthCheck;
use rust_arroyo::processing::strategies::reduce::Reduce;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory};
Expand All @@ -23,6 +24,7 @@ pub struct ConsumerStrategyFactory {
concurrency: ConcurrencyConfig,
python_max_queue_depth: Option<usize>,
use_rust_processor: bool,
health_check_file: Option<String>,
}

impl ConsumerStrategyFactory {
Expand All @@ -36,6 +38,7 @@ impl ConsumerStrategyFactory {
concurrency: ConcurrencyConfig,
python_max_queue_depth: Option<usize>,
use_rust_processor: bool,
health_check_file: Option<String>,
) -> Self {
Self {
storage_config,
Expand All @@ -46,6 +49,7 @@ impl ConsumerStrategyFactory {
concurrency,
python_max_queue_depth,
use_rust_processor,
health_check_file,
}
}
}
Expand All @@ -72,7 +76,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
self.max_batch_time,
);

match (
let processor = match (
self.use_rust_processor,
processors::get_processing_function(
&self.storage_config.message_processor.python_class_name,
Expand All @@ -94,6 +98,12 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
)
.unwrap(),
),
};

if let Some(path) = &self.health_check_file {
Box::new(HealthCheck::new(processor, path))
} else {
processor
}
}
}
8 changes: 8 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
default=None,
help="How many messages should be queued up in the Python message processor before backpressure kicks in. Defaults to the number of processes.",
)
@click.option(
"--health-check-file",
default=None,
type=str,
help="Arroyo will touch this file at intervals to indicate health. If not provided, no health check is performed.",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand All @@ -132,6 +138,7 @@ def rust_consumer(
use_rust_processor: bool,
group_instance_id: Optional[str],
python_max_queue_depth: Optional[int],
health_check_file: Optional[str],
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down Expand Up @@ -173,4 +180,5 @@ def rust_consumer(
concurrency_override or concurrency or 1,
use_rust_processor,
python_max_queue_depth,
health_check_file,
)

0 comments on commit 388639f

Please sign in to comment.