Skip to content

Commit

Permalink
Removed Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mohityadav766 committed Nov 18, 2024
1 parent ae1e7e6 commit 537f887
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -161,10 +162,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
@Getter private EventPublisherJob jobData;
private final Object jobDataLock = new Object();
private volatile boolean stopped = false;
private ExecutorService consumerExecutor;
private ExecutorService producerExecutor;
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
Expand Down Expand Up @@ -290,15 +289,7 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
int numConsumers = jobData.getConsumerThreads();
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);

taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
consumerExecutor =
new ThreadPoolExecutor(
numConsumers,
numConsumers,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(jobData.getQueueSize()));
producerExecutor =
new ThreadPoolExecutor(
numProducers,
Expand All @@ -316,35 +307,31 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
} finally {
shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
}
}

private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
int latchCount = getTotalLatchCount(jobData.getEntities());
CountDownLatch producerLatch = new CountDownLatch(latchCount);
CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers);
submitProducerTask(producerLatch);
submitConsumerTask(jobExecutionContext, consumerLatch);

submitProducerTask(jobExecutionContext, producerLatch);
producerLatch.await();
sendPoisonPills(numConsumers);
consumerLatch.await();
}

private void submitProducerTask(CountDownLatch producerLatch) {
private void submitProducerTask(
JobExecutionContext jobExecutionContext, CountDownLatch producerLatch) {
for (String entityType : jobData.getEntities()) {
jobExecutor.submit(
() -> {
try {
reCreateIndexes(entityType);
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
Semaphore semaphore = new Semaphore(jobData.getQueueSize());
if (totalEntityRecords > 0) {
for (int i = 0; i < noOfThreads; i++) {
for (int i = 0; i < loadPerThread; i++) {
semaphore.acquire();
LOG.debug(
"Submitting producer task current queue size: {}", producerQueue.size());
int currentOffset = i * batchSize.get();
Expand All @@ -355,12 +342,15 @@ private void submitProducerTask(CountDownLatch producerLatch) {
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
currentOffset,
producerLatch.getCount());
processReadTask(entityType, source, currentOffset);
processReadTask(jobExecutionContext, entityType, source, currentOffset);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
} finally {
LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount());
LOG.debug(
"Producer Latch Down and Semaphore Release, Current : {}",
producerLatch.getCount());
producerLatch.countDown();
semaphore.release();
}
});
}
Expand All @@ -372,50 +362,6 @@ private void submitProducerTask(CountDownLatch producerLatch) {
}
}

private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) {
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
consumerExecutor.submit(
() -> {
try {
consumeTasks(jobExecutionContext, latch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Consumer thread interrupted.");
}
});
}
}

private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch)
throws InterruptedException {
while (true) {
IndexingTask<?> task = taskQueue.take();
LOG.info(
"Consuming Indexing Task for entityType: {}, entity offset : {}",
task.entityType(),
task.currentEntityOffset());
if (task == IndexingTask.POISON_PILL) {
LOG.debug("Received POISON_PILL. Consumer thread terminating.");
latch.countDown();
break;
}
processTask(task, jobExecutionContext, latch);
}
}

/**
* Sends POISON_PILLs to signal consumer threads to terminate.
*
* @param numConsumers The number of consumers to signal.
* @throws InterruptedException If the thread is interrupted while waiting.
*/
private void sendPoisonPills(int numConsumers) throws InterruptedException {
for (int i = 0; i < numConsumers; i++) {
taskQueue.put(IndexingTask.POISON_PILL);
}
LOG.debug("Sent {} POISON_PILLs to consumers.", numConsumers);
}

/**
* Shuts down an executor service gracefully.
*
Expand Down Expand Up @@ -592,11 +538,9 @@ public void stopJob() {
stopped = true;
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
}

private void processTask(
IndexingTask<?> task, JobExecutionContext jobExecutionContext, CountDownLatch latch) {
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
String entityType = task.entityType();
ResultList<?> entities = task.entities();
Map<String, Object> contextData = new HashMap<>();
Expand Down Expand Up @@ -653,7 +597,6 @@ private void processTask(
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
} finally {
sendUpdates(jobExecutionContext);
latch.countDown();
}
}

Expand Down Expand Up @@ -697,20 +640,18 @@ private int getTotalEntityRecords(String entityType) {
.getTotalRecords();
}

private void processReadTask(String entityType, Source<?> source, int offset) {
private void processReadTask(
JobExecutionContext jobExecutionContext, String entityType, Source<?> source, int offset) {
try {
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
LOG.info("Read Entities with CurrentOffset: {}", offset);
LOG.debug("Read Entities with entityType: {}, CurrentOffset: {}", entityType, offset);
if (resultList != null) {
ResultList<?> entities = extractEntities(entityType, resultList);
if (!nullOrEmpty(entities.getData())) {
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
taskQueue.put(task);
processTask(task, jobExecutionContext);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Reader thread interrupted for entityType: {}", entityType);
} catch (SearchIndexException e) {
LOG.error("Error while reading source for entityType: {}", entityType, e);
synchronized (jobDataLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ List<String> migrationListAfterWithOffset(
@Define("nameHashColumn") String nameHashColumnName,
@Bind("limit") int limit);

@SqlQuery("SELECT json FROM <table> <cond> ORDER BY name LIMIT :limit OFFSET :offset")
@SqlQuery("SELECT json FROM <table> <cond> ORDER BY id LIMIT :limit OFFSET :offset")
List<String> listAfter(
@Define("table") String table,
@BindMap Map<String, ?> params,
Expand Down

0 comments on commit 537f887

Please sign in to comment.