Skip to content

Commit

Permalink
IGNITE-20429 Code review fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Sep 25, 2023
1 parent 9badcdb commit 0eac15a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,35 @@ private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
SnapshotHandlerContext opCtx,
Set<File> partFiles
) throws IgniteCheckedException {
Dump dump = new Dump(cctx.kernalContext(), opCtx.snapshotDirectory());

Collection<PartitionHashRecordV2> partitionHashRecordV2s = U.doInParallel(
cctx.snapshotMgr().snapshotExecutorService(),
partFiles,
part -> caclucateDumpedPartitionHash(dump, cacheGroupName(part.getParentFile()), partId(part.getName()))
GridKernalContext snpCtx = cctx.snapshotMgr().createStandaloneKernalContext(
cctx.kernalContext().compress(),
opCtx.snapshotDirectory(),
opCtx.metadata().folderName()
);

return partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey, r -> r));
for (GridComponent comp : snpCtx)
comp.start();

try {
Dump dump = new Dump(snpCtx, opCtx.snapshotDirectory());

Collection<PartitionHashRecordV2> partitionHashRecordV2s = U.doInParallel(
cctx.snapshotMgr().snapshotExecutorService(),
partFiles,
part -> caclucateDumpedPartitionHash(dump, cacheGroupName(part.getParentFile()), partId(part.getName()))
);

return partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey, r -> r));
}
catch (Throwable t) {
log.error("Error executing handler: ", t);

throw t;
}
finally {
for (GridComponent comp : snpCtx)
comp.stop(true);
}
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class Dump {
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new ConcurrentHashMap<>();

/**
* @param cctx Kernal context.
* @param dumpDir Dump directory.
*/
public Dump(GridKernalContext cctx, File dumpDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.jetbrains.annotations.Nullable;

/**
* Serialization logic for dump.
Expand All @@ -46,12 +43,12 @@ public class DumpEntrySerializer {
/** */
private final FastCrc crc = new FastCrc();

/** Kernal context. */
private @Nullable GridKernalContext cctx;

/** Cache object processor. */
private IgniteCacheObjectProcessor co;

/** Fake context. */
private CacheObjectContext fakeCacheObjCtx;

/**
* @param thLocBufs Thread local buffers.
*/
Expand All @@ -61,8 +58,8 @@ public DumpEntrySerializer(ConcurrentMap<Long, ByteBuffer> thLocBufs) {

/** */
public void kernalContext(GridKernalContext cctx) {
this.cctx = cctx;
co = cctx.cacheObjects();
fakeCacheObjCtx = new CacheObjectContext(cctx, null, null, false, false, false, false, false);
}

/**
Expand Down Expand Up @@ -135,7 +132,7 @@ public ByteBuffer writeToBuffer(
* @return dump entry.
*/
public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, IgniteCheckedException {
assert cctx != null : "Set kernalContext first";
assert co != null : "Set kernalContext first";

ByteBuffer buf = threadLocalBuffer();

Expand Down Expand Up @@ -183,14 +180,7 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig

buf.get(keyBytes, 0, keyBytes.length);

GridCacheContext<?, ?> cacheCtx = Objects.requireNonNull(
cctx.cache().cacheGroup(grp).shared().cacheContext(cache),
"Can't find cache context!"
);

CacheObjectContext coCtx = Objects.requireNonNull(cacheCtx.cacheObjectContext(), "Can't find cache object context!");

KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
KeyCacheObject key = co.toKeyCacheObject(fakeCacheObjCtx, keyType, keyBytes);

if (key.partition() == -1)
key.partition(part);
Expand All @@ -201,7 +191,7 @@ public DumpEntry read(FileIO dumpFile, int grp, int part) throws IOException, Ig

buf.get(valBytes, 0, valBytes.length);

CacheObject val = co.toCacheObject(coCtx, valType, valBytes);
CacheObject val = co.toCacheObject(fakeCacheObjCtx, valType, valBytes);

return new DumpEntry() {
@Override public int cacheId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public abstract class AbstractCacheDumpTest extends GridCommonAbstractTest {
public static final String DMP_NAME = "dump";

/** */
protected static final IntFunction<User> USER_FACTORY = i ->
static final IntFunction<User> USER_FACTORY = i ->
new User(i, ACL.values()[Math.abs(i) % ACL.values().length], new Role("Role" + i, SUPER));

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.platform.model.User;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
Expand All @@ -58,6 +59,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.USER_FACTORY;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.invokeCheckCommand;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
Expand Down Expand Up @@ -234,8 +236,8 @@ public void testCheckFailOnCorruptedData() throws Exception {

IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i));

int correuptedPart = 1;
int corruptedKey = partitionKeys(cache, correuptedPart, 1, 0).get(0);
int corruptedPart = 1;
int corruptedKey = partitionKeys(cache, corruptedPart, 1, 0).get(0);

cache.put(corruptedKey, corruptedKey);

Expand Down Expand Up @@ -285,7 +287,42 @@ public void testCheckFailOnCorruptedData() throws Exception {
invokeCheckCommand(ign, DMP_NAME),
"Conflict partition: PartitionKeyV2 [grpId=" + CU.cacheId(DEFAULT_CACHE_NAME) +
", grpName=" + DEFAULT_CACHE_NAME +
", partId=" + correuptedPart + "]"
", partId=" + corruptedPart + "]"
);
}

/** */
@Test
public void testCheckOnEmptyNode() throws Exception {
String id = "test";

IgniteEx ign = startGrid(getConfiguration(id).setConsistentId(id));

IgniteCache<Integer, Integer> cache = ign.createCache(new CacheConfiguration<Integer, Integer>()
.setName(DEFAULT_CACHE_NAME)
.setBackups(1)
.setAtomicityMode(CacheAtomicityMode.ATOMIC));

IgniteCache<Integer, User> cache2 = ign.createCache(new CacheConfiguration<Integer, User>()
.setName("users")
.setBackups(1)
.setAtomicityMode(CacheAtomicityMode.ATOMIC));

IntStream.range(0, KEYS_CNT).forEach(i -> {
cache.put(i, i);
cache2.put(i, USER_FACTORY.apply(i));
});

ign.snapshot().createDump(DMP_NAME).get();

assertEquals("The check procedure has finished, no conflicts have been found.\n\n", invokeCheckCommand(ign, DMP_NAME));

stopAllGrids();

cleanPersistenceDir(true);

ign = startGrid(getConfiguration(id).setConsistentId(id));

assertEquals("The check procedure has finished, no conflicts have been found.\n\n", invokeCheckCommand(ign, DMP_NAME));
}
}

0 comments on commit 0eac15a

Please sign in to comment.