Skip to content

Commit

Permalink
feat(all):name thread pools (tronprotocol#5425)
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed Aug 30, 2023
1 parent f9c42d6 commit 03e903e
Show file tree
Hide file tree
Showing 26 changed files with 201 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
Expand All @@ -46,6 +45,7 @@
import org.tron.common.crypto.zksnark.BN128G2;
import org.tron.common.crypto.zksnark.Fp;
import org.tron.common.crypto.zksnark.PairingCheck;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.runtime.ProgramResult;
import org.tron.common.runtime.vm.DataWord;
Expand Down Expand Up @@ -983,11 +983,13 @@ public Pair<Boolean, byte[]> execute(byte[] rawData) {
public static class BatchValidateSign extends PrecompiledContract {

private static final ExecutorService workers;
private static final String workersName = "validate-sign-contract";
private static final int ENGERYPERSIGN = 1500;
private static final int MAX_SIZE = 16;

static {
workers = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2 + 1);
workers = ExecutorServiceManager.newFixedThreadPool(workersName,
Runtime.getRuntime().availableProcessors() / 2 + 1);
}

@Override
Expand Down Expand Up @@ -1290,10 +1292,12 @@ public static class VerifyTransferProof extends VerifyProof {
private static final Integer[] SIZE = {2080, 2368, 2464, 2752};
private static final ExecutorService workersInConstantCall;
private static final ExecutorService workersInNonConstantCall;
private static final String constantCallName = "verify-transfer-constant-call";
private static final String nonConstantCallName = "verify-transfer-non-constant-call";

static {
workersInConstantCall = Executors.newFixedThreadPool(5);
workersInNonConstantCall = Executors.newFixedThreadPool(5);
workersInConstantCall = ExecutorServiceManager.newFixedThreadPool(constantCallName, 5);
workersInNonConstantCall = ExecutorServiceManager.newFixedThreadPool(nonConstantCallName, 5);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package org.tron.common.storage.metric;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.prometheus.Metrics;
import org.tron.core.db.common.DbSourceInter;
import org.tron.core.db2.common.DB;

@Slf4j(topic = "metrics")
@Component
public class DbStatService {
private static final ScheduledExecutorService statExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("db-stats-thread-%d").build());

private final String esName = "db-stats";
private final ScheduledExecutorService statExecutor =
ExecutorServiceManager.newSingleThreadScheduledExecutor(esName);

public void register(DB<byte[], byte[]> db) {
if (Metrics.enabled()) {
Expand All @@ -32,11 +30,7 @@ public void register(DbSourceInter<byte[]> db) {

public void shutdown() {
if (Metrics.enabled()) {
try {
statExecutor.shutdown();
} catch (Exception e) {
logger.error("{}", e.getMessage());
}
ExecutorServiceManager.shutdownAndAwaitTermination(statExecutor, esName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
Expand All @@ -43,6 +42,7 @@
import org.tron.common.crypto.ECKey.ECDSASignature;
import org.tron.common.crypto.SignInterface;
import org.tron.common.crypto.SignUtils;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.overlay.message.Message;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
Expand Down Expand Up @@ -86,8 +86,9 @@
@Slf4j(topic = "capsule")
public class TransactionCapsule implements ProtoCapsule<Transaction> {

private static final ExecutorService executorService = Executors
.newFixedThreadPool(CommonParameter.getInstance()
private static final String esName = "valid-contract-proto";
private static final ExecutorService executorService = ExecutorServiceManager
.newFixedThreadPool(esName, CommonParameter.getInstance()
.getValidContractProtoThreadNum());
private static final String OWNER_ADDRESS = "ownerAddress_";

Expand Down
33 changes: 14 additions & 19 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.error.TronDBException;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.utils.FileUtil;
Expand Down Expand Up @@ -76,6 +74,7 @@ public class SnapshotManager implements RevokingDatabase {
private Map<String, ListeningExecutorService> flushServices = new HashMap<>();

private ScheduledExecutorService pruneCheckpointThread = null;
private final String pruneName = "checkpoint-prune";

@Autowired
@Setter
Expand All @@ -95,7 +94,7 @@ public void init() {
checkpointVersion = CommonParameter.getInstance().getStorage().getCheckpointVersion();
// prune checkpoint
if (isV2Open()) {
pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor();
pruneCheckpointThread = ExecutorServiceManager.newSingleThreadScheduledExecutor(pruneName);
pruneCheckpointThread.scheduleWithFixedDelay(() -> {
try {
if (!unChecked) {
Expand All @@ -117,18 +116,6 @@ public void init() {
exitThread.start();
}

@PreDestroy
public void close() {
try {
exitThread.interrupt();
// help GC
exitThread = null;
flushServices.values().forEach(ExecutorService::shutdown);
} catch (Exception e) {
logger.warn("exitThread interrupt error", e);
}
}

public static String simpleDecode(byte[] bytes) {
byte[] lengthBytes = Arrays.copyOf(bytes, 4);
int length = Ints.fromByteArray(lengthBytes);
Expand Down Expand Up @@ -177,7 +164,8 @@ public void add(IRevokingDB db) {
Chainbase revokingDB = (Chainbase) db;
dbs.add(revokingDB);
flushServices.put(revokingDB.getDbName(),
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
MoreExecutors.listeningDecorator(ExecutorServiceManager.newSingleThreadExecutor(
"flush-service-" + revokingDB.getDbName())));
}

private void advance() {
Expand Down Expand Up @@ -284,8 +272,15 @@ public synchronized void disable() {

@Override
public void shutdown() {
if (pruneCheckpointThread != null) {
pruneCheckpointThread.shutdown();
ExecutorServiceManager.shutdownAndAwaitTermination(pruneCheckpointThread, pruneName);
flushServices.forEach((key, value) -> ExecutorServiceManager.shutdownAndAwaitTermination(value,
"flush-service-" + key));
try {
exitThread.interrupt();
// help GC
exitThread = null;
} catch (Exception e) {
logger.warn("exitThread interrupt error", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.tron.common.es;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "common")
@Slf4j(topic = "common-executor")
public class ExecutorServiceManager {

public static ExecutorService newSingleThreadExecutor(String name) {
Expand All @@ -29,6 +32,31 @@ public static ScheduledExecutorService newSingleThreadScheduledExecutor(String n
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}

public static ExecutorService newFixedThreadPool(String name, int fixThreads) {
return newFixedThreadPool(name, fixThreads, false);
}

public static ExecutorService newFixedThreadPool(String name, int fixThreads, boolean isDaemon) {
return Executors.newFixedThreadPool(fixThreads,
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build());
}

public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
String name) {
return newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
name, false);
}

public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
String name, boolean isDaemon) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build());
}

public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
if (pool == null) {
return;
Expand Down
14 changes: 10 additions & 4 deletions consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.tron.consensus.pbft;

import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.consensus.base.Param;
import org.tron.consensus.base.Param.Miner;
import org.tron.consensus.dpos.MaintenanceManager;
Expand All @@ -18,7 +19,7 @@

@Slf4j(topic = "pbft")
@Component
public class PbftManager {
public class PbftManager implements Closeable {

@Autowired
private PbftMessageHandle pbftMessageHandle;
Expand All @@ -29,8 +30,8 @@ public class PbftManager {
@Autowired
private ChainBaseManager chainBaseManager;

private ExecutorService executorService = Executors.newFixedThreadPool(10,
r -> new Thread(r, "Pbft"));
private final String esName = "pbft-msg-manager";
private ExecutorService executorService = ExecutorServiceManager.newFixedThreadPool(esName, 10);

@PostConstruct
public void init() {
Expand Down Expand Up @@ -111,4 +112,9 @@ public boolean verifyMsg(PbftBaseMessage msg) {
return witnessList.contains(ByteString.copyFrom(msg.getPublicKey()));
}

@Override
public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -19,6 +18,7 @@
import org.tron.common.backup.socket.EventHandler;
import org.tron.common.backup.socket.MessageHandler;
import org.tron.common.backup.socket.UdpEvent;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;

@Slf4j(topic = "backup")
Expand All @@ -39,7 +39,10 @@ public class BackupManager implements EventHandler {

private Set<String> members = new ConcurrentSet<>();

private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final String esName = "backup-manager";

private ScheduledExecutorService executorService =
ExecutorServiceManager.newSingleThreadScheduledExecutor(esName);

private MessageHandler messageHandler;

Expand Down Expand Up @@ -144,6 +147,10 @@ public void handleEvent(UdpEvent udpEvent) {
}
}

public void stop() {
ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName);
}

@Override
public void channelActivated() {
init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

@Slf4j(topic = "backup")
@Component
public class BackupServer {
public class BackupServer implements AutoCloseable {

private CommonParameter commonParameter = CommonParameter.getInstance();

Expand Down Expand Up @@ -91,10 +91,12 @@ public void initChannel(NioDatagramChannel ch)
}
}

@Override
public void close() {
logger.info("Closing backup server...");
shutdown = true;
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
backupManager.stop();
if (channel != null) {
try {
channel.close().await(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.Constant;
import org.tron.core.config.Configuration;
Expand All @@ -26,10 +25,12 @@ public class DynamicArgs {

private long lastModified = 0;

private ScheduledExecutorService reloadExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService reloadExecutor;
private final String esName = "dynamic-reload";

public void init() {
if (parameter.isDynamicConfigEnable()) {
reloadExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(esName);
logger.info("Start the dynamic loading configuration service");
long checkInterval = parameter.getDynamicConfigCheckInterval();
File config = getConfigFile();
Expand Down Expand Up @@ -108,7 +109,6 @@ private void updateTrustNodes(Config config) {
}

public void close() {
logger.info("Closing the dynamic loading configuration service");
reloadExecutor.shutdown();
ExecutorServiceManager.shutdownAndAwaitTermination(reloadExecutor, esName);
}
}
}
Loading

0 comments on commit 03e903e

Please sign in to comment.