Skip to content

Commit

Permalink
Fix Producer Sync Issue (#18635)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohityadav766 authored Nov 13, 2024
1 parent 887ffab commit e7e4cbf
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -161,6 +162,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
private volatile boolean stopped = false;
private ExecutorService consumerExecutor;
private ExecutorService producerExecutor;
private ExecutorService jobExecutor = Executors.newFixedThreadPool(2);
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
Expand Down Expand Up @@ -300,14 +302,16 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
numProducers,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(jobData.getQueueSize()));
new LinkedBlockingQueue<>(jobData.getQueueSize()),
new ThreadPoolExecutor.CallerRunsPolicy());

try {
processEntityReindex(jobExecutionContext);
} catch (Exception e) {
LOG.error("Error during reindexing process.", e);
throw e;
} finally {
shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
}
Expand All @@ -317,6 +321,14 @@ private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
jobExecutor.submit(() -> submitProducerTask(producerLatch));
jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext));

producerLatch.await();
sendPoisonPills(numConsumers);
}

private void submitProducerTask(CountDownLatch producerLatch) {
for (String entityType : jobData.getEntities()) {
try {
reCreateIndexes(entityType);
Expand All @@ -342,8 +354,10 @@ private void processEntityReindex(JobExecutionContext jobExecutionContext)
LOG.error("Error processing entity type {}", entityType, e);
}
}
}

for (int i = 0; i < numConsumers; i++) {
private void submitConsumerTask(JobExecutionContext jobExecutionContext) {
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
consumerExecutor.submit(
() -> {
try {
Expand All @@ -354,9 +368,6 @@ private void processEntityReindex(JobExecutionContext jobExecutionContext)
}
});
}

producerLatch.await();
sendPoisonPills(numConsumers);
}

private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {
Expand Down

0 comments on commit e7e4cbf

Please sign in to comment.