Skip to content

Commit

Permalink
IGNITE-20472 Dump API implemented (#10953)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Oct 6, 2023
1 parent 83177a0 commit 8018473
Show file tree
Hide file tree
Showing 18 changed files with 1,031 additions and 261 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.dump;

import java.util.Iterator;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.lang.IgniteExperimental;

/**
* Consumer of {@link Dump}.
* This consumer will receive all {@link DumpEntry} stored in cache dump during {@code IgniteDumpReader} application invocation.
* The lifecycle of the consumer is the following:
* <ul>
* <li>Start of the consumer {@link #start()}.</li>
* <li>Stop of the consumer {@link #stop()}.</li>
* </ul>
*
*/
@IgniteExperimental
public interface DumpConsumer {
/**
* Starts the consumer.
*/
void start();

/**
* Handles type mappings.
* @param mappings Mappings iterator.
*/
void onMappings(Iterator<TypeMapping> mappings);

/**
* Handles binary types.
* @param types Binary types iterator.
*/
void onTypes(Iterator<BinaryType> types);

/**
* Handles cache configs.
* Note, there can be several copies of cache config in the dump.
* This can happen if dump contains data from several nodes.
* @param caches Stored cache data.
*/
void onCacheConfigs(Iterator<StoredCacheData> caches);

/**
* Handles cache data.
* This method can be invoced by several threads concurrently.
* Note, there can be several copies of group partition in the dump.
* This can happen if dump contains data from several nodes.
* In this case callback will be invoked several time for the same pair of [grp, part] values.
*
* @param grp Group id.
* @param part Partition.
* @param data Cache data iterator.
* @see DumpReaderConfiguration#threadCount()
*/
void onPartition(int grp, int part, Iterator<DumpEntry> data);

/**
* Stops the consumer.
* This method can be invoked only after {@link #start()}.
*/
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
package org.apache.ignite.dump;

import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import java.util.Iterator;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.lang.IgniteExperimental;

/**
* Single cache entry from dump.
*
* @see Dump#iterator(String, int, int)
* @see DumpConsumer#onPartition(int, int, Iterator)
* @see org.apache.ignite.IgniteSnapshot#createDump(String)
*/
@IgniteExperimental
public interface DumpEntry {
/** @return Cache id. */
public int cacheId();
Expand All @@ -31,8 +37,8 @@ public interface DumpEntry {
public long expireTime();

/** @return Key. */
public KeyCacheObject key();
public Object key();

/** @return Value. */
public CacheObject value();
public Object value();
}
210 changes: 210 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.dump;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;

/**
* Dump Reader application.
* The application runs independently of Ignite node process and provides the ability to the {@link DumpConsumer} to consume
* all data stored in cache dump ({@link Dump})
*/
public class DumpReader implements Runnable {
/** Configuration. */
private final DumpReaderConfiguration cfg;

/** Log. */
private final IgniteLogger log;

/**
* @param cfg Dump reader configuration.
* @param log Logger.
*/
public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
this.cfg = cfg;
this.log = log.getLogger(DumpReader.class);
}

/** {@inheritDoc} */
@Override public void run() {
ackAsciiLogo();

try (Dump dump = new Dump(cfg.dumpRoot(), cfg.keepBinary(), false, log)) {
DumpConsumer cnsmr = cfg.consumer();

cnsmr.start();

try {
File[] files = new File(cfg.dumpRoot(), DFLT_MARSHALLER_PATH).listFiles(BinaryUtils::notTmpFile);

if (files != null)
cnsmr.onMappings(CdcMain.typeMappingIterator(files, tm -> true));

cnsmr.onTypes(dump.types());

Map<Integer, List<String>> grpToNodes = new HashMap<>();

for (SnapshotMetadata meta : dump.metadata()) {
for (Integer grp : meta.cacheGroupIds())
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
}

cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
.flatMap(e -> dump.configs(F.first(e.getValue()), e.getKey()).stream())
.iterator());

ExecutorService execSvc = cfg.threadCount() > 1 ? Executors.newFixedThreadPool(cfg.threadCount()) : null;

AtomicBoolean skip = new AtomicBoolean(false);

for (Map.Entry<Integer, List<String>> e : grpToNodes.entrySet()) {
int grp = e.getKey();

for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
Runnable consumePart = () -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grp +
", part=" + part + ']');
}

return;
}

try (DumpedPartitionIterator iter = dump.iterator(node, grp, part)) {
if (log.isDebugEnabled()) {
log.debug("Consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']');
}

cnsmr.onPartition(grp, part, iter);
}
catch (Exception ex) {
skip.set(cfg.failFast());

log.error("Error consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']', ex);

throw new IgniteException(ex);
}
};

if (cfg.threadCount() > 1)
execSvc.submit(consumePart);
else
consumePart.run();
}
}
}

if (cfg.threadCount() > 1) {
execSvc.shutdown();

boolean res = execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);

if (!res) {
log.warning("Dump processing tasks not finished after timeout. Cancelling");

execSvc.shutdownNow();
}
}
}
finally {
cnsmr.stop();
}
}
catch (Exception e) {
throw new IgniteException(e);
}
}

/** */
private void ackAsciiLogo() {
String ver = "ver. " + ACK_VER_STR;

if (log.isInfoEnabled()) {
log.info(NL + NL +
">>> __________ ________________ ___ __ ____ ______ ___ _______ ___ _______" + NL +
">>> / _/ ___/ |/ / _/_ __/ __/ / _ \\/ / / / |/ / _ \\ / _ \\/ __/ _ | / _ \\/ __/ _ \\" + NL +
">>> _/ // (_ / // / / / / _/ / // / /_/ / /|_/ / ___/ / , _/ _// __ |/ // / _// , _/" + NL +
">>> /___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/ /_/_/ /_/|_/___/_/ |_/____/___/_/|_|" + NL +
">>> " + NL +
">>> " + ver + NL +
">>> " + COPYRIGHT + NL +
">>> " + NL +
">>> Ignite documentation: " + "http://" + SITE + NL +
">>> ConsistentId: " + cfg.dumpRoot() + NL +
">>> Consumer: " + U.toStringSafe(cfg.consumer())
);
}

if (log.isQuiet()) {
U.quiet(false,
" __________ ________________ ___ __ ____ ______ ___ _______ ___ _______",
" / _/ ___/ |/ / _/_ __/ __/ / _ \\/ / / / |/ / _ \\ / _ \\/ __/ _ | / _ \\/ __/ _ \\",
" _/ // (_ / // / / / / _/ / // / /_/ / /|_/ / ___/ / , _/ _// __ |/ // / _// , _/",
"/___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/ /_/_/ /_/|_/___/_/ |_/____/___/_/|_|",
"",
ver,
COPYRIGHT,
"",
"Ignite documentation: " + "http://" + SITE,
"Dump: " + cfg.dumpRoot(),
"Consumer: " + U.toStringSafe(cfg.consumer()),
"",
"Quiet mode.");

String fileName = log.fileName();

if (fileName != null)
U.quiet(false, " ^-- Logging to file '" + fileName + '\'');

if (log instanceof GridLoggerProxy)
U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');

U.quiet(false,
" ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to ignite-cdc.{sh|bat}",
"");
}
}
}
Loading

0 comments on commit 8018473

Please sign in to comment.