Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 16, 2024
1 parent 98d40cb commit e59728b
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 43 deletions.
4 changes: 3 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}
}

if (master_info.__isset.cloud_unique_id && config::cloud_unique_id.empty()) {
if (master_info.__isset.cloud_unique_id &&
config::cloud_unique_id != master_info.cloud_unique_id &&
config::enable_use_cloud_unique_id_from_fe) {
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}
Expand Down
10 changes: 3 additions & 7 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,12 @@ class CloudStorageEngine final : public BaseStorageEngine {

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

bool synced = false;
do {
if (vault_id.empty() && latest_fs() != nullptr) {
return StorageResource {latest_fs()};
}
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,6 @@ DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds);

DECLARE_mInt32(tablet_txn_info_min_expired_seconds);

DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

} // namespace doris::config
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,11 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort) thr
}

public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException {
addFrontend(role, host, editLogPort, nodeName, "");
}

public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName, String cloudUniqueId)
throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
Expand Down Expand Up @@ -3037,6 +3042,7 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort, Str

// Only add frontend after removing the conflict nodes, to ensure the exception safety.
fe = new Frontend(role, nodeName, host, editLogPort);
fe.setCloudUniqueId(cloudUniqueId);
frontends.put(nodeName, fe);

LOG.info("add frontend: {}", fe);
Expand Down Expand Up @@ -3087,7 +3093,6 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd
}

public void dropFrontendFromBDBJE(FrontendNodeType role, String host, int port) throws DdlException {

if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER
&& selfNode.getHost().equals(host)) {
throw new DdlException("can not drop current master node.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,14 @@ private void checkCloudFes() {
List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();

if (!isUpdateCloudUniqueId) {
// just run once and number of fes is small, so iterating is ok.
// Just run once and number of fes is small, so iterating is ok.
// newly addde fe has cloudUniqueId.
for (Frontend fe : currentFes) {
for (Cloud.NodeInfoPB node : expectedFes) {
if (fe.getHost().equals(Config.enable_fqdn_mode ? node.getHost() : node.getIp())
&& fe.getEditLogPort() == node.getEditLogPort()) {
fe.setCloudUniqueId(node.getCloudUniqueId());
LOG.info("update cloud unique id result {}", fe);
break;
}
}
Expand All @@ -424,6 +426,7 @@ private void checkCloudFes() {
endpoint = endpoint + "_" + fe.getRole();
currentMap.put(endpoint, fe);
}
LOG.info("fes in memory {}", currentMap);
return currentMap;
}, () -> {
// meta service
Expand All @@ -442,10 +445,10 @@ private void checkCloudFes() {
Cloud.NodeInfoPB.NodeType type = node.getNodeType();
// ATTN: just allow to add follower or observer
if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
LOG.warn("impossible !!!, get fe node {} type equel master from ms", node);
LOG.warn("impossible !!!, get fe node {} type equal master from ms", node);
}
FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_FOLLOWER
? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER;
FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_OBSERVER
? FrontendNodeType.OBSERVER : FrontendNodeType.FOLLOWER;
Frontend fe = new Frontend(role,
CloudEnv.genFeNodeNameFromMeta(host, node.getEditLogPort(),
node.getCtime() * 1000), host, node.getEditLogPort());
Expand All @@ -454,6 +457,8 @@ private void checkCloudFes() {
endpoint = endpoint + "_" + fe.getRole();
nodeMap.put(endpoint, fe);
}
LOG.info("fes in ms {}", nodeMap);

return nodeMap;
});
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}, enable auto start: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

import java.io.DataInputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -181,23 +180,12 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());

