diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java index ca34f3cbacea9..6fa8b3c58a062 100644 --- a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java @@ -17,7 +17,9 @@ package org.apache.ignite.dump; +import java.util.Collection; import java.util.Iterator; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump; import org.apache.ignite.lang.IgniteExperimental; @@ -26,7 +28,7 @@ * * @see Dump#iterator(String, int, int) * @see DumpConsumer#onPartition(int, int, Iterator) - * @see org.apache.ignite.IgniteSnapshot#createDump(String) + * @see org.apache.ignite.IgniteSnapshot#createDump(String, Collection) */ @IgniteExperimental public interface DumpEntry { @@ -36,6 +38,11 @@ public interface DumpEntry { /** @return Expiration time. */ public long expireTime(); + /** + * @return Version of the entry. + */ + public CacheEntryVersion version(); + /** @return Key. */ public Object key(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 3847b19b3a911..71cafd158286d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -409,7 +409,7 @@ private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump, String grp while (iter.hasNext()) { DumpEntry e = iter.next(); - ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), null); + ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), e.version()); size++; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java index 11314e9d5e5ff..7e80a8dc64d3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java @@ -488,7 +488,7 @@ else if (!changed.get(cache).add(key)) // Entry changed several time during dump else if (val == null) reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip. else { - write(cache, expireTime, key, val); + write(cache, expireTime, key, val, ver); changedCnt.increment(); } @@ -532,7 +532,7 @@ public boolean writeForIterator( else if (changed.get(cache).contains(key)) written = false; else - write(cache, expireTime, key, val); + write(cache, expireTime, key, val, ver); if (log.isTraceEnabled()) { log.trace("Iterator [" + @@ -540,17 +540,18 @@ else if (changed.get(cache).contains(key)) ", cache=" + cache + ", part=" + part + ", key=" + key + - ", written=" + written + ']'); + ", written=" + written + + ", ver=" + ver + ']'); } return written; } /** */ - private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val) { + private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) { synchronized (serializer) { // Prevent concurrent access to the dump file. try { - ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache)); + ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, cctx.cacheObjectContext(cache)); if (file.writeFully(buf) != buf.limit()) throw new IgniteException("Can't write row"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java index 27146ec3644ef..4a1fd7f1c2910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.dump.DumpEntry; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry; @@ -32,6 +33,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; /** @@ -88,11 +91,11 @@ public void raw(boolean raw) { /** * Dump entry structure: *
-     * +---------+-----------+----------+-----------------+-----+-------+
-     * | 4 bytes | 4 bytes   | 4 bytes  | 8 bytes         |     |       |
-     * +---------+-----------+----------+-----------------+-----+-------+
-     * | CRC     | Data size | cache ID | expiration time | key | value |
-     * +---------+-----------+----------+-----------------+-----+-------+
+     * +---------+-----------+----------+-----------------+-----+-------------+
+     * | 4 bytes | 4 bytes   | 4 bytes  | 8 bytes         |     |     |       |
+     * +---------+-----------+----------+-----------------+-----+-------------+
+     * | CRC     | Data size | cache ID | expiration time | ver | key | value |
+     * +---------+-----------+----------+-----------------+-----+-------------+
      * 
