Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): Mutations consumer MVP #6216

Merged
merged 23 commits into from
Sep 25, 2024
Merged

feat(eap): Mutations consumer MVP #6216

merged 23 commits into from
Sep 25, 2024

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Aug 15, 2024

Rolling out the mutations consumer that has general pattern of INSERT INTO .. SELECT FROM into Clickhouse, in order to simultaneously insert updated and cancelled/negated rows into Clickhouse to reflect a single mutation.

This PR includes:

  • The Rust consumer entrypoint
  • The strategies that parse mutations and extract the updates to specific span attributes
  • The strategies that map updates to attributes of old data already in the table
  • The actual write to Clickhouse

@ayirr7 ayirr7 changed the title do not merge/wip wip: Scaffolding for mutations consumer CLI Aug 28, 2024
@ayirr7 ayirr7 requested a review from untitaker August 28, 2024 06:04
@ayirr7 ayirr7 changed the title wip: Scaffolding for mutations consumer CLI wip: Scaffolding for mutations consumer Sep 9, 2024
@untitaker untitaker self-requested a review September 9, 2024 19:23
rust_snuba/src/consumer.rs Show resolved Hide resolved
snuba/cli/rust_consumer.py Outdated Show resolved Hide resolved
rust_snuba/src/consumer.rs Outdated Show resolved Hide resolved
));
}

let mut body = format!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Write cancellation rows

@ayirr7 ayirr7 changed the title wip: Scaffolding for mutations consumer wip: Mutations consumer MVP Sep 16, 2024

let mut body = format!(
"
INSERT INTO {table}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@untitaker 2 thoughts on this query:

  1. We can rewrite this to avoid JOIN by using the WITH context. We would need to look into the performance differences.

  2. What if we tried a UNION to get both the cancellation and the update in this one query? (SELECT ... UNION SELECT ...)

I'm also wondering if there's a way to do something like INSERT INTO ... VALUES (SELECT ...) (SELECT ...) so we can have 2 insertions back to back?

Just some thoughts -- I can try these out on some local data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think WITH conflicts directly with input() in my tests. I was also trying to use WITH to make the cancellation rows happen (using both WITH and UNION ALL). I'm not sure how it would help to avoid the join?

If you have a specific query let's discuss 1:1 or just push it, if it works already.

so we can have 2 insertions back to back?

then we should just merge those MutationBatches at JSON level, I don't think the query needs to adapt to accomodate larger batches.

@ayirr7
Copy link
Member Author

ayirr7 commented Sep 17, 2024

From local testing:

  • OPTIMIZE TABLE ... FINAL for some reason clears out the table. I inserted 1 "normal" row and then inserted 1 "updated" row. It seems that running optimize somehow cancelled out the two, even though they had the same sign (+1).
  • Seeing 2 rows inserted instead of 1 when running snuba devserver in eap_spans_local

@untitaker
Copy link
Member

@ayirr7 with the test command I gave you, it sends duplicate rows, and the rows contain timestamps that are immediately subjected to TTL.

@untitaker untitaker changed the title wip: Mutations consumer MVP feat: Mutations consumer MVP Sep 18, 2024
Copy link

codecov bot commented Sep 18, 2024

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2557 1 2556 4
View the top 1 failed tests by shortest run time
tests.utils.streams.test_topics test_valid_topics
Stack Traces | 0.141s run time
Traceback (most recent call last):
  File ".../local/lib/python3.11....../site-packages/sentry_kafka_schemas/sentry_kafka_schemas.py", line 79, in get_topic
    with open(topic_path) as f:
         ^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: '.../local/lib/python3.11.../sentry_kafka_schemas/topics/snuba-eap-mutations.yaml'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".../utils/streams/test_topics.py", line 10, in test_valid_topics
    sentry_kafka_schemas.get_topic(
  File ".../local/lib/python3.11....../site-packages/sentry_kafka_schemas/sentry_kafka_schemas.py", line 82, in get_topic
    raise SchemaNotFound
sentry_kafka_schemas.sentry_kafka_schemas.SchemaNotFound

To view individual test run time comparison to the main branch, go to the Test Analytics Dashboard

@ayirr7 ayirr7 changed the title feat: Mutations consumer MVP feat(eap): Mutations consumer MVP Sep 18, 2024
@ayirr7
Copy link
Member Author

ayirr7 commented Sep 18, 2024

I have published the new topic and its schema as follows: getsentry/sentry-kafka-schemas#333. Will bump in Snuba tomorrow.

As I see it, we'll have to modify the process_message function; we'll have to do something a bit more complicated than the current

let parsed: MutationMessage = serde_json::from_slice(payload)?;

As the payload will not be directly deserialized into the MutationMessage struct. I can give this a try @untitaker ?

@@ -82,19 +134,21 @@ fn fnv_1a(input: &[u8]) -> u32 {
impl From<FromSpanMessage> for EAPSpan {
fn from(from: FromSpanMessage) -> EAPSpan {
let mut res = Self {
organization_id: from.organization_id,
primary_key: PrimaryKey {
Copy link
Member Author

@ayirr7 ayirr7 Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@untitaker I was curious why exactly we added initializing the PrimaryKey here? Is it just so that we have some consistency between the mutations and EAP spans schema?

ayirr7 added a commit that referenced this pull request Sep 20, 2024
Doing this separately so we can roll it out independently of the actual
consumer: #6216

This will be rolled out after the schema is published
getsentry/sentry-kafka-schemas#333
@untitaker untitaker marked this pull request as ready for review September 24, 2024 14:56
@untitaker untitaker requested a review from a team as a code owner September 24, 2024 14:56
@untitaker
Copy link
Member

going to try and get this merged even though the query is not performant yet -- otherwise we'll keep fighting merge conflicts

));
}

// rewriting the update query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these still need to be here?

@untitaker untitaker merged commit a612e70 into master Sep 25, 2024
30 checks passed
@untitaker untitaker deleted the mutations-compat-factory branch September 25, 2024 09:19
ayirr7 added a commit that referenced this pull request Sep 30, 2024
A follow-up to #6216 

More rigorous testing for the main query run on the mutations consumer.
We need to be able to catch any correctness issues with the query in CI
itself.

---------

Co-authored-by: Markus Unterwaditzer <markus-tarpit+git@unterwaditzer.net>
Co-authored-by: Markus Unterwaditzer <markus-honeypot@unterwaditzer.net>
untitaker added a commit that referenced this pull request Oct 10, 2024
* span_id should be a string
* _sort_timestamp is no longer directly exposed, instead users provide
  start_timestamp_ms and it gets converted using the same logic used in
  the ingestion consumer

This is a follow-up to #6216
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants