diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java index 136c9264a4e1d15..a2ce117725d15e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -35,9 +35,8 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; /* * CloudTabletStatMgr is for collecting tablet(replica) statistics from backends. @@ -46,10 +45,8 @@ public class CloudTabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(CloudTabletStatMgr.class); - private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); - // <(dbId, tableId) -> CloudTableStats> - private ConcurrentHashMap, CloudTableStats> cloudTableStatsMap = new ConcurrentHashMap<>(); + private volatile HashMap, CloudTableStats> cloudTableStatsMap = new HashMap<>(); public CloudTabletStatMgr() { super("cloud tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); @@ -136,7 +133,7 @@ protected void runAfterCatalogReady() { // after update replica in all backends, update index row num start = System.currentTimeMillis(); - ConcurrentHashMap, CloudTableStats> newCloudTableStatsMap = new ConcurrentHashMap<>(); + HashMap, CloudTableStats> newCloudTableStatsMap = new HashMap<>(); for (Long dbId : dbIds) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -241,7 +238,7 @@ private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest request) return response; } - public ConcurrentHashMap, CloudTableStats> getCloudTableStatsMap() { + public HashMap, CloudTableStats> getCloudTableStatsMap() { return this.cloudTableStatsMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index b4a62ae0c0ee2d6..cc4a3c09d9cb7ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -28,9 +28,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.ha.FrontendNodeType; -import org.apache.doris.metric.GaugeMetricImpl; -import org.apache.doris.metric.Metric.MetricUnit; -import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -110,7 +107,7 @@ private void checkToAddCluster(Map remoteClusterIdToPB, Set i.setTagMap(newTagMap)); List toAdd = new ArrayList<>(); for (Cloud.NodeInfoPB node : remoteClusterIdToPB.get(addId).getNodesList()) { @@ -474,33 +471,18 @@ private void updateCloudMetrics() { Map> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); Map clusterNameToId = cloudSystemInfoService.getCloudClusterNameToId(); for (Map.Entry entry : clusterNameToId.entrySet()) { - long aliveNum = 0L; + int aliveNum = 0; List bes = clusterIdToBackend.get(entry.getValue()); if (bes == null || bes.size() == 0) { LOG.info("cant get be nodes by cluster {}, bes {}", entry, bes); continue; } for (Backend backend : bes) { - MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), key -> { - GaugeMetricImpl backendAlive = new GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT, - "backend alive or not"); - backendAlive.addLabel(new MetricLabel("cluster_id", entry.getValue())); - backendAlive.addLabel(new MetricLabel("cluster_name", entry.getKey())); - backendAlive.addLabel(new MetricLabel("address", key)); - MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive); - return backendAlive; - }).setValue(backend.isAlive() ? 1 : 0); + MetricRepo.updateClusterBackendAlive(entry.getKey(), entry.getValue(), + backend.getAddress(), backend.isAlive()); aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum; } - - MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), key -> { - GaugeMetricImpl backendAliveTotal = new GaugeMetricImpl<>("backend_alive_total", - MetricUnit.NOUNIT, "backend alive num in cluster"); - backendAliveTotal.addLabel(new MetricLabel("cluster_id", entry.getValue())); - backendAliveTotal.addLabel(new MetricLabel("cluster_name", key)); - MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal); - return backendAliveTotal; - }).setValue(aliveNum); + MetricRepo.updateClusterBackendAliveTotal(entry.getKey(), entry.getValue(), aliveNum); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 8d852c1109714b7..0567daea8e080dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -203,7 +203,7 @@ public void updateCloudClusterMap(List toAdd, List toDel) { if (be == null) { be = new ArrayList<>(); clusterIdToBackend.put(clusterId, be); - MetricRepo.registerClusterMetrics(clusterName, clusterId); + MetricRepo.registerCloudMetrics(clusterId, clusterName); } Set existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort()) .collect(Collectors.toSet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java index 17b7e1a1041254b..440e00330b481f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java @@ -34,4 +34,8 @@ public M getOrAdd(String name) { return nameToMetric.computeIfAbsent(name, metricSupplier); } + public Map getMetrics() { + return nameToMetric; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java new file mode 100644 index 000000000000000..df5d9e127ffed66 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.metric; + +import org.apache.doris.common.Config; +import org.apache.doris.metric.Metric.MetricUnit; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; + +public class CloudMetrics { + protected static String CLOUD_CLUSTER_DELIMITER = "@delimiter#"; + protected static AutoMappedMetric CLUSTER_REQUEST_ALL_COUNTER; + protected static AutoMappedMetric CLUSTER_QUERY_ALL_COUNTER; + protected static AutoMappedMetric CLUSTER_QUERY_ERR_COUNTER; + + protected static AutoMappedMetric> CLUSTER_REQUEST_PER_SECOND_GAUGE; + protected static AutoMappedMetric> CLUSTER_QUERY_PER_SECOND_GAUGE; + protected static AutoMappedMetric> CLUSTER_QUERY_ERR_RATE_GAUGE; + + protected static AutoMappedMetric CLUSTER_QUERY_LATENCY_HISTO; + + protected static AutoMappedMetric> CLUSTER_BACKEND_ALIVE; + protected static AutoMappedMetric> CLUSTER_BACKEND_ALIVE_TOTAL; + + protected static void init() { + if (!Config.isCloudMode()) { + return; + } + CLUSTER_REQUEST_ALL_COUNTER = new AutoMappedMetric<>(name -> { + LongCounterMetric counter = new LongCounterMetric("request_total", MetricUnit.REQUESTS, + "total request"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter); + return counter; + }); + + CLUSTER_QUERY_ALL_COUNTER = new AutoMappedMetric<>(name -> { + LongCounterMetric counter = new LongCounterMetric("query_total", MetricUnit.REQUESTS, + "total query"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter); + return counter; + }); + + CLUSTER_QUERY_ERR_COUNTER = new AutoMappedMetric<>(name -> { + LongCounterMetric counter = new LongCounterMetric("query_err", MetricUnit.REQUESTS, + "total error query"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter); + return counter; + }); + + CLUSTER_REQUEST_PER_SECOND_GAUGE = new AutoMappedMetric<>(name -> { + GaugeMetricImpl gauge = new GaugeMetricImpl("rps", MetricUnit.NOUNIT, + "request per second"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + return gauge; + }); + + CLUSTER_QUERY_PER_SECOND_GAUGE = new AutoMappedMetric<>(name -> { + GaugeMetricImpl gauge = new GaugeMetricImpl("qps", MetricUnit.NOUNIT, + "query per second"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + return gauge; + }); + + CLUSTER_QUERY_ERR_RATE_GAUGE = new AutoMappedMetric<>(name -> { + GaugeMetricImpl gauge = new GaugeMetricImpl("query_err_rate", MetricUnit.NOUNIT, + "query error rate"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + return gauge; + }); + + CLUSTER_BACKEND_ALIVE = new AutoMappedMetric<>(name -> { + GaugeMetricImpl gauge = new GaugeMetricImpl("backend_alive", MetricUnit.NOUNIT, + "backend alive or not"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + return gauge; + }); + + CLUSTER_BACKEND_ALIVE_TOTAL = new AutoMappedMetric<>(name -> { + GaugeMetricImpl gauge = new GaugeMetricImpl("backend_alive_total", MetricUnit.NOUNIT, + "backend alive num in cluster"); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + return gauge; + }); + + CLUSTER_QUERY_LATENCY_HISTO = new AutoMappedMetric<>(key -> { + String[] values = key.split(CLOUD_CLUSTER_DELIMITER); + String clusterId = values[0]; + String clusterName = values[1]; + String metricName = MetricRegistry.name("query", "latency", "ms", "cluster_id=" + + clusterId, "cluster_name=" + clusterName); + return MetricRepo.METRIC_REGISTER.histogram(metricName); + }); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java index 5b3b3d292a2f4eb..69ef5f643c38c3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java @@ -192,7 +192,7 @@ private void setHistogramJsonMetric(StringBuilder sb, String metric, String quan } @Override - public void getNodeInfo() { + public void visitNodeInfo() { if (Env.getCurrentEnv().isMaster()) { setNodeInfo(sb, "node_info", "is_master", null, 1, false); } @@ -208,6 +208,11 @@ public void getNodeInfo() { Env.getCurrentEnv().getBrokerMgr().getAllBrokers().stream().filter(b -> !b.isAlive).count(), true); } + @Override + public void visitCloudTableStats() { + return; + } + private void setNodeInfo(StringBuilder sb, String metric, String type, String status, long value, boolean lastMetric) { sb.append("{\n\t\"tags\":\n\t{\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/Metric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/Metric.java index aa26ecde1e65e29..d6a4ac2ae844775 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/Metric.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/Metric.java @@ -84,5 +84,10 @@ public List getLabels() { return labels; } + public Metric setLabels(List newLabels) { + this.labels = newLabels; + return this; + } + public abstract T getValue(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java index ccbb1ec096a06ef..06ab8a4dd19dd18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java @@ -17,7 +17,11 @@ package org.apache.doris.metric; +import org.apache.doris.common.Config; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimerTask; /* @@ -30,6 +34,10 @@ public class MetricCalculator extends TimerTask { private long lastRequestCounter = -1; private long lastQueryErrCounter = -1; + private Map clusterLastRequestCounter = new HashMap<>(); + private Map clusterLastQueryCounter = new HashMap<>(); + private Map clusterLastQueryErrCounter = new HashMap<>(); + @Override public void run() { update(); @@ -42,6 +50,7 @@ private void update() { lastQueryCounter = MetricRepo.COUNTER_QUERY_ALL.getValue(); lastRequestCounter = MetricRepo.COUNTER_REQUEST_ALL.getValue(); lastQueryErrCounter = MetricRepo.COUNTER_QUERY_ERR.getValue(); + initCloudMetrics(); return; } @@ -65,6 +74,7 @@ private void update() { MetricRepo.GAUGE_QUERY_ERR_RATE.setValue(errRate < 0 ? 0.0 : errRate); lastQueryErrCounter = currentErrCounter; + updateCloudMetrics(interval); lastTs = currentTs; // max tablet compaction score of all backends @@ -77,4 +87,69 @@ private void update() { } MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore); } + + private void initCloudMetrics() { + if (!Config.isCloudMode()) { + return; + } + Map requsetAllMetrics = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getMetrics(); + if (requsetAllMetrics != null) { + requsetAllMetrics.forEach((clusterId, metric) -> { + clusterLastRequestCounter.put(clusterId, metric.getValue()); + }); + } + + Map queryAllMetrics = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getMetrics(); + if (queryAllMetrics != null) { + queryAllMetrics.forEach((clusterId, metric) -> { + clusterLastQueryCounter.put(clusterId, metric.getValue()); + }); + } + + Map queryErrMetrics = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getMetrics(); + if (queryErrMetrics != null) { + queryErrMetrics.forEach((clusterId, metric) -> { + clusterLastQueryErrCounter.put(clusterId, metric.getValue()); + }); + } + } + + private void updateCloudMetrics(long interval) { + if (!Config.isCloudMode()) { + return; + } + + Map requsetAllMetrics = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getMetrics(); + if (requsetAllMetrics != null) { + requsetAllMetrics.forEach((clusterId, metric) -> { + double rps = (double) (metric.getValue() - clusterLastRequestCounter.getOrDefault(clusterId, 0L)) + / interval; + rps = Double.max(rps, 0); + MetricRepo.updateClusterRequestPerSecond(clusterId, rps, metric.getLabels()); + clusterLastRequestCounter.replace(clusterId, metric.getValue()); + }); + } + + Map queryAllMetrics = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getMetrics(); + if (queryAllMetrics != null) { + queryAllMetrics.forEach((clusterId, metric) -> { + double rps = (double) (metric.getValue() - clusterLastQueryCounter.getOrDefault(clusterId, 0L)) + / interval; + rps = Double.max(rps, 0); + MetricRepo.updateClusterQueryPerSecond(clusterId, rps, metric.getLabels()); + clusterLastQueryCounter.replace(clusterId, metric.getValue()); + }); + } + + Map queryErrMetrics = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getMetrics(); + if (queryErrMetrics != null) { + queryErrMetrics.forEach((clusterId, metric) -> { + double rps = (double) (metric.getValue() - clusterLastQueryErrCounter.getOrDefault(clusterId, 0L)) + / interval; + rps = Double.max(rps, 0); + MetricRepo.updateClusterQueryErrRate(clusterId, rps, metric.getLabels()); + clusterLastQueryCounter.replace(clusterId, metric.getValue()); + }); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index bcfe6ae630f5230..0905c4f651a6838 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -17,11 +17,11 @@ package org.apache.doris.metric; - import org.apache.doris.alter.Alter; import org.apache.doris.alter.AlterJobV2.JobType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; @@ -43,17 +43,17 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -134,110 +134,12 @@ public final class MetricRepo { public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; - // cloud metrics - public static ConcurrentHashMap - CLOUD_CLUSTER_COUNTER_REQUEST_ALL = new ConcurrentHashMap<>(); - public static ConcurrentHashMap - CLOUD_CLUSTER_COUNTER_QUERY_ALL = new ConcurrentHashMap<>(); - public static ConcurrentHashMap - CLOUD_CLUSTER_COUNTER_QUERY_ERR = new ConcurrentHashMap<>(); - - public static ConcurrentHashMap> - CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND = new ConcurrentHashMap<>(); - - public static GaugeMetricImpl GAUGE_HTTP_COPY_INTO_UPLOAD_PER_SECOND; - - public static GaugeMetricImpl GAUGE_HTTP_COPY_INTO_UPLOAD_ERR_RATE; - - public static GaugeMetricImpl GAUGE_HTTP_COPY_INTO_QUERY_PER_SECOND; - - public static GaugeMetricImpl GAUGE_HTTP_COPY_INTO_QUERY_ERR_RATE; - - public static ConcurrentHashMap> - CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> - CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE = new ConcurrentHashMap<>(); - - public static ConcurrentHashMap - CLOUD_CLUSTER_HISTO_QUERY_LATENCY = new ConcurrentHashMap<>(); - - public static Map> - CLOUD_CLUSTER_BACKEND_ALIVE = new HashMap<>(); - public static Map> - CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL = new HashMap<>(); - private static Map, Long> loadJobNum = Maps.newHashMap(); private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "metric-timer-pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); - public static void registerClusterMetrics(String clusterName, String clusterId) { - CLOUD_CLUSTER_COUNTER_REQUEST_ALL.computeIfAbsent(clusterName, key -> { - LongCounterMetric counterRequestAll = new LongCounterMetric("request_total", MetricUnit.REQUESTS, - "total request"); - counterRequestAll.addLabel(new MetricLabel("cluster_id", clusterId)); - counterRequestAll.addLabel(new MetricLabel("cluster_name", key)); - DORIS_METRIC_REGISTER.addMetrics(counterRequestAll); - return counterRequestAll; - }); - - MetricRepo.CLOUD_CLUSTER_COUNTER_QUERY_ALL.computeIfAbsent(clusterName, key -> { - LongCounterMetric counterQueryAll = new LongCounterMetric("query_total", MetricUnit.REQUESTS, - "total query"); - counterQueryAll.addLabel(new MetricLabel("cluster_id", clusterId)); - counterQueryAll.addLabel(new MetricLabel("cluster_name", key)); - DORIS_METRIC_REGISTER.addMetrics(counterQueryAll); - return counterQueryAll; - }); - - CLOUD_CLUSTER_COUNTER_QUERY_ERR.computeIfAbsent(clusterName, key -> { - LongCounterMetric counterQueryErr = new LongCounterMetric("query_err", MetricUnit.REQUESTS, - "total error query"); - counterQueryErr.addLabel(new MetricLabel("cluster_id", clusterId)); - counterQueryErr.addLabel(new MetricLabel("cluster_name", key)); - DORIS_METRIC_REGISTER.addMetrics(counterQueryErr); - return counterQueryErr; - }); - - CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND.computeIfAbsent(clusterName, key -> { - GaugeMetricImpl gaugeQueryPerSecond = new GaugeMetricImpl<>("qps", MetricUnit.NOUNIT, - "query per second"); - gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_id", clusterId)); - gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_name", clusterName)); - gaugeQueryPerSecond.setValue(0.0); - DORIS_METRIC_REGISTER.addMetrics(gaugeQueryPerSecond); - return gaugeQueryPerSecond; - }).setValue(0.0); - - CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND.computeIfAbsent(clusterName, key -> { - GaugeMetricImpl gaugeRequestPerSecond = new GaugeMetricImpl<>("rps", MetricUnit.NOUNIT, - "request per second"); - gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_id", clusterId)); - gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_name", clusterName)); - gaugeRequestPerSecond.setValue(0.0); - DORIS_METRIC_REGISTER.addMetrics(gaugeRequestPerSecond); - return gaugeRequestPerSecond; - }).setValue(0.0); - - CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE.computeIfAbsent(clusterName, key -> { - GaugeMetricImpl gaugeQueryErrRate = new GaugeMetricImpl<>("query_err_rate", - MetricUnit.NOUNIT, "query error rate"); - gaugeQueryErrRate.addLabel(new MetricLabel("cluster_id", clusterId)); - gaugeQueryErrRate.addLabel(new MetricLabel("cluster_name", clusterName)); - gaugeQueryErrRate.setValue(0.0); - DORIS_METRIC_REGISTER.addMetrics(gaugeQueryErrRate); - return gaugeQueryErrRate; - }).setValue(0.0); - - CLOUD_CLUSTER_HISTO_QUERY_LATENCY.computeIfAbsent(clusterName, key -> { - Histogram histoQueryLatency = MetricRepo.METRIC_REGISTER.histogram( - MetricRegistry.name("query", "latency", "ms", - MetricRepo.CLOUD_TAG, key)); - return histoQueryLatency; - }); - } - // init() should only be called after catalog is contructed. public static synchronized void init() { if (isInit) { @@ -593,6 +495,7 @@ public Long getValue() { // init system metrics initSystemMetrics(); + CloudMetrics.init(); updateMetrics(); isInit = true; @@ -777,8 +680,9 @@ public static synchronized String getMetric(MetricVisitor visitor) { visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); } - // node info - visitor.getNodeInfo(); + visitor.visitNodeInfo(); + + visitor.visitCloudTableStats(); return visitor.finish(); } @@ -809,4 +713,156 @@ private static void updateLoadJobMetrics() { private static long getLoadJobNum(EtlJobType jobType, JobState jobState) { return MetricRepo.loadJobNum.getOrDefault(Pair.of(jobType, jobState), 0L); } + + public static void registerCloudMetrics(String clusterId, String clusterName) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName) + || Strings.isNullOrEmpty(clusterId)) { + return; + } + List labels = new ArrayList<>(); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + + LongCounterMetric requestAllCounter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId); + requestAllCounter.setLabels(labels); + + LongCounterMetric queryAllCounter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId); + queryAllCounter.setLabels(labels); + + LongCounterMetric queryErrCounter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId); + queryErrCounter.setLabels(labels); + + GaugeMetricImpl requestPerSecondGauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE + .getOrAdd(clusterId); + requestPerSecondGauge.setLabels(labels); + + GaugeMetricImpl queryPerSecondGauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.getOrAdd(clusterId); + queryPerSecondGauge.setLabels(labels); + + GaugeMetricImpl queryErrRateGauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId); + queryErrRateGauge.setLabels(labels); + + String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName; + CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key); + } + + public static void increaseClusterRequestAll(String clusterName) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) { + return; + } + String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNameToId().get(clusterName); + if (Strings.isNullOrEmpty(clusterId)) { + return; + } + LongCounterMetric counter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId); + List labels = new ArrayList<>(); + counter.increase(1L); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + counter.setLabels(labels); + } + + public static void increaseClusterQueryAll(String clusterName) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) { + return; + } + String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNameToId().get(clusterName); + if (Strings.isNullOrEmpty(clusterId)) { + return; + } + LongCounterMetric counter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId); + List labels = new ArrayList<>(); + counter.increase(1L); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + counter.setLabels(labels); + } + + public static void increaseClusterQueryErr(String clusterName) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) { + return; + } + String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNameToId().get(clusterName); + if (Strings.isNullOrEmpty(clusterId)) { + return; + } + LongCounterMetric counter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId); + List labels = new ArrayList<>(); + counter.increase(1L); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + counter.setLabels(labels); + } + + public static void updateClusterRequestPerSecond(String clusterId, double value, List labels) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) { + return; + } + GaugeMetricImpl gauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.getOrAdd(clusterId); + gauge.setValue(value); + gauge.setLabels(labels); + } + + public static void updateClusterQueryPerSecond(String clusterId, double value, List labels) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) { + return; + } + GaugeMetricImpl gauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.getOrAdd(clusterId); + gauge.setValue(value); + gauge.setLabels(labels); + } + + public static void updateClusterQueryErrRate(String clusterId, double value, List labels) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterId)) { + return; + } + GaugeMetricImpl gauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId); + gauge.setValue(value); + gauge.setLabels(labels); + } + + public static void updateClusterBackendAlive(String clusterName, String clusterId, String ipAddress, + boolean alive) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName) + || Strings.isNullOrEmpty(clusterId) || Strings.isNullOrEmpty(ipAddress)) { + return; + } + String key = clusterId + "_" + ipAddress; + GaugeMetricImpl metric = CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key); + metric.setValue(alive ? 1 : 0); + List labels = new ArrayList<>(); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + labels.add(new MetricLabel("address", ipAddress)); + metric.setLabels(labels); + } + + public static void updateClusterBackendAliveTotal(String clusterName, String clusterId, int aliveNum) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName) + || Strings.isNullOrEmpty(clusterId)) { + return; + } + GaugeMetricImpl gauge = CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId); + gauge.setValue(aliveNum); + List labels = new ArrayList<>(); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + gauge.setLabels(labels); + } + + public static void updateClusterQueryLatency(String clusterName, long elapseMs) { + if (!MetricRepo.isInit || !Config.isCloudMode() || Strings.isNullOrEmpty(clusterName)) { + return; + } + String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNameToId().get(clusterName); + if (Strings.isNullOrEmpty(clusterId)) { + return; + } + String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName; + CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key).update(elapseMs); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java index dcbf9268b8285f6..5ca63d91d18d545 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricVisitor.java @@ -42,7 +42,9 @@ public MetricVisitor() { public abstract void visitHistogram(String prefix, String name, Histogram histogram); - public abstract void getNodeInfo(); + public abstract void visitNodeInfo(); + + public abstract void visitCloudTableStats(); public String finish() { return sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java index 4cab30e8041d337..8c34a1b80eb82ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java @@ -204,7 +204,7 @@ public void visitHistogram(String prefix, String name, Histogram histogram) { } @Override - public void getNodeInfo() { + public void visitNodeInfo() { final String NODE_INFO = "node_info"; sb.append(Joiner.on(" ").join(TYPE, NODE_INFO, "gauge\n")); sb.append(NODE_INFO).append("{type=\"fe_node_num\", state=\"total\"} ") @@ -225,4 +225,9 @@ public void getNodeInfo() { } return; } + + @Override + public void visitCloudTableStats() { + return; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java index bbbd13186ee175a..162828d37cd4611 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java @@ -126,7 +126,7 @@ public void visitHistogram(String prefix, String name, Histogram histogram) { } @Override - public void getNodeInfo() { + public void visitNodeInfo() { long feDeadNum = Env.getCurrentEnv().getFrontends(null).stream().filter(f -> !f.isAlive()).count(); long beDeadNum = Env.getCurrentSystemInfo().getIdToBackend().values().stream().filter(b -> !b.isAlive()) .count(); @@ -136,4 +136,9 @@ public void getNodeInfo() { sb.append("doris_fe_backend_dead_num").append(" ").append(beDeadNum).append("\n"); sb.append("doris_fe_broker_dead_num").append(" ").append(brokerDeadNum).append("\n"); } + + @Override + public void visitCloudTableStats() { + return; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index fa8b1bdaf3f5b8d..57894751ee88603 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -76,16 +76,19 @@ public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBas if (ctx.getState().isQuery()) { MetricRepo.COUNTER_QUERY_ALL.increase(1L); MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); + MetricRepo.increaseClusterQueryAll(ctx.getCloudCluster(false)); if (ctx.getState().getStateType() == MysqlStateType.ERR && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { // err query MetricRepo.COUNTER_QUERY_ERR.increase(1L); MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); + MetricRepo.increaseClusterQueryErr(ctx.getCloudCluster(false)); } else if (ctx.getState().getStateType() == MysqlStateType.OK || ctx.getState().getStateType() == MysqlStateType.EOF) { // ok query MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); + MetricRepo.updateClusterQueryLatency(ctx.getCloudCluster(false), elapseMs); if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 0f8f887ee901f0c..0bf9b8ac75d0d5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -203,6 +203,7 @@ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) { public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Exception { if (MetricRepo.isInit) { MetricRepo.COUNTER_REQUEST_ALL.increase(1L); + MetricRepo.increaseClusterRequestAll(ctx.getCloudCluster(false)); } String convertedStmt = convertOriginStmt(originStmt);