Skip to content

Commit

Permalink
IGNITE-20429 Dump check command implemented (#10949)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Sep 26, 2023
1 parent f5c7f73 commit 2b7e078
Show file tree
Hide file tree
Showing 11 changed files with 528 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
Expand Down Expand Up @@ -374,15 +375,10 @@ else if (txRec.state() == TransactionState.ROLLED_BACK) {
e.getKey(),
false,
consId,
e.getValue().hash,
e.getValue().verHash,
null,
0,
null,
0,
0,
0,
0
new VerifyPartitionContext(e.getValue())
)
));

Expand Down Expand Up @@ -453,12 +449,12 @@ private Map<Integer, StoredCacheData> readTxCachesData() throws IgniteCheckedExc
}

/** Holder for calculated hashes. */
private static class HashHolder {
public static class HashHolder {
/** */
private int hash;
public int hash;

/** */
private int verHash;
public int verHash;

/** */
public void increment(int hash, int verHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntry;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.util.GridStringBuilder;
Expand Down Expand Up @@ -167,7 +170,7 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
return Collections.emptyMap();
}

return meta.dump() ? checkDumpFiles() : checkSnapshotFiles(opCtx, grpDirs, meta, partFiles, punchHoleEnabled);
return meta.dump() ? checkDumpFiles(opCtx, partFiles) : checkSnapshotFiles(opCtx, grpDirs, meta, partFiles, punchHoleEnabled);
}

/** */
Expand Down Expand Up @@ -309,8 +312,85 @@ private Map<PartitionKeyV2, PartitionHashRecordV2> checkSnapshotFiles(
}

/** */
private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles() {
return Collections.emptyMap();
private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
SnapshotHandlerContext opCtx,
Set<File> partFiles
) throws IgniteCheckedException {
GridKernalContext snpCtx = cctx.snapshotMgr().createStandaloneKernalContext(
cctx.kernalContext().compress(),
opCtx.snapshotDirectory(),
opCtx.metadata().folderName()
);

for (GridComponent comp : snpCtx)
comp.start();

try {
Dump dump = new Dump(snpCtx, opCtx.snapshotDirectory());

Collection<PartitionHashRecordV2> partitionHashRecordV2s = U.doInParallel(
cctx.snapshotMgr().snapshotExecutorService(),
partFiles,
part -> caclucateDumpedPartitionHash(dump, cacheGroupName(part.getParentFile()), partId(part.getName()))
);

return partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey, r -> r));
}
catch (Throwable t) {
log.error("Error executing handler: ", t);

throw t;
}
finally {
for (GridComponent comp : snpCtx)
comp.stop(true);
}
}

