From bdc65cbc901ae059ef67da9a76cac870fd8c1ea1 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 13 Jun 2024 19:26:28 +1000 Subject: [PATCH] Update translog writeLocation for flushListener after commit (#109603) During flush, the shard can keep processing concurrent indexing requests which advance the translog location. If all changes are then commited by the indexWriter, we will update the `flushListener` with a more up-to-date translog location that is read after the commit. --- docs/changelog/109603.yaml | 5 ++ .../index/engine/InternalEngine.java | 8 ++ .../index/engine/InternalEngineTests.java | 74 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 docs/changelog/109603.yaml diff --git a/docs/changelog/109603.yaml b/docs/changelog/109603.yaml new file mode 100644 index 0000000000000..2d6e8b94aa8d0 --- /dev/null +++ b/docs/changelog/109603.yaml @@ -0,0 +1,5 @@ +pr: 109603 +summary: Update translog `writeLocation` for `flushListener` after commit +area: Engine +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 245cef2d97b24..be64365fedd34 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2223,6 +2223,14 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); translog.trimUnreferencedReaders(); + // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and + // (2) indexWriter has committed all the changes (checks must be done in this order). + // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. + final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation(); + if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) { + assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation; + commitLocation = writeLocationAfterFlush; + } // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in // the above lines would not lead the engine to think that it recently flushed, when it did not. this.lastFlushTimestamp = lastFlushTimestamp; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 00de132f9200e..d4ff35fee549e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -7737,6 +7737,80 @@ public void testFlushListener() throws Exception { } } + public void testFlushListenerWithConcurrentIndexing() throws IOException, InterruptedException { + engine.close(); + final var barrierReference = new AtomicReference(); + engine = new InternalTestEngine(engine.config()) { + @Override + protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { + final CyclicBarrier barrier = barrierReference.get(); + if (barrier != null) { + safeAwait(barrier); + safeAwait(barrier); + } + super.commitIndexWriter(writer, translog); + if (barrier != null) { + safeAwait(barrier); + safeAwait(barrier); + } + } + }; + recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE); + final var barrier = new CyclicBarrier(2); + barrierReference.set(barrier); + + // (1) Indexing the 1st doc before flush and it should be visible after flush + final Engine.IndexResult result1 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null))); + final PlainActionFuture future1 = new PlainActionFuture<>(); + engine.addFlushListener(result1.getTranslogLocation(), future1); + assertFalse(future1.isDone()); + final Thread flushThread = new Thread(() -> engine.flush()); + flushThread.start(); + + // (2) Wait till flush thread block before commitIndexWriter and indexing the 2nd doc + safeAwait(barrier); + final Engine.IndexResult result2 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null))); + final PlainActionFuture future2 = new PlainActionFuture<>(); + engine.addFlushListener(result2.getTranslogLocation(), future2); + assertFalse(future2.isDone()); + + // Let flush completes the commit + safeAwait(barrier); + safeAwait(barrier); + + // Randomly indexing the 3rd doc after commit. + final PlainActionFuture future3; + final boolean indexingAfterCommit = randomBoolean(); + if (indexingAfterCommit) { + final Engine.IndexResult result3 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null))); + future3 = new PlainActionFuture<>(); + engine.addFlushListener(result3.getTranslogLocation(), future3); + assertFalse(future3.isDone()); + } else { + future3 = null; + } + safeAwait(barrier); + flushThread.join(); + + // The translog location before flush (1st doc) is always visible + assertThat(safeGet(future1), equalTo(engine.getLastCommittedSegmentInfos().getGeneration())); + + if (indexingAfterCommit) { + // Indexing after the commit makes indexWriter.hasUncommittedChanges() return true which in turn makes + // it unsafe to advance flushListener's commitLocation after commit. That is, the flushListener + // will not learn the translog location of the 2nd doc. + assertFalse(future2.isDone()); + // It requires a 2nd flush to make all translog locations to be visible + barrierReference.set(null); // remove the flush barrier + engine.flush(); + assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration())); + assertThat(safeGet(future3), equalTo(engine.getLastCommittedSegmentInfos().getGeneration())); + } else { + // If no indexing after commit, translog location of the 2nd doc should be visible. + assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration())); + } + } + private static void assertCommitGenerations(Map commits, List expectedGenerations) { assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations); }