Skip to content

Commit

Permalink
Merge branch 'master' into jferg/add-replay-id-column
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshFerge authored Apr 17, 2024
2 parents a1086b5 + 2f1509f commit 36023cc
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@
},

"python.linting.mypyEnabled": true,
"python.linting.mypyArgs": ["--strict"],
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
// https://github.com/DonJayamanne/pythonVSCode/issues/992
"python.pythonPath": ".venv/bin/python",
// test discovery is sluggish and the UI around running
// tests is often in your way and misclicked
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": ["tests"]
"python.testing.pytestArgs": ["tests"],
"mypy-type-checker.args": ["--strict"]
}
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# Changelog

## 24.4.0

### Various fixes & improvements

- chore(on-call): Add metric for concurrent queries by referrer that violate policy (#5767) by @enochtangg
- ref: make auto.offset.reset=earliest everywhere (#5765) by @lynnagara
- feat(trace): Add trace id to transactions (#5768) by @wmak
- fix(spans): Move span id above trace id in the prewhere (#5766) by @wmak
- feat(meta): Add record meta column to gauges (#5760) by @evanh
- feat(meta): Add record meta column to distributions (#5759) by @evanh
- feat(meta): Add record_meta column to sets (#5735) by @evanh
- ref: bump sentry-kafka-schemas to 0.1.68 (#5764) by @getsentry-bot
- chore(capman): set default bytes scanned limit for rejecting policy (#5755) by @volokluev
- ref(ch-upgrades): create dist tables functionality (#5737) by @MeredithAnya
- perf(metrics): Use kafka header optimization (#5756) by @nikhars
- feat(meta): Add updated versions of meta tables for counters (#5734) by @evanh
- chore(rust): Update dependencies (#5751) by @nikhars
- chore: update sdk version (#5754) by @kylemumma
- fix: Fix default auto-offset-reset value (#5753) by @lynnagara
- lower max query size to 128KiB (#5750) by @enochtangg
- clean up old simple pipeline (#5732) by @enochtangg
- feat(capman): Long term rejection allocation policy (#5718) by @volokluev
- ref(fetcher): --tables optional now (#5730) by @MeredithAnya
- fix: parser, bug in pushdown filter (#5731) by @kylemumma
- feat(generic-metrics): Add a killswitch to processor (#5617) by @ayirr7
- feat(replays): Replace python processor with a rust-based processor (#5380) by @cmanallen
- feat(replay): add ReplayViewedEvent to replay processor (#5712) by @aliu3ntry
- chore(deps): bump h2 from 0.3.22 to 0.3.26 in /rust_snuba (#5727) by @dependabot

_Plus 52 more_

## 24.3.0

### Various fixes & improvements
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
copyright = "2021, Sentry Team and Contributors"
author = "Sentry Team and Contributors"

release = "24.4.0.dev0"
release = "24.5.0.dev0"


# -- General configuration ---------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ fn create_factory(
logical_topic_name: "shared-resources-usage".to_string(),
broker_config: BrokerConfig::default(),
},
None,
);
Box::new(factory)
}
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn consumer(
max_poll_interval_ms: usize,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
) {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -53,6 +54,7 @@ pub fn consumer(
max_poll_interval_ms,
python_max_queue_depth,
health_check_file,
stop_at_timestamp,
)
});
}
Expand All @@ -70,6 +72,7 @@ pub fn consumer_impl(
max_poll_interval_ms: usize,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
) -> usize {
setup_logging();

Expand Down Expand Up @@ -217,6 +220,7 @@ pub fn consumer_impl(
consumer_group.to_owned(),
Topic::new(&consumer_config.raw_topic.physical_topic_name),
consumer_config.accountant_topic,
stop_at_timestamp,
);

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
Expand Down
5 changes: 5 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct ConsumerStrategyFactory {
physical_consumer_group: String,
physical_topic_name: Topic,
accountant_topic_config: config::TopicConfig,
stop_at_timestamp: Option<i64>,
}

impl ConsumerStrategyFactory {
Expand All @@ -78,6 +79,7 @@ impl ConsumerStrategyFactory {
physical_consumer_group: String,
physical_topic_name: Topic,
accountant_topic_config: config::TopicConfig,
stop_at_timestamp: Option<i64>,
) -> Self {
Self {
storage_config,
Expand All @@ -99,6 +101,7 @@ impl ConsumerStrategyFactory {
physical_consumer_group,
physical_topic_name,
accountant_topic_config,
stop_at_timestamp,
}
}
}
Expand Down Expand Up @@ -220,6 +223,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
config::ProcessorConfig {
env_config: self.env_config.clone(),
},
self.stop_at_timestamp,
);
}
(true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => {
Expand All @@ -232,6 +236,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
config::ProcessorConfig {
env_config: self.env_config.clone(),
},
self.stop_at_timestamp,
)
}
(
Expand Down
42 changes: 40 additions & 2 deletions rust_snuba/src/strategies/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub fn make_rust_processor(
enforce_schema: bool,
concurrency: &ConcurrencyConfig,
processor_config: ProcessorConfig,
stop_at_timestamp: Option<i64>,
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let schema = get_schema(schema_name, enforce_schema);

Expand All @@ -34,7 +35,18 @@ pub fn make_rust_processor(
partition: Partition,
offset: u64,
timestamp: DateTime<Utc>,
stop_at_timestamp: Option<i64>,
) -> anyhow::Result<Message<BytesInsertBatch<RowData>>> {
// Don't process any more messages
if let Some(stop) = stop_at_timestamp {
if stop < timestamp.timestamp() {
let payload = BytesInsertBatch::default();
return Ok(Message::new_broker_message(
payload, partition, offset, timestamp,
));
}
}

let payload = BytesInsertBatch::new(
transformed.rows,
Some(timestamp),
Expand Down Expand Up @@ -62,6 +74,7 @@ pub fn make_rust_processor(
func,
result_to_next_msg,
processor_config,
stop_at_timestamp,
};

Box::new(RunTaskInThreads::new(
Expand All @@ -79,6 +92,7 @@ pub fn make_rust_processor_with_replacements(
enforce_schema: bool,
concurrency: &ConcurrencyConfig,
processor_config: ProcessorConfig,
stop_at_timestamp: Option<i64>,
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let schema = get_schema(schema_name, enforce_schema);

Expand All @@ -87,7 +101,21 @@ pub fn make_rust_processor_with_replacements(
partition: Partition,
offset: u64,
timestamp: DateTime<Utc>,
stop_at_timestamp: Option<i64>,
) -> anyhow::Result<Message<InsertOrReplacement<BytesInsertBatch<RowData>>>> {
// Don't process any more messages
if let Some(stop) = stop_at_timestamp {
if stop < timestamp.timestamp() {
let payload = BytesInsertBatch::default();
return Ok(Message::new_broker_message(
InsertOrReplacement::Insert(payload),
partition,
offset,
timestamp,
));
}
}

let payload = match transformed {
InsertOrReplacement::Insert(transformed) => {
InsertOrReplacement::Insert(BytesInsertBatch::new(
Expand Down Expand Up @@ -120,6 +148,7 @@ pub fn make_rust_processor_with_replacements(
func,
result_to_next_msg,
processor_config,
stop_at_timestamp,
};

Box::new(RunTaskInThreads::new(
Expand Down Expand Up @@ -155,9 +184,11 @@ struct MessageProcessor<TResult: Clone, TNext: Clone> {
fn(KafkaPayload, KafkaMessageMetadata, config: &ProcessorConfig) -> anyhow::Result<TResult>,
// Function that return Message<TNext> to be passed to the next strategy. Gets passed TResult,
// as well as the message's partition, offset and timestamp.
#[allow(clippy::type_complexity)]
result_to_next_msg:
fn(TResult, Partition, u64, DateTime<Utc>) -> anyhow::Result<Message<TNext>>,
fn(TResult, Partition, u64, DateTime<Utc>, Option<i64>) -> anyhow::Result<Message<TNext>>,
processor_config: ProcessorConfig,
stop_at_timestamp: Option<i64>,
}

impl<TResult: Clone, TNext: Clone> MessageProcessor<TResult, TNext> {
Expand Down Expand Up @@ -216,7 +247,13 @@ impl<TResult: Clone, TNext: Clone> MessageProcessor<TResult, TNext> {

let transformed = (self.func)(msg.payload, metadata, &self.processor_config)?;

(self.result_to_next_msg)(transformed, msg.partition, msg.offset, msg.timestamp)
(self.result_to_next_msg)(
transformed,
msg.partition,
msg.offset,
msg.timestamp,
self.stop_at_timestamp,
)
}
}

Expand Down Expand Up @@ -329,6 +366,7 @@ mod tests {
true,
&concurrency,
ProcessorConfig::default(),
None,
);

let example = "{
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from setuptools import find_packages, setup

VERSION = "24.4.0.dev0"
VERSION = "24.5.0.dev0"


def get_requirements() -> Sequence[str]:
Expand Down
9 changes: 8 additions & 1 deletion snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
"--skip-write/--no-skip-write",
"skip_write",
help="Skip the write to clickhouse",
default=True,
default=False,
)
@click.option(
"--concurrency",
Expand Down Expand Up @@ -148,6 +148,11 @@
default=False,
help="Enforce schema on the raw events topic.",
)
@click.option(
"--stop-at-timestamp",
type=int,
help="Unix timestamp after which to stop processing messages",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand All @@ -174,6 +179,7 @@ def rust_consumer(
python_max_queue_depth: Optional[int],
health_check_file: Optional[str],
enforce_schema: bool,
stop_at_timestamp: Optional[int],
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down Expand Up @@ -215,6 +221,7 @@ def rust_consumer(
max_poll_interval_ms,
python_max_queue_depth,
health_check_file,
stop_at_timestamp,
)

sys.exit(exitcode)
11 changes: 11 additions & 0 deletions snuba/datasets/configuration/events/storages/errors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ allocation_policies:
- referrer
default_config_overrides:
is_enforced: 0
- name: CrossOrgQueryAllocationPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 0
is_active: 0
cross_org_referrer_limits:
getsentry.tasks.backfill_grouping_records:
max_threads: 2
concurrent_limit: 4

query_processors:
- processor: UniqInSelectAndHavingProcessor
Expand Down
11 changes: 11 additions & 0 deletions snuba/datasets/configuration/events/storages/errors_ro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,17 @@ allocation_policies:
- referrer
default_config_overrides:
is_enforced: 0
- name: CrossOrgQueryAllocationPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 0
is_active: 0
cross_org_referrer_limits:
getsentry.tasks.backfill_grouping_records:
max_threads: 2
concurrent_limit: 4


query_processors:
Expand Down
1 change: 1 addition & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def get_migrations(self) -> Sequence[str]:
"0037_add_record_meta_column_sets",
"0038_add_record_meta_column_distributions",
"0039_add_record_meta_column_gauges",
"0040_remove_counters_meta_tables",
]


Expand Down
Loading

0 comments on commit 36023cc

Please sign in to comment.