Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into dvrt_blob_transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahgrimaldi authored Nov 4, 2024
2 parents 73bf831 + e4fac90 commit d442adb
Show file tree
Hide file tree
Showing 64 changed files with 2,322 additions and 200 deletions.
16 changes: 10 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ if (project.hasProperty('overrideBuildEnvironment')) {
def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.21'
def grpcVersion = '1.49.2'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.78'
// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork:
def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka'
def kafkaVersion = '2.4.1' // '2.4.1.65'
def log4j2Version = '2.17.1'
def pegasusVersion = '29.31.0'
def protobufVersion = '3.21.7'
Expand All @@ -52,14 +53,15 @@ def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
antlr4: "org.antlr:antlr4:${antlrVersion}",
antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -101,7 +103,7 @@ ext.libraries = [
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}",
kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}",
kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}",
kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test",
log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}",
Expand Down Expand Up @@ -194,6 +196,8 @@ subprojects {

if (JavaVersion.current() >= JavaVersion.VERSION_1_9) {
tasks.withType(JavaCompile) {
// Compiler arguments can be injected here...
// options.compilerArgs << '-Xlint:unchecked'
options.release = 8
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
Expand All @@ -44,7 +46,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -89,8 +90,8 @@ public class VeniceClusterConfig {

private final VeniceProperties clusterProperties;

private final SecurityProtocol kafkaSecurityProtocol;
private final Map<String, SecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final PubSubSecurityProtocol kafkaSecurityProtocol;
private final Map<String, PubSubSecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final Optional<SSLConfig> sslConfig;

public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String, String>> kafkaClusterMap)
Expand Down Expand Up @@ -135,20 +136,35 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
LOGGER.info("Final region name for this node: {}", this.regionName);

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = SecurityProtocol.forName(kafkaSecurityProtocolString);
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);

Int2ObjectMap<String> tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>();
Int2ObjectMap<String> tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>();
Map<String, SecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, PubSubSecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
/**
* The cluster ID, alias and kafka URL mappings are defined in the service config file
* so in order to support multiple cluster id mappings we pass them as separated entries
* for example, we can build a new cluster id with its alias and url
* <entry key="2">
* <map>
* <entry key="name" value="region1_sep"/>
* <entry key="url" value="${venice.kafka.ssl.bootstrap.servers.region1}_sep"/>
* </map>
* </entry>
*
* For the separate incremental push topic feature, we duplicate entries with "_sep" suffix and different cluster id
* to support two RT topics (regular rt and incremental rt) with different cluster id.
*/

for (Map.Entry<String, Map<String, String>> kafkaCluster: kafkaClusterMap.entrySet()) {
int clusterId = Integer.parseInt(kafkaCluster.getKey());
Map<String, String> mappings = kafkaCluster.getValue();
Expand All @@ -167,7 +183,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
tmpKafkaClusterUrlToIdMap.put(url, clusterId);
tmpKafkaUrlResolution.put(url, url);
if (securityProtocolString != null) {
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, SecurityProtocol.valueOf(securityProtocolString));
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, PubSubSecurityProtocol.valueOf(securityProtocolString));
}
}
if (baseKafkaBootstrapServers.equals(url)) {
Expand Down Expand Up @@ -207,11 +223,11 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
/**
* If the {@link kafkaClusterIdToUrlMap} and {@link kafkaClusterUrlToIdMap} are equal in size, then it means
* that {@link KAFKA_CLUSTER_MAP_KEY_OTHER_URLS} was never specified in the {@link kafkaClusterMap}, in which
* case, the resolver needs not lookup anything, and it will always return the same as its input.
* case, the resolver needs not lookup anything, and it will always return the same input with potentially filtering
*/
this.kafkaClusterUrlResolver = this.kafkaClusterIdToUrlMap.size() == this.kafkaClusterUrlToIdMap.size()
? String::toString
: url -> kafkaUrlResolution.getOrDefault(url, url);
? Utils::resolveKafkaUrlForSepTopic
: url -> Utils.resolveKafkaUrlForSepTopic(kafkaUrlResolution.getOrDefault(url, url));
this.kafkaBootstrapServers = this.kafkaClusterUrlResolver.apply(baseKafkaBootstrapServers);
if (this.kafkaBootstrapServers == null || this.kafkaBootstrapServers.isEmpty()) {
throw new ConfigurationException("kafkaBootstrapServers can't be empty");
Expand All @@ -228,7 +244,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(SecurityProtocol.SSL)) {
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
this.sslConfig = Optional.empty();
Expand Down Expand Up @@ -260,8 +276,8 @@ public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public SecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
SecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
public PubSubSecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
PubSubSecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
return clusterSpecificSecurityProtocol == null ? kafkaSecurityProtocol : clusterSpecificSecurityProtocol;
}

Expand Down Expand Up @@ -364,4 +380,20 @@ public VeniceProperties getClusterProperties() {
public Map<String, Map<String, String>> getKafkaClusterMap() {
return kafkaClusterMap;
}

/**
* For the separate incremental push topic feature, we need to resolve the cluster id to the original one for monitoring
* purposes as the incremental push topic essentially uses the same pubsub clusters as the regular push topic, though
* it appears to have a different cluster id
* @param clusterId
* @return
*/
public int getEquivalentKafkaClusterIdForSepTopic(int clusterId) {
String alias = kafkaClusterIdToAliasMap.get(clusterId);
if (alias == null || !alias.endsWith(Utils.SEPARATE_TOPIC_SUFFIX)) {
return clusterId;
}
String originalAlias = alias.substring(0, alias.length() - Utils.SEPARATE_TOPIC_SUFFIX.length());
return kafkaClusterAliasToIdMap.getInt(originalAlias);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue;
import com.linkedin.venice.utils.concurrent.BlockingQueueType;
import io.netty.channel.WriteBufferWaterMark;
import java.io.File;
Expand Down Expand Up @@ -269,14 +270,14 @@ public class VeniceServerConfig extends VeniceClusterConfig {

/**
* Considering the consumer thread could put various sizes of messages into the shared queue, the internal
* {@link com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueue} won't notify the waiting thread (consumer thread)
* {@link MemoryBoundBlockingQueue} won't notify the waiting thread (consumer thread)
* right away when some message gets processed until the freed memory hit the follow config: {@link #storeWriterBufferNotifyDelta}.
* The reason behind this design:
* When the buffered queue is full, and the processing thread keeps processing small message, the bigger message won't
* have chance to get queued into the buffer since the memory freed by the processed small message is not enough to
* fit the bigger message.
*
* With this delta config, {@link com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueue} will guarantee some fairness
* With this delta config, {@link MemoryBoundBlockingQueue} will guarantee some fairness
* among various sizes of messages when buffered queue is full.
*
* When tuning this config, we need to consider the following tradeoffs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
AbstractKafkaConsumerService getKafkaConsumerService(final String kafkaURL) {
AbstractKafkaConsumerService consumerService = kafkaServerToConsumerServiceMap.get(kafkaURL);
if (consumerService == null && kafkaClusterUrlResolver != null) {
// The resolver is needed to resolve a special format of kafka URL to the original kafka URL
consumerService = kafkaServerToConsumerServiceMap.get(kafkaClusterUrlResolver.apply(kafkaURL));
}
return consumerService;
Expand Down Expand Up @@ -391,8 +392,8 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L
new StorePartitionDataReceiver(
storeIngestionTask,
pubSubTopicPartition,
kafkaURL,
kafkaClusterUrlToIdMap.getOrDefault(kafkaURL, -1));
kafkaURL, // do not resolve and let it pass downstream for offset tracking purpose
kafkaClusterUrlToIdMap.getOrDefault(kafkaURL, -1)); // same pubsub url but different id for sep topic

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);
consumerService.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, lastOffset, dataReceiver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ protected KafkaConsumerService(
}

/** May be overridden to clean up state in sub-classes */
void handleUnsubscription(SharedKafkaConsumer consumer, PubSubTopicPartition topicPartition) {
void handleUnsubscription(
SharedKafkaConsumer consumer,
PubSubTopic versionTopic,
PubSubTopicPartition topicPartition) {
}

private String getUniqueClientId(String kafkaUrl, int suffix) {
Expand Down Expand Up @@ -563,7 +566,8 @@ private interface OffsetGetter {
*/
public enum ConsumerAssignmentStrategy {
TOPIC_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(TopicWiseKafkaConsumerService::new),
PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(PartitionWiseKafkaConsumerService::new);
PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(PartitionWiseKafkaConsumerService::new),
STORE_AWARE_PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(StoreAwarePartitionWiseKafkaConsumerService::new);

final KCSConstructor constructor;

Expand All @@ -576,4 +580,8 @@ public enum ConsumerAssignmentStrategy {
public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

IndexedMap<SharedKafkaConsumer, ConsumptionTask> getConsumerToConsumptionTask() {
return consumerToConsumptionTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand Down Expand Up @@ -113,7 +114,6 @@
import java.util.function.BooleanSupplier;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1119,15 +1119,15 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
kafkaBootstrapUrls = resolvedKafkaUrl;
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
SecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
}
properties.putAll(sslConfig.get().getKafkaSSLConfig());
}
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name);
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return new VeniceProperties(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2238,20 +2238,24 @@ protected void recordHeartbeatReceived(
return;
}

// Separate incremental push pubsub entries has the same pubsub url but differnet cluster id, which creates
// confusion
// for heartbeat tracking. We need to resolve the kafka url to the actual kafka cluster url.
String resolvedKafkaUrl = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaUrl) : kafkaUrl;
if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
heartbeatMonitoringService.recordLeaderHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
} else {
heartbeatMonitoringService.recordFollowerHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
}
Expand Down Expand Up @@ -2421,12 +2425,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(

// If we are here the message must be produced to local kafka or silently consumed.
LeaderProducedRecordContext leaderProducedRecordContext;

// No need to resolve cluster id and kafka url because sep topics are real time topic and it's not VT
validateRecordBeforeProducingToLocalKafka(consumerRecord, partitionConsumptionState, kafkaUrl, kafkaClusterId);

if (consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {
recordRegionHybridConsumptionStats(
kafkaClusterId,
// convert the cluster id back to the original cluster id for monitoring purpose
serverConfig.getEquivalentKafkaClusterIdForSepTopic(
serverConfig.getEquivalentKafkaClusterIdForSepTopic(kafkaClusterId)),
consumerRecord.getPayloadSize(),
consumerRecord.getOffset(),
beforeProcessingBatchRecordsTimestampMs);
Expand All @@ -2449,7 +2455,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
switch (controlMessageType) {
case START_OF_PUSH:
/**
* N.B.: This is expected to be the first time time we call {@link veniceWriter#get()}. During this time
* N.B.: This is expected to be the first time we call {@link veniceWriter#get()}. During this time
* since startOfPush has not been processed yet, {@link StoreVersionState} is not created yet (unless
* this is a server restart scenario). So the {@link com.linkedin.venice.writer.VeniceWriter#isChunkingEnabled} field
* will not be set correctly at this point. This is fine as chunking is mostly not applicable for control messages.
Expand Down Expand Up @@ -2759,6 +2765,8 @@ private void validateRecordBeforeProducingToLocalKafka(
int kafkaClusterId) {
// Check whether the message is from local version topic; leader shouldn't consume from local VT and then produce
// back to VT again
// localKafkaClusterId will always be the regular one without "_sep" suffix so kafkaClusterId should be converted
// for comparison. Like-wise for the kafkaUrl.
if (kafkaClusterId == localKafkaClusterId
&& consumerRecord.getTopicPartition().getPubSubTopic().equals(this.versionTopic)
&& kafkaUrl.equals(this.localKafkaServer)) {
Expand Down
Loading

0 comments on commit d442adb

Please sign in to comment.