Skip to content

Commit

Permalink
IGNITE-20917 Add entry version to DumpEntry (#11069)
Browse files Browse the repository at this point in the history
(cherry picked from commit b7a8f02)
  • Loading branch information
nizhikov committed Nov 27, 2023
1 parent ef7fc63 commit 0a200d5
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.ignite.dump;

import java.util.Collection;
import java.util.Iterator;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.lang.IgniteExperimental;

Expand All @@ -26,7 +28,7 @@
*
* @see Dump#iterator(String, int, int)
* @see DumpConsumer#onPartition(int, int, Iterator)
* @see org.apache.ignite.IgniteSnapshot#createDump(String)
* @see org.apache.ignite.IgniteSnapshot#createDump(String, Collection)
*/
@IgniteExperimental
public interface DumpEntry {
Expand All @@ -36,6 +38,11 @@ public interface DumpEntry {
/** @return Expiration time. */
public long expireTime();

/**
* @return Version of the entry.
*/
public CacheEntryVersion version();

/** @return Key. */
public Object key();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump, String grp
while (iter.hasNext()) {
DumpEntry e = iter.next();

ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), null);
ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), e.version());

size++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ else if (!changed.get(cache).add(key)) // Entry changed several time during dump
else if (val == null)
reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip.
else {
write(cache, expireTime, key, val);
write(cache, expireTime, key, val, ver);

changedCnt.increment();
}
Expand Down Expand Up @@ -532,25 +532,26 @@ public boolean writeForIterator(
else if (changed.get(cache).contains(key))
written = false;
else
write(cache, expireTime, key, val);
write(cache, expireTime, key, val, ver);

if (log.isTraceEnabled()) {
log.trace("Iterator [" +
"grp=" + grp +
", cache=" + cache +
", part=" + part +
", key=" + key +
", written=" + written + ']');
", written=" + written +
", ver=" + ver + ']');
}

return written;
}

/** */
private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val) {
private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) {
synchronized (serializer) { // Prevent concurrent access to the dump file.
try {
ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache));
ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, cctx.cacheObjectContext(cache));

if (file.writeFully(buf) != buf.limit())
throw new IgniteException("Can't write row");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
Expand All @@ -32,6 +33,8 @@
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;

/**
Expand Down Expand Up @@ -88,11 +91,11 @@ public void raw(boolean raw) {
/**
* Dump entry structure:
* <pre>
* +---------+-----------+----------+-----------------+-----+-------+
* | 4 bytes | 4 bytes | 4 bytes | 8 bytes | | |
* +---------+-----------+----------+-----------------+-----+-------+
* | CRC | Data size | cache ID | expiration time | key | value |
* +---------+-----------+----------+-----------------+-----+-------+
* +---------+-----------+----------+-----------------+-----+-------------+
* | 4 bytes | 4 bytes | 4 bytes | 8 bytes | | | |
* +---------+-----------+----------+-----------------+-----+-------------+
* | CRC | Data size | cache ID | expiration time | ver | key | value |
* +---------+-----------+----------+-----------------+-----+-------------+
* </pre>
*
* @param cache Cache id.
Expand All @@ -108,11 +111,22 @@ public ByteBuffer writeToBuffer(
long expireTime,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
CacheObjectContext coCtx
) throws IgniteCheckedException {
int verSz = Integer.BYTES/*topVer*/ + Long.BYTES/*order*/ + Integer.BYTES/*nodeOrderDrId*/;

boolean hasConflictVer = ver.otherClusterVersion() != null;

if (hasConflictVer)
verSz *= 2 /*GridCacheVersion otherClusterVersion*/;

assert ver.fieldsCount() == (hasConflictVer ? 4 : 3);

int keySz = key.valueBytesLength(coCtx);
int valSz = val.valueBytesLength(coCtx);
int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + /*key*/keySz + /*value*/valSz;
int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES
+ /* hasConflictVersion */1 + /*version*/verSz + /*key*/keySz + /*value*/valSz;

int fullSz = dataSz + /*extra bytes for row size*/Integer.BYTES + /*CRC*/Integer.BYTES;

Expand All @@ -128,6 +142,19 @@ public ByteBuffer writeToBuffer(
buf.putInt(cache);
buf.putLong(expireTime);

buf.put((byte)(hasConflictVer ? 1 : 0));
buf.putInt(ver.topologyVersion());
buf.putLong(ver.order());
buf.putInt(ver.nodeOrderAndDrIdRaw());

if (hasConflictVer) {
GridCacheVersion ver0 = (GridCacheVersion)ver.otherClusterVersion();

buf.putInt(ver0.topologyVersion());
buf.putLong(ver0.order());
buf.putInt(ver0.nodeOrderAndDrIdRaw());
}

if (!key.putValue(buf))
throw new IgniteCheckedException("Can't write key");

Expand Down Expand Up @@ -195,6 +222,8 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig
int cache = buf.getInt();
long expireTime = buf.getLong();

GridCacheVersion ver = readVersion(buf);

int keySz = buf.getInt();

byte keyType = buf.get();
Expand Down Expand Up @@ -225,6 +254,10 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig
return expireTime;
}

@Override public CacheEntryVersion version() {
return ver;
}

@Override public Object key() {
return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary, fakeCacheObjCtx);
}
Expand All @@ -235,6 +268,24 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig
};
}

/** @return Written entry version. */
private static GridCacheVersion readVersion(ByteBuffer buf) {
boolean hasConflictVer = buf.get() == 1;

int topVer = buf.getInt();
long order = buf.getLong();
int nodeOrderDrId = buf.getInt();

if (!hasConflictVer)
return new GridCacheVersion(topVer, nodeOrderDrId, order);

int topVer0 = buf.getInt();
long order0 = buf.getLong();
int nodeOrderDrId0 = buf.getInt();

return new GridCacheVersionEx(topVer, nodeOrderDrId, order, new GridCacheVersion(topVer0, nodeOrderDrId0, order0));
}

/** @return Thread local buffer. */
private ByteBuffer threadLocalBuffer() {
return thLocBufs.computeIfAbsent(Thread.currentThread().getId(), DFLT_BUF_ALLOC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryObjectEx;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
Expand All @@ -46,7 +47,6 @@
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
import org.apache.ignite.internal.util.typedef.F;
Expand Down Expand Up @@ -392,7 +392,7 @@ public VerifyPartitionContext(HashHolder hash) {
public void update(
KeyCacheObject key,
CacheObject val,
@Nullable GridCacheVersion ver
CacheEntryVersion ver
) throws IgniteCheckedException {
partHash += key.hashCode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ else if (!cacheName.equals(DEFAULT_CACHE_NAME))
DumpEntry e = iter.next();

assertNotNull(e);
assertNotNull(e.version());
assertNull(e.version().otherClusterVersion());

if (e.cacheId() == CU.cacheId(CACHE_0))
assertEquals(USER_FACTORY.apply((Integer)e.key()), e.value());
Expand Down Expand Up @@ -430,6 +432,8 @@ protected void checkDefaultCacheEntry(DumpEntry e) {
Integer key = (Integer)e.key();

assertEquals(key, e.value());
assertNotNull(e.version());
assertNull(e.version().otherClusterVersion());
}

/** */
Expand Down
Loading

0 comments on commit 0a200d5

Please sign in to comment.