Skip to content

Commit

Permalink
ref(rust): Add logs to debug deadlock in hybrid consumers (#5125)
Browse files Browse the repository at this point in the history
* ref(rust): Add logs to debug deadlock in hybrid consumers

* fmt
  • Loading branch information
untitaker authored Nov 30, 2023
1 parent 825f8d4 commit 853926e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
21 changes: 20 additions & 1 deletion rust_snuba/src/strategies/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,15 @@ impl PythonTransformStep {
})
}

#[tracing::instrument(skip(self))]
fn check_for_results(&mut self, max_queue_depth: usize) {
if let Some(message_carried_over) = self.message_carried_over.take() {
tracing::debug!("resubmitting carried over message");
// TODO: handle InvalidMessage here
if let Err(SubmitError::MessageRejected(MessageRejected { message })) =
self.next_step.submit(message_carried_over)
{
tracing::debug!("failed to resubmit, returning");
self.message_carried_over = Some(message);
return;
}
Expand All @@ -116,6 +120,7 @@ impl PythonTransformStep {
debug_assert!(self.message_carried_over.is_none());

if !self.queue_needs_drain(max_queue_depth) {
tracing::debug!("queue_needs_drain=false, returning");
return;
}

Expand All @@ -126,6 +131,7 @@ impl PythonTransformStep {
..
}) => {
let handle = join_handle.into_inner().unwrap();
tracing::debug!("joining procspawn handle");
let result = handle.join().expect("procspawn failed");
(original_message_meta, result)
}
Expand All @@ -134,7 +140,10 @@ impl PythonTransformStep {
result,
..
}) => (original_message_meta, result),
None => return,
None => {
tracing::debug!("self.handles is empty, returning");
return;
}
};

match message_result {
Expand All @@ -158,10 +167,13 @@ impl PythonTransformStep {
original_message_meta.timestamp,
);

tracing::debug!("forwarding new result to next step");

if let Err(SubmitError::MessageRejected(MessageRejected {
message: transformed_message,
})) = self.next_step.submit(new_message)
{
tracing::debug!("failed to forward");
self.message_carried_over = Some(transformed_message);
}
}
Expand All @@ -170,9 +182,14 @@ impl PythonTransformStep {
tracing::error!(error, "Invalid message");
}
}

tracing::debug!("done with check_for_results, returning");
}

#[tracing::instrument(skip_all)]
fn queue_needs_drain(&self, max_queue_depth: usize) -> bool {
tracing::debug!(self.handles.len = self.handles.len());

if self.handles.len() > max_queue_depth {
return true;
}
Expand All @@ -191,7 +208,9 @@ impl PythonTransformStep {

impl ProcessingStrategy<KafkaPayload> for PythonTransformStep {
fn poll(&mut self) -> Result<Option<CommitRequest>, InvalidMessage> {
tracing::debug!("python poll");
self.check_for_results(self.max_queue_depth);
tracing::debug!("python end poll");

self.next_step.poll()
}
Expand Down
3 changes: 2 additions & 1 deletion snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def rust_consumer(

import rust_snuba

os.environ["RUST_LOG"] = log_level.lower()
# TODO: remove after debugging
os.environ["RUST_LOG"] = "debug" if not use_rust_processor else log_level.lower()

# XXX: Temporary way to quickly test different values for concurrency
# Should be removed before this is put into prod
Expand Down

0 comments on commit 853926e

Please sign in to comment.