* * @param cache Cache id. @@ -108,11 +111,22 @@ public ByteBuffer writeToBuffer( long expireTime, KeyCacheObject key, CacheObject val, + GridCacheVersion ver, CacheObjectContext coCtx ) throws IgniteCheckedException { + int verSz = Integer.BYTES/*topVer*/ + Long.BYTES/*order*/ + Integer.BYTES/*nodeOrderDrId*/; + + boolean hasConflictVer = ver.otherClusterVersion() != null; + + if (hasConflictVer) + verSz *= 2 /*GridCacheVersion otherClusterVersion*/; + + assert ver.fieldsCount() == (hasConflictVer ? 4 : 3); + int keySz = key.valueBytesLength(coCtx); int valSz = val.valueBytesLength(coCtx); - int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + /*key*/keySz + /*value*/valSz; + int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + + /* hasConflictVersion */1 + /*version*/verSz + /*key*/keySz + /*value*/valSz; int fullSz = dataSz + /*extra bytes for row size*/Integer.BYTES + /*CRC*/Integer.BYTES; @@ -128,6 +142,19 @@ public ByteBuffer writeToBuffer( buf.putInt(cache); buf.putLong(expireTime); + buf.put((byte)(hasConflictVer ? 1 : 0)); + buf.putInt(ver.topologyVersion()); + buf.putLong(ver.order()); + buf.putInt(ver.nodeOrderAndDrIdRaw()); + + if (hasConflictVer) { + GridCacheVersion ver0 = (GridCacheVersion)ver.otherClusterVersion(); + + buf.putInt(ver0.topologyVersion()); + buf.putLong(ver0.order()); + buf.putInt(ver0.nodeOrderAndDrIdRaw()); + } + if (!key.putValue(buf)) throw new IgniteCheckedException("Can't write key"); @@ -195,6 +222,8 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig int cache = buf.getInt(); long expireTime = buf.getLong(); + GridCacheVersion ver = readVersion(buf); + int keySz = buf.getInt(); byte keyType = buf.get(); @@ -225,6 +254,10 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig return expireTime; } + @Override public CacheEntryVersion version() { + return ver; + } + @Override public Object key() { return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary, fakeCacheObjCtx); } @@ -235,6 +268,24 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig }; } + /** @return Written entry version. */ + private static GridCacheVersion readVersion(ByteBuffer buf) { + boolean hasConflictVer = buf.get() == 1; + + int topVer = buf.getInt(); + long order = buf.getLong(); + int nodeOrderDrId = buf.getInt(); + + if (!hasConflictVer) + return new GridCacheVersion(topVer, nodeOrderDrId, order); + + int topVer0 = buf.getInt(); + long order0 = buf.getLong(); + int nodeOrderDrId0 = buf.getInt(); + + return new GridCacheVersionEx(topVer, nodeOrderDrId, order, new GridCacheVersion(topVer0, nodeOrderDrId0, order0)); + } + /** @return Thread local buffer. */ private ByteBuffer threadLocalBuffer() { return thLocBufs.computeIfAbsent(Thread.currentThread().getId(), DFLT_BUF_ALLOC); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java index 290dc0352e64a..e2d22e61a0d1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java @@ -30,6 +30,7 @@ import java.util.function.BiConsumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryObjectEx; import org.apache.ignite.internal.management.cache.PartitionKeyV2; @@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; @@ -392,7 +392,7 @@ public VerifyPartitionContext(HashHolder hash) { public void update( KeyCacheObject key, CacheObject val, - @Nullable GridCacheVersion ver + CacheEntryVersion ver ) throws IgniteCheckedException { partHash += key.hashCode(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java index fdb5207b4a65d..a61a6c8b2d41e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java @@ -386,6 +386,8 @@ else if (!cacheName.equals(DEFAULT_CACHE_NAME)) DumpEntry e = iter.next(); assertNotNull(e); + assertNotNull(e.version()); + assertNull(e.version().otherClusterVersion()); if (e.cacheId() == CU.cacheId(CACHE_0)) assertEquals(USER_FACTORY.apply((Integer)e.key()), e.value()); @@ -430,6 +432,8 @@ protected void checkDefaultCacheEntry(DumpEntry e) { Integer key = (Integer)e.key(); assertEquals(key, e.value()); + assertNotNull(e.version()); + assertNull(e.version().otherClusterVersion()); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java index 429db9f2bd4a9..ff3b959073b8a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java @@ -24,38 +24,67 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.dump.DumpEntry; +import org.apache.ignite.dump.DumpReader; +import org.apache.ignite.dump.DumpReaderConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.platform.model.User; +import org.apache.ignite.plugin.AbstractCachePluginProvider; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; +import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_THREAD_CNT; +import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_TIMEOUT; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; @@ -305,13 +334,41 @@ public void testCheckFailOnCorruptedData() throws Exception { ign.snapshot().createDump(DMP_NAME, null).get(); + String out = invokeCheckCommand(ign, DMP_NAME); + assertContains( null, - invokeCheckCommand(ign, DMP_NAME), + out, "Conflict partition: PartitionKeyV2 [grpId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", grpName=" + DEFAULT_CACHE_NAME + ", partId=" + corruptedPart + "]" ); + + String verPattern = "partVerHash=(-)?[0-9]+"; + String hashPattern = "partHash=(-)?[0-9]+"; + + Matcher m = Pattern.compile(verPattern).matcher(out); + + assertTrue(m.find()); + String ver0 = out.substring(m.start(), m.end()); + + assertTrue(m.find()); + String ver1 = out.substring(m.start(), m.end()); + + assertFalse(m.find()); + + m = Pattern.compile(hashPattern).matcher(out); + + assertTrue(m.find()); + String hash0 = out.substring(m.start(), m.end()); + + assertTrue(m.find()); + String hash1 = out.substring(m.start(), m.end()); + + assertFalse(m.find()); + + assertFalse(Objects.equals(ver0, ver1)); + assertFalse(Objects.equals(hash0, hash1)); } /** */ @@ -465,4 +522,115 @@ public void testCompareRawWithCompressedCacheDumps() throws Exception { ) ); } + + /** */ + @Test + public void testDumpEntryConflictVersion() throws Exception { + IgniteConfiguration cfg = getConfiguration("test").setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + if (!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME)) + return null; + + return new AbstractCachePluginProvider() { + @Override public @Nullable Object createComponent(Class cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return new TestCacheConflictResolutionManager(); + } + }; + } + }); + + IgniteEx ign = startGrid(cfg); + + IgniteCache cache = ign.createCache(new CacheConfiguration() + .setName(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction().setPartitions(3)) + ); + + int topVer = 42; + int dataCenterId = 31; + int nodeOrder = 13; + + Map drMap = new HashMap<>(); + + IgniteInternalCache intCache = ign.cachex(cache.getName()); + + for (int i = 0; i < KEYS_CNT; i++) { + KeyCacheObject key = new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i)); + CacheObject val = new CacheObjectImpl(i, null); + + val.prepareMarshal(intCache.context().cacheObjectContext()); + + drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(topVer, i, nodeOrder, dataCenterId))); + } + + intCache.putAllConflict(drMap); + + ign.snapshot().createDump(DMP_NAME, null).get(getTestTimeout()); + + TestDumpConsumer cnsmr = new TestDumpConsumer() { + @Override public void onPartition(int grp, int part, Iterator data) { + data.forEachRemaining(e -> { + int key = (int)e.key(); + int val = (int)e.value(); + + assertNotNull(e.version()); + + CacheEntryVersion conflictVer = e.version().otherClusterVersion(); + + assertNotNull(conflictVer); + assertEquals(topVer, conflictVer.topologyVersion()); + assertEquals(nodeOrder, conflictVer.nodeOrder()); + assertEquals(dataCenterId, conflictVer.clusterId()); + assertEquals(key, val); + assertEquals(key, conflictVer.order()); + }); + } + }; + + new DumpReader( + new DumpReaderConfiguration( + dumpDirectory(ign, DMP_NAME), + cnsmr, + DFLT_THREAD_CNT, + DFLT_TIMEOUT, + true, + false, + null, + false + ), + log + ).run(); + + cnsmr.check(); + } + + /** */ + public class TestCacheConflictResolutionManager extends GridCacheManagerAdapter + implements CacheConflictResolutionManager { + + /** {@inheritDoc} */ + @Override public CacheVersionConflictResolver conflictResolver() { + return new CacheVersionConflictResolver() { + @Override public GridCacheVersionConflictContext resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + res.useNew(); + + return res; + } + }; + } + } }