From a48b556ebcb437540f3f04ba8ee4b10dececccda Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Thu, 14 Nov 2024 16:09:23 -0800 Subject: [PATCH] [router] remove store level metrics for non-streaming multiget The store level metrics in AggRouterHttpRequestStats are registered lazily. Add a flag in AggRouterHttpRequestStats to disable store level metric emission for non-streaming multiget request type. This is cleaner and the less intrusive than adding checks and handling elsewhere because AggRouterHttpRequestStats and the triggers to corresponding record functions are shared across different request types (SINGLE_GET, MULTI_GET, MULTI_GET_STREAMING, COMPUTE, etc...). We will keep the total stats for non-streaming multiget to give us visibility in case any legacy clients start to make non-streaming multiget requests. Fixed a bug where recordNoAvailableReplicaAbortedRetryRequest was calling the wrong method for the store metric recording. --- .../stats/AggRouterHttpRequestStats.java | 97 +++++++++++-------- .../router/AggRouterHttpRequestStatsTest.java | 35 +++++++ 2 files changed, 92 insertions(+), 40 deletions(-) diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java index 517f1485e7..da777d5c21 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java @@ -8,11 +8,14 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats { private final Map scatterGatherStatsMap = new VeniceConcurrentHashMap<>(); + private final boolean isStoreStatsEnabled; + private final ScatterGatherStats totalScatterGatherStats; public AggRouterHttpRequestStats( MetricsRepository metricsRepository, @@ -29,6 +32,10 @@ public AggRouterHttpRequestStats( ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) { super(metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled); + // Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still + // report to the total stats for visibility of potential old clients. + isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType); + totalScatterGatherStats = new AggScatterGatherStats(); /** * Use a setter function to bypass the restriction that the supertype constructor could not * touch member fields of current object. @@ -36,7 +43,7 @@ public AggRouterHttpRequestStats( setStatsSupplier((metricsRepo, storeName) -> { ScatterGatherStats stats; if (storeName.equals(AbstractVeniceAggStats.STORE_NAME_FOR_TOTAL_STAT)) { - stats = new AggScatterGatherStats(); + stats = totalScatterGatherStats; } else { stats = scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); } @@ -46,35 +53,45 @@ public AggRouterHttpRequestStats( } public ScatterGatherStats getScatterGatherStatsForStore(String storeName) { - return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); + if (isStoreStatsEnabled) { + return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); + } else { + return totalScatterGatherStats; + } + } + + private void recordStoreStats(String storeName, Consumer statsConsumer) { + if (isStoreStatsEnabled) { + statsConsumer.accept(getStoreStats(storeName)); + } } public void recordRequest(String storeName) { totalStats.recordRequest(); - getStoreStats(storeName).recordRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequest); } public void recordHealthyRequest(String storeName, double latency) { totalStats.recordHealthyRequest(latency); - getStoreStats(storeName).recordHealthyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency)); } public void recordUnhealthyRequest(String storeName) { totalStats.recordUnhealthyRequest(); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnhealthyRequest); } } public void recordUnavailableReplicaStreamingRequest(String storeName) { totalStats.recordUnavailableReplicaStreamingRequest(); - getStoreStats(storeName).recordUnavailableReplicaStreamingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest); } public void recordUnhealthyRequest(String storeName, double latency) { totalStats.recordUnhealthyRequest(latency); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency)); } } @@ -86,12 +103,12 @@ public void recordUnhealthyRequest(String storeName, double latency) { */ public void recordReadQuotaUsage(String storeName, int quotaUsage) { totalStats.recordReadQuotaUsage(quotaUsage); - getStoreStats(storeName).recordReadQuotaUsage(quotaUsage); + recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage)); } public void recordTardyRequest(String storeName, double latency) { totalStats.recordTardyRequest(latency); - getStoreStats(storeName).recordTardyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency)); } /** @@ -103,79 +120,79 @@ public void recordTardyRequest(String storeName, double latency) { */ public void recordThrottledRequest(String storeName) { totalStats.recordThrottledRequest(); - getStoreStats(storeName).recordThrottledRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordThrottledRequest); } public void recordThrottledRequest(String storeName, double latency) { totalStats.recordThrottledRequest(latency); - getStoreStats(storeName).recordThrottledRequest(latency); + recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency)); } public void recordBadRequest(String storeName) { totalStats.recordBadRequest(); if (storeName != null) { - getStoreStats(storeName).recordBadRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordBadRequest); } } public void recordBadRequestKeyCount(String storeName, int keyCount) { totalStats.recordBadRequestKeyCount(keyCount); if (storeName != null) { - getStoreStats(storeName).recordBadRequestKeyCount(keyCount); + recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount)); } } public void recordRequestThrottledByRouterCapacity(String storeName) { totalStats.recordRequestThrottledByRouterCapacity(); if (storeName != null) { - getStoreStats(storeName).recordRequestThrottledByRouterCapacity(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity); } } public void recordErrorRetryCount(String storeName) { totalStats.recordErrorRetryCount(); if (storeName != null) { - getStoreStats(storeName).recordErrorRetryCount(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount); } } public void recordFanoutRequestCount(String storeName, int count) { totalStats.recordFanoutRequestCount(count); - getStoreStats(storeName).recordFanoutRequestCount(count); + recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count)); } public void recordLatency(String storeName, double latency) { totalStats.recordLatency(latency); - getStoreStats(storeName).recordLatency(latency); + recordStoreStats(storeName, stats -> stats.recordLatency(latency)); } public void recordResponseWaitingTime(String storeName, double waitingTime) { totalStats.recordResponseWaitingTime(waitingTime); - getStoreStats(storeName).recordResponseWaitingTime(waitingTime); + recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime)); } public void recordRequestSize(String storeName, double keySize) { totalStats.recordRequestSize(keySize); - getStoreStats(storeName).recordRequestSize(keySize); + recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize)); } public void recordCompressedResponseSize(String storeName, double compressedResponseSize) { totalStats.recordCompressedResponseSize(compressedResponseSize); - getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize); + recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize)); } public void recordResponseSize(String storeName, double valueSize) { totalStats.recordResponseSize(valueSize); - getStoreStats(storeName).recordResponseSize(valueSize); + recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize)); } public void recordDecompressionTime(String storeName, double decompressionTime) { totalStats.recordDecompressionTime(decompressionTime); - getStoreStats(storeName).recordDecompressionTime(decompressionTime); + recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime)); } public void recordQuota(String storeName, double quota) { - getStoreStats(storeName).recordQuota(quota); + recordStoreStats(storeName, stats -> stats.recordQuota(quota)); } public void recordTotalQuota(double totalQuota) { @@ -184,17 +201,17 @@ public void recordTotalQuota(double totalQuota) { public void recordFindUnhealthyHostRequest(String storeName) { totalStats.recordFindUnhealthyHostRequest(); - getStoreStats(storeName).recordFindUnhealthyHostRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest); } public void recordResponse(String storeName) { totalStats.recordResponse(); - getStoreStats(storeName).recordResponse(); + recordStoreStats(storeName, RouterHttpRequestStats::recordResponse); } public void recordMetaStoreShadowRead(String storeName) { totalStats.recordMetaStoreShadowRead(); - getStoreStats(storeName).recordMetaStoreShadowRead(); + recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead); } private class AggScatterGatherStats extends ScatterGatherStats { @@ -234,47 +251,47 @@ public long getTotalRetriesError() { public void recordKeyNum(String storeName, int keyNum) { totalStats.recordKeyNum(keyNum); - getStoreStats(storeName).recordKeyNum(keyNum); + recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum)); } public void recordRequestUsage(String storeName, int usage) { totalStats.recordRequestUsage(usage); - getStoreStats(storeName).recordRequestUsage(usage); + recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage)); } public void recordMultiGetFallback(String storeName, int keyCount) { totalStats.recordMultiGetFallback(keyCount); - getStoreStats(storeName).recordMultiGetFallback(keyCount); + recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount)); } public void recordRequestParsingLatency(String storeName, double latency) { totalStats.recordRequestParsingLatency(latency); - getStoreStats(storeName).recordRequestParsingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency)); } public void recordRequestRoutingLatency(String storeName, double latency) { totalStats.recordRequestRoutingLatency(latency); - getStoreStats(storeName).recordRequestRoutingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency)); } public void recordUnavailableRequest(String storeName) { totalStats.recordUnavailableRequest(); - getStoreStats(storeName).recordUnavailableRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest); } public void recordDelayConstraintAbortedRetryRequest(String storeName) { totalStats.recordDelayConstraintAbortedRetryRequest(); - getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest); } public void recordSlowRouteAbortedRetryRequest(String storeName) { totalStats.recordSlowRouteAbortedRetryRequest(); - getStoreStats(storeName).recordSlowRouteAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest); } public void recordRetryRouteLimitAbortedRetryRequest(String storeName) { totalStats.recordRetryRouteLimitAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest); } public void recordKeySize(String storeName, long keySize) { @@ -283,26 +300,26 @@ public void recordKeySize(String storeName, long keySize) { public void recordAllowedRetryRequest(String storeName) { totalStats.recordAllowedRetryRequest(); - getStoreStats(storeName).recordAllowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest); } public void recordDisallowedRetryRequest(String storeName) { totalStats.recordDisallowedRetryRequest(); - getStoreStats(storeName).recordDisallowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest); } public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) { totalStats.recordNoAvailableReplicaAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest); } public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) { totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck(); - getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck); } public void recordRetryDelay(String storeName, double delay) { totalStats.recordRetryDelay(delay); - getStoreStats(storeName).recordRetryDelay(delay); + recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay)); } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java index 3490d97483..0e45af9545 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java @@ -4,6 +4,8 @@ import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.stats.AggRouterHttpRequestStats; import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.tehuti.TehutiException; import io.tehuti.metrics.MetricsRepository; import org.mockito.Mockito; import org.testng.Assert; @@ -76,4 +78,37 @@ public void testProfilingMetrics() { Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3); Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4); } + + @Test + public void testDisableMultiGetStoreMetrics() { + AggRouterHttpRequestStats multiGetStats = + new AggRouterHttpRequestStats(metricsRepository, RequestType.MULTI_GET, storeMetadataRepository, true); + AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats( + metricsRepository, + RequestType.MULTI_GET_STREAMING, + storeMetadataRepository, + true); + String storeName = Utils.getUniqueString("test-store"); + multiGetStats.recordRequest(storeName); + streamingMultiGetStats.recordRequest(storeName); + multiGetStats.recordHealthyRequest(storeName, 10); + streamingMultiGetStats.recordHealthyRequest(storeName, 10); + // Total stats should exist for streaming and non-streaming multi-get + Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10); + // Store level stats should only exist for streaming multi-get + Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals( + (int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(), + 10); + TehutiException exception = + Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + exception = Assert.expectThrows( + TehutiException.class, + () -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + } }