Skip to content

Commit

Permalink
[refact](bdbje) Refact BDBEnvironment and BDBJEJournal
Browse files Browse the repository at this point in the history
* Add more ut about "org.apache.doris.journal.bdbje"
* Make tiny refactor about "org.apache.doris.journal.bdbje"
  • Loading branch information
SWJTU-ZhangLei committed Nov 29, 2023
1 parent 83ed8d3 commit 5c3fef3
Show file tree
Hide file tree
Showing 12 changed files with 836 additions and 91 deletions.
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star

if (Config.enable_bdbje_debug_mode) {
// Start in BDB Debug mode
BDBDebugger.get().startDebugMode(dorisHomeDir);
BDBDebugger.get().startDebugMode(Config.meta_dir + "/bdb");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@

public class BDBStateChangeListener implements StateChangeListener {
public static final Logger LOG = LogManager.getLogger(BDBStateChangeListener.class);
private final boolean isElectable;

public BDBStateChangeListener() {
public BDBStateChangeListener(boolean isElectable) {
this.isElectable = isElectable;
}

@Override
Expand All @@ -41,7 +43,7 @@ public synchronized void stateChange(StateChangeEvent sce) throws RuntimeExcepti
break;
}
case REPLICA: {
if (Env.getCurrentEnv().isElectable()) {
if (isElectable) {
newType = FrontendNodeType.FOLLOWER;
} else {
newType = FrontendNodeType.OBSERVER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ public class BDBDebugger {
private static final Logger LOG = LogManager.getLogger(BDBDebugger.class);
private BDBDebugEnv debugEnv;

private static class SingletonHolder {
private static final BDBDebugger INSTANCE = new BDBDebugger();
}

public static BDBDebugger get() {
return SingletonHolder.INSTANCE;
}

/**
* Start in BDB Debug mode.
*/
public void startDebugMode(String dorisHomeDir) {
public void startDebugMode(String bdbHome) {
try {
initDebugEnv();
startService(dorisHomeDir);
initDebugEnv(bdbHome);
startService();
while (true) {
Thread.sleep(2000);
}
Expand All @@ -78,8 +82,13 @@ public void startDebugMode(String dorisHomeDir) {
}
}

private void initDebugEnv(String bdbHome) throws BDBDebugException {
debugEnv = new BDBDebugEnv(bdbHome);
debugEnv.init();
}

// Only start MySQL and HttpServer
private void startService(String dorisHomeDir) throws Exception {
private void startService() throws Exception {
// HTTP server

HttpServer httpServer = new HttpServer();
Expand All @@ -94,19 +103,10 @@ private void startService(String dorisHomeDir) throws Exception {
ThreadPoolManager.registerAllThreadPoolMetric();
}

private void initDebugEnv() throws BDBDebugException {
debugEnv = new BDBDebugEnv(Config.meta_dir + "/bdb/");
debugEnv.init();
}

public BDBDebugEnv getEnv() {
return debugEnv;
}

private static class SingletonHolder {
private static final BDBDebugger INSTANCE = new BDBDebugger();
}

/**
* A wrapper class of the BDBJE environment, used to obtain information in bdbje.
*/
Expand Down Expand Up @@ -144,7 +144,9 @@ public Long getJournalNumber(String dbName) {
dbConfig.setAllowCreate(false);
dbConfig.setReadOnly(true);
Database db = env.openDatabase(null, dbName, dbConfig);
return db.count();
long journalNumber = db.count();
db.close();
return journalNumber;
}

/**
Expand Down Expand Up @@ -172,6 +174,8 @@ public List<Long> getJournalIds(String dbName) throws BDBDebugException {
Long id = idBinding.entryToObject(key);
journalIds.add(id);
}
cursor.close();
db.close();
} catch (Exception e) {
LOG.warn("failed to get journal ids of {}", dbName, e);
throw new BDBDebugException("failed to get journal ids of database " + dbName, e);
Expand Down Expand Up @@ -205,6 +209,7 @@ public JournalEntityWrapper getJournalEntity(String dbName, Long journalId) {

// get the journal
OperationStatus status = db.get(null, key, value, LockMode.READ_COMMITTED);
db.close();
if (status == OperationStatus.SUCCESS) {
byte[] retData = value.getData();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData));
Expand All @@ -223,6 +228,14 @@ public JournalEntityWrapper getJournalEntity(String dbName, Long journalId) {
MetaContext.remove();
return entityWrapper;
}

public void close() {
try {
env.close();
} catch (Exception e) {
LOG.warn("exception:", e);
}
}
}

public static class JournalEntityWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.BDBStateChangeListener;
import org.apache.doris.ha.FrontendNodeType;
Expand Down Expand Up @@ -72,10 +71,8 @@ public class BDBEnvironment {
private static final int MEMORY_CACHE_PERCENT = 20;
private static final List<String> BDBJE_LOG_LEVEL = ImmutableList.of("OFF", "SEVERE", "WARNING",
"INFO", "CONFIG", "FINE", "FINER", "FINEST", "ALL");

public static final String PALO_JOURNAL_GROUP = "PALO_JOURNAL_GROUP";


private ReplicatedEnvironment replicatedEnvironment;
private EnvironmentConfig environmentConfig;
private ReplicationConfig replicationConfig;
Expand All @@ -84,15 +81,19 @@ public class BDBEnvironment {
private ReentrantReadWriteLock lock;
private List<Database> openedDatabases;

public BDBEnvironment() {
private final boolean isElectable;
private final boolean metadataFailureRecovery;

public BDBEnvironment(boolean isElectable, boolean metadataFailureRecovery) {
openedDatabases = new ArrayList<Database>();
this.lock = new ReentrantReadWriteLock(true);
this.isElectable = isElectable;
this.metadataFailureRecovery = metadataFailureRecovery;
}

// The setup() method opens the environment and database
public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
String helperHostPort, boolean isElectable) {
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
String helperHostPort) {
// Almost never used, just in case the master can not restart
if (metadataFailureRecovery) {
if (!isElectable) {
Expand All @@ -112,7 +113,7 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
replicationConfig.setNodeHostPort(selfNodeHostPort);
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setGroupName(PALO_JOURNAL_GROUP);
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10");
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10 s");
replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
String.valueOf(Config.txn_rollback_limit));
Expand Down Expand Up @@ -170,6 +171,9 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
// open environment and epochDB
for (int i = 0; i < RETRY_TIME; i++) {
try {
if (replicatedEnvironment != null) {
this.close();
}
// open the environment
replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

Expand All @@ -178,12 +182,13 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
Env.getCurrentEnv().setHaProtocol(protocol);

// start state change listener
StateChangeListener listener = new BDBStateChangeListener();
StateChangeListener listener = new BDBStateChangeListener(isElectable);
replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit
epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
break;
} catch (InsufficientLogException insufficientLogEx) {
LOG.info("i:{} insufficientLogEx:", i, insufficientLogEx);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false); // delete obsolete log files.
Expand All @@ -193,6 +198,7 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
// default selection of providers is not suitable.
restore.execute(insufficientLogEx, config);
} catch (DatabaseException e) {
LOG.info("i:{} exception:", i, e);
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
Expand Down Expand Up @@ -381,6 +387,7 @@ public void close() {
if (epochDB != null) {
try {
epochDB.close();
epochDB = null;
} catch (DatabaseException exception) {
LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception);
}
Expand All @@ -390,19 +397,7 @@ public void close() {
try {
// Finally, close the store and environment.
replicatedEnvironment.close();
} catch (DatabaseException exception) {
LOG.error("Error closing replicatedEnvironment", exception);
}
}
}

// Close environment
public void closeReplicatedEnvironment() {
if (replicatedEnvironment != null) {
try {
openedDatabases.clear();
// Finally, close the store and environment.
replicatedEnvironment.close();
replicatedEnvironment = null;
} catch (DatabaseException exception) {
LOG.error("Error closing replicatedEnvironment", exception);
}
Expand All @@ -413,18 +408,22 @@ public void closeReplicatedEnvironment() {
public void openReplicatedEnvironment(File envHome) {
for (int i = 0; i < RETRY_TIME; i++) {
try {
if (replicatedEnvironment != null) {
this.close();
}
// open the environment
replicatedEnvironment =
new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

// start state change listener
StateChangeListener listener = new BDBStateChangeListener();
StateChangeListener listener = new BDBStateChangeListener(isElectable);
replicatedEnvironment.setStateChangeListener(listener);

// open epochDB. the first parameter null means auto-commit
epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
break;
} catch (DatabaseException e) {
LOG.info("i:{} exception:", i, e);
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
Expand All @@ -439,7 +438,7 @@ public void openReplicatedEnvironment(File envHome) {
}
}

private SyncPolicy getSyncPolicy(String policy) {
private static SyncPolicy getSyncPolicy(String policy) {
if (policy.equalsIgnoreCase("SYNC")) {
return Durability.SyncPolicy.SYNC;
}
Expand All @@ -450,7 +449,7 @@ private SyncPolicy getSyncPolicy(String policy) {
return Durability.SyncPolicy.WRITE_NO_SYNC;
}

private ReplicaAckPolicy getAckPolicy(String policy) {
private static ReplicaAckPolicy getAckPolicy(String policy) {
if (policy.equalsIgnoreCase("ALL")) {
return Durability.ReplicaAckPolicy.ALL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.journal.bdbje;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.NetUtils;
Expand Down Expand Up @@ -73,14 +74,6 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
private AtomicLong nextJournalId = new AtomicLong(1);

public BDBJEJournal(String nodeName) {
initBDBEnv(nodeName);
}

/*
* Initialize bdb environment.
* node name is ip_port (the port is edit_log_port)
*/
private void initBDBEnv(String nodeName) {
environmentPath = Env.getServingEnv().getBdbDir();
HostInfo selfNode = Env.getServingEnv().getSelfNode();
selfNodeName = nodeName;
Expand Down Expand Up @@ -325,13 +318,14 @@ public void close() {
public synchronized void open() {
if (bdbEnvironment == null) {
File dbEnv = new File(environmentPath);
bdbEnvironment = new BDBEnvironment();

boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
bdbEnvironment = new BDBEnvironment(Env.getServingEnv().isElectable(), metadataFailureRecovery);

HostInfo helperNode = Env.getServingEnv().getHelperNode();
String helperHostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort());
try {
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
Env.getServingEnv().isElectable());
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort);
} catch (Exception e) {
if (e instanceof DatabaseNotFoundException) {
LOG.error("It is not allowed to set metadata_failure_recovery"
Expand Down Expand Up @@ -378,7 +372,7 @@ public synchronized void open() {
reSetupBdbEnvironment(insufficientLogEx);
} catch (RollbackException rollbackEx) {
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
bdbEnvironment.closeReplicatedEnvironment();
bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
}
}
Expand Down Expand Up @@ -412,8 +406,7 @@ private void reSetupBdbEnvironment(InsufficientLogException insufficientLogEx) {

bdbEnvironment.close();
bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort,
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()),
Env.getServingEnv().isElectable());
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()));
}

@Override
Expand Down Expand Up @@ -504,7 +497,7 @@ public List<Long> getDatabaseNames() {
} catch (RollbackException rollbackEx) {
if (!Env.isCheckpointThread()) {
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
bdbEnvironment.closeReplicatedEnvironment();
bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
} else {
throw rollbackEx;
Expand Down
Loading

0 comments on commit 5c3fef3

Please sign in to comment.