Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Connect consumer to clickhouse #4437

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

dbanda
Copy link
Contributor

@dbanda dbanda commented Jun 28, 2023

This is an attempt to get the consumers to actually write to clickhouse on submit. A challenge here was our clickhouse client is async, so I had to modify the ProcessingStrategy to use an async submit function. This change had ripple effects all across the board.

The quick solution in many places where the submit function was called in sync functions was to use block_on but this would sometimes hang, so I avoided block_on and used async await + tokio::spawn.

In some parts of the code, I had to replace mutexes with async-aware mutexes to make sure that we are not holding the locks synchronously in the submit async function.

@codecov
Copy link

codecov bot commented Jun 29, 2023

Codecov Report

Patch coverage: 100.00% and no project coverage change.

Comparison is base (46b90d2) 90.55% compared to head (ee7676d) 90.55%.

❗ Current head ee7676d differs from pull request most recent head 8736a89. Consider uploading reports for the commit 8736a89 to get more accurate results

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #4437   +/-   ##
=======================================
  Coverage   90.55%   90.55%           
=======================================
  Files         806      806           
  Lines       39739    39742    +3     
  Branches      255      255           
=======================================
+ Hits        35985    35990    +5     
+ Misses       3720     3718    -2     
  Partials       34       34           
Impacted Files Coverage Δ
snuba/clusters/cluster.py 96.34% <100.00%> (+0.04%) ⬆️
snuba/consumers/consumer_config.py 98.85% <100.00%> (+0.01%) ⬆️

... and 1 file with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@dbanda dbanda force-pushed the dbanda/rust-add-clickhouse branch from 816f3a1 to 62f9c70 Compare June 30, 2023 19:57
@dbanda dbanda marked this pull request as ready for review June 30, 2023 22:40
@dbanda dbanda requested a review from a team as a code owner June 30, 2023 22:40
for batch in message.payload() {
for row in batch.rows {
let decoded_row = String::from_utf8_lossy(&row);
log::debug!("insert: {:?}", decoded_row);
let res = self.clickhouse_client.send( decoded_row.to_string()).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spawning via https://docs.rs/tokio/latest/tokio/task/fn.spawn.html or https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.spawn would allow you to run a future without making all processing strategies async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I considered it but the tricky part with spawn is it runs the futures at its own time and doesn't guarantee completion. My understanding is we want to move on to the next batch once we get an ack of the current batch written in Clickhouse. That way we don't commit and then later on figure out the spawn job failed. This approach is slower, but has more guarantees that at commit time the data has been written to Clickhouse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can spawn a future into the tokio runtime, then use a channel to wait for the future's completion (assuming you don't get a JoinHandle back)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues with that

  1. Tokio's JoinHandle still needs to be awaited on to actually join. So we still need async context for that.
  2. Channels would work, but then you lose out on concurrency. The standard channel primitive will block the thread waiting for a response.

Copy link
Member

@untitaker untitaker Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, you have try_recv and recv_timeout on the std mpsc channel. you have teh same methods on oneshot, which is what we typically use in symbolicator/relay in situations like this.

I believe you can build this internally similarly to how RunTaskWithMultiprocessing in Python deals with the Future objects of the processing pool: Keep all those channels in a list ordered by offset/submission order, recv_timeout(1 second) the channels in the list in Strategy.poll/join, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the benefit of doing it that way. If we do recv_timeout we risk timing out right before the msg is sent to Clickhouse and report an incorrect failure. Using try_recv has the same issue.

Keeping the channels in a list raises challenges with bookkeeping and spinning to check channels for msgs. I guess I'm struggling to understand why that would be better vs using async and having Tokio take care of all that for us.

The limitation of the Tokio approach is that Kafka isn't async aware so the callbacks have to be run synchronously. To fix that, I'd spawn threads within the callbacks. Given that the callbacks are only executed infrequently, it shouldn't be too expensive.

Copy link
Member

@untitaker untitaker Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do recv_timeout we risk timing out right before the msg is sent to Clickhouse and report an incorrect failure

PTAL at how the multiprocessing strategy works in Python. Yes we do a lot of bookkeeping there for pending futures, but we should not lose any data. In every call to Strategy.poll(), we wait for a future with a timeout of 1 second, and if the future completes, take its result and forward it to the next strategy. In join() we wait for the future to complete indefinitely. I propose we do the same in Rust here, except we wait for channels instead of futures.

Keeping the channels in a list raises challenges with bookkeeping and spinning to check channels for msgs. I guess I'm struggling to understand why that would be better vs using async and having Tokio take care of all that for us.

I want to ensure we do not introduce Tokio into the core arroyo architecture just because one strategy requires it. It's a significant amount of complexity on top of everything else, and even if we did introduce it, I would expect that each strategy has its own separate runtime either way

lynnagara added a commit that referenced this pull request Jul 11, 2023
The reduce step should accumulate rows inside the BytesInsertBatch
not produce a batch of batches.

feat(rust-consumer): Add basic clickhouse strategy

Same idea as #4437
with a few differences:
- Does not switch the arroyo interface to async
- Does not write to ClickHouse by default (only if --no-skip-write is passed)
- Batches inserted rows
lynnagara added a commit that referenced this pull request Jul 12, 2023
feat(rust-consumer): Add basic clickhouse strategy

Same idea as #4437 with a few differences:
- Does not switch the arroyo interface to async
- Does not write to ClickHouse by default (only if --no-skip-write is passed)
- Batches inserted rows into fewer writes with multiple rows
@getsantry getsantry bot added the Stale label Oct 18, 2023
untitaker pushed a commit to getsentry/arroyo that referenced this pull request Jan 17, 2024
feat(rust-consumer): Add basic clickhouse strategy

Same idea as getsentry/snuba#4437 with a few differences:
- Does not switch the arroyo interface to async
- Does not write to ClickHouse by default (only if --no-skip-write is passed)
- Batches inserted rows into fewer writes with multiple rows
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants