Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
2
  • Loading branch information
SWJTU-ZhangLei committed Mar 19, 2024
1 parent d542248 commit 6a8c95e
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.GaugeMetricImpl;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -110,7 +106,7 @@ private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL;
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus));
MetricRepo.registerClusterMetrics(clusterName, clusterId);
// MetricRepo.registerClusterMetrics(clusterName, clusterId);
//toAdd.forEach(i -> i.setTagMap(newTagMap));
List<Backend> toAdd = new ArrayList<>();
for (Cloud.NodeInfoPB node : remoteClusterIdToPB.get(addId).getNodesList()) {
Expand Down Expand Up @@ -481,26 +477,26 @@ private void updateCloudMetrics() {
continue;
}
for (Backend backend : bes) {
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), key -> {
GaugeMetricImpl<Integer> backendAlive = new GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
"backend alive or not");
backendAlive.addLabel(new MetricLabel("cluster_id", entry.getValue()));
backendAlive.addLabel(new MetricLabel("cluster_name", entry.getKey()));
backendAlive.addLabel(new MetricLabel("address", key));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
return backendAlive;
}).setValue(backend.isAlive() ? 1 : 0);
// MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), key -> {
// GaugeMetricImpl<Integer> backendAlive = new GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
// "backend alive or not");
// backendAlive.addLabel(new MetricLabel("cluster_id", entry.getValue()));
// backendAlive.addLabel(new MetricLabel("cluster_name", entry.getKey()));
// backendAlive.addLabel(new MetricLabel("address", key));
// MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
// return backendAlive;
// }).setValue(backend.isAlive() ? 1 : 0);
aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
}

MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), key -> {
GaugeMetricImpl<Long> backendAliveTotal = new GaugeMetricImpl<>("backend_alive_total",
MetricUnit.NOUNIT, "backend alive num in cluster");
backendAliveTotal.addLabel(new MetricLabel("cluster_id", entry.getValue()));
backendAliveTotal.addLabel(new MetricLabel("cluster_name", key));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
return backendAliveTotal;
}).setValue(aliveNum);
// MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), key -> {
// GaugeMetricImpl<Long> backendAliveTotal = new GaugeMetricImpl<>("backend_alive_total",
// MetricUnit.NOUNIT, "backend alive num in cluster");
// backendAliveTotal.addLabel(new MetricLabel("cluster_id", entry.getValue()));
// backendAliveTotal.addLabel(new MetricLabel("cluster_name", key));
// MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
// return backendAliveTotal;
// }).setValue(aliveNum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> toDel) {
if (be == null) {
be = new ArrayList<>();
clusterIdToBackend.put(clusterId, be);
MetricRepo.registerClusterMetrics(clusterName, clusterId);
// MetricRepo.registerClusterMetrics(clusterName, clusterId);
}
Set<String> existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort())
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public M getOrAdd(String name) {
return nameToMetric.computeIfAbsent(name, metricSupplier);
}

public Map<String, M> getMetrics() {
return nameToMetric;
}

}
164 changes: 164 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetricRepo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.metric;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.metric.Metric.MetricUnit;

import com.codahale.metrics.Histogram;
import com.google.common.base.Strings;

import java.util.List;

