Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 15, 2024
1 parent 81f442d commit f041141
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 24 deletions.
22 changes: 12 additions & 10 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,21 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint
<< " " << st;
}
}

if (master_info.__isset.cloud_unique_id) {
if (!config::cloud_unique_id.empty() &&
config::cloud_unique_id != master_info.cloud_unique_id) {
LOG(ERROR) << "Cloud unique ID mismatch between FE and BE. FE cloud unique ID: "
<< master_info.cloud_unique_id
<< ", BE cloud unique ID: " << config::cloud_unique_id;
if (master_info.meta_service_endpoint != config::meta_service_endpoint) {
LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE and BE. "
<< "FE meta_service_endpoint: "
<< master_info.meta_service_endpoint
<< ", BE meta_service_endpoint: "
<< config::meta_service_endpoint;
return Status::InvalidArgument<false>(
"fe and be do not work with same cloud unique ID, fe cloud unique ID: {},"
" be cloud unique ID: {}",
master_info.cloud_unique_id, config::cloud_unique_id);
"fe and be do not work in same mode, fe meta_service_endpoint: {},"
" be meta_service_endpoint: {}",
master_info.meta_service_endpoint, config::meta_service_endpoint);
}
}

if (master_info.__isset.cloud_unique_id && config::cloud_unique_id.empty()) {
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class CloudClusterChecker extends MasterDaemon {

private CloudSystemInfoService cloudSystemInfoService;

boolean isUpdateCloudUniqueId = false;

public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) {
super("cloud cluster check", Config.cloud_cluster_check_interval_second * 1000L);
this.cloudSystemInfoService = cloudSystemInfoService;
Expand Down Expand Up @@ -394,6 +396,21 @@ private void checkCloudFes() {
List<Frontend> toAdd = new ArrayList<>();
List<Frontend> toDel = new ArrayList<>();
List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();

if (!isUpdateCloudUniqueId) {
// just run once and number of fes is small, so iterating is ok.
for (Frontend fe : currentFes) {
for (Cloud.NodeInfoPB node : expectedFes) {
if (fe.getHost().equals(Config.enable_fqdn_mode ? node.getHost() : node.getIp())
&& fe.getEditLogPort() == node.getEditLogPort()) {
fe.setCloudUniqueId(node.getCloudUniqueId());
break;
}
}
}
isUpdateCloudUniqueId = true;
}

diffNodes(toAdd, toDel, () -> {
// memory
Map<String, Frontend> currentMap = new HashMap<>();
Expand Down Expand Up @@ -432,6 +449,7 @@ private void checkCloudFes() {
Frontend fe = new Frontend(role,
CloudEnv.genFeNodeNameFromMeta(host, node.getEditLogPort(),
node.getCtime() * 1000), host, node.getEditLogPort());
fe.setCloudUniqueId(node.getCloudUniqueId());
// add type to map key, for diff
endpoint = endpoint + "_" + fe.getRole();
nodeMap.put(endpoint, fe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -71,6 +73,8 @@ public class CloudEnv extends Env {

private CleanCopyJobScheduler cleanCopyJobScheduler;

private String cloudInstanceId;

public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
this.cleanCopyJobScheduler = new CleanCopyJobScheduler();
Expand All @@ -92,7 +96,11 @@ public CloudUpgradeMgr getCloudUpgradeMgr() {
}

public String getCloudInstanceId() {
return String.valueOf(Config.cluster_id);
return cloudInstanceId;
}

private void setCloudInstanceId(String cloudInstanceId) {
this.cloudInstanceId = cloudInstanceId;
}

@Override
Expand All @@ -102,12 +110,17 @@ public void initialize(String[] args) throws Exception {
+ "in cloud mode, because FE should known to which it belongs");
}

if (Strings.isNullOrEmpty(Config.cloud_unique_id)) {
Config.cloud_unique_id = "1:" + getCloudInstanceId() + ":sqlserver";
if (Config.cluster_id != -1) {
setCloudInstanceId(String.valueOf(Config.cluster_id));
}

if (Strings.isNullOrEmpty(Config.cloud_unique_id) && !Strings.isNullOrEmpty(cloudInstanceId)) {
Config.cloud_unique_id = "1:" + cloudInstanceId + ":fe";
LOG.info("cloud_unique_id is empty, setting it to: {}", Config.cloud_unique_id);
}

LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id);
LOG.info("Initializing CloudEnv with cloud_unique_id: {}, cluster_id: {}, cloudInstanceId: {}",
Config.cloud_unique_id, Config.cluster_id, cloudInstanceId);

super.initialize(args);
}
Expand Down Expand Up @@ -231,10 +244,20 @@ protected void getClusterIdAndRole() throws IOException {
}
continue;
}

type = nodeInfoPB.getNodeType();
break;
}

try {
String instanceId;
instanceId = getCloudSystemInfoService().getInstanceId(Config.cloud_unique_id);
setCloudInstanceId(instanceId);
} catch (IOException e) {
LOG.error("Failed to get instance ID from cloud_unique_id: {}", Config.cloud_unique_id, e);
throw e;
}

LOG.info("current fe's role is {}", type == NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
type == NodeInfoPB.NodeType.FE_FOLLOWER ? "FOLLOWER" :
type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" : "UNKNOWN");
Expand Down Expand Up @@ -402,7 +425,17 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd
throw new DdlException("can not drop current master node.");
}

getCloudSystemInfoService().dropFrontend(role, host, port);
Frontend frontend = checkFeExist(host, port);
if (frontend == null) {
throw new DdlException("Frontend does not exist.");
}

if (frontend.getRole() != role) {
throw new DdlException(role.toString() + " does not exist[" + NetUtils
.getHostPortInAccessibleFormat(host, port) + "]");
}

getCloudSystemInfoService().dropFrontend(frontend);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -794,18 +795,29 @@ public Map<String, String> getCloudClusterNameToId() {

// FrontendCluster = SqlServerCluster
private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort,
Cloud.AlterClusterRequest.Operation op) throws DdlException {
String cloudUnqiueID, Cloud.AlterClusterRequest.Operation op) throws DdlException {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to alter frontend due to empty cloud_instance_id");
}

Cloud.NodeInfoPB.NodeType nodeType;
if (role == FrontendNodeType.MASTER) {
nodeType = Cloud.NodeInfoPB.NodeType.FE_MASTER;
} else if (role == FrontendNodeType.FOLLOWER) {
nodeType = Cloud.NodeInfoPB.NodeType.FE_FOLLOWER;
} else if (role == FrontendNodeType.OBSERVER) {
nodeType = Cloud.NodeInfoPB.NodeType.FE_OBSERVER;
} else {
throw new DdlException("unable to alter frontend due to invalid role");
}

// Issue rpc to meta to add this node, then fe master would add this node to its frontends
Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder()
.setCloudUniqueId(cloudUnqiueID)
.setIp(host)
.setHost(host)
.setEditLogPort(editLogPort)
.setNodeType(role == FrontendNodeType.MASTER ? Cloud.NodeInfoPB.NodeType.FE_MASTER
: Cloud.NodeInfoPB.NodeType.FE_OBSERVER)
.setNodeType(nodeType)
.setCtime(System.currentTimeMillis() / 1000)
.build();

Expand Down Expand Up @@ -840,11 +852,12 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort) thr
Cloud.AlterClusterRequest.Operation op;
op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER
: Cloud.AlterClusterRequest.Operation.ADD_NODE;
alterFrontendCluster(role, host, editLogPort, op);
alterFrontendCluster(role, host, editLogPort, Config.cloud_unique_id, op);
}

public void dropFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE);
public void dropFrontend(Frontend frontend) throws DdlException {
alterFrontendCluster(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort(),
frontend.getCloudUniqueId(), Cloud.AlterClusterRequest.Operation.DROP_NODE);
}

private String tryCreateComputeGroup(String clusterName, String computeGroupId) throws UserException {
Expand Down Expand Up @@ -1096,4 +1109,24 @@ public void tryCreateInstance(String instanceId, String name, boolean sseEnabled
throw new DdlException("Failed to create instance");
}
}

public String getInstanceId(String cloudUniqueId) throws IOException {
Cloud.GetInstanceRequest.Builder builder = Cloud.GetInstanceRequest.newBuilder();
builder.setCloudUniqueId(cloudUniqueId);

Cloud.GetInstanceResponse response;
try {
Cloud.GetInstanceRequest request = builder.build();
response = MetaServiceProxy.getInstance().getInstance(request);
LOG.info("get instance info, request: {}, response: {}", request, response);
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("Failed to get instance info, response: {}", response);
throw new IOException("Failed to get instance info");
}
return response.getInstance().getInstanceId();
} catch (RpcException e) {
LOG.warn("Failed to get instance info {}", cloudUniqueId, e);
throw new IOException("Failed to get instance info");
}
}
}
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class Frontend implements Writable {
// used for getIpByHostname
@SerializedName("editLogPort")
private int editLogPort;
@SerializedName("cloudUniqueId")
private String cloudUniqueId;

private String version;

private int queryPort;
Expand Down Expand Up @@ -141,6 +144,14 @@ public List<FeDiskInfo> getDiskInfos() {
return diskInfos;
}

public void setCloudUniqueId(String cloudUniqueId) {
this.cloudUniqueId = cloudUniqueId;
}

public String getCloudUniqueId() {
return cloudUniqueId;
}

/**
* handle Frontend's heartbeat response. Because the replayed journal id is very likely to be
* changed at each heartbeat response, so we simple return true if the heartbeat status is OK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
def clusterOptions = [
new ClusterOptions(),
new ClusterOptions(),
new ClusterOptions(),
new ClusterOptions(),
]

for (options in clusterOptions) {
Expand All @@ -40,12 +42,22 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
"heartbeat_interval_second=1",]
}

clusterOptions[0].sqlModeNodeMgr = true;
clusterOptions[0].beClusterId = true;
clusterOptions[0].beMetaServiceEndpoint = true;

clusterOptions[1].sqlModeNodeMgr = true;
clusterOptions[1].beClusterId = false;
clusterOptions[1].beMetaServiceEndpoint = false;

clusterOptions[2].sqlModeNodeMgr = false;
clusterOptions[2].beClusterId = true;
clusterOptions[2].beMetaServiceEndpoint = true;

clusterOptions[3].sqlModeNodeMgr = false;
clusterOptions[3].beClusterId = false;
clusterOptions[3].beMetaServiceEndpoint = false;

for (options in clusterOptions) {
docker(options) {
logger.info("docker started");
Expand Down Expand Up @@ -217,8 +229,8 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {
assert computeGroups.size() >= 2, "Expected at least 2 compute groups, but got ${computeGroups.size()}"

// Verify that we have a 'default_compute_group' and 'another_compute_group'
def defaultGroup = computeGroups.find { it['Name'] == 'default_compute_group' }
def anotherGroup = computeGroups.find { it['Name'] == 'another_compute_group' }
def defaultGroup = computeGroups.find { it['IsCurrent'] == "TRUE" }
def anotherGroup = computeGroups.find { it['IsCurrent'] == "FALSE" }

assert defaultGroup != null, "Expected to find 'default_compute_group'"
assert anotherGroup != null, "Expected to find 'another_compute_group'"
Expand All @@ -242,11 +254,12 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {

def feHost = feToDropMap['Host']
def feEditLogPort = feToDropMap['EditLogPort']
def feRole = feToDropMap['Role']

logger.info("Dropping non-master frontend: {}:{}", feHost, feEditLogPort)

// Drop the selected non-master frontend
sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """
sql """ ALTER SYSTEM DROP ${feRole} "${feHost}:${feEditLogPort}"; """

// Wait for the frontend to be fully dropped
maxWaitSeconds = 300
Expand Down Expand Up @@ -310,6 +323,57 @@ suite('test_sql_mode_node_mgr', 'docker,p1') {

logger.info("Frontend successfully added back and cluster status verified")

// CASE 6. Drop frontend and add back again
logger.info("Dropping frontend and adding back again")

// Get the frontend to be dropped
def frontendToDrop = frontends.find { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }
assert frontendToDrop != null, "Could not find the frontend to drop"

// Drop the frontend
sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """

// Wait for the frontend to be fully dropped
maxWaitSeconds = 300
waited = 0
while (waited < maxWaitSeconds) {
def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
if (!updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) {
logger.info("Frontend successfully dropped")
break
}
sleep(10000)
waited += 10
}

if (waited >= maxWaitSeconds) {
throw new Exception("Timeout waiting for frontend to be dropped")
}

// Add the frontend back
sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """

// Wait for the frontend to be fully added back
maxWaitSeconds = 300
waited = 0
while (waited < maxWaitSeconds) {
def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
if (updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) {
logger.info("Frontend successfully added back")
break
}
sleep(10000)
waited += 10
}

if (waited >= maxWaitSeconds) {
throw new Exception("Timeout waiting for frontend to be added back")
}

// Verify cluster status after adding the frontend back
checkClusterStatus(3, 3, 5)

logger.info("Frontend successfully added back and cluster status verified")
// CASE 6. If fe can not drop itself.
// 6. Attempt to drop the master FE and expect an exception
logger.info("Attempting to drop the master frontend")
Expand Down

0 comments on commit f041141

Please sign in to comment.