Skip to content

Commit

Permalink
IGNITE-20708 Support cacheGroupNames for DumpReader (#11008)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurinaryshkin authored Oct 25, 2023
1 parent 2aeba5c commit 6ac26a0
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 34 deletions.
14 changes: 12 additions & 2 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridLoggerProxy;
Expand All @@ -34,6 +37,7 @@
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;

Expand Down Expand Up @@ -85,9 +89,15 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {

Map<Integer, List<String>> grpToNodes = new HashMap<>();

Set<Integer> cacheGroupIds = cfg.cacheGroupNames() != null
? Arrays.stream(cfg.cacheGroupNames()).map(CU::cacheId).collect(Collectors.toSet())
: null;

for (SnapshotMetadata meta : dump.metadata()) {
for (Integer grp : meta.cacheGroupIds())
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
for (Integer grp : meta.cacheGroupIds()) {
if (cacheGroupIds == null || cacheGroupIds.contains(grp))
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
}
}

cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ public class DumpReaderConfiguration {
/** If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}. */
private final boolean keepBinary;

/** Cache group names. */
private String[] cacheGroupNames;

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true);
this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null);
}

/**
Expand All @@ -70,14 +73,18 @@ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
* @param timeout Timeout of dump reader invocation.
* @param failFast Stop processing partitions if consumer fail to process one.
* @param keepBinary If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}.
* @param cacheGroupNames Cache group names.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, Duration timeout, boolean failFast, boolean keepBinary) {
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, Duration timeout, boolean failFast, boolean keepBinary,
String[] cacheGroupNames
) {
this.dir = dir;
this.cnsmr = cnsmr;
this.thCnt = thCnt;
this.timeout = timeout;
this.failFast = failFast;
this.keepBinary = keepBinary;
this.cacheGroupNames = cacheGroupNames;
}

/** @return Root dump directiory. */
Expand Down Expand Up @@ -109,4 +116,9 @@ public boolean failFast() {
public boolean keepBinary() {
return keepBinary;
}

/** @return Cache group names. */
public String[] cacheGroupNames() {
return cacheGroupNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -284,6 +285,25 @@ protected void checkDump(IgniteEx ign) throws Exception {

/** */
void checkDump(IgniteEx ign, String name) throws Exception {
checkDump(ign,
name,
null,
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
KEYS_CNT);
}

/** */
void checkDump(
IgniteEx ign,
String name,
String[] cacheGroupNames,
Set<String> expectedFoundCaches,
int expectedDfltDumpSz,
int expectedGrpDumpSz,
int expectedCount
) throws Exception {
checkDumpWithCommand(ign, name, backups);

if (persistence)
Expand Down Expand Up @@ -318,41 +338,26 @@ void checkDump(IgniteEx ign, String name) throws Exception {
@Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
super.onCacheConfigs(caches);

boolean[] cachesFound = new boolean[3];
Set<String> cachesFound = new HashSet<>();

caches.forEachRemaining(data -> {
if (data.config().getName().equals(DEFAULT_CACHE_NAME)) {
assertFalse(cachesFound[0]);
cachesFound[0] = true;
String cacheName = data.config().getName();

assertEquals(DEFAULT_CACHE_NAME, data.config().getName());
assertFalse(data.sql());
assertTrue(data.queryEntities().isEmpty());
}
else if (data.config().getName().equals(CACHE_0)) {
assertFalse(cachesFound[1]);
cachesFound[1] = true;
assertTrue(cachesFound.add(cacheName));

assertEquals(GRP, data.configuration().getGroupName());
assertEquals(CACHE_0, data.configuration().getName());
assertFalse(data.sql());
assertTrue(data.queryEntities().isEmpty());
}
else if (data.config().getName().equals(CACHE_1)) {
assertFalse(cachesFound[2]);
cachesFound[2] = true;
assertEquals(cacheName, data.configuration().getName());

assertFalse(data.sql());

assertTrue(data.queryEntities().isEmpty());

if (cacheName.startsWith("cache-"))
assertEquals(GRP, data.configuration().getGroupName());
assertEquals(CACHE_1, data.configuration().getName());
assertFalse(data.sql());
assertTrue(data.queryEntities().isEmpty());
}
else
else if (!cacheName.equals(DEFAULT_CACHE_NAME))
throw new IgniteException("Unknown cache");
});

for (boolean found : cachesFound)
assertTrue(found);
assertEquals(expectedFoundCaches, cachesFound);
}

@Override public void onPartition(int grp, int part, Iterator<DumpEntry> iter) {
Expand Down Expand Up @@ -389,10 +394,10 @@ else if (data.config().getName().equals(CACHE_1)) {
@Override public void check() {
super.check();

assertEquals(KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups), dfltDumpSz);
assertEquals(2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)), grpDumpSz);
assertEquals(expectedDfltDumpSz, dfltDumpSz);
assertEquals(expectedGrpDumpSz, grpDumpSz);

IntStream.range(0, KEYS_CNT).forEach(key -> assertTrue(keys.contains(key)));
IntStream.range(0, expectedCount).forEach(key -> assertTrue(keys.contains(key)));
}
};

Expand All @@ -402,7 +407,8 @@ else if (data.config().getName().equals(CACHE_1)) {
cnsmr,
DFLT_THREAD_CNT, DFLT_TIMEOUT,
true,
false
false,
cacheGroupNames
),
log
).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,50 @@ public void testCacheDump() throws Exception {
}
}

/** */
@Test
public void testCacheDumpWithReadGroupFilter() throws Exception {
snpPoolSz = 4;

try {
IgniteEx ign = startGridAndFillCaches();

createDump(ign);

checkDump(
ign,
DMP_NAME,
new String[]{GRP},
new HashSet<>(Arrays.asList(CACHE_0, CACHE_1)),
0,
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
0);

checkDump(
ign,
DMP_NAME,
new String[]{DEFAULT_CACHE_NAME},
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
0,
KEYS_CNT
);

checkDump(
ign,
DMP_NAME,
new String[]{DEFAULT_CACHE_NAME, GRP},
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
KEYS_CNT
);
}
finally {
snpPoolSz = 1;
}
}

/** */
@Test
public void testCacheDumpWithGroupFilter() throws Exception {
Expand Down

0 comments on commit 6ac26a0

Please sign in to comment.