public class CloudMetricRepo {
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_REQUEST_ALL_COUNTER;
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_QUERY_ALL_COUNTER;
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_QUERY_ERR_COUNTER;

protected static AutoMappedMetric<GaugeMetricImpl<Double>> CLUSTER_REQUEST_PER_SECOND_GAUGE;
protected static AutoMappedMetric<GaugeMetricImpl<Double>> CLUSTER_QUERY_PER_SECOND_GAUGE;
protected static AutoMappedMetric<GaugeMetricImpl<Double>> CLUSTER_QUERY_ERR_RATE_GAUGE;
protected static AutoMappedMetric<Histogram> CLUSTER_QUERY_LATENCY_HISTO;

protected static AutoMappedMetric<GaugeMetricImpl<Integer>> CLUSTER_BACKEND_ALIVE;
protected static AutoMappedMetric<GaugeMetricImpl<Integer>> CLUSTER_BACKEND_ALIVE_TOTAL;

public static void init() {
if (!Config.isCloudMode()) {
return;
}
CLUSTER_REQUEST_ALL_COUNTER = new AutoMappedMetric<>(name -> {
LongCounterMetric counter = new LongCounterMetric("request_total", MetricUnit.REQUESTS,
"total request");
counter.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
return counter;
});

CLUSTER_QUERY_ALL_COUNTER = new AutoMappedMetric<>(name -> {
LongCounterMetric counter = new LongCounterMetric("query_total", MetricUnit.REQUESTS,
"total query");
counter.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
return counter;
});

CLUSTER_QUERY_ERR_COUNTER = new AutoMappedMetric<>(name -> {
LongCounterMetric counter = new LongCounterMetric("query_err", MetricUnit.REQUESTS,
"total error query");
counter.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
return counter;
});

CLUSTER_REQUEST_PER_SECOND_GAUGE = new AutoMappedMetric<>(name -> {
GaugeMetricImpl<Double> gauge = new GaugeMetricImpl<Double>("rps", MetricUnit.NOUNIT,
"request per second");
gauge.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
return gauge;
});

CLUSTER_QUERY_PER_SECOND_GAUGE = new AutoMappedMetric<>(name -> {
GaugeMetricImpl<Double> gauge = new GaugeMetricImpl<Double>("qps", MetricUnit.NOUNIT,
"query per second");
gauge.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
return gauge;
});

CLUSTER_QUERY_ERR_RATE_GAUGE = new AutoMappedMetric<>(name -> {
GaugeMetricImpl<Double> gauge = new GaugeMetricImpl<Double>("query_err_rate", MetricUnit.NOUNIT,
"query error rate");
gauge.addLabel(new MetricLabel("cluster_id", name));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
return gauge;
});
}

public static void increaseClusterRequestAll(String clusterName) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) {
return;
}
String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNameToId().get(clusterName);
if (Strings.isNullOrEmpty(clusterId)) {
return;
}
LongCounterMetric metric = CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
metric.addOrUpdateLabel(new MetricLabel("cluster_name", clusterName));
metric.increase(1L);
}

public static void increaseClusterQueryAll(String clusterName) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) {
return;
}
String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNameToId().get(clusterName);
if (Strings.isNullOrEmpty(clusterId)) {
return;
}
LongCounterMetric metric = CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
metric.addOrUpdateLabel(new MetricLabel("cluster_name", clusterName));
metric.increase(1L);
}

public static void increaseClusterQueryErr(String clusterName) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) {
return;
}
String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNameToId().get(clusterName);
if (Strings.isNullOrEmpty(clusterId)) {
return;
}
LongCounterMetric metric = CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
metric.addOrUpdateLabel(new MetricLabel("cluster_name", clusterName));
metric.increase(1L);
}

public static void updateClusterRequestPerSecond(String clusterId, double value, List<MetricLabel> labels) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) {
return;
}
GaugeMetricImpl<Double> metric = CLUSTER_REQUEST_PER_SECOND_GAUGE.getOrAdd(clusterId);
metric.setLables(labels);
metric.setValue(value);
}

public static void updateClusterQueryPerSecond(String clusterId, double value, List<MetricLabel> labels) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) {
return;
}
GaugeMetricImpl<Double> metric = CLUSTER_QUERY_PER_SECOND_GAUGE.getOrAdd(clusterId);
metric.setLables(labels);
metric.setValue(value);
}

public static void updateClusterQueryErrRate(String clusterId, double value, List<MetricLabel> labels) {
if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) {
return;
}
GaugeMetricImpl<Double> metric = CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
metric.setLables(labels);
metric.setValue(value);
}
}
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,30 @@ public Metric<T> addLabel(MetricLabel label) {
return this;
}

public Metric<T> addOrUpdateLabel(MetricLabel newLabel) {
if (newLabel == null) {
return this;
}

boolean found = false;
for (MetricLabel label : labels) {
if (newLabel.getKey().equals(label.getKey())) {
found = true;
}
}
if (!found) {
labels.add(newLabel);
}
return this;
}

public List<MetricLabel> getLabels() {
return labels;
}

public void setLables(List<MetricLabel> newLabels) {
this.labels = newLabels;
}

public abstract T getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.doris.metric;

import org.apache.doris.common.Config;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;

