From ef7fc63fb8296f9b77569646eba789634a836569 Mon Sep 17 00:00:00 2001 From: yurinaryshkin <135707807+yurinaryshkin@users.noreply.github.com> Date: Fri, 24 Nov 2023 10:08:20 +0300 Subject: [PATCH] IGNITE-20836 Support zipping of dump files (#11040) (cherry picked from commit 36bd6ff35e8012616882a864c1e10aae115a00a3) --- .../ignite/dump/DumpReaderConfiguration.java | 20 --- .../persistence/file/UnzipFileIOFactory.java | 35 ++++ .../snapshot/IgniteSnapshotManager.java | 43 ++++- .../snapshot/SnapshotMetadata.java | 17 ++ .../snapshot/SnapshotOperationRequest.java | 13 +- .../SnapshotPartitionsVerifyHandler.java | 5 +- .../snapshot/SnapshotRestoreProcess.java | 1 + .../snapshot/dump/CreateDumpFutureTask.java | 14 +- .../cache/persistence/snapshot/dump/Dump.java | 25 ++- .../snapshot/dump/WriteOnlyZipFileIO.java | 158 ++++++++++++++++++ .../dump/WriteOnlyZipFileIOFactory.java | 36 ++++ .../snapshot/AbstractSnapshotSelfTest.java | 12 +- .../snapshot/EncryptedSnapshotTest.java | 13 +- .../IgniteSnapshotManagerSelfTest.java | 1 + .../snapshot/dump/AbstractCacheDumpTest.java | 22 ++- .../dump/IgniteCacheDumpSelf2Test.java | 66 +++++++- .../dump/IgniteCacheDumpSelfTest.java | 30 +++- 17 files changed, 458 insertions(+), 53 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIOFactory.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIOFactory.java diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java index 1b43554c3f009..a7a80b8227f21 100644 --- a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java @@ -69,26 +69,6 @@ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) { this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null, false); } - /** - * @param dir Root dump directory. - * @param cnsmr Dump consumer. - * @param thCnt Count of threads to consume dumped partitions. - * @param timeout Timeout of dump reader invocation. - * @param failFast Stop processing partitions if consumer fail to process one. - * @param keepBinary If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}. - * @param cacheGroupNames Cache group names. - */ - public DumpReaderConfiguration(File dir, - DumpConsumer cnsmr, - int thCnt, - Duration timeout, - boolean failFast, - boolean keepBinary, - String[] cacheGroupNames - ) { - this(dir, cnsmr, thCnt, timeout, failFast, keepBinary, cacheGroupNames, false); - } - /** * @param dir Root dump directory. * @param cnsmr Dump consumer. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIOFactory.java new file mode 100644 index 0000000000000..674ba29194914 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIOFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; + +/** + * File I/O factory which provides {@link UnzipFileIO} implementation of FileIO. + */ +public class UnzipFileIOFactory implements FileIOFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public UnzipFileIO create(File file, OpenOption... modes) throws IOException { + return new UnzipFileIO(file); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 18e905635c27e..47e4e1c7f441c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -1193,7 +1193,9 @@ private IgniteInternalFuture initLocalFullSnapshot( parts, withMetaStorage, req.dump(), - locSndrFactory.apply(req.snapshotName(), req.snapshotPath())); + req.compress(), + locSndrFactory.apply(req.snapshotName(), req.snapshotPath()) + ); if (withMetaStorage && task0 instanceof SnapshotFutureTask) { ((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage()) @@ -1220,6 +1222,7 @@ private IgniteInternalFuture initLocalFullSnapshot( req.snapshotName(), cctx.localNode().consistentId().toString(), pdsSettings.folderName(), + req.compress(), cctx.gridConfig().getDataStorageConfiguration().getPageSize(), grpIds, comprGrpIds, @@ -1805,7 +1808,7 @@ private boolean cancelLocalSnapshotTask0(Function, /** {@inheritDoc} */ @Override public IgniteFuture createDump(String name, @Nullable Collection cacheGroupNames) { - return createSnapshot(name, null, cacheGroupNames, false, false, true); + return createSnapshot(name, null, cacheGroupNames, false, false, true, false); } /** @@ -2193,7 +2196,7 @@ public IgniteFutureImpl createSnapshot( boolean incremental, boolean onlyPrimary ) { - return createSnapshot(name, snpPath, null, incremental, onlyPrimary, false); + return createSnapshot(name, snpPath, null, incremental, onlyPrimary, false, false); } /** @@ -2205,6 +2208,7 @@ public IgniteFutureImpl createSnapshot( * @param incremental Incremental snapshot flag. * @param onlyPrimary If {@code true} snapshot only primary copies of partitions. * @param dump If {@code true} cache dump must be created. + * @param compress If {@code true} then compress partition files. * @return Future which will be completed when a process ends. */ public IgniteFutureImpl createSnapshot( @@ -2213,13 +2217,15 @@ public IgniteFutureImpl createSnapshot( @Nullable Collection cacheGroupNames, boolean incremental, boolean onlyPrimary, - boolean dump + boolean dump, + boolean compress ) { A.notNullOrEmpty(name, "Snapshot name cannot be null or empty."); A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_"); A.ensure(!(incremental && onlyPrimary), "Only primary not supported for incremental snapshots"); A.ensure(!(dump && incremental), "Incremental dump not supported"); A.ensure(!(cacheGroupNames != null && !dump), "Cache group names filter supported only for dump"); + A.ensure(!compress || dump, "Compression is supported only for dumps"); try { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); @@ -2244,7 +2250,7 @@ public IgniteFutureImpl createSnapshot( return new IgniteSnapshotFutureImpl(cctx.kernalContext().closure() .callAsync( BALANCE, - new CreateSnapshotCallable(name, cacheGroupNames, incremental, onlyPrimary, dump), + new CreateSnapshotCallable(name, cacheGroupNames, incremental, onlyPrimary, dump, compress), options(Collections.singletonList(crd)).withFailoverDisabled() )); } @@ -2346,7 +2352,8 @@ else if (grps.isEmpty()) incremental, incIdx, onlyPrimary, - dump + dump, + compress )); String msg = @@ -2725,6 +2732,7 @@ public GridCloseableIterator partitionRowIterator(String snpName, * @param parts Collection of pairs group and appropriate cache partition to be snapshot. * @param withMetaStorage {@code true} if all metastorage data must be also included into snapshot. * @param dump {@code true} if cache group dump must be created. + * @param compress If {@code true} then compress partition files. * @param snpSndr Factory which produces snapshot receiver instance. * @return Snapshot operation task which should be registered on checkpoint to run. */ @@ -2736,10 +2744,20 @@ AbstractSnapshotFutureTask registerSnapshotTask( Map> parts, boolean withMetaStorage, boolean dump, + boolean compress, SnapshotSender snpSndr ) { AbstractSnapshotFutureTask task = registerTask(snpName, dump - ? new CreateDumpFutureTask(cctx, srcNodeId, requestId, snpName, snapshotLocalDir(snpName, snpPath), ioFactory, snpSndr, parts) + ? new CreateDumpFutureTask(cctx, + srcNodeId, + requestId, + snpName, + snapshotLocalDir(snpName, snpPath), + ioFactory, + snpSndr, + parts, + compress + ) : new SnapshotFutureTask(cctx, srcNodeId, requestId, snpName, tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff)); if (!withMetaStorage) { @@ -4659,6 +4677,9 @@ private static class CreateSnapshotCallable implements IgniteCallable { /** If {@code true} create cache dump. */ private final boolean dump; + /** If {@code true} then compress partition files. */ + private final boolean comprParts; + /** Auto-injected grid instance. */ @IgniteInstanceResource private transient IgniteEx ignite; @@ -4669,19 +4690,22 @@ private static class CreateSnapshotCallable implements IgniteCallable { * @param incremental If {@code true} then incremental snapshot must be created. * @param onlyPrimary If {@code true} then only copy of primary partitions will be created. * @param dump If {@code true} then cache dump must be created. + * @param comprParts If {@code true} then compress partition files. */ public CreateSnapshotCallable( String snpName, @Nullable Collection cacheGroupNames, boolean incremental, boolean onlyPrimary, - boolean dump + boolean dump, + boolean comprParts ) { this.snpName = snpName; this.cacheGroupNames = cacheGroupNames; this.incremental = incremental; this.onlyPrimary = onlyPrimary; this.dump = dump; + this.comprParts = comprParts; } /** {@inheritDoc} */ @@ -4695,7 +4719,8 @@ public CreateSnapshotCallable( cacheGroupNames, false, onlyPrimary, - dump + dump, + comprParts ).get(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java index cd04a5f80efe6..4bb26d0dc3978 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java @@ -63,6 +63,13 @@ public class SnapshotMetadata implements Serializable { */ private final String folderName; + /** + * If {@code true} then compress partition files. + * This shouldn't be confused with {@link SnapshotMetadata#comprGrpIds} which represents how Ignite keeps data in memory pages + * while {@link SnapshotMetadata#comprParts} represents how dump files are stored on disk. + */ + private final boolean comprParts; + /** Page size of stored snapshot data. */ private final int pageSize; @@ -109,6 +116,7 @@ public class SnapshotMetadata implements Serializable { * @param snpName Snapshot name. * @param consId Consistent id of a node to which this metadata relates. * @param folderName Directory name which stores the data files. + * @param comprParts If {@code true} then compress partition files. * @param pageSize Page size of stored snapshot data. * @param grpIds The list of cache groups ids which were included into snapshot. * @param bltNodes The set of affected by snapshot baseline nodes. @@ -122,6 +130,7 @@ public SnapshotMetadata( String snpName, String consId, String folderName, + boolean comprParts, int pageSize, List grpIds, Collection compGrpIds, @@ -136,6 +145,7 @@ public SnapshotMetadata( this.snpName = snpName; this.consId = consId; this.folderName = folderName; + this.comprParts = comprParts; this.pageSize = pageSize; this.grpIds = grpIds; this.bltNodes = bltNodes; @@ -183,6 +193,13 @@ public String folderName() { return folderName; } + /** + * @return {@code true} if compress partition files. + */ + public boolean compressPartitions() { + return comprParts; + } + /** * @return Page size of stored snapshot data. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java index 90777b1cc560c..795a123642944 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java @@ -94,6 +94,9 @@ public class SnapshotOperationRequest implements Serializable { /** If {@code true} then create dump. */ private final boolean dump; + /** If {@code true} then compress partition files. */ + private final boolean compress; + /** * @param reqId Request ID. * @param opNodeId Operational node ID. @@ -105,6 +108,7 @@ public class SnapshotOperationRequest implements Serializable { * @param incIdx Incremental snapshot index. * @param onlyPrimary If {@code true} snapshot only primary copies of partitions. * @param dump If {@code true} then create dump. + * @param compress If {@code true} then compress partition files. */ public SnapshotOperationRequest( UUID reqId, @@ -116,7 +120,8 @@ public SnapshotOperationRequest( boolean incremental, int incIdx, boolean onlyPrimary, - boolean dump + boolean dump, + boolean compress ) { this.reqId = reqId; this.opNodeId = opNodeId; @@ -128,6 +133,7 @@ public SnapshotOperationRequest( this.incIdx = incIdx; this.onlyPrimary = onlyPrimary; this.dump = dump; + this.compress = compress; startTime = U.currentTimeMillis(); } @@ -207,6 +213,11 @@ public boolean dump() { return dump; } + /** @return If {@code true} then compress partition files. */ + public boolean compress() { + return compress; + } + /** @return Start time. */ public long startTime() { return startTime; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 3ea6d4c126441..3847b19b3a911 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -76,6 +76,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; @@ -143,7 +144,9 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext cctx) { Set parts = meta.partitions().get(grpId) == null ? Collections.emptySet() : new HashSet<>(meta.partitions().get(grpId)); - for (File part : cachePartitionFiles(dir, meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX)) { + for (File part : cachePartitionFiles(dir, + (meta.dump() ? DUMP_FILE_EXT : FILE_SUFFIX) + (meta.compressPartitions() ? ZIP_SUFFIX : "") + )) { int partId = partId(part.getName()); if (!parts.remove(partId)) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 7fa931d586433..a8e70efd0e37d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -421,6 +421,7 @@ public IgniteFutureImpl start( false, incIdx, onlyPrimary, + false, false ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java index 237665c2ab0c6..11314e9d5e5ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java @@ -71,8 +71,8 @@ import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cacheDataFilename; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.dumpPartFileName; /** * Task creates cache group dump. @@ -93,6 +93,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple /** */ private final FileIOFactory ioFactory; + /** If {@code true} then compress partition files. */ + private final boolean compress; + /** * Dump contextes. * Key is [group_id, partition_id] combined in single long value. @@ -120,6 +123,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple * @param ioFactory IO factory. * @param snpSndr Snapshot sender. * @param parts Parts to dump. + * @param compress If {@code true} then compress partition files. */ public CreateDumpFutureTask( GridCacheSharedContext cctx, @@ -129,7 +133,8 @@ public CreateDumpFutureTask( File dumpDir, FileIOFactory ioFactory, SnapshotSender snpSndr, - Map> parts + Map> parts, + boolean compress ) { super( cctx, @@ -141,7 +146,8 @@ public CreateDumpFutureTask( ); this.dumpDir = dumpDir; - this.ioFactory = ioFactory; + this.ioFactory = compress ? new WriteOnlyZipFileIOFactory() : ioFactory; + this.compress = compress; } /** {@inheritDoc} */ @@ -444,7 +450,7 @@ public PartitionDumpContext(CacheGroupContext gctx, int part, ConcurrentMap()); - File dumpFile = new File(groupDirectory(gctx), PART_FILE_PREFIX + part + DUMP_FILE_EXT); + File dumpFile = new File(groupDirectory(gctx), dumpPartFileName(part, compress)); if (!dumpFile.createNewFile()) throw new IgniteException("Dump file can't be created: " + dumpFile); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java index 1874e0727d74b..c27b40375c835 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext; import org.apache.ignite.internal.util.typedef.F; @@ -68,6 +69,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; @@ -103,6 +105,9 @@ public class Dump implements AutoCloseable { */ private final ConcurrentMap thLocBufs = new ConcurrentHashMap<>(); + /** If {@code true} then compress partition files. */ + private final boolean comprParts; + /** * @param dumpDir Dump directory. * @param keepBinary If {@code true} then keep read entries in binary form. @@ -130,6 +135,7 @@ public Dump(File dumpDir, @Nullable String consistentId, boolean keepBinary, boo this.keepBinary = keepBinary; this.cctx = standaloneKernalContext(dumpDir, log); this.raw = raw; + this.comprParts = metadata.get(0).compressPartitions(); } /** @@ -225,14 +231,16 @@ public List configs(String node, int group) { * @return Dump iterator. */ public List partitions(String node, int group) { + String suffix = comprParts ? DUMP_FILE_EXT + ZIP_SUFFIX : DUMP_FILE_EXT; + File[] parts = dumpGroupDirectory(node, group) - .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) && f.getName().endsWith(DUMP_FILE_EXT)); + .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) && f.getName().endsWith(suffix)); if (parts == null) return Collections.emptyList(); return Arrays.stream(parts) - .map(partFile -> Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX, "").replace(DUMP_FILE_EXT, ""))) + .map(partFile -> Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX, "").replace(suffix, ""))) .collect(Collectors.toList()); } @@ -242,12 +250,12 @@ public List partitions(String node, int group) { * @return Dump iterator. */ public DumpedPartitionIterator iterator(String node, int group, int part) { - FileIOFactory ioFactory = new RandomAccessFileIOFactory(); + FileIOFactory ioFactory = comprParts ? new UnzipFileIOFactory() : new RandomAccessFileIOFactory(); FileIO dumpFile; try { - dumpFile = ioFactory.create(new File(dumpGroupDirectory(node, group), PART_FILE_PREFIX + part + DUMP_FILE_EXT)); + dumpFile = ioFactory.create(new File(dumpGroupDirectory(node, group), dumpPartFileName(part, comprParts))); } catch (IOException e) { throw new RuntimeException(e); @@ -317,6 +325,15 @@ private void advance() { }; } + /** + * @param part Partition number. + * @param compressed If {@code true} then compressed partition file. + * @return Dump partition file name. + */ + public static String dumpPartFileName(int part, boolean compressed) { + return PART_FILE_PREFIX + part + DUMP_FILE_EXT + (compressed ? ZIP_SUFFIX : ""); + } + /** @return Root dump directory. */ public File dumpDirectory() { return dumpDir; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java new file mode 100644 index 0000000000000..d52a8f6fd4de4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.ignite.internal.processors.cache.persistence.file.AbstractFileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.util.typedef.internal.A; + +import static java.util.zip.Deflater.BEST_COMPRESSION; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; + +/** + * {@link FileIO} that allows to write ZIP compressed file. + * It doesn't support reading or random access. + * It is not designed for writing concurrently from several threads. + */ +public class WriteOnlyZipFileIO extends AbstractFileIO { + /** */ + private final ZipOutputStream zos; + + /** */ + private final WritableByteChannel ch; + + /** */ + private long pos; + + /** */ + public WriteOnlyZipFileIO(File file) throws IOException { + A.ensure(file.getName().endsWith(ZIP_SUFFIX), "File name should end with " + ZIP_SUFFIX); + + String entryName = file.getName().substring(0, file.getName().length() - ZIP_SUFFIX.length()); + + zos = new ZipOutputStream(new BufferedOutputStream(Files.newOutputStream(Paths.get(file.getPath())))); + + zos.setLevel(BEST_COMPRESSION); + + zos.putNextEntry(new ZipEntry(entryName)); + + ch = Channels.newChannel(zos); + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + return pos; + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf, long position) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buf, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf) throws IOException { + int written = ch.write(srcBuf); + + pos += written; + + return written; + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int write(byte[] buf, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + force(false); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + zos.closeEntry(); + + ch.close(); + } + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + return -1; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return -1; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIOFactory.java new file mode 100644 index 0000000000000..d4660a6011b19 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIOFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; + +/** + * File I/O factory which provides {@link WriteOnlyZipFileIO} implementation of FileIO. + */ +public class WriteOnlyZipFileIOFactory implements FileIOFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public WriteOnlyZipFileIO create(File file, OpenOption... modes) throws IOException { + return new WriteOnlyZipFileIO(file); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 65857a6d93b8e..2f58e2449a88a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -823,8 +823,16 @@ protected static IgniteInternalFuture startLocalSnapshotTask( boolean withMetaStorage, SnapshotSender snpSndr ) throws IgniteCheckedException { - AbstractSnapshotFutureTask task = cctx.snapshotMgr().registerSnapshotTask(snpName, null, cctx.localNodeId(), null, - parts, withMetaStorage, false, snpSndr); + AbstractSnapshotFutureTask task = cctx.snapshotMgr().registerSnapshotTask(snpName, + null, + cctx.localNodeId(), + null, + parts, + withMetaStorage, + false, + false, + snpSndr + ); if (!(task instanceof SnapshotFutureTask)) throw new IgniteCheckedException("Snapshot task hasn't been registered: " + task); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java index a23a61c25a5f4..6f5862ef208ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java @@ -290,9 +290,16 @@ public void testSnapshotTaskIsBlockedWithoutMetastore() throws Exception { IgniteEx ig = startGridsWithCache(1, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg); GridTestUtils.assertThrowsAnyCause(log, - () -> snp(ig).registerSnapshotTask(SNAPSHOT_NAME, null, ig.localNode().id(), - null, F.asMap(CU.cacheId(dfltCacheCfg.getName()), null), false, false, - snp(ig).localSnapshotSenderFactory().apply(SNAPSHOT_NAME, null)).get(TIMEOUT), + () -> snp(ig).registerSnapshotTask(SNAPSHOT_NAME, + null, + ig.localNode().id(), + null, + F.asMap(CU.cacheId(dfltCacheCfg.getName()), null), + false, + false, + false, + snp(ig).localSnapshotSenderFactory().apply(SNAPSHOT_NAME, null) + ).get(TIMEOUT), IgniteCheckedException.class, "Metastore is required because it holds encryption keys"); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java index 2002417dd1167..d226a4b6a49ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java @@ -150,6 +150,7 @@ public void testSnapshotLocalPartitionMultiCpWithLoad() throws Exception { F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null), encryption, false, + false, new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME, null)) { @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { try { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java index 269939c8e2aee..fdb5207b4a65d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java @@ -280,11 +280,11 @@ protected void putData( /** */ protected void checkDump(IgniteEx ign) throws Exception { - checkDump(ign, DMP_NAME); + checkDump(ign, DMP_NAME, false); } /** */ - void checkDump(IgniteEx ign, String name) throws Exception { + void checkDump(IgniteEx ign, String name, boolean expectedComprParts) throws Exception { checkDump(ign, name, null, @@ -292,7 +292,9 @@ void checkDump(IgniteEx ign, String name) throws Exception { KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups), 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)), KEYS_CNT, - false); + false, + expectedComprParts + ); } /** */ @@ -304,11 +306,12 @@ void checkDump( int expectedDfltDumpSz, int expectedGrpDumpSz, int expectedCount, - boolean skipCopies + boolean skipCopies, + boolean expectedComprParts ) throws Exception { checkDumpWithCommand(ign, name, backups); - if (persistence) + if (persistence && !ign.context().clientNode()) assertNull(ign.context().cache().context().database().metaStorage().read(SNP_RUNNING_DIR_KEY)); Dump dump = dump(ign, name); @@ -322,6 +325,7 @@ void checkDump( assertEquals(name, meta.snapshotName()); assertTrue(meta.dump()); assertFalse(meta.cacheGroupIds().contains(CU.cacheId(UTILITY_CACHE_NAME))); + assertEquals(expectedComprParts, meta.compressPartitions()); } List nodesDirs = dump.nodesDirectories(); @@ -536,7 +540,13 @@ public static String invokeCheckCommand(IgniteEx ign, String name, String snpPat /** */ void createDump(IgniteEx ign, String name, @Nullable Collection cacheGroupNames) { - ign.context().cache().context().snapshotMgr().createSnapshot(name, null, cacheGroupNames, false, onlyPrimary, true).get(); + createDump(ign, name, cacheGroupNames, false); + } + + /** */ + void createDump(IgniteEx ign, String name, @Nullable Collection cacheGroupNames, boolean comprParts) { + ign.context().cache().context().snapshotMgr() + .createSnapshot(name, null, cacheGroupNames, false, onlyPrimary, true, comprParts).get(); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java index 029224383cde3..429db9f2bd4a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java @@ -25,12 +25,14 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -57,13 +59,16 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_SNAPSHOT_TMP_DIR; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.CACHE_0; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.USER_FACTORY; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dumpDirectory; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.invokeCheckCommand; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntrySerializer.HEADER_SZ; @@ -330,7 +335,8 @@ public void testCustomLocation() throws Exception { null, false, false, - true + true, + false ).get(); assertFalse( @@ -401,4 +407,62 @@ public void testCheckOnEmptyNode() throws Exception { assertTrue(lsnr.check()); } + + /** */ + @Test + public void testCompareRawWithCompressedCacheDumps() throws Exception { + String id = "test"; + + IgniteEx ign = startGrid(getConfiguration(id).setConsistentId(id)); + + int parts = 20; + + IgniteCache cache = ign.createCache(new CacheConfiguration() + .setName(CACHE_0) + .setAffinity(new RendezvousAffinityFunction().setPartitions(parts)) + ); + + IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i)); + + String rawDump = "rawDump"; + String zipDump = "zipDump"; + + ign.context().cache().context().snapshotMgr() + .createSnapshot(rawDump, null, null, false, true, true, false).get(); + + ign.context().cache().context().snapshotMgr() + .createSnapshot(zipDump, null, null, false, true, true, true).get(); + + stopAllGrids(); + + Map rawSizes = Arrays + .stream(new File(dumpDirectory(ign, rawDump) + "/db/" + id + "/cache-" + CACHE_0).listFiles()) + .filter(f -> !f.getName().equals("cache_data.dat")) + .peek(f -> assertTrue(f.getName().startsWith(PART_FILE_PREFIX) && f.getName().endsWith(DUMP_FILE_EXT))) + .collect(Collectors.toMap( + f -> Integer.parseInt(f.getName().substring(PART_FILE_PREFIX.length(), f.getName().length() - DUMP_FILE_EXT.length())), + File::length + )); + + Map zipSizes = Arrays + .stream(new File(dumpDirectory(ign, zipDump) + "/db/" + id + "/cache-" + CACHE_0).listFiles()) + .filter(f -> !f.getName().equals("cache_data.dat")) + .peek(f -> assertTrue(f.getName().startsWith(PART_FILE_PREFIX) && f.getName().endsWith(DUMP_FILE_EXT + ZIP_SUFFIX))) + .collect(Collectors.toMap( + f -> Integer.parseInt(f.getName().substring(PART_FILE_PREFIX.length(), + f.getName().length() - (DUMP_FILE_EXT + ZIP_SUFFIX).length()) + ), + File::length + )); + + assertEquals(parts, rawSizes.keySet().size()); + + assertEquals("Different set of partitions", rawSizes.keySet(), zipSizes.keySet()); + + rawSizes.keySet().forEach( p -> + assertTrue("Compressed size " + rawSizes.get(p) + " should be smaller than compressed " + zipSizes.get(p), + rawSizes.get(p) > zipSizes.get(p) + ) + ); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java index 564b590c87811..344ca9b02f062 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java @@ -125,7 +125,7 @@ public void testCacheDump() throws Exception { createDump(ign, DMP_NAME + 2, null); - checkDump(ign, DMP_NAME + 2); + checkDump(ign, DMP_NAME + 2, false); if (persistence) { assertThrows(null, () -> ign.snapshot().createSnapshot(DMP_NAME).get(), IgniteException.class, EXISTS_ERR_MSG); @@ -146,6 +146,27 @@ public void testCacheDump() throws Exception { } } + /** */ + @Test + public void testZippedCacheDump() throws Exception { + snpPoolSz = 4; + + try { + IgniteEx ign = startGridAndFillCaches(); + + createDump(ign, DMP_NAME, null, true); + + checkDump(ign, DMP_NAME, true); + + createDump(cli, DMP_NAME + 2, null, true); + + checkDump(cli, DMP_NAME + 2, true); + } + finally { + snpPoolSz = 1; + } + } + /** */ @Test public void testCacheDumpWithReadGroupFilter() throws Exception { @@ -164,6 +185,7 @@ public void testCacheDumpWithReadGroupFilter() throws Exception { 0, 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)), 0, + false, false ); @@ -175,6 +197,7 @@ public void testCacheDumpWithReadGroupFilter() throws Exception { KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups), 0, KEYS_CNT, + false, false ); @@ -186,6 +209,7 @@ public void testCacheDumpWithReadGroupFilter() throws Exception { KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups), 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)), KEYS_CNT, + false, false ); } @@ -212,6 +236,7 @@ public void testSkipCopies() throws Exception { KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups), 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)), KEYS_CNT, + false, false ); @@ -223,7 +248,8 @@ public void testSkipCopies() throws Exception { KEYS_CNT, 2 * KEYS_CNT, KEYS_CNT, - true + true, + false ); } finally {