Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue: Allow stuck entries (same node) to be reaped. See #169 #170

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions queue/src/main/java/org/killbill/queue/DBBackedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,39 +282,27 @@ public Void inTransaction(final QueueSqlDao<T> transactional, final TransactionS
return null;
}

final Collection<T> entriesToMove = new ArrayList<T>(entriesLeftBehind.size());
final List<T> entriesToReInsert = new ArrayList<T>(entriesLeftBehind.size());
final List<T> stuckEntries = new LinkedList<T>();
final List<T> lateEntries = new LinkedList<T>();
for (final T entryLeftBehind : entriesLeftBehind) {
// entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node
final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner());
// entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late
final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null;
if (entryIsBeingProcessedByThisNode) {
// See https://github.com/killbill/killbill-commons/issues/47
stuckEntries.add(entryLeftBehind);
} else if (entryCreatedByThisNodeAndNeverProcessed) {
if (entryCreatedByThisNodeAndNeverProcessed) {
lateEntries.add(entryLeftBehind);
} else {
// Fields will be reset appropriately in insertReapedEntriesFromTransaction
entriesToReInsert.add(entryLeftBehind);

} else { /* This includes entryIsBeingProcessedByThisNode (owner.equals(entryLeftBehind.getProcessingOwner())). See https://github.com/killbill/killbill-commons/issues/169 */
// Set the status to REAPED in the history table
entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
entriesToMove.add(entryLeftBehind);
entriesToReInsert.add(entryLeftBehind);
}
}

if (!stuckEntries.isEmpty()) {
log.warn("{} reapEntries: stuck queue entries {}", DB_QUEUE_LOG_ID, stuckEntries);
}
if (!lateEntries.isEmpty()) {
log.warn("{} reapEntries: late queue entries {}", DB_QUEUE_LOG_ID, lateEntries);
}

if (!entriesToReInsert.isEmpty()) {
moveEntriesToHistoryFromTransaction(transactional, entriesToMove);
moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert);
insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now);
log.warn("{} reapEntries: {} entries were reaped by {} {}",
DB_QUEUE_LOG_ID,
Expand Down
3 changes: 3 additions & 0 deletions queue/src/test/java/org/killbill/TestSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class TestSetup {

private static final String TEST_DB_PROPERTY_PREFIX = "org.killbill.billing.dbi.test.";

protected static final Long SEARCH_KEY_1 = 1L;
protected static final Long SEARCH_KEY_2 = 2L;

protected EmbeddedDB embeddedDB;

protected DBI dbi;
Expand Down
16 changes: 8 additions & 8 deletions queue/src/test/java/org/killbill/queue/TestReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@

public class TestReaper extends TestSetup {

private static final Long SEARCH_KEY_1 = 1L;
private static final Long SEARCH_KEY_2 = 2L;

private DBBackedQueue<BusEventModelDao> queue;
private PersistentBusSqlDao sqlDao;
Expand Down Expand Up @@ -102,26 +100,28 @@ public void testReapEntries() {
queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate());

final List<BusEventModelDao> readyEntriesAfterReaping = sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName());
assertEquals(readyEntriesAfterReaping.size(), 2);
assertEquals(readyEntriesAfterReaping.size(), 3);
assertEquals(readyEntriesAfterReaping.get(0).getRecordId(), (Long) 1L);
assertTrue(readyEntriesAfterReaping.get(1).getRecordId() > (Long) 6L);
assertTrue(readyEntriesAfterReaping.get(2).getRecordId() > (Long) 6L);

final List<BusEventModelDao> readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList(sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName()));
assertEquals(readyOrInProcessingAfterReaping.size(), 6);
assertEquals(readyOrInProcessingAfterReaping.get(0).getRecordId(), (Long) 1L);
assertEquals(readyOrInProcessingAfterReaping.get(1).getRecordId(), (Long) 2L);
assertEquals(readyOrInProcessingAfterReaping.get(2).getRecordId(), (Long) 3L);
assertEquals(readyOrInProcessingAfterReaping.get(3).getRecordId(), (Long) 4L);
// That stuck entry hasn't moved (https://github.com/killbill/killbill-commons/issues/47)
assertEquals(readyOrInProcessingAfterReaping.get(4).getRecordId(), (Long) 5L);
// New (reaped) one
// New (reaped) ones
assertTrue(readyOrInProcessingAfterReaping.get(4).getRecordId() > (Long) 6L);
assertTrue(readyOrInProcessingAfterReaping.get(5).getRecordId() > (Long) 6L);

// Check history table
final List<BusEventModelDao> historicalQueueEntries = Iterators.toUnmodifiableList(sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName()));
assertEquals(historicalQueueEntries.size(), 1);
assertEquals(historicalQueueEntries.size(), 2);
assertEquals(historicalQueueEntries.get(0).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(4).getUserToken());
assertEquals(historicalQueueEntries.get(1).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(1).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
}

private BusEventModelDao createEntry(final long recordId,
Expand Down
20 changes: 15 additions & 5 deletions queue/src/test/java/org/killbill/queue/TestReaperIntegration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.killbill.TestSetup;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.BusEventWithMetadata;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
Expand All @@ -39,6 +40,7 @@
import org.killbill.commons.eventbus.Subscribe;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.skife.config.TimeSpan;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -164,9 +166,17 @@ public void testWithStuckEntryProcessedByThisNode() throws EventBusException, Js

// Trigger reaper
clock.addDeltaFromReality(config.getReapThreshold().getMillis());
// It's a no-op though (it won't reap itself)
handler.ensureNotSeen(event2);
handler.assertSeenEvents(2);
// Give a chance to the reaper to run
Thread.sleep(500);

clock.addDeltaFromReality(1000);
handler.waitFor(event2);

// See https://github.com/killbill/killbill-commons/issues/169
handler.assertSeenEvents(3);
final Iterable<BusEventWithMetadata<BusEvent>> result = bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2);
final long nbItems = result.spliterator().getExactSizeIfKnown();
assertEquals(nbItems, 4);
}


Expand Down Expand Up @@ -278,8 +288,8 @@ public DummyEvent(@JsonProperty("name") final String name,

public DummyEvent() {
this(UUID.randomUUID().toString(),
System.currentTimeMillis(),
System.currentTimeMillis(),
SEARCH_KEY_1,
SEARCH_KEY_2,
UUID.randomUUID());
}

Expand Down
Loading