diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index 624546fdff837..8a55138bd1672 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -72,6 +72,12 @@ public void update(Map bundleStats, int topk) { pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); for (var etr : bundleStats.entrySet()) { String bundle = etr.getKey(); + var stat = etr.getValue(); + + // skip zero traffic bundles + if (stat.msgThroughputIn + stat.msgThroughputOut == 0) { + continue; + } // TODO: do not filter system topic while shedding if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) { continue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 7126ccb034196..72d671aa4ca8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -528,6 +528,13 @@ public Set findBundlesForUnloading(LoadManagerContext context, var bundleData = e.stats(); double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut; + if (maxBrokerBundleThroughput == 0) { + if (debugMode) { + log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG + + " It has zero throughput.", bundle)); + } + continue; + } boolean swap = false; List minToMaxUnloads = new ArrayList<>(); double minBrokerBundleSwapThroughput = 0.0; @@ -549,6 +556,9 @@ public Set findBundlesForUnloading(LoadManagerContext context, var minBrokerBundleThroughput = minBrokerBundleData.stats().msgThroughputIn + minBrokerBundleData.stats().msgThroughputOut; + if (minBrokerBundleThroughput == 0) { + continue; + } var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput; var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput; if (maxBrokerNewThroughputTmp < maxBrokerThroughput diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 472d44df8906d..3445ab393be7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -88,14 +88,17 @@ public void testTopBundlesLoadData() { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 100000; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 500; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); NamespaceBundleStats stats3 = new NamespaceBundleStats(); stats3.msgRateIn = 10000; + stats3.msgThroughputOut = 10; bundleStats.put(bundle3, stats3); NamespaceBundleStats stats4 = new NamespaceBundleStats(); @@ -118,10 +121,12 @@ public void testSystemNamespace() { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle1, stats2); topKBundles.update(bundleStats, 2); @@ -131,6 +136,21 @@ public void testSystemNamespace() { assertEquals(top0.bundleName(), bundle1); } + @Test + public void testZeroMsgThroughputBundleStats() { + Map bundleStats = new HashMap<>(); + var topKBundles = new TopKBundles(pulsar); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + bundleStats.put(bundle1, stats1); + + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + bundleStats.put(bundle1, stats2); + + topKBundles.update(bundleStats, 2); + + assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0); + } + private void setAntiAffinityGroup() throws MetadataStoreException { LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup"); @@ -166,10 +186,12 @@ public void testIsolationPolicy() throws MetadataStoreException { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); @@ -188,10 +210,12 @@ public void testAntiAffinityGroupPolicy() throws MetadataStoreException { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); @@ -213,10 +237,12 @@ public void testLoadBalancerSheddingBundlesWithPoliciesEnabledConfig() throws Me var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index efca2880949f2..a8edf19d5f90e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -918,6 +918,26 @@ public void testBundleThroughputLargerThanOffloadThreshold() { assertEquals(counter.getLoadStd(), setupLoadStd); } + @Test + public void testZeroBundleThroughput() { + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); + var ctx = setupContext(); + var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); + for (var e : topBundlesLoadDataStore.entrySet()) { + for (var stat : e.getValue().getTopBundlesLoadData()) { + stat.stats().msgThroughputOut = 0; + stat.stats().msgThroughputIn = 0; + + } + } + var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); + } + @Test public void testTargetStdAfterTransfer() {