Skip to content

Commit

Permalink
IGNITE-20836 Support zipping of dump files (#11040)
Browse files Browse the repository at this point in the history
(cherry picked from commit 36bd6ff)
  • Loading branch information
yurinaryshkin authored and nizhikov committed Nov 27, 2023
1 parent e0d23e4 commit ef7fc63
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,6 @@ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null, false);
}

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
* @param thCnt Count of threads to consume dumped partitions.
* @param timeout Timeout of dump reader invocation.
* @param failFast Stop processing partitions if consumer fail to process one.
* @param keepBinary If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}.
* @param cacheGroupNames Cache group names.
*/
public DumpReaderConfiguration(File dir,
DumpConsumer cnsmr,
int thCnt,
Duration timeout,
boolean failFast,
boolean keepBinary,
String[] cacheGroupNames
) {
this(dir, cnsmr, thCnt, timeout, failFast, keepBinary, cacheGroupNames, false);
}

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.file;

import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;

/**
* File I/O factory which provides {@link UnzipFileIO} implementation of FileIO.
*/
public class UnzipFileIOFactory implements FileIOFactory {
/** */
private static final long serialVersionUID = 0L;

/** {@inheritDoc} */
@Override public UnzipFileIO create(File file, OpenOption... modes) throws IOException {
return new UnzipFileIO(file);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,9 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
parts,
withMetaStorage,
req.dump(),
locSndrFactory.apply(req.snapshotName(), req.snapshotPath()));
req.compress(),
locSndrFactory.apply(req.snapshotName(), req.snapshotPath())
);

