Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20462 Fix idle_verify hash conflicts for expiring entries #10947

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand All @@ -52,6 +53,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
Expand Down Expand Up @@ -2393,6 +2397,49 @@ public void testCacheIdleVerifyMovingParts() throws Exception {
assertContains(log, testOut.toString(), "MOVING partitions");
}

/**
* @throws Exception If failed.
*/
@Test
public void testCacheIdleVerifyExpiringEntries() throws Exception {
IgniteEx ignite = startGrids(3);

ignite.cluster().state(ACTIVE);

IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(1));

Random rnd = new Random();

// Put without expiry policy.
for (int i = 0; i < 5_000; i++)
cache.put(i, i);

// Put with expiry policy.
for (int i = 5_000; i < 10_000; i++) {
ExpiryPolicy expPol = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, rnd.nextInt(1_000)));
cache.withExpiryPolicy(expPol).put(i, i);
}

injectTestSystemOut();

assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));

assertContains(log, testOut.toString(), "no conflicts have been found");
}

/** */
private void assertDumpContains(String val) throws IOException {
Matcher fileNameMatcher = dumpFileNameMatcher();

assertTrue(fileNameMatcher.find());

String dumpContent = new String(Files.readAllBytes(Paths.get(fileNameMatcher.group(1))));

assertContains(log, dumpContent, val);
}

/** */
@Test
public void testCacheSequence() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
if (other == null)
return;

if (val.size() != other.size() || !Objects.equals(val.updateCounter(), other.updateCounter()))
if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && val.size() != other.size())
|| !Objects.equals(val.updateCounter(), other.updateCounter()))
wrnGrps.add(part.groupId());
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
Expand Down Expand Up @@ -102,7 +105,7 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
/** {@inheritDoc} */
@Override public Map<PartitionKeyV2, PartitionHashRecordV2> invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException {
if (!opCtx.snapshotDirectory().exists())
throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());;
throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());

SnapshotMetadata meta = opCtx.metadata();

Expand Down Expand Up @@ -271,6 +274,13 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {

assert hash != null : "OWNING must have hash: " + key;

// If we skip hash calculation we don't iterate over all entries and can't calculate real
// partition size (size can include entries to expire, which were already expired on other nodes
// to the moment of snapshot creation), so we need to skip size comparison if there are entries
// to expire exist.
if (hasExpiringEntries(snpCtx, pageStore, pageBuff, io.getPendingTreeRoot(pageAddr)))
hash.hasExpiringEntries(true);

res.put(key, hash);
}
catch (IOException e) {
Expand All @@ -294,6 +304,40 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
return res;
}

/** */
private boolean hasExpiringEntries(
GridKernalContext ctx,
PageStore pageStore,
ByteBuffer pageBuff,
long pendingTreeMetaId
) throws IgniteCheckedException {
if (pendingTreeMetaId == 0)
return false;

long pageAddr = GridUnsafe.bufferAddress(pageBuff);

pageBuff.clear();
pageStore.read(pendingTreeMetaId, pageBuff, true);

if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());

BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr);

int rootLvl = treeIO.getRootLevel(pageAddr);
long rootId = treeIO.getFirstPageId(pageAddr, rootLvl);

pageBuff.clear();
pageStore.read(rootId, pageBuff, true);

if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());

BPlusIO<?> rootIO = PageIO.getPageIO(pageBuff);

return rootIO.getCount(pageAddr) != 0;
}

/** {@inheritDoc} */
@Override public void complete(String name,
Collection<SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>> results) throws IgniteCheckedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public static List<Integer> compareUpdateCounters(
while (it.hasNextX()) {
CacheDataRow row = it.nextX();

if (row.expireTime() > 0)
continue;

partHash += row.key().hashCode();
partVerHash += row.version().hashCode(); // Detects ABA problem.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
@GridToStringExclude
private int regKeys;

/** If partition has entries to expire. */
@GridToStringExclude
private boolean hasExpiringEntries;

/**
* @param partKey Partition key.
* @param isPrimary Is primary.
Expand Down Expand Up @@ -219,6 +223,16 @@ public int regularKeys() {
return regKeys;
}

/** */
public boolean hasExpiringEntries() {
return hasExpiringEntries;
}

/** */
public void hasExpiringEntries(boolean hasExpiringEntries) {
this.hasExpiringEntries = hasExpiringEntries;
}

/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
out.writeObject(partKey);
Expand All @@ -233,6 +247,7 @@ public int regularKeys() {
out.writeInt(noCfKeys);
out.writeInt(binKeys);
out.writeInt(regKeys);
out.writeBoolean(hasExpiringEntries);
}

/** {@inheritDoc} */
Expand All @@ -255,6 +270,7 @@ public int regularKeys() {
noCfKeys = in.readInt();
binKeys = in.readInt();
regKeys = in.readInt();
hasExpiringEntries = in.readBoolean();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
Expand Down Expand Up @@ -600,6 +604,32 @@ public void testClusterSnapshotCheckMultipleTimes() throws Exception {
assertTrue("Threads created: " + createdThreads, createdThreads < iterations);
}

/** */
@Test
public void testClusterSnapshotCheckWithExpiring() throws Exception {
IgniteEx ignite = startGrids(3);

ignite.cluster().state(ACTIVE);

IgniteCache<Object, Object> cache = ignite.getOrCreateCache(new CacheConfiguration<>("expCache")
.setAffinity(new RendezvousAffinityFunction(false, 32)).setBackups(1));

Random rnd = new Random();

for (int i = 0; i < 10_000; i++) {
cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS,
rnd.nextInt(10_000)))).put(i, i);
}

long timeout = getTestTimeout();

snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout);

SnapshotPartitionsVerifyTaskResult res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);

assertFalse(res.idleVerifyResult().hasConflicts());
}

/**
* @param cls Class of running task.
* @param results Results of compute.
Expand Down