/** */
private PartitionHashRecordV2 caclucateDumpedPartitionHash(Dump dump, String grpName, int part) {
if (skipHash()) {
return new PartitionHashRecordV2(
new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
false,
cctx.localNode().consistentId(),
null,
0,
PartitionHashRecordV2.PartitionState.OWNING,
new VerifyPartitionContext()
);
}

try {
String node = cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();

try (Dump.DumpedPartitionIterator iter = dump.iterator(node, CU.cacheId(grpName), part)) {
long size = 0;

VerifyPartitionContext ctx = new VerifyPartitionContext();

while (iter.hasNext()) {
DumpEntry e = iter.next();

ctx.update(e.key(), e.value(), null, ctx);

size++;
}

return new PartitionHashRecordV2(
new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
false,
cctx.localNode().consistentId(),
null,
size,
PartitionHashRecordV2.PartitionState.OWNING,
ctx
);
}
}
catch (Exception e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
Expand Down Expand Up @@ -97,6 +100,14 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
/** Local node is primary for set of group partitions. */
private final Map<Integer, Set<Integer>> grpPrimaries = new ConcurrentHashMap<>();

/**
* Map shared across all instances of {@link PartitionDumpContext} and {@link DumpEntrySerializer}.
* We use per thread buffer because number of threads is fewer then number of partitions.
* Regular count of partitions is {@link RendezvousAffinityFunction#DFLT_PARTITION_COUNT}
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} whic is significantly less.
*/
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new ConcurrentHashMap<>();

/**
* @param cctx Cache context.
* @param srcNodeId Node id which cause snapshot task creation.
Expand Down Expand Up @@ -298,7 +309,10 @@ private void prepare() throws IOException, IgniteCheckedException {
}

closeFut = CompletableFuture.runAsync(
() -> onDone(new SnapshotFutureTaskResult(taken, null), err0),
() -> {
thLocBufs.clear();
onDone(new SnapshotFutureTaskResult(taken, null), err0);
},
cctx.kernalContext().pools().getSystemExecutorService()
);
}
Expand Down Expand Up @@ -329,7 +343,7 @@ private void createDumpLock() throws IgniteCheckedException, IOException {
private PartitionDumpContext dumpContext(int grp, int part) {
return dumpCtxs.computeIfAbsent(
toLong(grp, part),
key -> new PartitionDumpContext(cctx.kernalContext().cache().cacheGroup(grp), part)
key -> new PartitionDumpContext(cctx.kernalContext().cache().cacheGroup(grp), part, thLocBufs)
);
}

Expand Down Expand Up @@ -359,7 +373,7 @@ private class PartitionDumpContext implements Closeable {
private final AffinityTopologyVersion topVer;

/** Partition serializer. */
DumpEntrySerializer serdes;
private final DumpEntrySerializer serdes;

/** If {@code true} context is closed. */
volatile boolean closed;
Expand All @@ -370,8 +384,9 @@ private class PartitionDumpContext implements Closeable {
/**
* @param gctx Group context.
* @param part Partition id.
* @param thLocBufs Thread local buffers.
*/
public PartitionDumpContext(CacheGroupContext gctx, int part) {
public PartitionDumpContext(CacheGroupContext gctx, int part, ConcurrentMap<Long, ByteBuffer> thLocBufs) {
assert gctx != null;

try {
Expand All @@ -381,7 +396,7 @@ public PartitionDumpContext(CacheGroupContext gctx, int part) {

startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? gctx.shared().versions().last() : null;

serdes = new DumpEntrySerializer();
serdes = new DumpEntrySerializer(thLocBufs);
changed = new HashMap<>();

for (int cache : gctx.cacheIds())
Expand Down Expand Up @@ -416,28 +431,24 @@ public void writeChanged(
) {
String reasonToSkip = null;

if (closed) // Partition already saved in dump.
if (closed) // Quick exit. Partition already saved in dump.
reasonToSkip = "partition already saved";
else {
writers.getAndIncrement();

try {
if (startVer != null && ver.isGreater(startVer))
if (closed) // Partition already saved in dump.
reasonToSkip = "partition already saved";
else if (startVer != null && ver.isGreater(startVer))
reasonToSkip = "greater version";
else if (!changed.get(cache).add(key)) // Entry changed several time during dump.
reasonToSkip = "changed several times";
else if (val == null)
reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip.
else {
synchronized (this) {
if (closed) // Partition already saved in dump.
reasonToSkip = "partition already saved";
else {
write(cache, expireTime, key, val);

changedCnt.increment();
}
}
write(cache, expireTime, key, val);

changedCnt.increment();
}
}
finally {
Expand Down Expand Up @@ -478,11 +489,8 @@ public boolean writeForIterator(
written = false;
else if (changed.get(cache).contains(key))
written = false;
else {
synchronized (this) {
write(cache, expireTime, key, val);
}
}
else
write(cache, expireTime, key, val);

if (log.isTraceEnabled()) {
log.trace("Iterator [" +
Expand All @@ -498,20 +506,22 @@ else if (changed.get(cache).contains(key))

/** */
private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val) {
try {
ByteBuffer buf = serdes.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache));
synchronized (serdes) { // Prevent concurrent access to the dump file.
try {
ByteBuffer buf = serdes.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache));

if (file.writeFully(buf) != buf.limit())
throw new IgniteException("Can't write row");
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteException(e);
if (file.writeFully(buf) != buf.limit())
throw new IgniteException("Can't write row");
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}

/** {@inheritDoc} */
@Override public void close() {
synchronized (this) {
synchronized (this) { // Prevent concurrent close invocation.
if (closed)
return;

Expand All @@ -524,8 +534,6 @@ private void write(int cache, long expireTime, KeyCacheObject key, CacheObject v
LockSupport.parkNanos(1_000_000);

U.closeQuiet(file);

serdes = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
Expand All @@ -29,9 +30,13 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
Expand Down Expand Up @@ -67,9 +72,18 @@ public class Dump {
private final GridKernalContext cctx;

/**
* Map shared across all instances of {@link DumpEntrySerializer}.
* We use per thread buffer because number of threads is fewer then number of partitions.
* Regular count of partitions is {@link RendezvousAffinityFunction#DFLT_PARTITION_COUNT}
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} whic is significantly less.
*/
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new ConcurrentHashMap<>();

/**
* @param cctx Kernal context.
* @param dumpDir Dump directory.
*/
public Dump(GridKernalContext cctx, File dumpDir) throws IgniteCheckedException {
public Dump(GridKernalContext cctx, File dumpDir) {
this.cctx = cctx;
this.dumpDir = dumpDir;

Expand Down Expand Up @@ -162,7 +176,7 @@ DumpedPartitionIterator iterator(String node, int group, int part, boolean exclu
throw new RuntimeException(e);
}

DumpEntrySerializer serializer = new DumpEntrySerializer();
DumpEntrySerializer serializer = new DumpEntrySerializer(thLocBufs);

serializer.kernalContext(cctx);

Expand Down
Loading

0 comments on commit 2b7e078

Please sign in to comment.