helperNodes.clear();
if (allNodes.stream().anyMatch(n -> n.getNodeType() == NodeInfoPB.NodeType.FE_FOLLOWER)) {
// multi followers mode, select first
Optional<HostInfo> helperNode = allNodes.stream()
.filter(nodeInfoPB -> nodeInfoPB.getNodeType() != NodeInfoPB.NodeType.FE_OBSERVER)
.map(nodeInfoPB -> new HostInfo(
Config.enable_fqdn_mode ? nodeInfoPB.getHost() : nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
.min(Comparator.comparing(HostInfo::getHost));
helperNode.ifPresent(hostInfo -> helperNodes.add(hostInfo));
} else {
// master observers mode
// helper node select follower's first, just one
helperNodes.addAll(allNodes.stream()
.filter(nodeInfoPB -> nodeInfoPB.getNodeType() == NodeInfoPB.NodeType.FE_MASTER)
.map(nodeInfoPB -> new HostInfo(
Config.enable_fqdn_mode ? nodeInfoPB.getHost() : nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
.collect(Collectors.toList()));
// check only have one master node.
Optional<Cloud.NodeInfoPB> firstNonObserverNode = allNodes.stream().findFirst();
if (firstNonObserverNode.isPresent()) {
helperNodes.add(new HostInfo(
Config.enable_fqdn_mode ? firstNonObserverNode.get().getHost()
: firstNonObserverNode.get().getIp(),
firstNonObserverNode.get().getEditLogPort()));
}
Preconditions.checkState(helperNodes.size() == 1);

Expand Down Expand Up @@ -435,6 +423,10 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd
.getHostPortInAccessibleFormat(host, port) + "]");
}

if (Strings.isNullOrEmpty(frontend.getCloudUniqueId())) {
throw new DdlException("Frontend does not have a cloudUniqueId, wait for a minute.");
}

getCloudSystemInfoService().dropFrontend(frontend);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
requestBuilder.setDbId(dbId);

LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, partitionId: {}, partitionName: {}, "
+ "indexId: {}, vault name {}",
+ "indexId: {}, vault name: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, storageVaultName);
Cloud.CreateTabletsResponse resp = sendCreateTabletsRpc(requestBuilder);
// If the resp has no vault id set, it means the MS is running with enable_storage_vault false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public synchronized void updateFrontends(List<Frontend> toAdd, List<Frontend> to
}
try {
Env.getCurrentEnv().addFrontend(fe.getRole(),
fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
fe.getHost(), fe.getEditLogPort(), fe.getNodeName(), fe.getCloudUniqueId());
LOG.info("added cloud frontend={} ", fe);
} catch (DdlException e) {
LOG.warn("failed to add cloud frontend={} ", fe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,11 @@ class Suite implements GroovyInterceptable {
return s3Url
}

String getJdbcPassword() {
String sk = context.config.otherConfigs.get("jdbcPassword");
return sk
}

static void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) {
String cmd = "scp -o StrictHostKeyChecking=no -r ${username}@${host}:${files} ${filePath}"
if (!fromDst) {
Expand Down Expand Up @@ -1769,7 +1774,7 @@ class Suite implements GroovyInterceptable {

drop_cluster_api.call(js) {
respCode, body ->
log.info("dorp cluster resp: ${body} ${respCode}".toString())
log.info("drop cluster resp: ${body} ${respCode}".toString())
def json = parseJson(body)
assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class SuiteContext implements Closeable {
public <T> T connect(String user, String password, String url, Closure<T> actionSupplier) {
def originConnection = threadLocalConn.get()
try {
log.info("Create new connection for user '${user}'")
log.info("Create new connection for user '${user}' to '${url}'")
return DriverManager.getConnection(url, user, password).withCloseable { newConn ->
def newConnInfo = new ConnectionInfo()
newConnInfo.conn = newConn
Expand All @@ -306,7 +306,7 @@ class SuiteContext implements Closeable {
return actionSupplier.call()
}
} finally {
log.info("Recover original connection")
log.info("Recover original connection to '${url}'")
if (originConnection == null) {
threadLocalConn.remove()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') {

def jsonSlurper = new JsonSlurper()
def jsonObject = jsonSlurper.parseText(tag)
String cloudClusterId = jsonObject.cloud_cluster_id
String cloudClusterId = jsonObject.compute_group_id
String uniqueId = jsonObject.cloud_unique_id

sleep(5 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ suite('test_tvf_in_cloud', 'multi_cluster,docker') {

def jsonSlurper = new JsonSlurper()
def jsonObject = jsonSlurper.parseText(tag)
def cloudClusterId = jsonObject.cloud_cluster_id
def cloudClusterId = jsonObject.compute_group_id
// multi cluster env

// current cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,24 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
cluster.restartFrontends();
cluster.restartBackends();

sleep(30000)
context.reconnectFe()
def reconnectFe = {
sleep(10000)
logger.info("Reconnecting to a new frontend...")
def newFe = cluster.getMasterFe()
if (newFe) {
logger.info("New frontend found: ${newFe.host}:${newFe.httpPort}")
def url = String.format(
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
newFe.host, newFe.queryPort)
url = context.config.buildUrlWithDb(url, context.dbName)
context.connectTo(url, context.config.jdbcUser, context.config.jdbcPassword)
logger.info("Successfully reconnected to the new frontend")
} else {
logger.error("No new frontend found to reconnect")
}
}

reconnectFe()

checkClusterStatus(3, 3, 1)

Expand Down Expand Up @@ -265,6 +281,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
maxWaitSeconds = 300
waited = 0
while (waited < maxWaitSeconds) {
reconnectFe()
def currentFrontends = sql_return_maparray("SHOW FRONTENDS")
if (currentFrontends.size() == frontends.size() - 1) {
logger.info("Non-master frontend successfully dropped")
Expand Down Expand Up @@ -332,6 +349,8 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {

// Drop the frontend
sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """
sleep(30000)
reconnectFe()

// Wait for the frontend to be fully dropped
maxWaitSeconds = 300
Expand Down Expand Up @@ -371,7 +390,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
}

// Verify cluster status after adding the frontend back
checkClusterStatus(3, 3, 5)
checkClusterStatus(3, 3, 6)

logger.info("Frontend successfully added back and cluster status verified")
// CASE 6. If fe can not drop itself.
Expand Down Expand Up @@ -421,7 +440,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
def originalBackendCount = 3 // As per the initial setup in this test
assert currentBackends.size() == originalBackendCount, "Number of backends should remain unchanged after attempting to drop a non-existent backend"

checkClusterStatus(3, 3, 6)
checkClusterStatus(3, 3, 7)

// CASE 8. Decommission a backend and verify the process
logger.info("Attempting to decommission a backend")
Expand Down Expand Up @@ -469,7 +488,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {

logger.info("Successfully decommissioned backend and verified its status")

checkClusterStatus(3, 3, 7)
checkClusterStatus(3, 3, 8)
}
}

Expand Down

0 comments on commit e59728b

Please sign in to comment.