diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index f7ca1f577478..bf4afd568e12 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -322,12 +322,15 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte private void processEntityReindex(JobExecutionContext jobExecutionContext) throws InterruptedException { int numConsumers = jobData.getConsumerThreads(); - CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); + int latchCount = getTotalLatchCount(jobData.getEntities()); + CountDownLatch producerLatch = new CountDownLatch(latchCount); + CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers); submitProducerTask(producerLatch); - submitConsumerTask(jobExecutionContext); + submitConsumerTask(jobExecutionContext, consumerLatch); producerLatch.await(); sendPoisonPills(numConsumers); + consumerLatch.await(); } private void submitProducerTask(CountDownLatch producerLatch) { @@ -368,12 +371,12 @@ private void submitProducerTask(CountDownLatch producerLatch) { } } - private void submitConsumerTask(JobExecutionContext jobExecutionContext) { + private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) { for (int i = 0; i < jobData.getConsumerThreads(); i++) { consumerExecutor.submit( () -> { try { - consumeTasks(jobExecutionContext); + consumeTasks(jobExecutionContext, latch); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Consumer thread interrupted."); @@ -382,7 +385,8 @@ private void submitConsumerTask(JobExecutionContext jobExecutionContext) { } } - private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException { + private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch) + throws InterruptedException { while (true) { IndexingTask task = taskQueue.take(); LOG.info( @@ -391,9 +395,10 @@ private void consumeTasks(JobExecutionContext jobExecutionContext) throws Interr task.currentEntityOffset()); if (task == IndexingTask.POISON_PILL) { LOG.debug("Received POISON_PILL. Consumer thread terminating."); + latch.countDown(); break; } - processTask(task, jobExecutionContext); + processTask(task, jobExecutionContext, latch); } } @@ -589,7 +594,8 @@ public void stopJob() { shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); } - private void processTask(IndexingTask task, JobExecutionContext jobExecutionContext) { + private void processTask( + IndexingTask task, JobExecutionContext jobExecutionContext, CountDownLatch latch) { String entityType = task.entityType(); ResultList entities = task.entities(); Map contextData = new HashMap<>(); @@ -632,7 +638,7 @@ private void processTask(IndexingTask task, JobExecutionContext jobExecutionC } catch (Exception e) { synchronized (jobDataLock) { - jobData.setStatus(EventPublisherJob.Status.FAILED); + jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR); jobData.setFailure( new IndexingError() .withErrorSource(IndexingError.ErrorSource.JOB) @@ -646,6 +652,7 @@ private void processTask(IndexingTask task, JobExecutionContext jobExecutionC LOG.error("Unexpected error during processing task for entity {}", entityType, e); } finally { sendUpdates(jobExecutionContext); + latch.countDown(); } }