Skip to content

Commit

Permalink
Retry individual messages/requests when failing with 429/`Data too …
Browse files Browse the repository at this point in the history
…large`.
  • Loading branch information
dennisoelkers committed Jan 10, 2025
1 parent 75bf486 commit 8727f5a
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.graylog.shaded.elasticsearch7.org.elasticsearch.client.RestHighLevelClient;
import org.graylog.storage.errors.ResponseError;
import org.graylog2.indexer.BatchSizeTooLargeException;
import org.graylog2.indexer.CircuitBreakerException;
import org.graylog2.indexer.IndexNotFoundException;
import org.graylog2.indexer.InvalidWriteTargetException;
import org.graylog2.indexer.MapperParsingException;
Expand Down Expand Up @@ -206,12 +207,24 @@ private ElasticsearchException exceptionFrom(Exception e, String errorMessage) {
if (isMapperParsingExceptionException(elasticsearchException)) {
throw new MapperParsingException(elasticsearchException.getMessage());
}
if (isCircuitBreakerException(elasticsearchException)) {
throw new CircuitBreakerException(elasticsearchException.getMessage());
}
} else if (e instanceof IOException && e.getCause() instanceof ContentTooLongException) {
throw new BatchSizeTooLargeException(e.getMessage());
}
return new ElasticsearchException(errorMessage, e);
}

private boolean isCircuitBreakerException(ElasticsearchException elasticsearchException) {
try {
final var parsedException = ParsedElasticsearchException.from(elasticsearchException.getMessage());
return parsedException.type().equals("circuit_breaking_exception");
} catch (Exception e) {
return false;
}
}

