From b0cb23e4063aadb824648e3dff6b1d5f64511580 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Fri, 15 Nov 2024 02:15:40 +0530 Subject: [PATCH] run es bulk in async --- .../java/org/openmetadata/service/Entity.java | 1 + .../service/OpenMetadataApplication.java | 1 + .../searchIndex/ElasticSearchIndexSink.java | 322 ++++++++++++----- .../searchIndex/OpenSearchIndexSink.java | 335 ++++++++++++------ .../service/search/SearchClient.java | 2 + .../elasticsearch/ElasticSearchClient.java | 2 + .../search/opensearch/OpenSearchClient.java | 3 +- 7 files changed, 460 insertions(+), 206 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 5249f86b4782..bf1a623b90b8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -77,6 +77,7 @@ public final class Entity { private static volatile boolean initializedRepositories = false; @Getter @Setter private static CollectionDAO collectionDAO; + @Getter @Setter private static Jdbi jdbi; public static final String SEPARATOR = "."; // Fully qualified name separator // Canonical entity name to corresponding EntityRepository map diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index 4fcdbe40123e..9fedc0c89534 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -162,6 +162,7 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory()); Entity.setCollectionDAO(getDao(jdbi)); + Entity.setJdbi(jdbi); initializeSearchRepository(catalogConfig.getElasticSearchConfiguration()); // Initialize the MigrationValidationClient, used in the Settings Repository diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java index 979e07e00447..af2ca8bb53e3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchIndexSink.java @@ -1,15 +1,17 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import es.org.elasticsearch.ElasticsearchException; +import es.org.elasticsearch.action.ActionListener; import es.org.elasticsearch.action.DocWriteRequest; +import es.org.elasticsearch.action.bulk.BulkItemResponse; import es.org.elasticsearch.action.bulk.BulkRequest; import es.org.elasticsearch.action.bulk.BulkResponse; import es.org.elasticsearch.action.update.UpdateRequest; import es.org.elasticsearch.client.RequestOptions; +import es.org.elasticsearch.client.RestHighLevelClient; import es.org.elasticsearch.rest.RestStatus; import es.org.elasticsearch.xcontent.XContentType; import java.io.Closeable; @@ -17,7 +19,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; @@ -74,8 +75,7 @@ public void write(List entities, Map contextData) throws Sear long requestSize = estimateRequestSizeInBytes(request); if (currentBatchSize + requestSize > maxPayloadSizeInBytes) { - // Flush current batch - sendBulkRequest(requests, entityErrorList); + sendBulkRequestAsync(new ArrayList<>(requests), new ArrayList<>(entityErrorList)); requests.clear(); currentBatchSize = 0L; } @@ -93,7 +93,7 @@ public void write(List entities, Map contextData) throws Sear } if (!requests.isEmpty()) { - sendBulkRequest(requests, entityErrorList); + sendBulkRequestAsync(requests, entityErrorList); } int totalEntities = entities.size(); @@ -113,101 +113,240 @@ public void write(List entities, Map contextData) throws Sear } } - private void sendBulkRequest(List> requests, List entityErrorList) + private void sendBulkRequestAsync( + List> requests, List entityErrorList) throws SearchIndexException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(requests); - int attempt = 0; - long backoffMillis = initialBackoffMillis; + try { + semaphore.acquire(); + LOG.debug("Semaphore acquired. Available permits: {}", semaphore.availablePermits()); + ((RestHighLevelClient) client.getClient()) + .bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + try { + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + String failureMessage = itemResponse.getFailureMessage(); + String entityData = requests.get(i).toString(); + entityErrorList.add( + new EntityError().withMessage(failureMessage).withEntity(entityData)); + LOG.warn("Bulk item failed: {}", failureMessage); + } + } - while (attempt <= maxRetries) { - try { - semaphore.acquire(); - try { - BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); - entityErrorList.addAll(getErrorsFromBulkResponse(response)); - break; // Success, exit retry loop - } finally { - semaphore.release(); - } - } catch (IOException e) { - if (isRetriableException(e)) { - attempt++; - LOG.warn( - "Bulk request failed with retriable exception, retrying attempt {}/{}", - attempt, - maxRetries); - sleepWithBackoff(backoffMillis); - backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); - } else { - LOG.error("Bulk request failed with non-retriable exception", e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Bulk request interrupted", e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } catch (ElasticsearchException e) { - if (isRetriableStatusCode(e.status())) { - attempt++; - LOG.warn( - "Bulk request failed with status {}, retrying attempt {}/{}", - e.status(), - attempt, - maxRetries); - sleepWithBackoff(backoffMillis); - backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); - } else { - LOG.error("Bulk request failed with non-retriable status {}", e.status(), e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } - } + int success = response.getItems().length; + int failed = 0; + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + failed++; + } + } + success -= failed; + updateStats(success, failed); + + if (response.hasFailures()) { + LOG.warn("Bulk request completed with failures. Total Failures: {}", failed); + } else { + LOG.debug("Bulk request successful with {} operations.", success); + } + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released. Available permits: {}", semaphore.availablePermits()); + } + } + + @Override + public void onFailure(Exception e) { + try { + LOG.error("Bulk request failed asynchronously", e); + if (isRetriableException(e)) { + retryBulkRequest(bulkRequest, entityErrorList, 1); + } else { + handleNonRetriableException(requests.size(), e); + } + } catch (Exception ex) { + LOG.error("Bulk request retry attempt {}/{} failed", 1, maxRetries, ex); + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after failure. Available permits: {}", + semaphore.availablePermits()); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request interrupted", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); } + } + private void retryBulkRequest( + BulkRequest bulkRequest, List entityErrorList, int attempt) + throws SearchIndexException { if (attempt > maxRetries) { + LOG.error("Exceeded maximum retries for bulk request"); throw new SearchIndexException( new IndexingError() .withErrorSource(SINK) - .withSubmittedCount(requests.size()) + .withSubmittedCount(bulkRequest.numberOfActions()) .withSuccessCount(0) - .withFailedCount(requests.size()) + .withFailedCount(bulkRequest.numberOfActions()) .withMessage("Exceeded maximum retries for bulk request")); } + + long backoffMillis = + Math.min(initialBackoffMillis * (long) Math.pow(2, attempt - 1), maxBackoffMillis); + LOG.info( + "Retrying bulk request (attempt {}/{}) after {} ms", attempt, maxRetries, backoffMillis); + + try { + Thread.sleep(backoffMillis + ThreadLocalRandom.current().nextLong(0, backoffMillis)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Sleep interrupted during backoff", e); + throw new SearchIndexException(createIndexingError(bulkRequest.numberOfActions(), e)); + } + + try { + semaphore.acquire(); + LOG.debug( + "Semaphore acquired for retry attempt {}/{}. Available permits: {}", + attempt, + maxRetries, + semaphore.availablePermits()); + ((RestHighLevelClient) client.getClient()) + .bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + try { + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + String failureMessage = itemResponse.getFailureMessage(); + String entityData = bulkRequest.requests().get(i).toString(); + entityErrorList.add( + new EntityError().withMessage(failureMessage).withEntity(entityData)); + LOG.warn("Bulk item failed on retry {}: {}", attempt, failureMessage); + } + } + + int success = response.getItems().length; + int failed = 0; + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + failed++; + } + } + success -= failed; + updateStats(success, failed); + + if (response.hasFailures()) { + LOG.warn( + "Bulk request retry attempt {}/{} completed with {} failures.", + attempt, + maxRetries, + failed); + } else { + LOG.debug( + "Bulk request retry attempt {}/{} successful with {} operations.", + attempt, + maxRetries, + success); + } + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after retry. Available permits: {}", + semaphore.availablePermits()); + } + } + + @Override + public void onFailure(Exception e) { + try { + LOG.error("Bulk request retry attempt {}/{} failed", attempt, maxRetries, e); + if (isRetriableException(e)) { + retryBulkRequest(bulkRequest, entityErrorList, attempt + 1); + } else { + handleNonRetriableException(bulkRequest.numberOfActions(), e); + } + } catch (Exception ex) { + LOG.error("Bulk request retry attempt {}/{} failed", attempt, maxRetries, ex); + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after retry failure. Available permits: {}", + semaphore.availablePermits()); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request retry interrupted", e); + throw new SearchIndexException(createIndexingError(bulkRequest.numberOfActions(), e)); + } + } + + private void handleNonRetriableException(int requestCount, Exception e) + throws SearchIndexException { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e))); } private boolean isRetriableException(Exception e) { - return e instanceof IOException; + return e instanceof IOException + || (e instanceof ElasticsearchException + && isRetriableStatusCode(((ElasticsearchException) e).status())); } private boolean isRetriableStatusCode(RestStatus status) { - return status == RestStatus.TOO_MANY_REQUESTS || status == RestStatus.SERVICE_UNAVAILABLE; + return status == RestStatus.TOO_MANY_REQUESTS + || status == RestStatus.SERVICE_UNAVAILABLE + || status == RestStatus.GATEWAY_TIMEOUT; } - private void sleepWithBackoff(long millis) { - try { - Thread.sleep(millis + ThreadLocalRandom.current().nextLong(0, millis)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Sleep interrupted during backoff", e); - } + private long estimateRequestSizeInBytes(DocWriteRequest request) { + return new BulkRequest().add(request).estimatedSizeInBytes(); } - private IndexingError createIndexingError(int requestCount, Exception e) { - return new IndexingError() - .withErrorSource(SINK) - .withSubmittedCount(requestCount) - .withSuccessCount(0) - .withFailedCount(requestCount) - .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) - .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + @Override + public void updateStats(int currentSuccess, int currentFailed) { + getUpdatedStats(stats, currentSuccess, currentFailed); + } + + @Override + public StepStats getStats() { + return stats; + } + + @Override + public void close() throws IOException { + client.close(); } private DocWriteRequest convertEntityToRequest(Object entity, String entityType) { if (entity instanceof EntityInterface) { return getEntityInterfaceRequest(entityType, (EntityInterface) entity); } else if (entity instanceof EntityTimeSeriesInterface) { - return getEntityTimeSeriesInterfaceReqeust(entityType, (EntityTimeSeriesInterface) entity); + return getEntityTimeSeriesInterfaceRequest(entityType, (EntityTimeSeriesInterface) entity); } else { throw new IllegalArgumentException("Unknown entity type: " + entity.getClass()); } @@ -219,47 +358,34 @@ private UpdateRequest getEntityInterfaceRequest(String entityType, EntityInterfa new UpdateRequest( indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), entity.getId().toString()); - updateRequest.doc( - JsonUtils.pojoToJson( - Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)) - .buildSearchIndexDoc()), - XContentType.JSON); + String jsonDoc = + JsonUtils.pojoToJson(Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc()); + updateRequest.doc(jsonDoc, XContentType.JSON); updateRequest.docAsUpsert(true); return updateRequest; } - private UpdateRequest getEntityTimeSeriesInterfaceReqeust( + private UpdateRequest getEntityTimeSeriesInterfaceRequest( String entityType, EntityTimeSeriesInterface entity) { IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); UpdateRequest updateRequest = new UpdateRequest( indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), entity.getId().toString()); - updateRequest.doc( - JsonUtils.pojoToJson( - Objects.requireNonNull( - Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())), - XContentType.JSON); + String jsonDoc = + JsonUtils.pojoToJson(Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc()); + updateRequest.doc(jsonDoc, XContentType.JSON); updateRequest.docAsUpsert(true); return updateRequest; } - private long estimateRequestSizeInBytes(DocWriteRequest request) { - return new BulkRequest().add(request).estimatedSizeInBytes(); - } - - @Override - public void updateStats(int currentSuccess, int currentFailed) { - getUpdatedStats(stats, currentSuccess, currentFailed); - } - - @Override - public StepStats getStats() { - return stats; - } - - @Override - public void close() throws IOException { - client.close(); + private IndexingError createIndexingError(int requestCount, Exception e) { + return new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java index 3c3ca23f162b..4a3b7384619b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchIndexSink.java @@ -1,7 +1,6 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import java.io.Closeable; @@ -9,7 +8,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; @@ -25,11 +23,14 @@ import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.util.JsonUtils; import os.org.opensearch.OpenSearchException; +import os.org.opensearch.action.ActionListener; import os.org.opensearch.action.DocWriteRequest; +import os.org.opensearch.action.bulk.BulkItemResponse; import os.org.opensearch.action.bulk.BulkRequest; import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.action.update.UpdateRequest; import os.org.opensearch.client.RequestOptions; +import os.org.opensearch.client.RestHighLevelClient; import os.org.opensearch.common.xcontent.XContentType; import os.org.opensearch.rest.RestStatus; @@ -63,21 +64,19 @@ public OpenSearchIndexSink( public void write(List entities, Map contextData) throws SearchIndexException { String entityType = (String) contextData.get("entityType"); LOG.debug( - "[OpenSearchIndexSink] Processing {} entities of type {}", entities.size(), entityType); + "[ElasticSearchIndexSink] Processing {} entities of type {}", entities.size(), entityType); List entityErrorList = new ArrayList<>(); List> requests = new ArrayList<>(); long currentBatchSize = 0L; - // Convert entities to DocWriteRequests for (Object entity : entities) { try { DocWriteRequest request = convertEntityToRequest(entity, entityType); long requestSize = estimateRequestSizeInBytes(request); if (currentBatchSize + requestSize > maxPayloadSizeInBytes) { - // Flush current batch - sendBulkRequest(requests, entityErrorList); + sendBulkRequestAsync(new ArrayList<>(requests), new ArrayList<>(entityErrorList)); requests.clear(); currentBatchSize = 0L; } @@ -94,18 +93,15 @@ public void write(List entities, Map contextData) throws Sear } } - // Send any remaining requests if (!requests.isEmpty()) { - sendBulkRequest(requests, entityErrorList); + sendBulkRequestAsync(requests, entityErrorList); } - // Update stats int totalEntities = entities.size(); int failedEntities = entityErrorList.size(); int successfulEntities = totalEntities - failedEntities; updateStats(successfulEntities, failedEntities); - // Handle errors if (!entityErrorList.isEmpty()) { throw new SearchIndexException( new IndexingError() @@ -113,159 +109,284 @@ public void write(List entities, Map contextData) throws Sear .withSubmittedCount(totalEntities) .withSuccessCount(successfulEntities) .withFailedCount(failedEntities) - .withMessage(String.format("Issues in Sink to OpenSearch: %s", entityErrorList)) + .withMessage(String.format("Issues in Sink to Elasticsearch: %s", entityErrorList)) .withFailedEntities(entityErrorList)); } } - private void sendBulkRequest(List> requests, List entityErrorList) + private void sendBulkRequestAsync( + List> requests, List entityErrorList) throws SearchIndexException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(requests); - int attempt = 0; - long backoffMillis = initialBackoffMillis; + try { + semaphore.acquire(); + LOG.debug("Semaphore acquired. Available permits: {}", semaphore.availablePermits()); + ((RestHighLevelClient) client.getClient()) + .bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + try { + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + String failureMessage = itemResponse.getFailureMessage(); + String entityData = requests.get(i).toString(); + entityErrorList.add( + new EntityError().withMessage(failureMessage).withEntity(entityData)); + LOG.warn("Bulk item failed: {}", failureMessage); + } + } - while (attempt <= maxRetries) { - try { - semaphore.acquire(); - try { - BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); - entityErrorList.addAll(getErrorsFromBulkResponse(response)); - break; // Success, exit retry loop - } finally { - semaphore.release(); - } - } catch (IOException e) { - if (isRetriableException(e)) { - attempt++; - LOG.warn( - "Bulk request failed with retriable exception, retrying attempt {}/{}", - attempt, - maxRetries); - sleepWithBackoff(backoffMillis); - backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); - } else { - LOG.error("Bulk request failed with non-retriable exception", e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Bulk request interrupted", e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } catch (OpenSearchException e) { - if (isRetriableStatusCode(e.status())) { - attempt++; - LOG.warn( - "Bulk request failed with status {}, retrying attempt {}/{}", - e.status(), - attempt, - maxRetries); - sleepWithBackoff(backoffMillis); - backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); - } else { - LOG.error("Bulk request failed with non-retriable status {}", e.status(), e); - throw new SearchIndexException(createIndexingError(requests.size(), e)); - } - } + int success = response.getItems().length; + int failed = 0; + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + failed++; + } + } + success -= failed; + updateStats(success, failed); + + if (response.hasFailures()) { + LOG.warn("Bulk request completed with failures. Total Failures: {}", failed); + } else { + LOG.debug("Bulk request successful with {} operations.", success); + } + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released. Available permits: {}", semaphore.availablePermits()); + } + } + + @Override + public void onFailure(Exception e) { + try { + LOG.error("Bulk request failed asynchronously", e); + if (isRetriableException(e)) { + retryBulkRequest(bulkRequest, entityErrorList, 1); + } else { + handleNonRetriableException(requests.size(), e); + } + } catch (Exception ex) { + LOG.error("Bulk request retry attempt {}/{} failed", 1, maxRetries, ex); + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after failure. Available permits: {}", + semaphore.availablePermits()); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request interrupted", e); + throw new SearchIndexException(createIndexingError(requests.size(), e)); } + } + private void retryBulkRequest( + BulkRequest bulkRequest, List entityErrorList, int attempt) + throws SearchIndexException { if (attempt > maxRetries) { + LOG.error("Exceeded maximum retries for bulk request"); throw new SearchIndexException( new IndexingError() .withErrorSource(SINK) - .withSubmittedCount(requests.size()) + .withSubmittedCount(bulkRequest.numberOfActions()) .withSuccessCount(0) - .withFailedCount(requests.size()) + .withFailedCount(bulkRequest.numberOfActions()) .withMessage("Exceeded maximum retries for bulk request")); } + + long backoffMillis = + Math.min(initialBackoffMillis * (long) Math.pow(2, attempt - 1), maxBackoffMillis); + LOG.info( + "Retrying bulk request (attempt {}/{}) after {} ms", attempt, maxRetries, backoffMillis); + + try { + Thread.sleep(backoffMillis + ThreadLocalRandom.current().nextLong(0, backoffMillis)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Sleep interrupted during backoff", e); + throw new SearchIndexException(createIndexingError(bulkRequest.numberOfActions(), e)); + } + + try { + semaphore.acquire(); + LOG.debug( + "Semaphore acquired for retry attempt {}/{}. Available permits: {}", + attempt, + maxRetries, + semaphore.availablePermits()); + ((RestHighLevelClient) client.getClient()) + .bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + try { + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + String failureMessage = itemResponse.getFailureMessage(); + String entityData = bulkRequest.requests().get(i).toString(); + entityErrorList.add( + new EntityError().withMessage(failureMessage).withEntity(entityData)); + LOG.warn("Bulk item failed on retry {}: {}", attempt, failureMessage); + } + } + + int success = response.getItems().length; + int failed = 0; + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + failed++; + } + } + success -= failed; + updateStats(success, failed); + + if (response.hasFailures()) { + LOG.warn( + "Bulk request retry attempt {}/{} completed with {} failures.", + attempt, + maxRetries, + failed); + } else { + LOG.debug( + "Bulk request retry attempt {}/{} successful with {} operations.", + attempt, + maxRetries, + success); + } + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after retry. Available permits: {}", + semaphore.availablePermits()); + } + } + + @Override + public void onFailure(Exception e) { + try { + LOG.error("Bulk request retry attempt {}/{} failed", attempt, maxRetries, e); + if (isRetriableException(e)) { + retryBulkRequest(bulkRequest, entityErrorList, attempt + 1); + } else { + handleNonRetriableException(bulkRequest.numberOfActions(), e); + } + } catch (Exception ex) { + LOG.error("Bulk request retry attempt {}/{} failed", attempt, maxRetries, ex); + } finally { + semaphore.release(); + LOG.debug( + "Semaphore released after retry failure. Available permits: {}", + semaphore.availablePermits()); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Bulk request retry interrupted", e); + throw new SearchIndexException(createIndexingError(bulkRequest.numberOfActions(), e)); + } + } + + private void handleNonRetriableException(int requestCount, Exception e) + throws SearchIndexException { + throw new SearchIndexException( + new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e))); } private boolean isRetriableException(Exception e) { - return e instanceof IOException; + return e instanceof IOException + || (e instanceof OpenSearchException + && isRetriableStatusCode(((OpenSearchException) e).status())); } private boolean isRetriableStatusCode(RestStatus status) { - return status == RestStatus.TOO_MANY_REQUESTS || status == RestStatus.SERVICE_UNAVAILABLE; + return status == RestStatus.TOO_MANY_REQUESTS + || status == RestStatus.SERVICE_UNAVAILABLE + || status == RestStatus.GATEWAY_TIMEOUT; } - private void sleepWithBackoff(long millis) { - try { - Thread.sleep(millis + ThreadLocalRandom.current().nextLong(0, millis)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Sleep interrupted during backoff", e); - } + private long estimateRequestSizeInBytes(DocWriteRequest request) { + return new BulkRequest().add(request).estimatedSizeInBytes(); } - private IndexingError createIndexingError(int requestCount, Exception e) { - return new IndexingError() - .withErrorSource(SINK) - .withSubmittedCount(requestCount) - .withSuccessCount(0) - .withFailedCount(requestCount) - .withMessage(String.format("Issue in Sink to OpenSearch: %s", e.getMessage())) - .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); + @Override + public void updateStats(int currentSuccess, int currentFailed) { + getUpdatedStats(stats, currentSuccess, currentFailed); + } + + @Override + public StepStats getStats() { + return stats; + } + + @Override + public void close() throws IOException { + client.close(); } private DocWriteRequest convertEntityToRequest(Object entity, String entityType) { if (entity instanceof EntityInterface) { - return getEntityInterfaceRequest((EntityInterface) entity, entityType); + return getEntityInterfaceRequest(entityType, (EntityInterface) entity); } else if (entity instanceof EntityTimeSeriesInterface) { - return getEntityTimeSeriesInterfaceReqeust(entityType, (EntityTimeSeriesInterface) entity); + return getEntityTimeSeriesInterfaceRequest(entityType, (EntityTimeSeriesInterface) entity); } else { throw new IllegalArgumentException("Unknown entity type: " + entity.getClass()); } } - private DocWriteRequest getEntityInterfaceRequest(EntityInterface entity, String entityType) { + private UpdateRequest getEntityInterfaceRequest(String entityType, EntityInterface entity) { IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); UpdateRequest updateRequest = new UpdateRequest( indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), entity.getId().toString()); - updateRequest.doc( - JsonUtils.pojoToJson( - Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity)) - .buildSearchIndexDoc()), - XContentType.JSON); + String jsonDoc = + JsonUtils.pojoToJson(Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc()); + updateRequest.doc(jsonDoc, XContentType.JSON); updateRequest.docAsUpsert(true); return updateRequest; } - private UpdateRequest getEntityTimeSeriesInterfaceReqeust( + private UpdateRequest getEntityTimeSeriesInterfaceRequest( String entityType, EntityTimeSeriesInterface entity) { IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType); UpdateRequest updateRequest = new UpdateRequest( indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()), entity.getId().toString()); - updateRequest.doc( - JsonUtils.pojoToJson( - Objects.requireNonNull( - Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())), - XContentType.JSON); + String jsonDoc = + JsonUtils.pojoToJson(Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc()); + updateRequest.doc(jsonDoc, XContentType.JSON); updateRequest.docAsUpsert(true); return updateRequest; } - private long estimateRequestSizeInBytes(DocWriteRequest request) { - return new BulkRequest().add(request).estimatedSizeInBytes(); - } - - @Override - public void updateStats(int currentSuccess, int currentFailed) { - getUpdatedStats(stats, currentSuccess, currentFailed); - } - - @Override - public StepStats getStats() { - return stats; - } - - @Override - public void close() throws IOException { - // Close resources if needed - client.close(); + private IndexingError createIndexingError(int requestCount, Exception e) { + return new IndexingError() + .withErrorSource(SINK) + .withSubmittedCount(requestCount) + .withSuccessCount(0) + .withFailedCount(requestCount) + .withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage())) + .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 19dc90689cda..7de871a1dd8d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -360,6 +360,8 @@ default List> fetchDIChartFields() throws IOException { Object getLowLevelClient(); + Object getClient(); + static boolean shouldApplyRbacConditions( SubjectContext subjectContext, RBACConditionEvaluator rbacConditionEvaluator) { return Boolean.TRUE.equals( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 09c8aae4886d..a67c4b369376 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -131,6 +131,7 @@ import javax.json.JsonObject; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.WordUtils; @@ -217,6 +218,7 @@ public class ElasticSearchClient implements SearchClient { @SuppressWarnings("deprecated") + @Getter protected final RestHighLevelClient client; private final RBACConditionEvaluator rbacConditionEvaluator; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index b8f3a64c9cc4..074cc6a71a76 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -51,6 +51,7 @@ import javax.json.JsonObject; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.WordUtils; @@ -214,7 +215,7 @@ @Slf4j // Not tagged with Repository annotation as it is programmatically initialized public class OpenSearchClient implements SearchClient { - protected final RestHighLevelClient client; + @Getter protected final RestHighLevelClient client; public static final NamedXContentRegistry X_CONTENT_REGISTRY; private final boolean isClientAvailable; private final RBACConditionEvaluator rbacConditionEvaluator;