From 43aabfc720912a2773fac91e4061460b335d2a73 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 2 Oct 2024 11:13:36 +0300 Subject: [PATCH] [fix][broker] Fix out-of-order issues with ConsistentHashingStickyKeyConsumerSelector (#23327) (cherry picked from commit adb9014dbac21afdfb5fc252ac38e07ed2d6b19c) --- ...stentHashingStickyKeyConsumerSelector.java | 104 ++--- .../service/ConsumerIdentityWrapper.java | 70 ++++ .../service/ConsumerNameIndexTracker.java | 136 +++++++ ...tHashingStickyKeyConsumerSelectorTest.java | 366 +++++++++++++++++- .../service/ConsumerIdentityWrapperTest.java | 68 ++++ .../service/ConsumerNameIndexTrackerTest.java | 157 ++++++++ ...ckyKeyDispatcherMultipleConsumersTest.java | 9 +- .../org/apache/pulsar/client/api/Range.java | 11 +- 8 files changed, 853 insertions(+), 68 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index b2b2b512c8cfc..1ae9a6ff96b7d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -18,10 +18,8 @@ */ package org.apache.pulsar.broker.service; -import com.google.common.collect.Lists; import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedHashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -44,7 +42,9 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); // Consistent-Hash ring - private final NavigableMap> hashRing; + private final NavigableMap hashRing; + // Tracks the used consumer name indexes for each consumer name + private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker(); private final int numberOfPoints; @@ -57,21 +57,20 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) { public CompletableFuture addConsumer(Consumer consumer) { rwLock.writeLock().lock(); try { + ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer); // Insert multiple points on the hash ring for every consumer // The points are deterministically added based on the hash of the consumer name for (int i = 0; i < numberOfPoints; i++) { - int hash = calculateHashForConsumerAndIndex(consumer, i); - hashRing.compute(hash, (k, v) -> { - if (v == null) { - return Lists.newArrayList(consumer); - } else { - if (!v.contains(consumer)) { - v.add(consumer); - v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo)); - } - return v; - } - }); + int consumerNameIndex = + consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper); + int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i); + // When there's a collision, the new consumer will replace the old one. + // This is a rare case, and it is acceptable to replace the old consumer since there + // are multiple points for each consumer. This won't affect the overall distribution significantly. + ConsumerIdentityWrapper removed = hashRing.put(hash, consumerIdentityWrapper); + if (removed != null) { + consumerNameIndexTracker.decreaseConsumerRefCount(removed); + } } return CompletableFuture.completedFuture(null); } finally { @@ -79,8 +78,19 @@ public CompletableFuture addConsumer(Consumer consumer) { } } - private static int calculateHashForConsumerAndIndex(Consumer consumer, int index) { - String key = consumer.consumerName() + KEY_SEPARATOR + index; + /** + * Calculate the hash for a consumer and hash ring point. + * The hash is calculated based on the consumer name, consumer name index, and hash ring point index. + * The resulting hash is used as the key to insert the consumer into the hash ring. + * + * @param consumer the consumer + * @param consumerNameIndex the index of the consumer name + * @param hashRingPointIndex the index of the hash ring point + * @return the hash value + */ + private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex, + int hashRingPointIndex) { + String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex; return Murmur3_32Hash.getInstance().makeHash(key.getBytes()); } @@ -88,20 +98,16 @@ private static int calculateHashForConsumerAndIndex(Consumer consumer, int index public void removeConsumer(Consumer consumer) { rwLock.writeLock().lock(); try { - // Remove all the points that were added for this consumer - for (int i = 0; i < numberOfPoints; i++) { - int hash = calculateHashForConsumerAndIndex(consumer, i); - hashRing.compute(hash, (k, v) -> { - if (v == null) { - return null; - } else { - v.removeIf(c -> c.equals(consumer)); - if (v.isEmpty()) { - v = null; - } - return v; + ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer); + int consumerNameIndex = consumerNameIndexTracker.getTrackedIndex(consumerIdentityWrapper); + if (consumerNameIndex > -1) { + // Remove all the points that were added for this consumer + for (int i = 0; i < numberOfPoints; i++) { + int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i); + if (hashRing.remove(hash, consumerIdentityWrapper)) { + consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper); } - }); + } } } finally { rwLock.writeLock().unlock(); @@ -115,16 +121,13 @@ public Consumer select(int hash) { if (hashRing.isEmpty()) { return null; } - - List consumerList; - Map.Entry> ceilingEntry = hashRing.ceilingEntry(hash); + Map.Entry ceilingEntry = hashRing.ceilingEntry(hash); if (ceilingEntry != null) { - consumerList = ceilingEntry.getValue(); + return ceilingEntry.getValue().consumer; } else { - consumerList = hashRing.firstEntry().getValue(); + // Handle wrap-around in the hash ring, return the first consumer + return hashRing.firstEntry().getValue().consumer; } - - return consumerList.get(hash % consumerList.size()); } finally { rwLock.readLock().unlock(); } @@ -132,16 +135,27 @@ public Consumer select(int hash) { @Override public Map> getConsumerKeyHashRanges() { - Map> result = new LinkedHashMap<>(); + Map> result = new IdentityHashMap<>(); rwLock.readLock().lock(); try { + if (hashRing.isEmpty()) { + return result; + } int start = 0; - for (Map.Entry> entry: hashRing.entrySet()) { - for (Consumer consumer: entry.getValue()) { - result.computeIfAbsent(consumer, key -> new ArrayList<>()) - .add(Range.of(start, entry.getKey())); - } - start = entry.getKey() + 1; + int lastKey = 0; + for (Map.Entry entry: hashRing.entrySet()) { + Consumer consumer = entry.getValue().consumer; + result.computeIfAbsent(consumer, key -> new ArrayList<>()) + .add(Range.of(start, entry.getKey())); + lastKey = entry.getKey(); + start = lastKey + 1; + } + // Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key + // to the maximum value of the hash range + Consumer firstConsumer = hashRing.firstEntry().getValue().consumer; + List ranges = result.get(firstConsumer); + if (lastKey != Integer.MAX_VALUE - 1) { + ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1)); } } finally { rwLock.readLock().unlock(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java new file mode 100644 index 0000000000000..2aae1d9b0622e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +/** + * A wrapper class for a Consumer instance that provides custom implementations + * of equals and hashCode methods. The equals method returns true if and only if + * the compared instance is the same instance. + * + *

The reason for this class is the custom implementation of {@link Consumer#equals(Object)}. + * Using this wrapper class will be useful in use cases where it's necessary to match a key + * in a map by instance or a value in a set by instance.

+ */ +class ConsumerIdentityWrapper { + final Consumer consumer; + + public ConsumerIdentityWrapper(Consumer consumer) { + this.consumer = consumer; + } + + /** + * Compares this wrapper to the specified object. The result is true if and only if + * the argument is not null and is a ConsumerIdentityWrapper object that wraps + * the same Consumer instance. + * + * @param obj the object to compare this ConsumerIdentityWrapper against + * @return true if the given object represents a ConsumerIdentityWrapper + * equivalent to this wrapper, false otherwise + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof ConsumerIdentityWrapper) { + ConsumerIdentityWrapper other = (ConsumerIdentityWrapper) obj; + return consumer == other.consumer; + } + return false; + } + + /** + * Returns a hash code for this wrapper. The hash code is computed based on + * the wrapped Consumer instance. + * + * @return a hash code value for this object + */ + @Override + public int hashCode() { + return consumer.hashCode(); + } + + @Override + public String toString() { + return consumer.toString(); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java new file mode 100644 index 0000000000000..1f93313ab1b71 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.commons.lang3.mutable.MutableInt; +import org.roaringbitmap.RoaringBitmap; + +/** + * Tracks the used consumer name indexes for each consumer name. + * This is used by {@link ConsistentHashingStickyKeyConsumerSelector} to get a unique "consumer name index" + * for each consumer name. It is useful when there are multiple consumers with the same name, but they are + * different consumers. The purpose of the index is to prevent collisions in the hash ring. + * + * The consumer name index serves as an additional key for the hash ring assignment. The logic keeps track of + * used "index slots" for each consumer name and assigns the first unused index when a new consumer is added. + * This approach minimizes hash collisions due to using the same consumer name. + * + * An added benefit of this tracking approach is that a consumer that leaves and then rejoins immediately will get the + * same index and therefore the same assignments in the hash ring. This improves stability since the hash assignment + * changes are minimized over time, although a better solution would be to avoid reusing the same consumer name + * in the first place. + * + * When a consumer is removed, the index is deallocated. RoaringBitmap is used to keep track of the used indexes. + * The data structure to track a consumer name is removed when the reference count of the consumer name is zero. + * + * This class is not thread-safe and should be used in a synchronized context in the caller. + */ +@NotThreadSafe +class ConsumerNameIndexTracker { + // tracks the used index slots for each consumer name + private final Map consumerNameIndexSlotsMap = new HashMap<>(); + // tracks the active consumer entries + private final Map consumerEntries = new HashMap<>(); + + // Represents a consumer entry in the tracker, including the consumer name, index, and reference count. + record ConsumerEntry(String consumerName, int nameIndex, MutableInt refCount) { + } + + /* + * Tracks the used indexes for a consumer name using a RoaringBitmap. + * A specific index slot is used when the bit is set. + * When all bits are cleared, the customer name can be removed from tracking. + */ + static class ConsumerNameIndexSlots { + private RoaringBitmap indexSlots = new RoaringBitmap(); + + public int allocateIndexSlot() { + // find the first index that is not set, if there is no such index, add a new one + int index = (int) indexSlots.nextAbsentValue(0); + if (index == -1) { + index = indexSlots.getCardinality(); + } + indexSlots.add(index); + return index; + } + + public boolean deallocateIndexSlot(int index) { + indexSlots.remove(index); + return indexSlots.isEmpty(); + } + } + + /* + * Adds a reference to the consumer and returns the index assigned to this consumer. + */ + public int increaseConsumerRefCountAndReturnIndex(ConsumerIdentityWrapper wrapper) { + ConsumerEntry entry = consumerEntries.computeIfAbsent(wrapper, k -> { + String consumerName = wrapper.consumer.consumerName(); + return new ConsumerEntry(consumerName, allocateConsumerNameIndex(consumerName), new MutableInt(0)); + }); + entry.refCount.increment(); + return entry.nameIndex; + } + + private int allocateConsumerNameIndex(String consumerName) { + return getConsumerNameIndexBitmap(consumerName).allocateIndexSlot(); + } + + private ConsumerNameIndexSlots getConsumerNameIndexBitmap(String consumerName) { + return consumerNameIndexSlotsMap.computeIfAbsent(consumerName, k -> new ConsumerNameIndexSlots()); + } + + /* + * Decreases the reference count of the consumer and removes the consumer name from tracking if the ref count is + * zero. + */ + public void decreaseConsumerRefCount(ConsumerIdentityWrapper removed) { + ConsumerEntry consumerEntry = consumerEntries.get(removed); + int refCount = consumerEntry.refCount.decrementAndGet(); + if (refCount == 0) { + deallocateConsumerNameIndex(consumerEntry.consumerName, consumerEntry.nameIndex); + consumerEntries.remove(removed, consumerEntry); + } + } + + private void deallocateConsumerNameIndex(String consumerName, int index) { + if (getConsumerNameIndexBitmap(consumerName).deallocateIndexSlot(index)) { + consumerNameIndexSlotsMap.remove(consumerName); + } + } + + /* + * Returns the currently tracked index for the consumer. + */ + public int getTrackedIndex(ConsumerIdentityWrapper wrapper) { + ConsumerEntry consumerEntry = consumerEntries.get(wrapper); + return consumerEntry != null ? consumerEntry.nameIndex : -1; + } + + int getTrackedConsumerNamesCount() { + return consumerNameIndexSlotsMap.size(); + } + + int getTrackedConsumersCount() { + return consumerEntries.size(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index 48311c57338b5..04aafc49b47e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -18,19 +18,27 @@ */ package org.apache.pulsar.broker.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; +import org.assertj.core.data.Offset; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,7 +48,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { @Test public void testConsumerSelect() throws ConsumerAssignException { - ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); String key1 = "anyKey"; Assert.assertNull(selector.select(key1.getBytes())); @@ -146,31 +154,115 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3); List consumerName = Arrays.asList("consumer1", "consumer2", "consumer3"); List consumers = new ArrayList<>(); + long id=0; for (String s : consumerName) { - Consumer consumer = mock(Consumer.class); - when(consumer.consumerName()).thenReturn(s); + Consumer consumer = createMockConsumer(s, s, id++); selector.addConsumer(consumer); consumers.add(consumer); } + + // check that results are the same when called multiple times + assertThat(selector.getConsumerKeyHashRanges()) + .containsExactlyEntriesOf(selector.getConsumerKeyHashRanges()); + Map> expectedResult = new HashMap<>(); + assertThat(consumers.get(0).consumerName()).isEqualTo("consumer1"); expectedResult.put(consumers.get(0), Arrays.asList( - Range.of(119056335, 242013991), - Range.of(722195657, 1656011842), - Range.of(1707482098, 1914695766))); + Range.of(95615213, 440020355), + Range.of(440020356, 455987436), + Range.of(1189794593, 1264144431))); + assertThat(consumers.get(1).consumerName()).isEqualTo("consumer2"); expectedResult.put(consumers.get(1), Arrays.asList( - Range.of(0, 90164503), - Range.of(90164504, 119056334), - Range.of(382436668, 722195656))); + Range.of(939655188, 1189794592), + Range.of(1314727625, 1977451233), + Range.of(1977451234, 2016237253))); + assertThat(consumers.get(2).consumerName()).isEqualTo("consumer3"); expectedResult.put(consumers.get(2), Arrays.asList( - Range.of(242013992, 242377547), - Range.of(242377548, 382436667), - Range.of(1656011843, 1707482097))); - for (Map.Entry> entry : selector.getConsumerKeyHashRanges().entrySet()) { - System.out.println(entry.getValue()); - Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey())); - expectedResult.remove(entry.getKey()); + Range.of(0, 95615212), + Range.of(455987437, 939655187), + Range.of(1264144432, 1314727624), + Range.of(2016237254, 2147483646))); + Map> consumerKeyHashRanges = selector.getConsumerKeyHashRanges(); + assertThat(consumerKeyHashRanges).containsExactlyInAnyOrderEntriesOf(expectedResult); + + // check that ranges are continuous and cover the whole range + List allRanges = + consumerKeyHashRanges.values().stream().flatMap(List::stream).sorted().collect(Collectors.toList()); + Range previousRange = null; + for (Range range : allRanges) { + if (previousRange != null) { + assertThat(range.getStart()).isEqualTo(previousRange.getEnd() + 1); + } + previousRange = range; + } + assertThat(allRanges.stream().mapToInt(r -> r.getEnd() - r.getStart() + 1).sum()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() + throws BrokerServiceException.ConsumerAssignException { + ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); + List consumers = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + // use the same name for all consumers, use toString to distinguish them + Consumer consumer = createMockConsumer("consumer", String.format("index %02d", i), i); + selector.addConsumer(consumer); + consumers.add(consumer); } - Assert.assertEquals(expectedResult.size(), 0); + printConsumerRangesStats(selector); + + int totalSelections = 10000; + + Map consumerSelectionCount = new HashMap<>(); + for (int i = 0; i < totalSelections; i++) { + Consumer selectedConsumer = selector.select(("key " + i).getBytes(StandardCharsets.UTF_8)); + consumerSelectionCount.computeIfAbsent(selectedConsumer, c -> new MutableInt()).increment(); + } + + printSelectionCountStats(consumerSelectionCount); + + int averageCount = totalSelections / consumers.size(); + int allowedVariance = (int) (0.2d * averageCount); + System.out.println("averageCount: " + averageCount + " allowedVariance: " + allowedVariance); + + for (Map.Entry entry : consumerSelectionCount.entrySet()) { + assertThat(entry.getValue().intValue()).describedAs("consumer: %s", entry.getKey()) + .isCloseTo(averageCount, Offset.offset(allowedVariance)); + } + + consumers.forEach(selector::removeConsumer); + assertThat(selector.getConsumerKeyHashRanges()).isEmpty(); + } + + private static void printSelectionCountStats(Map consumerSelectionCount) { + int totalSelections = consumerSelectionCount.values().stream().mapToInt(MutableInt::intValue).sum(); + consumerSelectionCount.entrySet().stream() + .sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString))) + .forEach(entry -> System.out.println( + String.format("consumer: %s got selected %d times. ratio: %.2f%%", entry.getKey(), + entry.getValue().intValue(), + ((double) entry.getValue().intValue() / totalSelections) * 100.0d))); + } + + private static void printConsumerRangesStats(ConsistentHashingStickyKeyConsumerSelector selector) { + selector.getConsumerKeyHashRanges().entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), + entry.getValue().stream().mapToInt(r -> r.getEnd() - r.getStart() + 1).sum())) + .sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString))) + .forEach(entry -> System.out.println( + String.format("consumer: %s total ranges size: %d ratio: %.2f%%", entry.getKey(), + entry.getValue(), + ((double) entry.getValue() / (Integer.MAX_VALUE - 1)) * 100.0d))); + } + + private static Consumer createMockConsumer(String consumerName, String toString, long id) { + // without stubOnly, the mock will record method invocations and run into OOME + Consumer consumer = mock(Consumer.class, Mockito.withSettings().stubOnly()); + when(consumer.consumerName()).thenReturn(consumerName); + when(consumer.getPriorityLevel()).thenReturn(0); + when(consumer.toString()).thenReturn(toString); + when(consumer.consumerId()).thenReturn(id); + return consumer; } // reproduces https://github.com/apache/pulsar/issues/22050 @@ -215,5 +307,243 @@ public void shouldRemoveConsumersFromConsumerKeyHashRanges() { consumers.forEach(selector::removeConsumer); // then there should be no mapping remaining Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0); + // when consumers are removed again, should not fail + consumers.forEach(selector::removeConsumer); + } + + @Test + public void testShouldNotChangeSelectedConsumerWhenConsumerIsRemoved() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 100; + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + int hashRangeSize = Integer.MAX_VALUE; + int validationPointCount = 200; + int increment = hashRangeSize / (validationPointCount + 1); + List selectedConsumerBeforeRemoval = new ArrayList<>(); + + for (int i = 0; i < validationPointCount; i++) { + selectedConsumerBeforeRemoval.add(selector.select(i * increment)); + } + + for (int i = 0; i < validationPointCount; i++) { + Consumer selected = selector.select(i * increment); + Consumer expected = selectedConsumerBeforeRemoval.get(i); + assertThat(selected.consumerId()).as("validationPoint %d", i).isEqualTo(expected.consumerId()); + } + + Set removedConsumers = new HashSet<>(); + for (Consumer removedConsumer : consumers) { + selector.removeConsumer(removedConsumer); + removedConsumers.add(removedConsumer); + for (int i = 0; i < validationPointCount; i++) { + int hash = i * increment; + Consumer selected = selector.select(hash); + Consumer expected = selectedConsumerBeforeRemoval.get(i); + if (!removedConsumers.contains(expected)) { + assertThat(selected.consumerId()).as("validationPoint %d, removed %s, hash %d ranges %s", i, + removedConsumer.toString(), hash, selector.getConsumerKeyHashRanges()).isEqualTo(expected.consumerId()); + } + } + } + } + + @Test + public void testShouldNotChangeSelectedConsumerWhenConsumerIsRemovedCheckHashRanges() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 25; + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + Map> expected = selector.getConsumerKeyHashRanges(); + assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected); + System.out.println(expected); + + for (Consumer removedConsumer : consumers) { + selector.removeConsumer(removedConsumer); + for (Map.Entry> entry : expected.entrySet()) { + if (entry.getKey() == removedConsumer) { + continue; + } + for (Range range : entry.getValue()) { + Consumer rangeStartConsumer = selector.select(range.getStart()); + assertThat(rangeStartConsumer).as("removed %s, range %s", removedConsumer, range) + .isEqualTo(entry.getKey()); + Consumer rangeEndConsumer = selector.select(range.getEnd()); + assertThat(rangeEndConsumer).as("removed %s, range %s", removedConsumer, range) + .isEqualTo(entry.getKey()); + assertThat(rangeStartConsumer).isSameAs(rangeEndConsumer); + } + } + expected = selector.getConsumerKeyHashRanges(); + } + } + + @Test + public void testShouldNotChangeSelectedConsumerUnnecessarilyWhenConsumerIsAddedCheckHashRanges() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 25; + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + Map> expected = selector.getConsumerKeyHashRanges(); + assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected); + + for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; i++) { + final Consumer addedConsumer = createMockConsumer(consumerName, "index " + i, i); + selector.addConsumer(addedConsumer); + for (Map.Entry> entry : expected.entrySet()) { + if (entry.getKey() == addedConsumer) { + continue; + } + for (Range range : entry.getValue()) { + Consumer rangeStartConsumer = selector.select(range.getStart()); + if (rangeStartConsumer != addedConsumer) { + assertThat(rangeStartConsumer).as("added %s, range start %s", addedConsumer, range) + .isEqualTo(entry.getKey()); + } + Consumer rangeEndConsumer = selector.select(range.getStart()); + if (rangeEndConsumer != addedConsumer) { + assertThat(rangeEndConsumer).as("added %s, range end %s", addedConsumer, range) + .isEqualTo(entry.getKey()); + } + } + } + expected = selector.getConsumerKeyHashRanges(); + } + } + + @Test + public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 50; + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + int hashRangeSize = Integer.MAX_VALUE; + int validationPointCount = 200; + int increment = hashRangeSize / (validationPointCount + 1); + List selectedConsumerBeforeRemoval = new ArrayList<>(); + + for (int i = 0; i < validationPointCount; i++) { + selectedConsumerBeforeRemoval.add(selector.select(i * increment)); + } + + for (int i = 0; i < validationPointCount; i++) { + Consumer selected = selector.select(i * increment); + Consumer expected = selectedConsumerBeforeRemoval.get(i); + assertThat(selected.consumerId()).as("validationPoint %d", i).isEqualTo(expected.consumerId()); + } + + Set addedConsumers = new HashSet<>(); + for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; i++) { + final Consumer addedConsumer = createMockConsumer(consumerName, "index " + i, i); + selector.addConsumer(addedConsumer); + addedConsumers.add(addedConsumer); + for (int j = 0; j < validationPointCount; j++) { + int hash = j * increment; + Consumer selected = selector.select(hash); + Consumer expected = selectedConsumerBeforeRemoval.get(j); + if (!addedConsumers.contains(addedConsumer)) { + assertThat(selected.consumerId()).as("validationPoint %d, hash %d", j, hash).isEqualTo(expected.consumerId()); + } + } + } + } + + @Test + public void testShouldNotChangeMappingWhenConsumerLeavesAndRejoins() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 25; + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + Map> expected = selector.getConsumerKeyHashRanges(); + assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected); + + selector.removeConsumer(consumers.get(0)); + selector.removeConsumer(consumers.get(numOfInitialConsumers / 2)); + selector.addConsumer(consumers.get(0)); + selector.addConsumer(consumers.get(numOfInitialConsumers / 2)); + + assertThat(selector.getConsumerKeyHashRanges()).as("ranges shouldn't change").containsExactlyInAnyOrderEntriesOf(expected); + } + + @Test + public void testConsumersReconnect() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "consumer"; + final int numOfInitialConsumers = 50; + final int validationPointCount = 200; + final List pointsToTest = pointsToTest(validationPointCount); + List consumers = new ArrayList<>(); + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); + consumers.add(consumer); + selector.addConsumer(consumer); + } + + // Mark original results. + List selectedConsumersBeforeRemove = new ArrayList<>(); + for (int i = 0; i < validationPointCount; i++) { + int point = pointsToTest.get(i); + selectedConsumersBeforeRemove.add(selector.select(point)); + } + + // All consumers leave (in any order) + List randomOrderConsumers = new ArrayList<>(consumers); + Collections.shuffle(randomOrderConsumers); + for (Consumer c : randomOrderConsumers) { + selector.removeConsumer(c); + } + + // All consumers reconnect in the same order as originally + for (Consumer c : consumers) { + selector.addConsumer(c); + } + + // Check that the same consumers are selected as before + for (int j = 0; j < validationPointCount; j++) { + int point = pointsToTest.get(j); + Consumer selected = selector.select(point); + Consumer expected = selectedConsumersBeforeRemove.get(j); + assertThat(selected.consumerId()).as("validationPoint %d, hash %d", j, point).isEqualTo(expected.consumerId()); + } + } + + private List pointsToTest(int validationPointCount) { + List res = new ArrayList<>(); + int hashRangeSize = Integer.MAX_VALUE; + final int increment = hashRangeSize / (validationPointCount + 1); + for (int i = 0; i < validationPointCount; i++) { + res.add(i * increment); + } + return res; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java new file mode 100644 index 0000000000000..75c8e6db5d2a0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ConsumerIdentityWrapperTest { + private static Consumer mockConsumer() { + return mockConsumer("consumer"); + } + + private static Consumer mockConsumer(String consumerName) { + Consumer consumer = mock(Consumer.class); + when(consumer.consumerName()).thenReturn(consumerName); + return consumer; + } + + @Test + public void testEquals() { + Consumer consumer = mockConsumer(); + assertEquals(new ConsumerIdentityWrapper(consumer), new ConsumerIdentityWrapper(consumer)); + } + + @Test + public void testHashCode() { + Consumer consumer = mockConsumer(); + assertEquals(new ConsumerIdentityWrapper(consumer).hashCode(), + new ConsumerIdentityWrapper(consumer).hashCode()); + } + + @Test + public void testEqualsAndHashCode() { + Consumer consumer1 = mockConsumer(); + Consumer consumer2 = mockConsumer(); + ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1); + ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer1); + ConsumerIdentityWrapper wrapper3 = new ConsumerIdentityWrapper(consumer2); + + // Test equality + assertEquals(wrapper1, wrapper2); + assertNotEquals(wrapper1, wrapper3); + + // Test hash code + assertEquals(wrapper1.hashCode(), wrapper2.hashCode()); + assertNotEquals(wrapper1.hashCode(), wrapper3.hashCode()); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java new file mode 100644 index 0000000000000..0f18ecce2ffb4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ConsumerNameIndexTrackerTest { + private ConsumerNameIndexTracker tracker; + + @BeforeMethod + public void setUp() { + tracker = new ConsumerNameIndexTracker(); + } + + private static Consumer mockConsumer() { + return mockConsumer("consumer"); + } + + + private static Consumer mockConsumer(String consumerName) { + Consumer consumer = mock(Consumer.class); + when(consumer.consumerName()).thenReturn(consumerName); + return consumer; + } + + @Test + public void testIncreaseConsumerRefCountAndReturnIndex() { + Consumer consumer1 = mockConsumer(); + Consumer consumer2 = mockConsumer(); + ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1); + ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer2); + int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1); + int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2); + assertNotEquals(index1, index2); + assertEquals(index1, tracker.getTrackedIndex(wrapper1)); + assertEquals(index2, tracker.getTrackedIndex(wrapper2)); + } + + @Test + public void testTrackingReturnsStableIndexWhenRemovedAndAddedInSameOrder() { + List consumerIdentityWrappers = + IntStream.range(0, 100).mapToObj(i -> mockConsumer()).map(ConsumerIdentityWrapper::new).toList(); + Map trackedIndexes = + consumerIdentityWrappers.stream().collect(Collectors.toMap( + wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper))); + // stop tracking every other consumer + for (int i = 0; i < consumerIdentityWrappers.size(); i++) { + if (i % 2 == 0) { + tracker.decreaseConsumerRefCount(consumerIdentityWrappers.get(i)); + } + } + // check that others are tracked + for (int i = 0; i < consumerIdentityWrappers.size(); i++) { + ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i); + int trackedIndex = tracker.getTrackedIndex(wrapper); + assertEquals(trackedIndex, i % 2 == 0 ? -1 : trackedIndexes.get(wrapper)); + } + // check that new consumers are tracked with the same index + for (int i = 0; i < consumerIdentityWrappers.size(); i++) { + ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i); + if (i % 2 == 0) { + int trackedIndex = tracker.increaseConsumerRefCountAndReturnIndex(wrapper); + assertEquals(trackedIndex, trackedIndexes.get(wrapper)); + } + } + // check that all consumers are tracked with the original indexes + for (int i = 0; i < consumerIdentityWrappers.size(); i++) { + ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i); + int trackedIndex = tracker.getTrackedIndex(wrapper); + assertEquals(trackedIndex, trackedIndexes.get(wrapper)); + } + } + + @Test + public void testTrackingMultipleTimes() { + List consumerIdentityWrappers = + IntStream.range(0, 100).mapToObj(i -> mockConsumer()).map(ConsumerIdentityWrapper::new).toList(); + Map trackedIndexes = + consumerIdentityWrappers.stream().collect(Collectors.toMap( + wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper))); + Map trackedIndexes2 = + consumerIdentityWrappers.stream().collect(Collectors.toMap( + wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper))); + assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(1); + assertThat(trackedIndexes).containsExactlyInAnyOrderEntriesOf(trackedIndexes2); + consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper)); + for (ConsumerIdentityWrapper wrapper : consumerIdentityWrappers) { + int trackedIndex = tracker.getTrackedIndex(wrapper); + assertEquals(trackedIndex, trackedIndexes.get(wrapper)); + } + consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper)); + assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0); + assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0); + } + + @Test + public void testDecreaseConsumerRefCount() { + Consumer consumer1 = mockConsumer(); + ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1); + int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1); + assertNotEquals(index1, -1); + tracker.decreaseConsumerRefCount(wrapper1); + assertEquals(tracker.getTrackedIndex(wrapper1), -1); + } + + @Test + public void testGetTrackedIndex() { + Consumer consumer1 = mockConsumer(); + Consumer consumer2 = mockConsumer(); + ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1); + ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer2); + int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1); + int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2); + assertEquals(index1, tracker.getTrackedIndex(wrapper1)); + assertEquals(index2, tracker.getTrackedIndex(wrapper2)); + } + + @Test + public void testTrackingMultipleNames() { + List consumerIdentityWrappers = + IntStream.range(0, 100).mapToObj(i -> mockConsumer("consumer" + i)).map(ConsumerIdentityWrapper::new) + .toList(); + consumerIdentityWrappers.forEach(wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper)); + assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(100); + assertThat(tracker.getTrackedConsumersCount()).isEqualTo(100); + consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper)); + assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0); + assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 03eb01e958a31..3fe3fef329943 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; @@ -262,7 +263,7 @@ public void testSkipRedeliverTemporally() { redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1"))); final List readEntries = new ArrayList<>(); readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); - readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key22"))); + readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2"))); try { Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits"); @@ -358,7 +359,7 @@ public void testMessageRedelivery() throws Exception { // Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2 final List allEntries = new ArrayList<>(); - allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key22"))); + allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2"))); allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1"))); allEntries.forEach(entry -> ((EntryImpl) entry).retain()); @@ -459,8 +460,8 @@ public void testMessageRedelivery() throws Exception { persistentDispatcher.readMoreEntries(); } - assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1); - assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2); + assertThat(actualEntriesToConsumer1).containsExactlyElementsOf(expectedEntriesToConsumer1); + assertThat(actualEntriesToConsumer2).containsExactlyElementsOf(expectedEntriesToConsumer2); allEntries.forEach(entry -> entry.release()); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java index 4437ffc4ac6a2..488083f484b76 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java @@ -27,7 +27,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class Range { +public class Range implements Comparable { private final int start; private final int end; @@ -84,4 +84,13 @@ public int hashCode() { public String toString() { return "[" + start + ", " + end + "]"; } + + @Override + public int compareTo(Range o) { + int result = Integer.compare(start, o.start); + if (result == 0) { + result = Integer.compare(end, o.end); + } + return result; + } }