/*
Expand All @@ -30,6 +34,10 @@ public class MetricCalculator extends TimerTask {
private long lastRequestCounter = -1;
private long lastQueryErrCounter = -1;

private Map<String, Long> clusterLastRequestCounter = new HashMap<>();
private Map<String, Long> clusterLastQueryCounter = new HashMap<>();
private Map<String, Long> clusterLastQueryErrCounter = new HashMap<>();

@Override
public void run() {
update();
Expand All @@ -42,6 +50,7 @@ private void update() {
lastQueryCounter = MetricRepo.COUNTER_QUERY_ALL.getValue();
lastRequestCounter = MetricRepo.COUNTER_REQUEST_ALL.getValue();
lastQueryErrCounter = MetricRepo.COUNTER_QUERY_ERR.getValue();
initCloudMetrics();
return;
}

Expand All @@ -65,6 +74,7 @@ private void update() {
MetricRepo.GAUGE_QUERY_ERR_RATE.setValue(errRate < 0 ? 0.0 : errRate);
lastQueryErrCounter = currentErrCounter;

updateCloudMetrics(interval);
lastTs = currentTs;

// max tablet compaction score of all backends
Expand All @@ -77,4 +87,69 @@ private void update() {
}
MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore);
}

private void initCloudMetrics() {
if (!Config.isCloudMode()) {
return;
}
Map<String, LongCounterMetric> requsetAllMetrics = CloudMetricRepo.CLUSTER_REQUEST_ALL_COUNTER.getMetrics();
if (requsetAllMetrics != null) {
requsetAllMetrics.forEach((clusterId, metric) -> {
clusterLastRequestCounter.put(clusterId, metric.getValue());
});
}

Map<String, LongCounterMetric> queryAllMetrics = CloudMetricRepo.CLUSTER_QUERY_ALL_COUNTER.getMetrics();
if (queryAllMetrics != null) {
queryAllMetrics.forEach((clusterId, metric) -> {
clusterLastQueryCounter.put(clusterId, metric.getValue());
});
}

Map<String, LongCounterMetric> queryErrMetrics = CloudMetricRepo.CLUSTER_QUERY_ERR_COUNTER.getMetrics();
if (queryErrMetrics != null) {
queryErrMetrics.forEach((clusterId, metric) -> {
clusterLastQueryErrCounter.put(clusterId, metric.getValue());
});
}
}

private void updateCloudMetrics(long interval) {
if (!Config.isCloudMode()) {
return;
}

Map<String, LongCounterMetric> requsetAllMetrics = CloudMetricRepo.CLUSTER_REQUEST_ALL_COUNTER.getMetrics();
if (requsetAllMetrics != null) {
requsetAllMetrics.forEach((clusterId, metric) -> {
double rps = (double) (metric.getValue() - clusterLastRequestCounter.getOrDefault(clusterId, 0L))
/ interval;
rps = Double.max(rps, 0);
CloudMetricRepo.updateClusterRequestPerSecond(clusterId, rps, metric.getLabels());
clusterLastRequestCounter.replace(clusterId, metric.getValue());
});
}

Map<String, LongCounterMetric> queryAllMetrics = CloudMetricRepo.CLUSTER_QUERY_ALL_COUNTER.getMetrics();
if (queryAllMetrics != null) {
queryAllMetrics.forEach((clusterId, metric) -> {
double rps = (double) (metric.getValue() - clusterLastQueryCounter.getOrDefault(clusterId, 0L))
/ interval;
rps = Double.max(rps, 0);
CloudMetricRepo.updateClusterQueryPerSecond(clusterId, rps, metric.getLabels());
clusterLastQueryCounter.replace(clusterId, metric.getValue());
});
}

Map<String, LongCounterMetric> queryErrMetrics = CloudMetricRepo.CLUSTER_QUERY_ERR_COUNTER.getMetrics();
if (queryErrMetrics != null) {
queryErrMetrics.forEach((clusterId, metric) -> {
double rps = (double) (metric.getValue() - clusterLastQueryErrCounter.getOrDefault(clusterId, 0L))
/ interval;
rps = Double.max(rps, 0);
CloudMetricRepo.updateClusterQueryErrRate(clusterId, rps, metric.getLabels());
clusterLastQueryCounter.replace(clusterId, metric.getValue());
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public String getValue() {
return value;
}

public void setValue(String newValue) {
this.value = newValue;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof MetricLabel)) {
Expand Down
Loading

0 comments on commit 6a8c95e

Please sign in to comment.