Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): Mutations consumer MVP #6216

Merged
merged 23 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 60 additions & 26 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::factory::ConsumerStrategyFactory;
use crate::logging::{setup_logging, setup_sentry};
use crate::metrics::global_tags::set_global_tag;
use crate::metrics::statsd::StatsDBackend;
use crate::mutations_factory::MutConsumerStrategyFactory;
use crate::processors;
use crate::types::{InsertOrReplacement, KafkaMessageMetadata};

Expand All @@ -38,6 +39,7 @@ pub fn consumer(
enforce_schema: bool,
max_poll_interval_ms: usize,
async_inserts: bool,
allow_mutability: bool,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
Expand All @@ -61,6 +63,7 @@ pub fn consumer(
stop_at_timestamp,
batch_write_timeout_ms,
max_bytes_before_external_group_by,
allow_mutability,
)
});
}
Expand All @@ -82,6 +85,7 @@ pub fn consumer_impl(
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_bytes_before_external_group_by: Option<usize>,
allow_mutability: bool,
) -> usize {
setup_logging();

Expand Down Expand Up @@ -228,33 +232,63 @@ pub fn consumer_impl(
None
};

let factory = ConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
replacements_config,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy);

let processor = if allow_mutability {
let mut_factory = MutConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
untitaker marked this conversation as resolved.
Show resolved Hide resolved
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
replacements_config,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
untitaker marked this conversation as resolved.
Show resolved Hide resolved
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

StreamProcessor::with_kafka(config, mut_factory, topic, dlq_policy)
} else {
let factory = ConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
replacements_config,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

StreamProcessor::with_kafka(config, factory, topic, dlq_policy)
};

let mut handle = processor.get_handle();

Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod consumer;
mod factory;
mod logging;
mod metrics;
mod mutations_factory;
mod processors;
mod runtime_config;
mod strategies;
Expand All @@ -27,6 +28,7 @@ pub use config::{
};
pub use factory::ConsumerStrategyFactory;
pub use metrics::statsd::StatsDBackend;
pub use mutations_factory::MutConsumerStrategyFactory;
pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS};
pub use strategies::noop::Noop;
pub use strategies::python::PythonTransformStep;
Expand Down
87 changes: 87 additions & 0 deletions rust_snuba/src/mutations_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::processing::strategies::commit_offsets::CommitOffsets;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::run_task_in_threads::{
RunTaskError, RunTaskFunc, TaskRunner,
};
use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory};
use rust_arroyo::types::Message;
use rust_arroyo::types::{Partition, Topic};
use sentry::{Hub, SentryFutureExt};
use sentry_kafka_schemas::Schema;

use crate::config;
use crate::metrics::global_tags::set_global_tag;
use crate::strategies::processor::validate_schema;

pub struct MutConsumerStrategyFactory {
pub storage_config: config::StorageConfig,
pub env_config: config::EnvConfig,
pub logical_topic_name: String,
pub max_batch_size: usize,
pub max_batch_time: Duration,
pub processing_concurrency: ConcurrencyConfig,
pub clickhouse_concurrency: ConcurrencyConfig,
pub commitlog_concurrency: ConcurrencyConfig,
pub replacements_concurrency: ConcurrencyConfig,
pub async_inserts: bool,
pub python_max_queue_depth: Option<usize>,
pub use_rust_processor: bool,
pub health_check_file: Option<String>,
pub enforce_schema: bool,
pub commit_log_producer: Option<(Arc<KafkaProducer>, Topic)>,
pub replacements_config: Option<(KafkaConfig, Topic)>,
pub physical_consumer_group: String,
pub physical_topic_name: Topic,
pub accountant_topic_config: config::TopicConfig,
pub stop_at_timestamp: Option<i64>,
pub batch_write_timeout: Option<Duration>,
pub max_bytes_before_external_group_by: Option<usize>,
}

impl ProcessingStrategyFactory<KafkaPayload> for MutConsumerStrategyFactory {
fn update_partitions(&self, partitions: &HashMap<Partition, u64>) {
match partitions.keys().map(|partition| partition.index).min() {
Some(min) => set_global_tag("min_partition".to_owned(), min.to_string()),
None => set_global_tag("min_partition".to_owned(), "none".to_owned()),
}
}

fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let next_step = CommitOffsets::new(Duration::from_secs(1));

Box::new(next_step)
}
}

#[derive(Clone)]
struct SchemaValidator {
schema: Option<Arc<Schema>>,
enforce_schema: bool,
}

impl SchemaValidator {
async fn process_message(
self,
message: Message<KafkaPayload>,
) -> Result<Message<KafkaPayload>, RunTaskError<anyhow::Error>> {
validate_schema(&message, &self.schema, self.enforce_schema)?;
Ok(message)
}
}

impl TaskRunner<KafkaPayload, KafkaPayload, anyhow::Error> for SchemaValidator {
fn get_task(&self, message: Message<KafkaPayload>) -> RunTaskFunc<KafkaPayload, anyhow::Error> {
Box::pin(
self.clone()
.process_message(message)
.bind_hub(Hub::new_from_top(Hub::current())),
)
}
}
12 changes: 11 additions & 1 deletion snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@
default=False,
help="Enable async inserts for ClickHouse",
)
@click.option(
"--allow-mutability",
untitaker marked this conversation as resolved.
Show resolved Hide resolved
is_flag=True,
default=False,
help="""
This is only to be used for the mutability consumer
""",
)
@click.option(
"--health-check-file",
default=None,
Expand Down Expand Up @@ -203,7 +211,8 @@ def rust_consumer(
enforce_schema: bool,
stop_at_timestamp: Optional[int],
batch_write_timeout_ms: Optional[int],
max_bytes_before_external_group_by: Optional[int]
max_bytes_before_external_group_by: Optional[int],
allow_mutability: bool
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down Expand Up @@ -250,6 +259,7 @@ def rust_consumer(
enforce_schema,
max_poll_interval_ms,
async_inserts,
allow_mutability,
python_max_queue_depth,
health_check_file,
stop_at_timestamp,
Expand Down
Loading