From 5cf69ce3e5c87cf6964d5e2b1c5d2312f239ff31 Mon Sep 17 00:00:00 2001 From: yikaifei Date: Thu, 12 Dec 2024 21:04:28 +0800 Subject: [PATCH] consistent hash --- .../utils/MergeTreePartsPartitionsUtil.scala | 4 + .../apache/gluten/hash/ConsistentHash.java | 277 ++++++++++++++++++ .../softaffinity/SoftAffinityManager.scala | 21 +- .../ConsistentHashSoftAffinityStrategy.scala | 47 +++ .../SoftAffinityAllocationTrait.scala | 15 +- .../strategy/SoftAffinityStrategy.scala | 48 --- .../gluten/hash/ConsistentHashTest.java | 148 ++++++++++ 7 files changed, 496 insertions(+), 64 deletions(-) create mode 100644 gluten-core/src/main/java/org/apache/gluten/hash/ConsistentHash.java create mode 100644 gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala delete mode 100644 gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala create mode 100644 gluten-core/src/test/java/org/apache/gluten/hash/ConsistentHashTest.java diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index cc04e9092df6..f351d713e7b9 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -377,6 +377,9 @@ object MergeTreePartsPartitionsUtil extends Logging { partNameWithLocation: util.HashMap[String, String], locationDistinct: util.HashSet[String]): Unit = { + logWarning( + s"[DEBUG] PR-8245 " + + s"partNameWithLocation = $partNameWithLocation, locationDistinct = $locationDistinct") val currentSizeByLocation = new util.HashMap[String, Long] val currentFilesByLocation = new util.HashMap[String, ArrayBuffer[MergeTreePartSplit]] @@ -434,6 +437,7 @@ object MergeTreePartsPartitionsUtil extends Logging { } locationDistinct.forEach(closePartition) + logWarning(s"[DEBUG] PR-8245, partitions = $partitions") } /** Generate bucket partition */ diff --git a/gluten-core/src/main/java/org/apache/gluten/hash/ConsistentHash.java b/gluten-core/src/main/java/org/apache/gluten/hash/ConsistentHash.java new file mode 100644 index 000000000000..e3ef9ac43b3e --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/hash/ConsistentHash.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.hash; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.MurmurHash3; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A consistent hash ring implementation. the class is thread-safe. + * + * @param the type of node to be used in the ring. + */ +@ThreadSafe +public class ConsistentHash { + + // read-write lock for the ring. + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + + // keep the mapping of node to its partitions, each partition is a slot in the ring. + private final Map>> nodes = new HashMap<>(); + + // keep the mapping of slot to partition, the partition actually is a virtual node. + private final SortedMap> ring = new TreeMap<>(); + + // the number of virtual nodes for each physical node. + private final int replicate; + + // the hasher function to hash the key. + private final Hasher hasher; + + public ConsistentHash(int replicate) { + Preconditions.checkArgument(replicate > 0, "HashRing require positive replicate number."); + this.replicate = replicate; + this.hasher = + (key, seed) -> { + byte[] data = key.getBytes(); + return MurmurHash3.hash32x86(data, 0, data.length, seed); + }; + } + + public ConsistentHash(int replicate, Hasher hasher) { + Preconditions.checkArgument(replicate > 0, "HashRing require positive replicate number."); + Preconditions.checkArgument(hasher != null, "HashRing require non-null hasher."); + this.replicate = replicate; + this.hasher = hasher; + } + + /** + * Add a node to the ring, the node will be replicated to `replicate` virtual nodes. + * + * @param node the node to be added to the ring. + * @return true if the node is added successfully, false otherwise. + */ + public boolean addNode(T node) { + lock.writeLock().lock(); + try { + return add(node); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Remove the node from the ring, all the virtual nodes will be removed. + * + * @param node the node to be removed. + * @return true if the node is removed successfully, false otherwise. + */ + public boolean removeNode(T node) { + lock.writeLock().lock(); + boolean removed = false; + try { + if (nodes.containsKey(node)) { + Set> partitions = nodes.remove(node); + partitions.forEach(p -> ring.remove(p.getSlot())); + removed = true; + } + } finally { + lock.writeLock().unlock(); + } + return removed; + } + + /** + * Allocate the node by the key, the number of nodes to be located is specified by the count. + * + * @param key the key to locate the node. + * @param count the number of nodes to be located. + * @return a set of nodes located by the key. + */ + public Set allocateNodes(String key, int count) { + lock.readLock().lock(); + try { + Set res = new HashSet<>(); + if (key != null && count > 0) { + if (count < nodes.size()) { + long slot = hasher.hash(key, 0); + Iterator> it = new AllocateIterator(slot); + while (it.hasNext() && res.size() < count) { + Partition part = it.next(); + res.add(part.getNode()); + } + } else { + res.addAll(nodes.keySet()); + } + } + return res; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get all the nodes in the ring. + * + * @return a set of nodes in the ring. + */ + public Set getNodes() { + lock.readLock().lock(); + try { + return new HashSet<>(nodes.keySet()); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get the partitions of the node. return null if the node is not exists. + * + * @param node the node to get the partitions. + * @return a set of partitions of the node. + */ + public Set> getPartition(T node) { + lock.readLock().lock(); + try { + return nodes.get(node); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Check if the node is in the ring. + * + * @param node the node to be checked. + * @return true if the node is in the ring, false otherwise. + */ + public boolean contains(T node) { + lock.readLock().lock(); + try { + return nodes.containsKey(node); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Check if the ring contains the slot. + * + * @param slot the slot to be checked. + * @return true if the slot is in the ring, false otherwise. + */ + public boolean ringContain(long slot) { + lock.readLock().lock(); + try { + return ring.containsKey(slot); + } finally { + lock.readLock().unlock(); + } + } + + private boolean add(T node) { + boolean added = false; + if (node != null && !nodes.containsKey(node)) { + Set> partitions = + IntStream.range(0, replicate) + .mapToObj(idx -> new Partition(node, idx)) + .collect(Collectors.toSet()); + nodes.put(node, partitions); + + // allocate slot. + for (Partition partition : partitions) { + long slot; + int seed = 0; + do { + slot = this.hasher.hash(partition.getPartitionKey(), seed++); + } while (ring.containsKey(slot)); + + partition.setSlot(slot); + ring.put(slot, partition); + } + added = true; + } + return added; + } + + public static class Partition { + private final T node; + + private final int index; + + private long slot; + + public Partition(T node, int index) { + this.node = node; + this.index = index; + } + + public String getPartitionKey() { + return String.format("%s:%d", node, index); + } + + public T getNode() { + return node; + } + + public void setSlot(long slot) { + this.slot = slot; + } + + public long getSlot() { + return this.slot; + } + } + + /** Base interface for the node in the ring. */ + public interface Node { + String key(); + } + + /** Base interface for the hash function. */ + public interface Hasher { + long hash(String key, int seed); + } + + private class AllocateIterator implements Iterator> { + private final Iterator> head; + private final Iterator> tail; + + AllocateIterator(long slot) { + this.head = ring.headMap(slot).values().iterator(); + this.tail = ring.tailMap(slot).values().iterator(); + } + + @Override + public boolean hasNext() { + return head.hasNext() || tail.hasNext(); + } + + @Override + public Partition next() { + return tail.hasNext() ? tail.next() : head.next(); + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala index c72a2680b5f7..68d95af76040 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala @@ -17,8 +17,9 @@ package org.apache.gluten.softaffinity import org.apache.gluten.GlutenConfig +import org.apache.gluten.hash.ConsistentHash import org.apache.gluten.logging.LogLevelUtil -import org.apache.gluten.softaffinity.strategy.SoftAffinityStrategy +import org.apache.gluten.softaffinity.strategy.{ConsistentHashSoftAffinityStrategy, ExecutorNode} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.SparkEnv @@ -38,7 +39,8 @@ abstract class AffinityManager extends LogLevelUtil with Logging { private val resourceRWLock = new ReentrantReadWriteLock(true) - private val softAffinityAllocation = new SoftAffinityStrategy + lazy val softAffinityReplicationNum: Int = + GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE lazy val minOnTargetHosts: Int = GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE @@ -88,6 +90,13 @@ abstract class AffinityManager extends LogLevelUtil with Logging { } }) + private var hashRing: Option[ConsistentHash[ExecutorNode]] = _ + + private val softAffinityStrategy = { + hashRing = Some(new ConsistentHash[ExecutorNode](softAffinityReplicationNum)) + new ConsistentHashSoftAffinityStrategy(hashRing.get) + } + private val rand = new Random(System.currentTimeMillis) def totalExecutors(): Int = totalRegisteredExecutors.intValue() @@ -107,6 +116,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { executorsSet.add(execHostId._1) idForExecutors += execHostId sortedIdForExecutors = idForExecutors.sortBy(_._2) + hashRing.foreach(_.addNode(ExecutorNode(execHostId._1, execHostId._2))) totalRegisteredExecutors.addAndGet(1) } logOnLevel( @@ -139,6 +149,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { nodesExecutorsMap.remove(findedExecId._2) } sortedIdForExecutors = idForExecutors.sortBy(_._2) + hashRing.foreach(_.removeNode(ExecutorNode(execId, findedExecId._2))) totalRegisteredExecutors.addAndGet(-1) } logOnLevel( @@ -237,7 +248,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { if (nodesExecutorsMap.size < 1) { Array.empty } else { - softAffinityAllocation.allocateExecs(file, sortedIdForExecutors) + softAffinityStrategy.allocateExecs(file, softAffinityReplicationNum) } } finally { resourceRWLock.readLock().unlock() @@ -302,6 +313,10 @@ object SoftAffinityManager extends AffinityManager { GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE ) + override lazy val softAffinityReplicationNum: Int = SparkEnv.get.conf.getInt( + GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, + GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE) + override lazy val minOnTargetHosts: Int = SparkEnv.get.conf.getInt( GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala new file mode 100644 index 000000000000..578f4986d15a --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.softaffinity.strategy + +import org.apache.gluten.hash.ConsistentHash + +import java.util.Objects + +import scala.collection.JavaConverters._ + +class ConsistentHashSoftAffinityStrategy(candidates: ConsistentHash[ExecutorNode]) + extends SoftAffinityAllocationTrait { + + /** Allocate the executors of count number from the candidates. */ + override def allocateExecs(file: String, count: Int): Array[(String, String)] = { + candidates.allocateNodes(file, count).asScala.map(node => (node.exeId, node.host)).toArray + } +} + +case class ExecutorNode(exeId: String, host: String) extends ConsistentHash.Node { + override def key(): String = s"$exeId-$host" + + override def toString: String = s"$exeId-$host" + + override def equals(o: Any): Boolean = { + o match { + case ExecutorNode(exeId, host) => this.exeId == exeId && this.host == host + case _ => false + } + } + + override def hashCode(): Int = Objects.hashCode(exeId, host) +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala index 639efd22e976..9a6abfe18af3 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala @@ -16,19 +16,8 @@ */ package org.apache.gluten.softaffinity.strategy -import org.apache.gluten.GlutenConfig - -import org.apache.spark.SparkEnv - -import scala.collection.mutable.ListBuffer - trait SoftAffinityAllocationTrait { - lazy val softAffinityReplicationNum = SparkEnv.get.conf.getInt( - GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, - GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE - ) - - /** allocate target executors for file */ - def allocateExecs(file: String, candidates: ListBuffer[(String, String)]): Array[(String, String)] + /** Allocate the executors of count number from the candidates. */ + def allocateExecs(file: String, count: Int): Array[(String, String)] } diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala deleted file mode 100644 index 7af5f212c1b8..000000000000 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.softaffinity.strategy - -import org.apache.spark.internal.Logging - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer - -class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { - - /** allocate target executors for file */ - override def allocateExecs( - file: String, - candidates: ListBuffer[(String, String)]): Array[(String, String)] = { - if (candidates.size < 1) { - Array.empty - } else { - val candidatesSize = candidates.size - val halfCandidatesSize = candidatesSize / softAffinityReplicationNum - val resultSet = new mutable.LinkedHashSet[(String, String)] - - // TODO: try to use ConsistentHash - val mod = file.hashCode % candidatesSize - val c1 = if (mod < 0) (mod + candidatesSize) else mod - resultSet.add(candidates(c1)) - for (i <- 1 until softAffinityReplicationNum) { - val c2 = (c1 + halfCandidatesSize + i) % candidatesSize - resultSet.add(candidates(c2)) - } - resultSet.toArray - } - } -} diff --git a/gluten-core/src/test/java/org/apache/gluten/hash/ConsistentHashTest.java b/gluten-core/src/test/java/org/apache/gluten/hash/ConsistentHashTest.java new file mode 100644 index 000000000000..7b27594202bf --- /dev/null +++ b/gluten-core/src/test/java/org/apache/gluten/hash/ConsistentHashTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.hash; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ConsistentHashTest { + + private ConsistentHash consistentHash; + private static final int REPLICAS = 3; + + @Before + public void setUp() throws Exception { + consistentHash = new ConsistentHash<>(REPLICAS); + } + + @Test + public void testAddNode() { + Set nodes = + IntStream.range(0, 10) + .mapToObj(i -> new ConsistentHashTest.HostNode(String.format("executor-%d", i))) + .collect(Collectors.toSet()); + nodes.forEach(n -> consistentHash.addNode(n)); + Assert.assertEquals(10, consistentHash.getNodes().size()); + + HostNode existsNode = new HostNode("executor-1"); + HostNode nonExistsNode = new HostNode("executor-100"); + Assert.assertTrue(consistentHash.contains(existsNode)); + Assert.assertFalse(consistentHash.contains(nonExistsNode)); + + Set> existsPartitions = + consistentHash.getPartition(existsNode); + Assert.assertEquals(REPLICAS, existsPartitions.size()); + Set> nonExistsPartitions = + consistentHash.getPartition(nonExistsNode); + Assert.assertNull(nonExistsPartitions); + } + + @Test + public void testRemoveNode() { + Set nodes = + IntStream.range(0, 10) + .mapToObj(i -> new ConsistentHashTest.HostNode(String.format("executor-%d", i))) + .collect(Collectors.toSet()); + nodes.forEach(n -> consistentHash.addNode(n)); + HostNode existsNode = new HostNode("executor-11"); + consistentHash.addNode(existsNode); + Assert.assertEquals(11, consistentHash.getNodes().size()); + Set> partitions = + consistentHash.getPartition(existsNode); + Assert.assertEquals(REPLICAS, partitions.size()); + ConsistentHash.Partition partition = partitions.iterator().next(); + Assert.assertTrue(consistentHash.ringContain(partition.getSlot())); + consistentHash.removeNode(existsNode); + Assert.assertEquals(10, consistentHash.getNodes().size()); + Set> removedPartitions = + consistentHash.getPartition(existsNode); + Assert.assertNull(removedPartitions); + Assert.assertFalse(consistentHash.ringContain(partition.getSlot())); + } + + @Test + public void testContain() { + Set nodes = + IntStream.range(0, 10) + .mapToObj(i -> new ConsistentHashTest.HostNode(String.format("executor-%d", i))) + .collect(Collectors.toSet()); + nodes.forEach(n -> consistentHash.addNode(n)); + HostNode existsNode = new HostNode("executor-11"); + consistentHash.addNode(existsNode); + Assert.assertTrue(consistentHash.contains(existsNode)); + } + + @Test + public void testAllocateNodes() { + Set nodes = + IntStream.range(0, 10) + .mapToObj(i -> new ConsistentHashTest.HostNode(String.format("executor-%d", i))) + .collect(Collectors.toSet()); + nodes.forEach(n -> consistentHash.addNode(n)); + + Set allocateNodes = + consistentHash.allocateNodes("part-00000-38af6778-964a-4a86-b1f9-8bf783cc65aa-c000", 3); + Assert.assertEquals(3, allocateNodes.size()); + for (ConsistentHash.Node node : allocateNodes) { + Assert.assertTrue(consistentHash.contains(node)); + } + + Set allocateAllNodes = + consistentHash.allocateNodes("part-00000-38af6778-964a-4a86-b1f9-8bf783cc65aa-c000", 11); + Assert.assertEquals(10, allocateAllNodes.size()); + for (ConsistentHash.Node node : allocateAllNodes) { + Assert.assertTrue(nodes.contains(node)); + } + } + + private static class HostNode implements ConsistentHash.Node { + private final String host; + + HostNode(String host) { + this.host = host; + } + + @Override + public String key() { + return host; + } + + @Override + public String toString() { + return host; + } + + @Override + public int hashCode() { + return Objects.hashCode(host); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof HostNode)) return false; + HostNode that = (HostNode) o; + return Objects.equals(host, that.host); + } + } +}