if (withMetaStorage && task0 instanceof SnapshotFutureTask) {
((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
Expand All @@ -1220,6 +1222,7 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
req.snapshotName(),
cctx.localNode().consistentId().toString(),
pdsSettings.folderName(),
req.compress(),
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
comprGrpIds,
Expand Down Expand Up @@ -1805,7 +1808,7 @@ private boolean cancelLocalSnapshotTask0(Function<AbstractSnapshotFutureTask<?>,

/** {@inheritDoc} */
@Override public IgniteFuture<Void> createDump(String name, @Nullable Collection<String> cacheGroupNames) {
return createSnapshot(name, null, cacheGroupNames, false, false, true);
return createSnapshot(name, null, cacheGroupNames, false, false, true, false);
}

/**
Expand Down Expand Up @@ -2193,7 +2196,7 @@ public IgniteFutureImpl<Void> createSnapshot(
boolean incremental,
boolean onlyPrimary
) {
return createSnapshot(name, snpPath, null, incremental, onlyPrimary, false);
return createSnapshot(name, snpPath, null, incremental, onlyPrimary, false, false);
}

/**
Expand All @@ -2205,6 +2208,7 @@ public IgniteFutureImpl<Void> createSnapshot(
* @param incremental Incremental snapshot flag.
* @param onlyPrimary If {@code true} snapshot only primary copies of partitions.
* @param dump If {@code true} cache dump must be created.
* @param compress If {@code true} then compress partition files.
* @return Future which will be completed when a process ends.
*/
public IgniteFutureImpl<Void> createSnapshot(
Expand All @@ -2213,13 +2217,15 @@ public IgniteFutureImpl<Void> createSnapshot(
@Nullable Collection<String> cacheGroupNames,
boolean incremental,
boolean onlyPrimary,
boolean dump
boolean dump,
boolean compress
) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
A.ensure(!(incremental && onlyPrimary), "Only primary not supported for incremental snapshots");
A.ensure(!(dump && incremental), "Incremental dump not supported");
A.ensure(!(cacheGroupNames != null && !dump), "Cache group names filter supported only for dump");
A.ensure(!compress || dump, "Compression is supported only for dumps");

try {
cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
Expand All @@ -2244,7 +2250,7 @@ public IgniteFutureImpl<Void> createSnapshot(
return new IgniteSnapshotFutureImpl(cctx.kernalContext().closure()
.callAsync(
BALANCE,
new CreateSnapshotCallable(name, cacheGroupNames, incremental, onlyPrimary, dump),
new CreateSnapshotCallable(name, cacheGroupNames, incremental, onlyPrimary, dump, compress),
options(Collections.singletonList(crd)).withFailoverDisabled()
));
}
Expand Down Expand Up @@ -2346,7 +2352,8 @@ else if (grps.isEmpty())
incremental,
incIdx,
onlyPrimary,
dump
dump,
compress
));

String msg =
Expand Down Expand Up @@ -2725,6 +2732,7 @@ public GridCloseableIterator<CacheDataRow> partitionRowIterator(String snpName,
* @param parts Collection of pairs group and appropriate cache partition to be snapshot.
* @param withMetaStorage {@code true} if all metastorage data must be also included into snapshot.
* @param dump {@code true} if cache group dump must be created.
* @param compress If {@code true} then compress partition files.
* @param snpSndr Factory which produces snapshot receiver instance.
* @return Snapshot operation task which should be registered on checkpoint to run.
*/
Expand All @@ -2736,10 +2744,20 @@ AbstractSnapshotFutureTask<?> registerSnapshotTask(
Map<Integer, Set<Integer>> parts,
boolean withMetaStorage,
boolean dump,
boolean compress,
SnapshotSender snpSndr
) {
AbstractSnapshotFutureTask<?> task = registerTask(snpName, dump
? new CreateDumpFutureTask(cctx, srcNodeId, requestId, snpName, snapshotLocalDir(snpName, snpPath), ioFactory, snpSndr, parts)
? new CreateDumpFutureTask(cctx,
srcNodeId,
requestId,
snpName,
snapshotLocalDir(snpName, snpPath),
ioFactory,
snpSndr,
parts,
compress
)
: new SnapshotFutureTask(cctx, srcNodeId, requestId, snpName, tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff));

if (!withMetaStorage) {
Expand Down Expand Up @@ -4659,6 +4677,9 @@ private static class CreateSnapshotCallable implements IgniteCallable<Void> {
/** If {@code true} create cache dump. */
private final boolean dump;

/** If {@code true} then compress partition files. */
private final boolean comprParts;

/** Auto-injected grid instance. */
@IgniteInstanceResource
private transient IgniteEx ignite;
Expand All @@ -4669,19 +4690,22 @@ private static class CreateSnapshotCallable implements IgniteCallable<Void> {
* @param incremental If {@code true} then incremental snapshot must be created.
* @param onlyPrimary If {@code true} then only copy of primary partitions will be created.
* @param dump If {@code true} then cache dump must be created.
* @param comprParts If {@code true} then compress partition files.
*/
public CreateSnapshotCallable(
String snpName,
@Nullable Collection<String> cacheGroupNames,
boolean incremental,
boolean onlyPrimary,
boolean dump
boolean dump,
boolean comprParts
) {
this.snpName = snpName;
this.cacheGroupNames = cacheGroupNames;
this.incremental = incremental;
this.onlyPrimary = onlyPrimary;
this.dump = dump;
this.comprParts = comprParts;
}

/** {@inheritDoc} */
Expand All @@ -4695,7 +4719,8 @@ public CreateSnapshotCallable(
cacheGroupNames,
false,
onlyPrimary,
dump
dump,
comprParts
).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public class SnapshotMetadata implements Serializable {
*/
private final String folderName;

/**
* If {@code true} then compress partition files.
* This shouldn't be confused with {@link SnapshotMetadata#comprGrpIds} which represents how Ignite keeps data in memory pages
* while {@link SnapshotMetadata#comprParts} represents how dump files are stored on disk.
*/
private final boolean comprParts;

/** Page size of stored snapshot data. */
private final int pageSize;

Expand Down Expand Up @@ -109,6 +116,7 @@ public class SnapshotMetadata implements Serializable {
* @param snpName Snapshot name.
* @param consId Consistent id of a node to which this metadata relates.
* @param folderName Directory name which stores the data files.
* @param comprParts If {@code true} then compress partition files.
* @param pageSize Page size of stored snapshot data.
* @param grpIds The list of cache groups ids which were included into snapshot.
* @param bltNodes The set of affected by snapshot baseline nodes.
Expand All @@ -122,6 +130,7 @@ public SnapshotMetadata(
String snpName,
String consId,
String folderName,
boolean comprParts,
int pageSize,
List<Integer> grpIds,
Collection<Integer> compGrpIds,
Expand All @@ -136,6 +145,7 @@ public SnapshotMetadata(
this.snpName = snpName;
this.consId = consId;
this.folderName = folderName;
this.comprParts = comprParts;
this.pageSize = pageSize;
this.grpIds = grpIds;
this.bltNodes = bltNodes;
Expand Down Expand Up @@ -183,6 +193,13 @@ public String folderName() {
return folderName;
}

/**
* @return {@code true} if compress partition files.
*/
public boolean compressPartitions() {
return comprParts;
}

/**
* @return Page size of stored snapshot data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public class SnapshotOperationRequest implements Serializable {
/** If {@code true} then create dump. */
private final boolean dump;

/** If {@code true} then compress partition files. */
private final boolean compress;

/**
* @param reqId Request ID.
* @param opNodeId Operational node ID.
Expand All @@ -105,6 +108,7 @@ public class SnapshotOperationRequest implements Serializable {
* @param incIdx Incremental snapshot index.
* @param onlyPrimary If {@code true} snapshot only primary copies of partitions.
* @param dump If {@code true} then create dump.
* @param compress If {@code true} then compress partition files.
*/
public SnapshotOperationRequest(
UUID reqId,
Expand All @@ -116,7 +120,8 @@ public SnapshotOperationRequest(
boolean incremental,
int incIdx,
boolean onlyPrimary,
boolean dump
boolean dump,
boolean compress
) {
this.reqId = reqId;
this.opNodeId = opNodeId;
Expand All @@ -128,6 +133,7 @@ public SnapshotOperationRequest(
this.incIdx = incIdx;
this.onlyPrimary = onlyPrimary;
this.dump = dump;
this.compress = compress;
startTime = U.currentTimeMillis();
}

Expand Down Expand Up @@ -207,6 +213,11 @@ public boolean dump() {
return dump;
}

/** @return If {@code true} then compress partition files. */
public boolean compress() {
return compress;
}

/** @return Start time. */
public long startTime() {
return startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
Expand Down Expand Up @@ -143,7 +144,9 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
Set<Integer> parts = meta.partitions().get(grpId) == null ? Collections.emptySet() :
new HashSet<>(meta.partitions().get(grpId));

for (File part : cachePartitionFiles(dir, meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX)) {
for (File part : cachePartitionFiles(dir,
(meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX) + (meta.compressPartitions() ? ZIP_SUFFIX : "")
)) {
int partId = partId(part.getName());

if (!parts.remove(partId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ public IgniteFutureImpl<Void> start(
false,
incIdx,
onlyPrimary,
false,
false
);

Expand Down
Loading

0 comments on commit ef7fc63

Please sign in to comment.