diff --git a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java index dad76041777..939d95dc584 100644 --- a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java +++ b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java @@ -28,6 +28,7 @@ import org.tron.core.db.PbftSignDataStore; import org.tron.core.db.RecentBlockStore; import org.tron.core.db.RecentTransactionStore; +import org.tron.core.db.TransactionCache; import org.tron.core.db.TransactionStore; import org.tron.core.db2.core.ITronChainBase; import org.tron.core.exception.BadItemException; @@ -237,6 +238,9 @@ public class ChainBaseManager { @Autowired private DbStatService dbStatService; + @Autowired + private TransactionCache transactionCache; + @Getter @Setter private NodeType nodeType; @@ -291,6 +295,7 @@ public void closeAllStore() { closeOneStore(pbftSignDataStore); closeOneStore(sectionBloomStore); closeOneStore(accountAssetStore); + closeOneStore(transactionCache); } // for test only diff --git a/framework/src/main/java/org/tron/core/db/TransactionCache.java b/chainbase/src/main/java/org/tron/core/db/TransactionCache.java similarity index 100% rename from framework/src/main/java/org/tron/core/db/TransactionCache.java rename to chainbase/src/main/java/org/tron/core/db/TransactionCache.java diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index 71cf361b06a..9923a876cad 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -3,10 +3,24 @@ import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import com.google.common.primitives.Longs; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.bouncycastle.util.encoders.Hex; @@ -17,6 +31,7 @@ import org.tron.common.storage.leveldb.LevelDbDataSourceImpl; import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl; import org.tron.common.utils.ByteArray; +import org.tron.common.utils.FileUtil; import org.tron.common.utils.JsonUtil; import org.tron.common.utils.StorageUtils; import org.tron.core.capsule.BytesCapsule; @@ -42,6 +57,7 @@ public class TxCacheDB implements DB, Flusher { private BloomFilter[] bloomFilters = new BloomFilter[2]; // filterStartBlock record the start block of the active filter private volatile long filterStartBlock = INVALID_BLOCK; + private volatile long currentBlockNum = INVALID_BLOCK; // currentFilterIndex records the index of the active filter private volatile int currentFilterIndex = 0; @@ -57,6 +73,12 @@ public class TxCacheDB implements DB, Flusher { // replace persistentStore and optimizes startup performance private RecentTransactionStore recentTransactionStore; + private final Path cacheFile0; + private final Path cacheFile1; + private final Path cacheProperties; + private final Path cacheDir; + private AtomicBoolean isValid = new AtomicBoolean(false); + public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { this.name = name; this.TRANSACTION_COUNT = @@ -85,6 +107,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { MAX_BLOCK_SIZE * TRANSACTION_COUNT); this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), MAX_BLOCK_SIZE * TRANSACTION_COUNT); + cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache"); + this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0"); + this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1"); + this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties"); } @@ -110,6 +136,10 @@ private void initCache() { } public void init() { + if (recovery()) { + isValid.set(true); + return; + } long size = recentTransactionStore.size(); if (size != MAX_BLOCK_SIZE) { // 0. load from persistentStore @@ -129,6 +159,7 @@ public void init() { logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.", bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(), System.currentTimeMillis() - start); + isValid.set(true); } @Override @@ -172,7 +203,7 @@ public void put(byte[] key, byte[] value) { MAX_BLOCK_SIZE * TRANSACTION_COUNT); } bloomFilters[currentFilterIndex].put(key); - + currentBlockNum = blockNum; if (lastMetricBlock != blockNum) { lastMetricBlock = blockNum; Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE, @@ -208,13 +239,15 @@ public Iterator> iterator() { } @Override - public void flush(Map batch) { + public synchronized void flush(Map batch) { + isValid.set(false); batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes())); + isValid.set(true); } @Override public void close() { - reset(); + dump(); bloomFilters[0] = null; bloomFilters[1] = null; persistentStore.close(); @@ -224,6 +257,116 @@ public void close() { public void reset() { } + private boolean recovery() { + FileUtil.createDirIfNotExists(this.cacheDir.toString()); + logger.info("recovery bloomFilters start."); + CompletableFuture loadProperties = CompletableFuture.supplyAsync(this::loadProperties); + CompletableFuture tk0 = loadProperties.thenApplyAsync( + v -> recovery(0, this.cacheFile0)); + CompletableFuture tk1 = loadProperties.thenApplyAsync( + v -> recovery(1, this.cacheFile1)); + + return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { + logger.info("recovery bloomFilters success."); + return true; + }).exceptionally(this::handleException).join(); + } + + private boolean recovery(int index, Path file) { + try (InputStream in = new BufferedInputStream(Files.newInputStream(file, + StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) { + logger.info("recovery bloomFilter[{}] from file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel()); + logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private boolean handleException(Throwable e) { + bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + try { + Files.deleteIfExists(this.cacheFile0); + Files.deleteIfExists(this.cacheFile1); + } catch (Exception ignored) { + + } + logger.info("recovery bloomFilters failed. {}", e.getMessage()); + logger.info("rollback to previous mode."); + return false; + } + + private void dump() { + if (!isValid.get()) { + logger.info("bloomFilters is not valid."); + } + FileUtil.createDirIfNotExists(this.cacheDir.toString()); + logger.info("dump bloomFilters start."); + CompletableFuture task0 = CompletableFuture.runAsync( + () -> dump(0, this.cacheFile0)); + CompletableFuture task1 = CompletableFuture.runAsync( + () -> dump(1, this.cacheFile1)); + CompletableFuture.allOf(task0, task1).thenRun(() -> { + writeProperties(); + logger.info("dump bloomFilters done."); + + }).exceptionally(e -> { + logger.info("dump bloomFilters to file failed. {}", e.getMessage()); + return null; + }).join(); + } + + private void dump(int index, Path file) { + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) { + logger.info("dump bloomFilters[{}] to file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index].writeTo(out); + logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private boolean loadProperties() { + try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream( + this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)), + StandardCharsets.UTF_8)) { + Properties properties = new Properties(); + properties.load(r); + filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock")); + currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum")); + currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex")); + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.", + filterStartBlock, currentBlockNum, currentFilterIndex); + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void writeProperties() { + try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) { + Properties properties = new Properties(); + properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock)); + properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum)); + properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex)); + properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! "); + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.", + filterStartBlock, currentBlockNum, currentFilterIndex); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public TxCacheDB newInstance() { return new TxCacheDB(name, recentTransactionStore); diff --git a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java new file mode 100644 index 00000000000..c3cb7cb2eb6 --- /dev/null +++ b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java @@ -0,0 +1,90 @@ +package org.tron.core.db; + +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.ByteArray; +import org.tron.core.Constant; +import org.tron.core.capsule.BytesCapsule; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.keystore.Wallet; + +@Slf4j +public class TxCacheDBInitTest { + + private static TronApplicationContext context; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private final byte[][] hash = new byte[140000][64]; + + @AfterClass + public static void destroy() { + context.destroy(); + Args.clearParam(); + } + + /** + * Init data. + */ + @BeforeClass + public static void init() throws IOException { + Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(), + "--p2p-disable", "true"}, Constant.TEST_CONF); + context = new TronApplicationContext(DefaultConfig.class); + } + + @Test + public void reload() { + TransactionCache db = context.getBean(TransactionCache.class); + db.initCache(); + putTransaction(); + DefaultListableBeanFactory defaultListableBeanFactory = + (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory(); + queryTransaction(); + defaultListableBeanFactory.destroySingleton("transactionCache"); + TransactionCache transactionCache = new TransactionCache("transactionCache", + context.getBean(RecentTransactionStore.class)); + transactionCache.initCache(); + defaultListableBeanFactory.registerSingleton("transactionCache",transactionCache); + queryTransaction(); + } + + private void putTransaction() { + TransactionCache db = context.getBean(TransactionCache.class); + for (int i = 1; i < 140000; i++) { + hash[i] = Wallet.generateRandomBytes(64); + db.put(hash[i], new BytesCapsule(ByteArray.fromLong(i))); + } + } + + private void queryTransaction() { + TransactionCache db = context.getBean(TransactionCache.class); + // [1,65537] are expired + for (int i = 1; i < 65538; i++) { + try { + Assert.assertFalse("index = " + i, db.has(hash[i])); + } catch (Exception e) { + Assert.fail("transaction should be expired index = " + i); + } + } + // [65538,140000] are in cache + for (int i = 65538; i < 140000; i++) { + try { + Assert.assertTrue("index = " + i, db.has(hash[i])); + } catch (Exception e) { + Assert.fail("transaction should not be expired index = " + i); + } + } + } + +} \ No newline at end of file