Skip to content

Commit

Permalink
Update translog writeLocation for flushListener after commit (#109603)
Browse files Browse the repository at this point in the history
During flush, the shard can keep processing concurrent indexing requests
which advance the translog location. If all changes are then commited by
the indexWriter, we will update the `flushListener` with a more
up-to-date translog location that is read after the commit.
  • Loading branch information
ywangd committed Jun 13, 2024
1 parent d58da2c commit bdc65cb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/109603.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109603
summary: Update translog `writeLocation` for `flushListener` after commit
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2223,6 +2223,14 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);
translog.trimUnreferencedReaders();
// Update the translog location for flushListener if (1) the writeLocation has changed during the flush and
// (2) indexWriter has committed all the changes (checks must be done in this order).
// If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended.
final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation();
if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) {
assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation;
commitLocation = writeLocationAfterFlush;
}
// Use the timestamp from when the flush started, but only update it in case of success, so that any exception in
// the above lines would not lead the engine to think that it recently flushed, when it did not.
this.lastFlushTimestamp = lastFlushTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7737,6 +7737,80 @@ public void testFlushListener() throws Exception {
}
}

public void testFlushListenerWithConcurrentIndexing() throws IOException, InterruptedException {
engine.close();
final var barrierReference = new AtomicReference<CyclicBarrier>();
engine = new InternalTestEngine(engine.config()) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
final CyclicBarrier barrier = barrierReference.get();
if (barrier != null) {
safeAwait(barrier);
safeAwait(barrier);
}
super.commitIndexWriter(writer, translog);
if (barrier != null) {
safeAwait(barrier);
safeAwait(barrier);
}
}
};
recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE);
final var barrier = new CyclicBarrier(2);
barrierReference.set(barrier);

// (1) Indexing the 1st doc before flush and it should be visible after flush
final Engine.IndexResult result1 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
final PlainActionFuture<Long> future1 = new PlainActionFuture<>();
engine.addFlushListener(result1.getTranslogLocation(), future1);
assertFalse(future1.isDone());
final Thread flushThread = new Thread(() -> engine.flush());
flushThread.start();

// (2) Wait till flush thread block before commitIndexWriter and indexing the 2nd doc
safeAwait(barrier);
final Engine.IndexResult result2 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
final PlainActionFuture<Long> future2 = new PlainActionFuture<>();
engine.addFlushListener(result2.getTranslogLocation(), future2);
assertFalse(future2.isDone());

// Let flush completes the commit
safeAwait(barrier);
safeAwait(barrier);

// Randomly indexing the 3rd doc after commit.
final PlainActionFuture<Long> future3;
final boolean indexingAfterCommit = randomBoolean();
if (indexingAfterCommit) {
final Engine.IndexResult result3 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
future3 = new PlainActionFuture<>();
engine.addFlushListener(result3.getTranslogLocation(), future3);
assertFalse(future3.isDone());
} else {
future3 = null;
}
safeAwait(barrier);
flushThread.join();

// The translog location before flush (1st doc) is always visible
assertThat(safeGet(future1), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));

if (indexingAfterCommit) {
// Indexing after the commit makes indexWriter.hasUncommittedChanges() return true which in turn makes
// it unsafe to advance flushListener's commitLocation after commit. That is, the flushListener
// will not learn the translog location of the 2nd doc.
assertFalse(future2.isDone());
// It requires a 2nd flush to make all translog locations to be visible
barrierReference.set(null); // remove the flush barrier
engine.flush();
assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
assertThat(safeGet(future3), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
} else {
// If no indexing after commit, translog location of the 2nd doc should be visible.
assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
}
}

private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
}
Expand Down

0 comments on commit bdc65cb

Please sign in to comment.