Skip to content

Commit

Permalink
IGNITE-20429 Use per thread buffer for serializers.
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Sep 25, 2023
1 parent 7bb5f8c commit ce17a8d
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface DumpConsumer {
/**
* Starts the consumer.
*/
public void start();
void start();

/**
* Handles type mappings.
Expand All @@ -61,19 +61,21 @@ public interface DumpConsumer {

/**
* Handles cache data.
* This method can be invoced by several threads concurrently.
* Note, there can be several copies of group partition in the dump.
* This can happen if dump contains data from several nodes.
* In this case callback will be invoked several time for the same pair of [grp, part] values.
*
* @param grp Group id.
* @param part Partition.
* @param data Cache data iterator.
* @see DumpReaderConfiguration#threadCount()
*/
void onPartition(int grp, int part, Iterator<DumpEntry> data);

/**
* Stops the consumer.
* This method can be invoked only after {@link #start()}.
*/
public void stop();
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,25 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;

/**
* Dump Reader application.
* The application runs independently of Ignite node process and provides the ability to the {@link DumpConsumer} to consume
* all data stored in cache dump ({@link Dump})
*/
public class IgniteDumpReader implements Runnable {
public class DumpReader implements Runnable {
/** Configuration. */
private final DumpReaderConfiguration cfg;

Expand All @@ -50,7 +53,7 @@ public class IgniteDumpReader implements Runnable {
* @param cfg Dump reader configuration.
* @param kctx Kernal context.
*/
public IgniteDumpReader(DumpReaderConfiguration cfg, GridKernalContext kctx) {
public DumpReader(DumpReaderConfiguration cfg, GridKernalContext kctx) {
this.cfg = cfg;
this.kctx = kctx;
}
Expand Down Expand Up @@ -84,22 +87,48 @@ public IgniteDumpReader(DumpReaderConfiguration cfg, GridKernalContext kctx) {

ExecutorService execSvc = Executors.newFixedThreadPool(cfg.threadCount());

IgniteLogger log = kctx.log(DumpReader.class);
AtomicBoolean skip = new AtomicBoolean(false);

for (Map.Entry<Integer, List<String>> e : grpToNodes.entrySet()) {
int grp = e.getKey();

for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
execSvc.submit(() -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grp +
", part=" + part + ']');
}

return;
}

try (DumpedPartitionIterator iter = dump.iterator(node, grp, part)) {
if (log.isDebugEnabled()) {
log.debug("Consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']');
}

cnsmr.onPartition(grp, part, iter);
}
catch (Exception ex) {
skip.set(cfg.failFast());

log.error("Error consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']', ex);

throw new IgniteException(ex);
}
});
}
}
}

execSvc.shutdown();

execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);
}
finally {
cnsmr.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.ignite.dump;

import java.io.File;
import java.time.Duration;
import org.apache.ignite.lang.IgniteExperimental;

/**
* Configuration class of {@link IgniteDumpReader}.
* Configuration class of {@link DumpReader}.
*
* @see IgniteDumpReader
* @see DumpReader
* @see DumpConsumer
*/
@IgniteExperimental
Expand All @@ -37,15 +38,33 @@ public class DumpReaderConfiguration {
/** Count of threads to consume dumped partitions. */
private final int thCnt;

/** Timeout of dump reader. 1 week, by default. */
private final Duration timeout;

/** Stop processing partitions if consumer fail to process one. */
private final boolean failFast;

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
this(dir, cnsmr, 1, Duration.ofDays(7), true);
}

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
* @param thCnt Count of threads to consume dumped partitions.
* @param timeout Timeout of dump reader invocation.
* @param failFast Stop processing partitions if consumer fail to process one.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt) {
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, Duration timeout, boolean failFast) {
this.dir = dir;
this.cnsmr = cnsmr;
this.thCnt = thCnt;
this.timeout = timeout;
this.failFast = failFast;
}

/** @return Root dump directiory. */
Expand All @@ -62,4 +81,14 @@ public DumpConsumer consumer() {
public int threadCount() {
return thCnt;
}

/** @return Timeout of dump reader invocation. */
public Duration timeout() {
return timeout;
}

/** @return {@code True} if stop processing after first consumer error. */
public boolean failFast() {
return failFast;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
* Map shared across all instances of {@link PartitionDumpContext} and {@link DumpEntrySerializer}.
* We use per thread buffer because number of threads is fewer then number of partitions.
* Regular count of partitions is {@link RendezvousAffinityFunction#DFLT_PARTITION_COUNT}
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} whic is significantly less.
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} which is significantly less.
*/
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class Dump {
* Map shared across all instances of {@link DumpEntrySerializer}.
* We use per thread buffer because number of threads is fewer then number of partitions.
* Regular count of partitions is {@link RendezvousAffinityFunction#DFLT_PARTITION_COUNT}
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} whic is significantly less.
* and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} which is significantly less.
*/
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -33,13 +34,16 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.dump.DumpConsumer;
import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
Expand Down Expand Up @@ -461,4 +465,86 @@ public static String invokeCheckCommand(IgniteEx ign, String name) throws Ignite
void createDump(IgniteEx ign, String name) {
ign.snapshot().createDump(name).get();
}

/** */
public abstract static class TestDumpConsumer implements DumpConsumer {
/** */
private boolean started;

/** */
private boolean stopped;

/** */
private boolean typesCb;

/** */
private boolean mappingcCb;

/** */
private boolean cacheCfgCb;

/** {@inheritDoc} */
@Override public void start() {
assertFalse(started);
assertFalse(mappingcCb);
assertFalse(typesCb);
assertFalse(cacheCfgCb);
assertFalse(stopped);

started = true;
}

/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
assertTrue(started);
assertFalse(mappingcCb);
assertFalse(typesCb);
assertFalse(cacheCfgCb);
assertFalse(stopped);

mappingcCb = true;
}

/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
assertTrue(started);
assertTrue(mappingcCb);
assertFalse(typesCb);
assertFalse(cacheCfgCb);
assertFalse(stopped);

typesCb = true;
}

/** {@inheritDoc} */
@Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
assertTrue(started);
assertTrue(mappingcCb);
assertTrue(typesCb);
assertFalse(cacheCfgCb);
assertFalse(stopped);

cacheCfgCb = true;
}

/** {@inheritDoc} */
@Override public void stop() {
assertTrue(started);
assertTrue(typesCb);
assertTrue(mappingcCb);
assertTrue(cacheCfgCb);
assertFalse(stopped);

stopped = true;
}

/** */
public void check() {
assertTrue(started);
assertTrue(typesCb);
assertTrue(mappingcCb);
assertTrue(cacheCfgCb);
assertTrue(stopped);
}
}
}
Loading

0 comments on commit ce17a8d

Please sign in to comment.