Skip to content

Commit

Permalink
Delay reading from the Kafka buffer as long as the circuit breaker is…
Browse files Browse the repository at this point in the history
… open (opensearch-project#4135)

Hold off on consuming from the Kafka topic as long as a pause-consume predicate is in place. This will allow the Kafka buffer to wait for the circuit breaker to close before reading. Also pause the topic while the circuit breaker is open.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Feb 20, 2024
1 parent e1f3167 commit 680ad7a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,19 @@ public void run() {
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException || pauseConsumePredicate.pauseConsuming()) {
LOG.debug("Pause consuming from Kafka topic.");
if (retryingAfterException) {
LOG.debug("Pause consuming from Kafka topic due a previous exception.");
Thread.sleep(10000);
} else if (pauseConsumePredicate.pauseConsuming()) {
LOG.debug("Pause and skip consuming from Kafka topic due to an external condition: {}", pauseConsumePredicate);
paused = true;
consumer.pause(consumer.assignment());
Thread.sleep(10000);
continue;
} else if(paused) {
LOG.debug("Resume consuming from Kafka topic.");
paused = false;
consumer.resume(consumer.assignment());
}
synchronized(this) {
commitOffsets(false);
Expand Down Expand Up @@ -453,7 +463,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re
if (paused) {
ConsumerRecords<String, ?> records = doPoll();
if (records.count() > 0) {
LOG.warn("Unexpected records received while the consumer is paused. Resetting the paritions to retry from last read pointer");
LOG.warn("Unexpected records received while the consumer is paused. Resetting the partitions to retry from last read pointer");
synchronized(this) {
partitionsToReset.addAll(consumer.assignment());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,17 @@ public interface PauseConsumePredicate {
static PauseConsumePredicate circuitBreakingPredicate(final CircuitBreaker circuitBreaker) {
if(circuitBreaker == null)
return noPause();
return circuitBreaker::isOpen;
return new PauseConsumePredicate() {
@Override
public boolean pauseConsuming() {
return circuitBreaker.isOpen();
}

@Override
public String toString() {
return "Circuit Breaker";
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ void circuitBreakingPredicate_with_a_circuit_breaker_returns_predicate_with_paus

assertThat(pauseConsumePredicate.pauseConsuming(), equalTo(isOpen));
}

@Test
void circuitBreakingPredicate_with_a_circuit_breaker_returns_predicate_with_toString() {
final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class);

final PauseConsumePredicate pauseConsumePredicate = PauseConsumePredicate.circuitBreakingPredicate(circuitBreaker);

assertThat(pauseConsumePredicate.toString(), equalTo("Circuit Breaker"));
}
}

0 comments on commit 680ad7a

Please sign in to comment.