Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 14, 2024
1 parent 16ad094 commit 806738a
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 92 deletions.
31 changes: 10 additions & 21 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,30 +263,19 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint
<< " " << st;
}
}

if (config::cluster_id == -1 && master_info.cluster_id != -1) {
auto st =
config::set_config("cluster_id", std::to_string(master_info.cluster_id), true);
config::set_cloud_unique_id(std::to_string(master_info.cluster_id));
LOG(INFO) << "set config cluster_id " << master_info.cluster_id << " " << st;
}

if (config::cluster_id == -1 && master_info.cluster_id != -1) {
auto st =
config::set_config("cluster_id", std::to_string(master_info.cluster_id), true);
config::set_cloud_unique_id(std::to_string(master_info.cluster_id));
LOG(INFO) << "set config unique_id according to cluster_id " << master_info.cluster_id
<< " " << st;
}

if (config::cluster_id != master_info.cluster_id && master_info.cluster_id != -1) {
LOG(WARNING) << "fe and be run in different cluster, fe in cluster_id: "
<< master_info.cluster_id
<< " while be in cluster_id: " << config::cluster_id;
if (master_info.__isset.cloud_unique_id) {
if (!config::cloud_unique_id.empty() && config::cloud_unique_id != master_info.cloud_unique_id) {
LOG(ERROR) << "Cloud unique ID mismatch between FE and BE. FE cloud unique ID: "
<< master_info.cloud_unique_id << ", BE cloud unique ID: " << config::cloud_unique_id;
return Status::InvalidArgument<false>(
"cluster_id in be and fe are different, fe: {}, be : {}",
master_info.cluster_id, config::cluster_id);
"fe and be do not work with same cloud unique ID, fe cloud unique ID: {},"
" be cloud unique ID: {}",
master_info.cloud_unique_id, config::cloud_unique_id);
}
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

return Status::OK();
Expand Down
6 changes: 0 additions & 6 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,4 @@ DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty() && instance_id != "-1") {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
}
}

} // namespace doris::config
2 changes: 0 additions & 2 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ static inline bool is_cloud_mode() {
return deploy_mode == "cloud" || !cloud_unique_id.empty();
}

void set_cloud_unique_id(std::string instance_id);

// Set the endpoint of meta service.
//
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
Expand Down
5 changes: 1 addition & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ DEFINE_String(custom_config_dir, "${DORIS_HOME}/conf");
DEFINE_String(jdbc_drivers_dir, "${DORIS_HOME}/jdbc_drivers");

// cluster id
DEFINE_mInt32(cluster_id, "-1");
DEFINE_Int32(cluster_id, "-1");
// port on which BackendService is exported
DEFINE_Int32(be_port, "9060");

Expand Down Expand Up @@ -1670,9 +1670,6 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

std::string cluster_id_str = std::to_string(cluster_id);
set_cloud_unique_id(cluster_id_str);

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ DECLARE_String(custom_config_dir);
DECLARE_String(jdbc_drivers_dir);

// cluster id
DECLARE_mInt32(cluster_id);
DECLARE_Int32(cluster_id);
// port on which BackendService is exported
DECLARE_Int32(be_port);

Expand Down
6 changes: 1 addition & 5 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -4534,13 +4534,9 @@ show_param ::=
{:
RESULT = new ShowWorkloadGroupsStmt(parser.wild, parser.where);
:}
| KW_BACKENDS KW_VERBOSE
{:
RESULT = new ShowBackendsStmt(true);
:}
| KW_BACKENDS
{:
RESULT = new ShowBackendsStmt(false);
RESULT = new ShowBackendsStmt();
:}
| KW_TRASH KW_ON STRING_LITERAL:backend
{:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@

public class ShowBackendsStmt extends ShowStmt implements NotFallbackInParser {

private boolean verbose = false;

public ShowBackendsStmt(boolean verbose) {
verbose = verbose;
public ShowBackendsStmt() {
}

@Override
Expand All @@ -47,10 +44,6 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}

public boolean isVerbose() {
return verbose;
}

@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,22 @@ public synchronized void updateFrontends(List<Frontend> toAdd, List<Frontend> to
}
}

private void alterBackendCluster(List<HostInfo> hostInfos, String clusterId,
private void alterBackendCluster(List<HostInfo> hostInfos, String computeGroupId,
Cloud.AlterClusterRequest.Operation operation) throws DdlException {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to alter backends due to empty cloud_instance_id");
}
// Issue rpc to meta to alter node, then fe master would add this node to its frontends
Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
.setClusterId(clusterId)
.setClusterId(computeGroupId)
.setType(Cloud.ClusterPB.Type.COMPUTE)
.build();

for (HostInfo hostInfo : hostInfos) {
String cloudUniqueId = "1" + Config.cluster_id
+ RandomIdentifierGenerator.generateRandomIdentifier(8);
Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder()
.setCloudUniqueId(cloudUniqueId)
.setIp(hostInfo.getHost())
.setHost(hostInfo.getHost())
.setHeartbeatPort(hostInfo.getPort())
Expand Down Expand Up @@ -367,8 +370,9 @@ public void addBackends(List<HostInfo> hostInfos, Map<String, String> tagMap) th
throw new UserException("ComputeGroup'name can not be empty");
}

String clusterId = tryCreateComputeGroup(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8));
alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.ADD_NODE);
String computeGroupId = tryCreateComputeGroup(clusterName,
RandomIdentifierGenerator.generateRandomIdentifier(8));
alterBackendCluster(hostInfos, computeGroupId, Cloud.AlterClusterRequest.Operation.ADD_NODE);
}

