Skip to content

Commit

Permalink
Search Index Issues (#18648)
Browse files Browse the repository at this point in the history
* Search Index Issues

* Log Thread

* Make logs debug

* Remove Current Therad Name
  • Loading branch information
mohityadav766 authored Nov 14, 2024
1 parent 75d417d commit 560e28d
Showing 1 changed file with 46 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
public class SearchIndexApp extends AbstractNativeApplication {

private static final String ALL = "all";

public static final Set<String> ALL_ENTITIES =
Set.of(
TABLE,
Expand Down Expand Up @@ -162,8 +163,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
private volatile boolean stopped = false;
private ExecutorService consumerExecutor;
private ExecutorService producerExecutor;
private ExecutorService jobExecutor = Executors.newFixedThreadPool(2);
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);

Expand Down Expand Up @@ -289,6 +291,7 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);

taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
consumerExecutor =
new ThreadPoolExecutor(
numConsumers,
Expand All @@ -302,7 +305,7 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
numProducers,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(jobData.getQueueSize()),
producerQueue,
new ThreadPoolExecutor.CallerRunsPolicy());

try {
Expand All @@ -321,38 +324,48 @@ private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
jobExecutor.submit(() -> submitProducerTask(producerLatch));
jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext));
submitProducerTask(producerLatch);
submitConsumerTask(jobExecutionContext);

producerLatch.await();
sendPoisonPills(numConsumers);
}

private void submitProducerTask(CountDownLatch producerLatch) {
for (String entityType : jobData.getEntities()) {
try {
reCreateIndexes(entityType);
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
if (totalEntityRecords > 0) {
for (int i = 0; i < noOfThreads; i++) {
int currentOffset = i * batchSize.get();
producerExecutor.submit(
() -> {
try {
processReadTask(entityType, source, currentOffset);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
} finally {
producerLatch.countDown();
}
});
}
}
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
}
jobExecutor.submit(
() -> {
try {
reCreateIndexes(entityType);
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
if (totalEntityRecords > 0) {
for (int i = 0; i < noOfThreads; i++) {
LOG.debug(
"Submitting producer task current queue size: {}", producerQueue.size());
int currentOffset = i * batchSize.get();
producerExecutor.submit(
() -> {
try {
LOG.debug(
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
currentOffset,
producerLatch.getCount());
processReadTask(entityType, source, currentOffset);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
} finally {
LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount());
producerLatch.countDown();
}
});
}
}
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
}
});
}
}

Expand Down Expand Up @@ -572,6 +585,7 @@ private void reCreateIndexes(String entityType) throws SearchIndexException {
public void stopJob() {
LOG.info("Stopping reindexing job.");
stopped = true;
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -679,12 +693,12 @@ private int getTotalEntityRecords(String entityType) {
private void processReadTask(String entityType, Source<?> source, int offset) {
try {
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
LOG.info("Read Entities with CurrentOffset: {}", offset);
if (resultList != null) {
ResultList<?> entities = extractEntities(entityType, resultList);
if (!nullOrEmpty(entities.getData())) {
LOG.info(
"Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset);
createIndexingTask(entityType, entities, offset);
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
taskQueue.put(task);
}
}
} catch (InterruptedException e) {
Expand All @@ -708,13 +722,7 @@ private void processReadTask(String entityType, Source<?> source, int offset) {
}
}

private void createIndexingTask(String entityType, ResultList<?> entities, int offset)
throws InterruptedException {
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
taskQueue.put(task);
}

private synchronized int calculateNumberOfThreads(int totalEntityRecords) {
private int calculateNumberOfThreads(int totalEntityRecords) {
int mod = totalEntityRecords % batchSize.get();
if (mod == 0) {
return totalEntityRecords / batchSize.get();
Expand All @@ -732,7 +740,7 @@ private ResultList<?> extractEntities(String entityType, Object resultList) {
}
}

private synchronized int getRemainingRecordsToProcess(String entityType) {
private int getRemainingRecordsToProcess(String entityType) {
StepStats entityStats =
((StepStats)
searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType));
Expand Down

0 comments on commit 560e28d

Please sign in to comment.