Skip to content

Commit

Permalink
use cluste_id instead of cloud_instance_id
Browse files Browse the repository at this point in the history
and support compute groups.
  • Loading branch information
dataroaring committed Sep 12, 2024
1 parent 1c7bbef commit 78609f6
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 40 deletions.
12 changes: 4 additions & 8 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

IP_PART4_SIZE = 200

CLUSTER_ID = "12345678"

LOG = utils.get_logger()


Expand Down Expand Up @@ -412,7 +414,7 @@ def get_add_init_config(self):

if self.cluster.sql_mode_node_mgr:
cfg += [
"cluster_id = " + self.cluster_id(),
"cluster_id = " + CLUSTER_ID,
]
else:
cfg += [
Expand All @@ -439,9 +441,6 @@ def docker_env(self):
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)

def cluster_id(self):
return 123456;

def entrypoint(self):
return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]

Expand Down Expand Up @@ -486,7 +485,7 @@ def get_add_init_config(self):
]
if self.cluster.be_cluster_id:
cfg += [
"cluster_id = " + self.cluster_id(),
"cluster_id = " + CLUSTER_ID,
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
Expand Down Expand Up @@ -553,9 +552,6 @@ def docker_env(self):
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)

def cluster_id(self):
return 12345678;

def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,8 @@ public static int metaServiceRpcRetryTimes() {
@ConfField
public static boolean enable_cloud_snapshot_version = true;

// Interval in seconds for checking the status of compute groups (cloud clusters).
// Compute groups and cloud clusters refer to the same concept.
@ConfField
public static int cloud_cluster_check_interval_second = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ unsupportedOtherStatement
| UNINSTALL PLUGIN name=identifierOrText #uninstallPlugin
| LOCK TABLES (lockTable (COMMA lockTable)*)? #lockTables
| UNLOCK TABLES #unlockTables
| WARM UP CLUSTER destination=identifier WITH
(CLUSTER source=identifier | (warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster
| WARM UP (CLUSTER | (COMPUTE GROUP)) destination=identifier WITH
(CLUSTER | (COMPUTE_GROUP) source=identifier | (warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster
| BACKUP SNAPSHOT label=multipartIdentifier TO repo=identifier
((ON | EXCLUDE) LEFT_PAREN baseTableRef (COMMA baseTableRef)* RIGHT_PAREN)?
properties=propertyClause? #backup
Expand Down Expand Up @@ -307,7 +307,7 @@ unsupportedShowStatement
| (FROM tableName=multipartIdentifier (ALL VERBOSE?)?))? #showQueryStats
| SHOW BUILD INDEX ((FROM | IN) database=multipartIdentifier)?
wildWhere? sortClause? limitClause? #showBuildIndex
| SHOW CLUSTERS #showClusters
| SHOW (CLUSTERS | (COMPUTE GROUPS)) #showClusters
| SHOW CONVERT_LSC ((FROM | IN) database=multipartIdentifier)? #showConvertLsc
| SHOW REPLICA STATUS FROM baseTableRef wildWhere? #showReplicaStatus
| SHOW REPLICA DISTRIBUTION FROM baseTableRef #showREplicaDistribution
Expand Down Expand Up @@ -495,13 +495,13 @@ unsupportedGrantRevokeStatement
: GRANT privilegeList ON multipartIdentifierOrAsterisk
TO (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege
| GRANT privilegeList ON
(RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP)
(RESOURCE | CLUSTER | (COMPUTE GROUP) | STAGE | STORAGE VAULT | WORKLOAD GROUP)
identifierOrTextOrAsterisk TO (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege
| GRANT roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* TO userIdentify #grantRole
| REVOKE privilegeList ON multipartIdentifierOrAsterisk
FROM (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege
| REVOKE privilegeList ON
(RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP)
(RESOURCE | CLUSTER | (COMPUTE GROUP) | STAGE | STORAGE VAULT | WORKLOAD GROUP)
identifierOrTextOrAsterisk FROM (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege
| REVOKE roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* FROM userIdentify #grantRole
;
Expand Down Expand Up @@ -1820,6 +1820,7 @@ nonReserved
| COMPACT
| COMPLETE
| COMPRESS_TYPE
| COMPUTE
| CONDITIONS
| CONFIG
| CONNECTION
Expand Down Expand Up @@ -1895,6 +1896,7 @@ nonReserved
| GENERIC
| GLOBAL
| GRAPH
| GROUP
| GROUPING
| GROUPS
| HASH
Expand Down
44 changes: 42 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ terminal String
KW_COMPACT,
KW_COMPLETE,
KW_COMPRESS_TYPE,
KW_COMPUTE,
KW_CONFIG,
KW_CONNECTION,
KW_CONNECTION_ID,
Expand Down Expand Up @@ -1481,6 +1482,14 @@ warm_up_stmt ::=
{:
RESULT = new WarmUpClusterStmt(dstClusterName, list, force);
:}
| KW_WARM KW_UP KW_COMPUTE KW_GROUP ident:dstClusterName KW_WITH KW_COMPUTE KW_GROUP ident:srcClusterName opt_force:force
{:
RESULT = new WarmUpClusterStmt(dstClusterName, srcClusterName, force);
:}
| KW_WARM KW_UP KW_COMPUTE KW_GROUP ident:dstClusterName KW_WITH warm_up_list:list opt_force:force
{:
RESULT = new WarmUpClusterStmt(dstClusterName, list, force);
:}
;

warm_up_item ::=
Expand Down Expand Up @@ -3050,6 +3059,14 @@ grant_stmt ::=
{:
RESULT = new GrantStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_GRANT privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_TO user_identity:userId
{:
RESULT = new GrantStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_GRANT privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_TO KW_ROLE STRING_LITERAL:role
{:
RESULT = new GrantStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_GRANT privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_TO user_identity:userId
{:
RESULT = new GrantStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE);
Expand Down Expand Up @@ -3154,6 +3171,14 @@ revoke_stmt ::=
{:
RESULT = new RevokeStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_REVOKE privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_FROM user_identity:userId
{:
RESULT = new RevokeStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_REVOKE privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_FROM KW_ROLE STRING_LITERAL:role
{:
RESULT = new RevokeStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER);
:}
| KW_REVOKE privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_FROM user_identity:userId
{:
RESULT = new RevokeStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE);
Expand Down Expand Up @@ -3483,6 +3508,10 @@ opt_cluster_keys ::=
{:
RESULT = keys;
:}
| KW_COMPUTE KW_GROUP KW_BY LPAREN ident_list:keys RPAREN
{:
RESULT = keys;
:}
;

opt_all_partition_desc_list ::=
Expand Down Expand Up @@ -4504,9 +4533,13 @@ show_param ::=
{:
RESULT = new ShowWorkloadGroupsStmt(parser.wild, parser.where);
:}
| KW_BACKENDS KW_VERBOSE
{:
RESULT = new ShowBackendsStmt(true);
:}
| KW_BACKENDS
{:
RESULT = new ShowBackendsStmt();
RESULT = new ShowBackendsStmt(false);
:}
| KW_TRASH KW_ON STRING_LITERAL:backend
{:
Expand Down Expand Up @@ -4693,7 +4726,12 @@ show_param ::=
/* Cloud Cluster */
| KW_CLUSTERS
{:
RESULT = new ShowClusterStmt();
RESULT = new ShowClusterStmt(false);
:}
/* Cloud Cluster */
| KW_COMPUTE KW_GROUPS
{:
RESULT = new ShowClusterStmt(true);
:}
| KW_CONVERT_LSC opt_db:db
{:
Expand Down Expand Up @@ -8422,6 +8460,8 @@ keyword ::=
{: RESULT = id; :}
| KW_CLUSTERS:id
{: RESULT = id; :}
| KW_COMPUTE:id
{: RESULT = id; :}
| KW_LINK:id
{: RESULT = id; :}
| KW_MIGRATE:id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@

public class ShowBackendsStmt extends ShowStmt implements NotFallbackInParser {

public ShowBackendsStmt() {
private boolean verbose = false;

public ShowBackendsStmt(boolean verbose) {
verbose = verbose;
}

@Override
Expand All @@ -44,6 +47,10 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}

public boolean isVerbose() {
return verbose;
}

@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class ShowCloudWarmUpStmt extends ShowStmt implements NotFallbackInParser
private boolean showAllJobs = false;
private long jobId = -1;

private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
private static final ImmutableList<String> COMPUTE_GROUP_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId")
.add("ClusterName")
.add("ComputeGroup")
.add("Status")
.add("Type")
.add("CreateTime")
Expand Down Expand Up @@ -116,7 +116,7 @@ public String toString() {
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : ShowCloudWarmUpStmt.TITLE_NAMES) {
for (String title : ShowCloudWarmUpStmt.COMPUTE_GROUP_TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,28 @@
import com.google.common.collect.ImmutableList;

public class ShowClusterStmt extends ShowStmt implements NotFallbackInParser {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
public static final ImmutableList<String> CLUSTER_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("cluster").add("is_current").add("users").build();

public ShowClusterStmt() {
public static final ImmutableList<String> COMPUTE_GROUP_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Name").add("IsCurrent").add("Users").build();

boolean isComputeGroup = true;

public ShowClusterStmt(boolean isComputeGroup) {
this.isComputeGroup = isComputeGroup;
}

@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();

ImmutableList<String> titleNames = null;
titleNames = TITLE_NAMES;
if (isComputeGroup) {
titleNames = COMPUTE_GROUP_TITLE_NAMES;
} else {
titleNames = CLUSTER_TITLE_NAMES;
}

for (String title : titleNames) {
builder.addColumn(new Column(title, ScalarType.createVarchar(128)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ private void alterBackendCluster(List<HostInfo> hostInfos, String clusterId,
public void addBackends(List<HostInfo> hostInfos, Map<String, String> tagMap) throws UserException {
// issue rpc to meta to add this node, then fe master would add this node to its backends

String clusterName = tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, Tag.VALUE_DEFAULT_CLOUD_CLUSTER_NAME);
String clusterName = tagMap.getOrDefault(Tag.COMPUTE_GROUP_NAME, Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME);
if (clusterName.isEmpty()) {
throw new UserException("clusterName empty");
throw new UserException("ComputeGroup'name can not be empty");
}

String clusterId = tryCreateComputeGroup(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8));
Expand Down Expand Up @@ -693,6 +693,7 @@ public String addCloudCluster(final String clusterName, final String userName) t
List<Backend> backends = new ArrayList<>();
for (Cloud.NodeInfoPB node : cpb.getNodesList()) {
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();

newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

final List<List<String>> backendInfos = getBackendInfos();
final List<List<String>> backendInfos = getBackendInfos(false);
for (List<String> backendInfo : backendInfos) {
List<String> oneInfo = new ArrayList<>(backendInfo.size());
oneInfo.addAll(backendInfo);
Expand All @@ -85,7 +85,7 @@ public ProcResult fetchResult() throws AnalysisException {
*
* @return
*/
public static List<List<String>> getBackendInfos() {
public static List<List<String>> getBackendInfos(boolean verbose) {
final SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<List<String>> backendInfos = new LinkedList<>();
List<Long> backendIds = systemInfoService.getAllBackendIds(false);
Expand Down Expand Up @@ -155,7 +155,7 @@ public static List<List<String>> getBackendInfos() {
+ totalRemoteUsedCapacity.second);

// tags
backendInfo.add(backend.getTagMapString());
backendInfo.add(backend.getTagMapString(verbose));
// err msg
backendInfo.add(backend.getHeartbeatErrMsg());
// version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,7 @@ private void handleShowExport() throws AnalysisException {

private void handleShowBackends() {
final ShowBackendsStmt showStmt = (ShowBackendsStmt) stmt;
List<List<String>> backendInfos = BackendsProcDir.getBackendInfos();
List<List<String>> backendInfos = BackendsProcDir.getBackendInfos(showStmt.isVerbose());

backendInfos.sort(new Comparator<List<String>>() {
@Override
Expand Down
4 changes: 3 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public class Tag implements Writable {
public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint";
public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";

public static final String VALUE_DEFAULT_CLOUD_CLUSTER_NAME = "default_cluster";
public static final String COMPUTE_GROUP_NAME = "compute_group_name";

public static final String VALUE_DEFAULT_COMPUTE_GROUP_NAME = "default_compute_group";

public static final String WORKLOAD_GROUP = "workload_group";

Expand Down
31 changes: 29 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,35 @@ public TNetworkAddress getArrowFlightAddress() {
return new TNetworkAddress(getHost(), getArrowFlightSqlPort());
}

public String getTagMapString() {
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
// Only used for users, we hide and rename some internal tags.
public String getTagMapString(boolean verbose) {
Map<String, String> displayTagMap = Maps.newHashMap();
displayTagMap.putAll(tagMap);
if (verbose) {
if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
displayTagMap.put("public_endpoint", displayTagMap.remove("cloud_cluster_public_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
displayTagMap.put("private_endpoint", displayTagMap.remove("cloud_cluster_private_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_status")) {
displayTagMap.put("compute_group_status", displayTagMap.remove("cloud_cluster_status"));
}
if (displayTagMap.containsKey("cloud_cluster_id")) {
displayTagMap.put("compute_group_id", displayTagMap.remove("cloud_cluster_id"));
}
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}
} else {
displayTagMap.entrySet().removeIf(entry -> entry.getKey().startsWith("cloud_")
&& !entry.getKey().equals("cloud_cluster_name"));
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}
}

return "{" + new PrintableMap<>(displayTagMap, ":", true, false).toString() + "}";
}

public Long getPublishTaskLastTimeAccumulated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTable
trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB()));

// tags
trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString()));
trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString(true)));
// err msg
trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg()));
// version
Expand Down
Loading

0 comments on commit 78609f6

Please sign in to comment.