// final entry of dropping backend
Expand All @@ -381,28 +385,28 @@ public void dropBackend(String host, int heartbeatPort) throws DdlException {
throw new DdlException("backend does not exists[" + NetUtils
.getHostPortInAccessibleFormat(host, heartbeatPort) + "]");
}
String clusterId = droppedBackend.getTagMap().get(Tag.CLOUD_CLUSTER_ID);
if (clusterId == null || clusterId.isEmpty()) {
String computeGroupId = droppedBackend.getTagMap().get(Tag.CLOUD_CLUSTER_ID);
if (computeGroupId == null || computeGroupId.isEmpty()) {
throw new DdlException("Failed to get cluster ID for backend: " + droppedBackend.getId());
}

List<HostInfo> hostInfos = new ArrayList<>();
hostInfos.add(new HostInfo(host, heartbeatPort));

alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DROP_NODE);
alterBackendCluster(hostInfos, computeGroupId, Cloud.AlterClusterRequest.Operation.DROP_NODE);
}

@Override
public void decommissionBackend(Backend backend) throws UserException {
String clusterId = backend.getTagMap().get(Tag.CLOUD_CLUSTER_ID);
if (clusterId == null || clusterId.isEmpty()) {
String computeGroupId = backend.getTagMap().get(Tag.CLOUD_CLUSTER_ID);
if (computeGroupId == null || computeGroupId.isEmpty()) {
throw new UserException("Failed to get cluster ID for backend: " + backend.getId());
}

List<HostInfo> hostInfos = new ArrayList<>();
hostInfos.add(new HostInfo(backend.getHost(), backend.getHeartbeatPort()));
try {
alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DECOMMISSION_NODE);
alterBackendCluster(hostInfos, computeGroupId, Cloud.AlterClusterRequest.Operation.DECOMMISSION_NODE);
} catch (DdlException e) {
String errorMessage = e.getMessage();
LOG.warn("Failed to decommission backend: {}", errorMessage);
Expand Down Expand Up @@ -839,13 +843,13 @@ public void dropFrontend(FrontendNodeType role, String host, int editLogPort) th
alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE);
}

private String tryCreateComputeGroup(String clusterName, String clusterId) throws UserException {
private String tryCreateComputeGroup(String clusterName, String computeGroupId) throws UserException {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to create compute group due to empty cluster_id");
}

Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
.setClusterId(clusterId)
.setClusterId(computeGroupId)
.setClusterName(clusterName)
.setType(Cloud.ClusterPB.Type.COMPUTE)
.build();
Expand All @@ -868,7 +872,7 @@ private String tryCreateComputeGroup(String clusterName, String clusterId) throw
}

if (response.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
return clusterId;
return computeGroupId;
} else if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED) {
Cloud.GetClusterResponse clusterResponse = getCloudCluster(clusterName, "", "");
if (clusterResponse.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

final List<List<String>> backendInfos = getBackendInfos(false);
final List<List<String>> backendInfos = getBackendInfos();
for (List<String> backendInfo : backendInfos) {
List<String> oneInfo = new ArrayList<>(backendInfo.size());
oneInfo.addAll(backendInfo);
Expand All @@ -85,7 +85,7 @@ public ProcResult fetchResult() throws AnalysisException {
*
* @return
*/
public static List<List<String>> getBackendInfos(boolean verbose) {
public static List<List<String>> getBackendInfos() {
final SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<List<String>> backendInfos = new LinkedList<>();
List<Long> backendIds = systemInfoService.getAllBackendIds(false);
Expand Down Expand Up @@ -155,7 +155,7 @@ public static List<List<String>> getBackendInfos(boolean verbose) {
+ totalRemoteUsedCapacity.second);

// tags
backendInfo.add(backend.getTagMapString(verbose));
backendInfo.add(backend.getTagMapString());
// err msg
backendInfo.add(backend.getHeartbeatErrMsg());
// version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ private void handleShowExport() throws AnalysisException {

private void handleShowBackends() {
final ShowBackendsStmt showStmt = (ShowBackendsStmt) stmt;
List<List<String>> backendInfos = BackendsProcDir.getBackendInfos(showStmt.isVerbose());
List<List<String>> backendInfos = BackendsProcDir.getBackendInfos();

backendInfos.sort(new Comparator<List<String>>() {
@Override
Expand Down
39 changes: 16 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -926,31 +926,24 @@ public TNetworkAddress getArrowFlightAddress() {
}

// Only used for users, we hide and rename some internal tags.
public String getTagMapString(boolean verbose) {
public String getTagMapString() {
Map<String, String> displayTagMap = Maps.newHashMap();
displayTagMap.putAll(tagMap);
if (verbose) {
if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
displayTagMap.put("public_endpoint", displayTagMap.remove("cloud_cluster_public_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
displayTagMap.put("private_endpoint", displayTagMap.remove("cloud_cluster_private_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_status")) {
displayTagMap.put("compute_group_status", displayTagMap.remove("cloud_cluster_status"));
}
if (displayTagMap.containsKey("cloud_cluster_id")) {
displayTagMap.put("compute_group_id", displayTagMap.remove("cloud_cluster_id"));
}
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}
} else {
displayTagMap.entrySet().removeIf(entry -> entry.getKey().startsWith("cloud_")
&& !entry.getKey().equals("cloud_cluster_name"));
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}

if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
displayTagMap.put("public_endpoint", displayTagMap.remove("cloud_cluster_public_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
displayTagMap.put("private_endpoint", displayTagMap.remove("cloud_cluster_private_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_status")) {
displayTagMap.put("compute_group_status", displayTagMap.remove("cloud_cluster_status"));
}
if (displayTagMap.containsKey("cloud_cluster_id")) {
displayTagMap.put("compute_group_id", displayTagMap.remove("cloud_cluster_id"));
}
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}

return "{" + new PrintableMap<>(displayTagMap, ":", true, false).toString() + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ public HeartbeatResponse call() {
copiedMasterInfo.setHeartbeatFlags(flags);
copiedMasterInfo.setBackendId(backendId);
copiedMasterInfo.setFrontendInfos(feInfos);
if (Config.isCloudMode()) {
String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
}
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTable
trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB()));

// tags
trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString(true)));
trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString()));
// err msg
trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg()));
// version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public boolean checkGlobalPriv(ConnectContext ctx, PrivPredicate wanted) {
}
};

ShowBackendsStmt stmt = new ShowBackendsStmt(true);
ShowBackendsStmt stmt = new ShowBackendsStmt();
Assertions.assertThrows(AnalysisException.class, () -> stmt.analyze(analyzer));

privilege.set(true);
Expand All @@ -69,15 +69,15 @@ public boolean checkGlobalPriv(ConnectContext ctx, PrivPredicate wanted) {

@Test
public void getMetaData() {
ShowBackendsStmt stmt = new ShowBackendsStmt(true);
ShowBackendsStmt stmt = new ShowBackendsStmt();
ShowResultSetMetaData result = stmt.getMetaData();
Assertions.assertEquals(result.getColumnCount(), 27);
result.getColumns().forEach(col -> Assertions.assertEquals(col.getType(), ScalarType.createVarchar(30)));
}

@Test
public void getRedirectStatus() {
ShowBackendsStmt stmt = new ShowBackendsStmt(true);
ShowBackendsStmt stmt = new ShowBackendsStmt();
Assertions.assertEquals(RedirectStatus.FORWARD_NO_SYNC, stmt.getRedirectStatus());

ctx.getSessionVariable().forwardToMaster = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void testSerialization() throws Exception {
Assert.assertNotEquals(back1, back2);

Assert.assertTrue(back1.toString().contains("tags: {location=default}"));
Assert.assertEquals("{\"compute\" : \"c1\", \"location\" : \"l1\"}", back2.getTagMapString(true));
Assert.assertEquals("{\"compute\" : \"c1\", \"location\" : \"l1\"}", back2.getTagMapString());

// 3. delete files
dis.close();
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/HeartbeatService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct TMasterInfo {
8: optional i64 backend_id
9: optional list<TFrontendInfo> frontend_infos
10: optional string meta_service_endpoint;
11: optional string cloud_unique_id;
}

struct TBackendInfo {
Expand Down

0 comments on commit 806738a

Please sign in to comment.