Skip to content

Commit

Permalink
[Transaction] Fix cursor readPosition is bigger than maxPosition in O…
Browse files Browse the repository at this point in the history
…pReadEntry (#14667)

Fix cursor read op dead loop.
1. in OpReadEntry we can't use cursor readPosition, because it is not the OpReadEntry readPosition, it is cursor readPosition when one ledger is empty the OpReadEntry readPosition is not equals cursor readPosition, we should use OpReadEntry readPosition to judge the OpReadEntry can be finished.
2. when readPosition is bigger than maxPosition in OpReadEntry, we should complete this OpReadEntry
add test

(cherry picked from commit a242f03)
  • Loading branch information
congbobo184 authored and codelipenghui committed Mar 14, 2022
1 parent 65af6ce commit 5258f12
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ void updateReadPosition(Position newReadPosition) {
}

void checkReadCompletion() {
if (entries.size() < count && cursor.hasMoreEntries() &&
((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
// op readPosition is smaller or equals maxPosition then can read again
if (entries.size() < count && cursor.hasMoreEntries()
&& maxPosition.compareTo(readPosition) > 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -59,7 +58,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
Expand Down Expand Up @@ -3710,5 +3708,46 @@ public void testCursorNoRolloverIfNoMetadataSession() throws Exception {
assertNotEquals(cursor.getCursorLedger(), initialLedgerId);
}

@Test
public void testReadEmptyEntryList() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(1);
managedLedgerConfig.setMetadataMaxEntriesPerLedger(1);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testReadEmptyEntryList", managedLedgerConfig);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");

PositionImpl lastPosition = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
ledger.rollCurrentLedgerIfFull();

AtomicBoolean flag = new AtomicBoolean();
flag.set(false);
ReadEntriesCallback callback = new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (entries.size() == 0) {
flag.set(true);
}
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
};

// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
null, PositionImpl.get(lastPosition.getLedgerId(), -1));
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.earliest);
ledger.asyncReadEntries(opReadEntry);

// when readPosition is bigger than maxReadPosition, should complete the opReadEntry
Awaitility.await().untilAsserted(() -> assertTrue(flag.get()));
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}

0 comments on commit 5258f12

Please sign in to comment.