private boolean isInvalidWriteTargetException(ElasticsearchException elasticsearchException) {
try {
final ParsedElasticsearchException parsedException = ParsedElasticsearchException.from(elasticsearchException.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class MessagesAdapterES7 implements MessagesAdapter {
static final String ILLEGAL_ARGUMENT_EXCEPTION = "illegal_argument_exception";
static final String NO_WRITE_INDEX_DEFINED_FOR_ALIAS = "no write index is defined for alias";

static final String CIRCUIT_BREAKING_EXCEPTION = "circuit_breaking_exception";
static final String DATA_TOO_LARGE = "Data too large";

private final ResultMessageFactory resultMessageFactory;
private final ElasticsearchClient client;
private final Meter invalidTimestampMeter;
Expand Down Expand Up @@ -243,6 +246,9 @@ private IndexingError.Type errorTypeFromResponse(BulkItemResponse item) {
case ILLEGAL_ARGUMENT_EXCEPTION:
if (exception.reason().contains(NO_WRITE_INDEX_DEFINED_FOR_ALIAS))
return IndexingError.Type.IndexBlocked;
case CIRCUIT_BREAKING_EXCEPTION:
if (exception.reason().contains(DATA_TOO_LARGE))
return IndexingError.Type.DataTooLarge;
default:
return IndexingError.Type.Unknown;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class MessagesAdapterOS2 implements MessagesAdapter {
static final String ILLEGAL_ARGUMENT_EXCEPTION = "illegal_argument_exception";
static final String NO_WRITE_INDEX_DEFINED_FOR_ALIAS = "no write index is defined for alias";

static final String CIRCUIT_BREAKING_EXCEPTION = "circuit_breaking_exception";
static final String DATA_TOO_LARGE = "Data too large";

private final ResultMessageFactory resultMessageFactory;
private final OpenSearchClient client;
private final Meter invalidTimestampMeter;
Expand Down Expand Up @@ -245,6 +248,10 @@ private IndexingError.Type errorTypeFromResponse(BulkItemResponse item) {
if (exception.reason().contains(NO_WRITE_INDEX_DEFINED_FOR_ALIAS)) {
return IndexingError.Type.IndexBlocked;
}
case CIRCUIT_BREAKING_EXCEPTION:
if (exception.reason().contains(DATA_TOO_LARGE)) {
return IndexingError.Type.DataTooLarge;
}
default:
return IndexingError.Type.Unknown;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
import org.graylog.storage.errors.ResponseError;
import org.graylog2.indexer.BatchSizeTooLargeException;
import org.graylog2.indexer.CircuitBreakerException;
import org.graylog2.indexer.IndexNotFoundException;
import org.graylog2.indexer.InvalidWriteTargetException;
import org.graylog2.indexer.MapperParsingException;
Expand Down Expand Up @@ -206,12 +207,24 @@ private OpenSearchException exceptionFrom(Exception e, String errorMessage) {
if (isMapperParsingExceptionException(openSearchException)) {
throw new MapperParsingException(openSearchException.getMessage());
}
if (isCircuitBreakerException(openSearchException)) {
throw new CircuitBreakerException(openSearchException.getMessage());
}
} else if (e instanceof IOException && e.getCause() instanceof ContentTooLongException) {
throw new BatchSizeTooLargeException(e.getMessage());
}
return new OpenSearchException(errorMessage, e);
}

private boolean isCircuitBreakerException(OpenSearchException openSearchException) {
try {
final var parsedException = ParsedOpenSearchException.from(openSearchException.getMessage());
return parsedException.type().equals("circuit_breaking_exception");
} catch (Exception e) {
return false;
}
}

private boolean isInvalidWriteTargetException(OpenSearchException openSearchException) {
try {
final ParsedOpenSearchException parsedException = ParsedOpenSearchException.from(openSearchException.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.graylog2.indexer;

public class CircuitBreakerException extends ElasticsearchException {
public CircuitBreakerException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public record IndexingError(Indexable message, String index, Error error) implem
public enum Type {
IndexBlocked,
MappingError,
Unknown;
Unknown,
DataTooLarge
}

public static IndexingError create(Indexable message, String index, Type errorType, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import com.github.rholder.retry.WaitStrategy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.indexer.CircuitBreakerException;
import org.graylog2.indexer.InvalidWriteTargetException;
import org.graylog2.indexer.MasterNotDiscoveredException;
import org.graylog2.indexer.results.ResultMessage;
Expand All @@ -36,10 +39,6 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -75,7 +74,8 @@ private RetryerBuilder<IndexingResults> createBulkRequestRetryerBuilder() {
return RetryerBuilder.<IndexingResults>newBuilder()
.retryIfException(t -> ExceptionUtils.hasCauseOf(t, IOException.class)
|| t instanceof InvalidWriteTargetException
|| t instanceof MasterNotDiscoveredException)
|| t instanceof MasterNotDiscoveredException
|| t instanceof CircuitBreakerException)
.withWaitStrategy(WaitStrategies.exponentialWait(MAX_WAIT_TIME.getQuantity(), MAX_WAIT_TIME.getUnit()))
.withRetryListener(new RetryListener() {
@Override
Expand Down Expand Up @@ -144,7 +144,7 @@ public IndexingResults bulkIndexRequests(List<IndexingRequest> indexingRequestLi
public IndexingResults bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) {
final IndexingResults indexingResults = runBulkRequest(indexingRequestList, indexingRequestList.size(), indexingListener);

final IndexingResults retryBlockResults = retryOnlyIndexBlockItemsForever(indexingRequestList, indexingResults.errors(), indexingListener);
final IndexingResults retryBlockResults = retryQualifyingIndividualItems(indexingRequestList, indexingResults.errors(), indexingListener);

final IndexingResults finalResults = retryBlockResults.mergeWith(indexingResults.successes(), List.of());

Expand All @@ -158,32 +158,32 @@ public IndexingResults bulkIndexRequests(List<IndexingRequest> indexingRequestLi
return finalResults;
}

private IndexingResults retryOnlyIndexBlockItemsForever(List<IndexingRequest> messages, List<IndexingError> allFailedItems, IndexingListener indexingListener) {
Set<IndexingError> indexBlocks = indexBlocksFrom(allFailedItems);
final Set<IndexingError> otherFailures = new HashSet<>(Sets.difference(new HashSet<>(allFailedItems), indexBlocks));
List<IndexingRequest> blockedMessages = messagesForResultItems(messages, indexBlocks);
private IndexingResults retryQualifyingIndividualItems(List<IndexingRequest> messages, List<IndexingError> allFailedItems, IndexingListener indexingListener) {
Set<IndexingError> retryableErrors = retryableErrorsFrom(allFailedItems);
final Set<IndexingError> otherFailures = new HashSet<>(Sets.difference(new HashSet<>(allFailedItems), retryableErrors));
List<IndexingRequest> blockedMessages = messagesForResultItems(messages, retryableErrors);

if (!indexBlocks.isEmpty()) {
LOG.warn("Retrying {} messages, because their indices are blocked with status [read-only / allow delete]", indexBlocks.size());
if (!retryableErrors.isEmpty()) {
LOG.warn("Retrying {} messages, because their indices are blocked with status [read-only / allow delete]", retryableErrors.size());
}

long attempt = 1;

final IndexingResults.Builder builder = IndexingResults.Builder.create();
while (!indexBlocks.isEmpty()) {
while (!retryableErrors.isEmpty()) {
waitBeforeRetrying(attempt++);

final IndexingResults indexingResults = runBulkRequest(blockedMessages, messages.size(), indexingListener);

builder.addSuccesses(indexingResults.successes());
final var failedItems = indexingResults.errors();
indexBlocks = indexBlocksFrom(failedItems);
blockedMessages = messagesForResultItems(blockedMessages, indexBlocks);
retryableErrors = retryableErrorsFrom(failedItems);
blockedMessages = messagesForResultItems(blockedMessages, retryableErrors);

final Set<IndexingError> newOtherFailures = Sets.difference(new HashSet<>(failedItems), indexBlocks);
final Set<IndexingError> newOtherFailures = Sets.difference(new HashSet<>(failedItems), retryableErrors);
otherFailures.addAll(newOtherFailures);

if (indexBlocks.isEmpty()) {
if (retryableErrors.isEmpty()) {
LOG.info("Retries were successful after {} attempts. Ingestion will continue now.", attempt);
}
}
Expand All @@ -198,12 +198,13 @@ private List<IndexingRequest> messagesForResultItems(List<IndexingRequest> chunk
return chunk.stream().filter(entry -> blockedMessageIds.contains(entry.message().getId())).collect(Collectors.toList());
}

private Set<IndexingError> indexBlocksFrom(List<IndexingError> allFailedItems) {
return allFailedItems.stream().filter(this::hasFailedDueToBlockedIndex).collect(Collectors.toSet());
private Set<IndexingError> retryableErrorsFrom(List<IndexingError> allFailedItems) {
return allFailedItems.stream().filter(this::isRetryable).collect(Collectors.toSet());
}

private boolean hasFailedDueToBlockedIndex(IndexingError indexingError) {
return indexingError.error().type().equals(IndexingError.Type.IndexBlocked);
private boolean isRetryable(IndexingError indexingError) {
final var errorType = indexingError.error().type();
return errorType.equals(IndexingError.Type.IndexBlocked) || errorType.equals(IndexingError.Type.DataTooLarge);
}

private void waitBeforeRetrying(long attempt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ default String createRandomIndex(String prefix) {
IndexState getStatus(String indexName);

void openIndex(String indexName);

default void setRequestCircuitBreakerLimit(String limit) {
putSetting("indices.breaker.total.limit", limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void tearDown() {
}

@Test
public void testIfLargeBatchesGetSplitUpOnCircuitBreakerExceptions() throws Exception {
public void testIfLargeBatchesGetSplitUpOnCircuitBreakerExceptions() {
// This test assumes that ES is running with only 256MB heap size.
// This will trigger the circuit breaker when we are trying to index large batches
final int MESSAGECOUNT = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,37 @@ public void messagesWithTheSameIdCanBeIngestedIntoMultipleIndices() {
assertThat(results.successes()).hasSize(2);
}

@Test
public void retryIndexingMessagesWhenHeapSpaceIsMaxed() throws Exception {
lowerCircuitBreaker();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean succeeded = new AtomicBoolean(false);
final var messageCount = 20;
final List<MessageWithIndex> messageBatch = createMessageBatch(10240, messageCount);

final Future<IndexingResults> resultsFuture = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));

countDownLatch.await();

relaxCircuitBreaker();

var results = resultsFuture.get(3, TimeUnit.MINUTES);
assertThat(results.errors()).isEmpty();

client().refreshNode();

assertThat(messageCount(INDEX_NAME)).isEqualTo(messageCount);
assertThat(succeeded.get()).isTrue();
}

private void lowerCircuitBreaker() {
client().setRequestCircuitBreakerLimit("100kb");
}

private void relaxCircuitBreaker() {
client().setRequestCircuitBreakerLimit(null);
}

@Test
public void retryIndexingMessagesDuringFloodStage() throws Exception {
triggerFloodStage(INDEX_NAME);
Expand All @@ -227,7 +258,6 @@ public void retryIndexingMessagesDuringFloodStage() throws Exception {
assertThat(succeeded.get()).isTrue();
}


private void waitForClusterBlockRelease() throws ExecutionException, RetryException {
RetryerBuilder.<String>newBuilder()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
Expand Down

0 comments on commit 8727f5a

Please sign in to comment.