diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java index 517f1485e7..da777d5c21 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java @@ -8,11 +8,14 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats { private final Map scatterGatherStatsMap = new VeniceConcurrentHashMap<>(); + private final boolean isStoreStatsEnabled; + private final ScatterGatherStats totalScatterGatherStats; public AggRouterHttpRequestStats( MetricsRepository metricsRepository, @@ -29,6 +32,10 @@ public AggRouterHttpRequestStats( ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) { super(metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled); + // Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still + // report to the total stats for visibility of potential old clients. + isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType); + totalScatterGatherStats = new AggScatterGatherStats(); /** * Use a setter function to bypass the restriction that the supertype constructor could not * touch member fields of current object. @@ -36,7 +43,7 @@ public AggRouterHttpRequestStats( setStatsSupplier((metricsRepo, storeName) -> { ScatterGatherStats stats; if (storeName.equals(AbstractVeniceAggStats.STORE_NAME_FOR_TOTAL_STAT)) { - stats = new AggScatterGatherStats(); + stats = totalScatterGatherStats; } else { stats = scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); } @@ -46,35 +53,45 @@ public AggRouterHttpRequestStats( } public ScatterGatherStats getScatterGatherStatsForStore(String storeName) { - return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); + if (isStoreStatsEnabled) { + return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); + } else { + return totalScatterGatherStats; + } + } + + private void recordStoreStats(String storeName, Consumer statsConsumer) { + if (isStoreStatsEnabled) { + statsConsumer.accept(getStoreStats(storeName)); + } } public void recordRequest(String storeName) { totalStats.recordRequest(); - getStoreStats(storeName).recordRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequest); } public void recordHealthyRequest(String storeName, double latency) { totalStats.recordHealthyRequest(latency); - getStoreStats(storeName).recordHealthyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency)); } public void recordUnhealthyRequest(String storeName) { totalStats.recordUnhealthyRequest(); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnhealthyRequest); } } public void recordUnavailableReplicaStreamingRequest(String storeName) { totalStats.recordUnavailableReplicaStreamingRequest(); - getStoreStats(storeName).recordUnavailableReplicaStreamingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest); } public void recordUnhealthyRequest(String storeName, double latency) { totalStats.recordUnhealthyRequest(latency); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency)); } } @@ -86,12 +103,12 @@ public void recordUnhealthyRequest(String storeName, double latency) { */ public void recordReadQuotaUsage(String storeName, int quotaUsage) { totalStats.recordReadQuotaUsage(quotaUsage); - getStoreStats(storeName).recordReadQuotaUsage(quotaUsage); + recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage)); } public void recordTardyRequest(String storeName, double latency) { totalStats.recordTardyRequest(latency); - getStoreStats(storeName).recordTardyRequest(latency); + recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency)); } /** @@ -103,79 +120,79 @@ public void recordTardyRequest(String storeName, double latency) { */ public void recordThrottledRequest(String storeName) { totalStats.recordThrottledRequest(); - getStoreStats(storeName).recordThrottledRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordThrottledRequest); } public void recordThrottledRequest(String storeName, double latency) { totalStats.recordThrottledRequest(latency); - getStoreStats(storeName).recordThrottledRequest(latency); + recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency)); } public void recordBadRequest(String storeName) { totalStats.recordBadRequest(); if (storeName != null) { - getStoreStats(storeName).recordBadRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordBadRequest); } } public void recordBadRequestKeyCount(String storeName, int keyCount) { totalStats.recordBadRequestKeyCount(keyCount); if (storeName != null) { - getStoreStats(storeName).recordBadRequestKeyCount(keyCount); + recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount)); } } public void recordRequestThrottledByRouterCapacity(String storeName) { totalStats.recordRequestThrottledByRouterCapacity(); if (storeName != null) { - getStoreStats(storeName).recordRequestThrottledByRouterCapacity(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity); } } public void recordErrorRetryCount(String storeName) { totalStats.recordErrorRetryCount(); if (storeName != null) { - getStoreStats(storeName).recordErrorRetryCount(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount); } } public void recordFanoutRequestCount(String storeName, int count) { totalStats.recordFanoutRequestCount(count); - getStoreStats(storeName).recordFanoutRequestCount(count); + recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count)); } public void recordLatency(String storeName, double latency) { totalStats.recordLatency(latency); - getStoreStats(storeName).recordLatency(latency); + recordStoreStats(storeName, stats -> stats.recordLatency(latency)); } public void recordResponseWaitingTime(String storeName, double waitingTime) { totalStats.recordResponseWaitingTime(waitingTime); - getStoreStats(storeName).recordResponseWaitingTime(waitingTime); + recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime)); } public void recordRequestSize(String storeName, double keySize) { totalStats.recordRequestSize(keySize); - getStoreStats(storeName).recordRequestSize(keySize); + recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize)); } public void recordCompressedResponseSize(String storeName, double compressedResponseSize) { totalStats.recordCompressedResponseSize(compressedResponseSize); - getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize); + recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize)); } public void recordResponseSize(String storeName, double valueSize) { totalStats.recordResponseSize(valueSize); - getStoreStats(storeName).recordResponseSize(valueSize); + recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize)); } public void recordDecompressionTime(String storeName, double decompressionTime) { totalStats.recordDecompressionTime(decompressionTime); - getStoreStats(storeName).recordDecompressionTime(decompressionTime); + recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime)); } public void recordQuota(String storeName, double quota) { - getStoreStats(storeName).recordQuota(quota); + recordStoreStats(storeName, stats -> stats.recordQuota(quota)); } public void recordTotalQuota(double totalQuota) { @@ -184,17 +201,17 @@ public void recordTotalQuota(double totalQuota) { public void recordFindUnhealthyHostRequest(String storeName) { totalStats.recordFindUnhealthyHostRequest(); - getStoreStats(storeName).recordFindUnhealthyHostRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest); } public void recordResponse(String storeName) { totalStats.recordResponse(); - getStoreStats(storeName).recordResponse(); + recordStoreStats(storeName, RouterHttpRequestStats::recordResponse); } public void recordMetaStoreShadowRead(String storeName) { totalStats.recordMetaStoreShadowRead(); - getStoreStats(storeName).recordMetaStoreShadowRead(); + recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead); } private class AggScatterGatherStats extends ScatterGatherStats { @@ -234,47 +251,47 @@ public long getTotalRetriesError() { public void recordKeyNum(String storeName, int keyNum) { totalStats.recordKeyNum(keyNum); - getStoreStats(storeName).recordKeyNum(keyNum); + recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum)); } public void recordRequestUsage(String storeName, int usage) { totalStats.recordRequestUsage(usage); - getStoreStats(storeName).recordRequestUsage(usage); + recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage)); } public void recordMultiGetFallback(String storeName, int keyCount) { totalStats.recordMultiGetFallback(keyCount); - getStoreStats(storeName).recordMultiGetFallback(keyCount); + recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount)); } public void recordRequestParsingLatency(String storeName, double latency) { totalStats.recordRequestParsingLatency(latency); - getStoreStats(storeName).recordRequestParsingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency)); } public void recordRequestRoutingLatency(String storeName, double latency) { totalStats.recordRequestRoutingLatency(latency); - getStoreStats(storeName).recordRequestRoutingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency)); } public void recordUnavailableRequest(String storeName) { totalStats.recordUnavailableRequest(); - getStoreStats(storeName).recordUnavailableRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest); } public void recordDelayConstraintAbortedRetryRequest(String storeName) { totalStats.recordDelayConstraintAbortedRetryRequest(); - getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest); } public void recordSlowRouteAbortedRetryRequest(String storeName) { totalStats.recordSlowRouteAbortedRetryRequest(); - getStoreStats(storeName).recordSlowRouteAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest); } public void recordRetryRouteLimitAbortedRetryRequest(String storeName) { totalStats.recordRetryRouteLimitAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest); } public void recordKeySize(String storeName, long keySize) { @@ -283,26 +300,26 @@ public void recordKeySize(String storeName, long keySize) { public void recordAllowedRetryRequest(String storeName) { totalStats.recordAllowedRetryRequest(); - getStoreStats(storeName).recordAllowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest); } public void recordDisallowedRetryRequest(String storeName) { totalStats.recordDisallowedRetryRequest(); - getStoreStats(storeName).recordDisallowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest); } public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) { totalStats.recordNoAvailableReplicaAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest); } public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) { totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck(); - getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck); } public void recordRetryDelay(String storeName, double delay) { totalStats.recordRetryDelay(delay); - getStoreStats(storeName).recordRetryDelay(delay); + recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay)); } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java index 3490d97483..0e45af9545 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java @@ -4,6 +4,8 @@ import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.stats.AggRouterHttpRequestStats; import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.tehuti.TehutiException; import io.tehuti.metrics.MetricsRepository; import org.mockito.Mockito; import org.testng.Assert; @@ -76,4 +78,37 @@ public void testProfilingMetrics() { Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3); Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4); } + + @Test + public void testDisableMultiGetStoreMetrics() { + AggRouterHttpRequestStats multiGetStats = + new AggRouterHttpRequestStats(metricsRepository, RequestType.MULTI_GET, storeMetadataRepository, true); + AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats( + metricsRepository, + RequestType.MULTI_GET_STREAMING, + storeMetadataRepository, + true); + String storeName = Utils.getUniqueString("test-store"); + multiGetStats.recordRequest(storeName); + streamingMultiGetStats.recordRequest(storeName); + multiGetStats.recordHealthyRequest(storeName, 10); + streamingMultiGetStats.recordHealthyRequest(storeName, 10); + // Total stats should exist for streaming and non-streaming multi-get + Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10); + // Store level stats should only exist for streaming multi-get + Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals( + (int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(), + 10); + TehutiException exception = + Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + exception = Assert.expectThrows( + TehutiException.class, + () -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + } }