Skip to content

Commit

Permalink
perf: Avoid unnecessarily clearing the rdkafka buffer on backpressure (
Browse files Browse the repository at this point in the history
…#296)

This is the same fix we did in the Rust version of Arroyo getsentry/snuba#4774

We saw a lot of performance improvements after this change so I'm porting it to Python too.
  • Loading branch information
lynnagara authored Oct 25, 2023
1 parent fa58454 commit ef16dc8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
48 changes: 23 additions & 25 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ def __init__(

self.__message: Optional[BrokerValue[TStrategyPayload]] = None

# If the consumer is in the paused state, this is when the last call to
# ``pause`` occurred or the time the pause metric was last recorded.
self.__paused_timestamp: Optional[float] = None
# The timestamp when backpressure state started
self.__backpressure_timestamp: Optional[float] = None
# Consumer is paused after it is in backpressure state for > 1 second
self.__is_paused = False

self.__commit_policy_state = commit_policy.get_state_machine()
self.__join_timeout = join_timeout
Expand Down Expand Up @@ -337,22 +338,19 @@ def _run_once(self) -> None:

message_carried_over = self.__message is not None

if message_carried_over:
if self.__is_paused:
# If a message was carried over from the previous run, there are two reasons:
#
# * MessageRejected. the consumer should be paused and not
# returning any messages on ``poll``.
# * InvalidMessage. the message should be resubmitted.
# _handle_invalid_message is responsible for clearing out
# self.__message if it was the invalid one.
if (
self.__paused_timestamp is not None
and self.__consumer.poll(timeout=0) is not None
):
if self.__consumer.poll(timeout=0) is not None:
raise InvalidStateError(
"received message when consumer was expected to be paused"
)
else:
elif not message_carried_over:
# Otherwise, we need to try fetch a new message from the consumer,
# even if there is no active assignment and/or processing strategy.
try:
Expand Down Expand Up @@ -393,40 +391,40 @@ def _run_once(self) -> None:
# If the processing strategy rejected our message, we need
# to pause the consumer and hold the message until it is
# accepted, at which point we can resume consuming.
if not message_carried_over:
# if not message_carried_over:
if self.__backpressure_timestamp is None:
self.__backpressure_timestamp = time.time()

elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
self.__message,
)
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True

self.__paused_timestamp = time.time()
else:
current_time = time.time()
time.sleep(0.01)
if self.__paused_timestamp:
self.__metrics_buffer.incr_timing(
"arroyo.consumer.paused.time",
current_time - self.__paused_timestamp,
)
self.__paused_timestamp = current_time

except InvalidMessage as e:
self._handle_invalid_message(e)

else:
# If we were trying to submit a message that failed to be
# submitted on a previous run, we can resume accepting new
# messages.
if message_carried_over and self.__paused_timestamp is not None:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False

# Clear backpressure timestamp if it is set
if self.__backpressure_timestamp is not None:
self.__metrics_buffer.incr_timing(
"arroyo.consumer.paused.time",
time.time() - self.__paused_timestamp,
time.time() - self.__backpressure_timestamp,
)

self.__paused_timestamp = None
self.__backpressure_timestamp = None

self.__message = None
else:
Expand Down
6 changes: 6 additions & 0 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def test_stream_processor_lifecycle() -> None:
with assert_changes(lambda: int(consumer.pause.call_count), 0, 1):
processor._run_once()
assert strategy.submit.call_args_list[-1] == mock.call(message)
time.sleep(1)
processor._run_once() # Should pause now

# If ``Consumer.poll`` returns a message when we expect it to be paused,
# we should raise an exception.
Expand Down Expand Up @@ -132,9 +134,13 @@ def test_stream_processor_lifecycle() -> None:
(Timing, "arroyo.consumer.poll.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.processing.time"),
(Increment, "arroyo.consumer.run.count"),
(Timing, "arroyo.consumer.processing.time"),
(Timing, "arroyo.consumer.paused.time"),
(Timing, "arroyo.consumer.join.time"),
(Timing, "arroyo.consumer.shutdown.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.poll.time"),
(Increment, "arroyo.consumer.run.count"),
]

Expand Down

0 comments on commit ef16dc8

Please sign in to comment.