Skip to content

Commit

Permalink
[common][samza-producer] Add a new option in stream job to leverage m…
Browse files Browse the repository at this point in the history
…ultiple producers (#1267)

* [common][samza-producer] Add a new option in stream job to leverage multiple producers

In a recent experiment, we noticed two issues:
1. KafkaProducer compression is quite time consuming for large records.
2. KafkaProducer is not scalable when using it in a multiple-thread env.

Streaming job typically runs in a container to process the upstreaming topic partition,
and they need to process these events sequentially even these events belong
to different Venice partitions, and the single-threaded KafkaProducer
is limiting the total throughput and cpu resources are mostly
under utilized.

To get round of this issue, in this PR, we add a capability to use multiple producers
even in a single-threaded streaming application. And we don't want to disable
compression as we would like to reduce pubsub usage and cross-colo replication
bandwidth.

Two new options in VeniceWriter:
1. venice.writer.producer.thread.count : default 1
2. venice.writer.producer.queue.size: default 5MB

Stream job can configure it with a higher number of producer thread count
to reduce the latency of `Producer#send` invocation, which is a blocking call,
so that the total throughput will be improved.
  • Loading branch information
gaojieliu authored Nov 4, 2024
1 parent 9b6f027 commit 3f2a130
Show file tree
Hide file tree
Showing 18 changed files with 767 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue;
import com.linkedin.venice.utils.concurrent.BlockingQueueType;
import io.netty.channel.WriteBufferWaterMark;
import java.io.File;
Expand Down Expand Up @@ -269,14 +270,14 @@ public class VeniceServerConfig extends VeniceClusterConfig {

/**
* Considering the consumer thread could put various sizes of messages into the shared queue, the internal
* {@link com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueue} won't notify the waiting thread (consumer thread)
* {@link MemoryBoundBlockingQueue} won't notify the waiting thread (consumer thread)
* right away when some message gets processed until the freed memory hit the follow config: {@link #storeWriterBufferNotifyDelta}.
* The reason behind this design:
* When the buffered queue is full, and the processing thread keeps processing small message, the bigger message won't
* have chance to get queued into the buffer since the memory freed by the processed small message is not enough to
* fit the bigger message.
*
* With this delta config, {@link com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueue} will guarantee some fairness
* With this delta config, {@link MemoryBoundBlockingQueue} will guarantee some fairness
* among various sizes of messages when buffered queue is full.
*
* When tuning this config, we need to consider the following tradeoffs:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.kafka.consumer;

import static com.linkedin.venice.utils.ProtocolUtils.getEstimateOfMessageEnvelopeSizeOnHeap;
import static java.util.Collections.reverseOrder;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
Expand All @@ -10,13 +11,11 @@
import com.linkedin.venice.exceptions.VeniceChecksumException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -450,40 +449,15 @@ public int hashCode() {

@Override
public int getSize() {
// For FakePubSubMessage, the key and the value are null, return 0.
// For FakePubSubMessage, the key and the value are null.
if (consumerRecord instanceof FakePubSubMessage) {
return 0;
return QUEUE_NODE_OVERHEAD_IN_BYTE;
}

// N.B.: This is just an estimate. TODO: Consider if it is really useful, and whether to get rid of it.
return this.consumerRecord.getKey().getEstimatedObjectSizeOnHeap()
+ getEstimateOfMessageEnvelopeSizeOnHeap(this.consumerRecord.getValue()) + QUEUE_NODE_OVERHEAD_IN_BYTE;
}

private int getEstimateOfMessageEnvelopeSizeOnHeap(KafkaMessageEnvelope messageEnvelope) {
int kmeBaseOverhead = 100; // Super rough estimate. TODO: Measure with a more precise library and store statically
switch (MessageType.valueOf(messageEnvelope)) {
case PUT:
Put put = (Put) messageEnvelope.payloadUnion;
int size = put.putValue.capacity();
if (put.replicationMetadataPayload != null
/**
* N.B.: When using the {@link org.apache.avro.io.OptimizedBinaryDecoder}, the {@link put.putValue} and the
* {@link put.replicationMetadataPayload} will be backed by the same underlying array. If that is the
* case, then we don't want to account for the capacity twice.
*/
&& put.replicationMetadataPayload.array() != put.putValue.array()) {
size += put.replicationMetadataPayload.capacity();
}
return size + kmeBaseOverhead;
case UPDATE:
Update update = (Update) messageEnvelope.payloadUnion;
return update.updateValue.capacity() + kmeBaseOverhead;
default:
return kmeBaseOverhead;
}
}

@Override
public String toString() {
return this.consumerRecord.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.samza;

import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig.KAFKA_BUFFER_MEMORY;
import static com.linkedin.venice.schema.AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation;
import static com.linkedin.venice.schema.AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation;

Expand Down Expand Up @@ -371,10 +372,34 @@ public String getTopicName() {
*/
protected VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse store) {
Properties veniceWriterProperties = new Properties();
veniceWriterProperties.putAll(additionalWriterConfigs);
veniceWriterProperties.put(KAFKA_BOOTSTRAP_SERVERS, store.getKafkaBootstrapServers());
return getVeniceWriter(store, veniceWriterProperties);
}

protected static void extractConcurrentProducerConfig(
Properties veniceWriterProperties,
VeniceWriterOptions.Builder builder) {
String producerThreadCountProp = veniceWriterProperties.getProperty(VeniceWriter.PRODUCER_THREAD_COUNT);
if (producerThreadCountProp != null) {
int producerThreadCount = Integer.parseInt(producerThreadCountProp);
builder.setProducerThreadCount(producerThreadCount);
if (producerThreadCount > 1) {
/**
* Try to limit the producer buffer if the following config is not specified to reduce the total Kafka producer memory usage.
*/
String kafkaBufferConfig = veniceWriterProperties.getProperty(KAFKA_BUFFER_MEMORY);
if (kafkaBufferConfig == null) {
veniceWriterProperties.put(KAFKA_BUFFER_MEMORY, "8388608"); // 8MB
}
}
}
String producerQueueSizeProp = veniceWriterProperties.getProperty(VeniceWriter.PRODUCER_QUEUE_SIZE);
if (producerQueueSizeProp != null) {
builder.setProducerQueueSize(Integer.parseInt(producerQueueSizeProp));
}
}

/**
* Please don't remove this method since it is still being used by LinkedIn internally.
*/
Expand All @@ -394,13 +419,12 @@ protected VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(
partitionerProperties.putAll(store.getPartitionerParams());
VenicePartitioner venicePartitioner =
PartitionUtils.getVenicePartitioner(store.getPartitionerClass(), new VeniceProperties(partitionerProperties));
return constructVeniceWriter(
veniceWriterProperties,
new VeniceWriterOptions.Builder(store.getKafkaTopic()).setTime(time)
.setPartitioner(venicePartitioner)
.setPartitionCount(partitionCount)
.setChunkingEnabled(isChunkingEnabled)
.build());
VeniceWriterOptions.Builder builder = new VeniceWriterOptions.Builder(store.getKafkaTopic()).setTime(time)
.setPartitioner(venicePartitioner)
.setPartitionCount(partitionCount)
.setChunkingEnabled(isChunkingEnabled);
extractConcurrentProducerConfig(veniceWriterProperties, builder);
return constructVeniceWriter(veniceWriterProperties, builder.build());
}

// trickery for unit testing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.samza;

import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig.KAFKA_BUFFER_MEMORY;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -269,4 +270,35 @@ public Version.PushType[] batchOrStreamReprocessing() {
return new Version.PushType[] { Version.PushType.BATCH, Version.PushType.STREAM_REPROCESSING,
Version.PushType.STREAM, Version.PushType.INCREMENTAL };
}

@Test
public void testExtractConcurrentProducerConfig() {
Properties properties = new Properties();
properties.put(VeniceWriter.PRODUCER_THREAD_COUNT, "2");
properties.put(VeniceWriter.PRODUCER_QUEUE_SIZE, "102400000");

VeniceWriterOptions.Builder builder = new VeniceWriterOptions.Builder("test_rt");
VeniceSystemProducer.extractConcurrentProducerConfig(properties, builder);
VeniceWriterOptions options = builder.build();
assertEquals(options.getProducerThreadCount(), 2);
assertEquals(options.getProducerQueueSize(), 102400000);
assertEquals(properties.getProperty(KAFKA_BUFFER_MEMORY), "8388608");

/**
* if {@link KAFKA_BUFFER_MEMORY} is specified, {@link VeniceSystemProducer} shouldn't override.
*/

properties = new Properties();
properties.put(VeniceWriter.PRODUCER_THREAD_COUNT, "2");
properties.put(VeniceWriter.PRODUCER_QUEUE_SIZE, "102400000");
properties.put(KAFKA_BUFFER_MEMORY, "10240");

builder = new VeniceWriterOptions.Builder("test_rt");
VeniceSystemProducer.extractConcurrentProducerConfig(properties, builder);
options = builder.build();
assertEquals(options.getProducerThreadCount(), 2);
assertEquals(options.getProducerQueueSize(), 102400000);
assertEquals(properties.getProperty(KAFKA_BUFFER_MEMORY), "10240");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public int getKeyLength() {
return key == null ? 0 : key.length;
}

public String toString() {
return getClass().getSimpleName() + "(" + (isControlMessage() ? "CONTROL_MESSAGE" : "PUT or DELETE") + ", "
+ ByteUtils.toHexString(key) + ")";
}

public int getEstimatedObjectSizeOnHeap() {
// This constant is the estimated size of the enclosing object + the byte[]'s overhead.
// TODO: Find a library that would allow us to precisely measure this and store it in a static constant.
return getKeyLength() + 36;
}

public String toString() {
return getClass().getSimpleName() + "(" + (isControlMessage() ? "CONTROL_MESSAGE" : "PUT or DELETE") + ", "
+ ByteUtils.toHexString(key) + ")";
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.linkedin.venice.pubsub.api;

import com.linkedin.venice.common.Measurable;
import java.util.Arrays;
import java.util.Objects;


/**
* A key-value pair that is associated with a message
*/
public class PubSubMessageHeader {
public class PubSubMessageHeader implements Measurable {
private final String key;
private final byte[] value;

Expand Down Expand Up @@ -41,4 +42,12 @@ public boolean equals(Object otherObj) {
PubSubMessageHeader otherHeader = (PubSubMessageHeader) otherObj;
return key.equals(otherHeader.key()) && Arrays.equals(value, otherHeader.value());
}

/**
* TODO: the following estimation doesn't consider the overhead of the internal structure.
*/
@Override
public int getSize() {
return key.length() + value.length;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.pubsub.api;

import com.linkedin.venice.common.Measurable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -10,7 +11,7 @@
* Set of key-value pairs to tagged with messages produced to a topic.
* In case of headers with the same key, only the most recently added headers value will be kept.
*/
public class PubSubMessageHeaders {
public class PubSubMessageHeaders implements Measurable {
/**
* N.B.: Kafka allows duplicate keys in the headers but some pubsub systems may not
* allow it. Hence, we will enforce uniqueness of keys in headers from the beginning.
Expand Down Expand Up @@ -44,4 +45,16 @@ public PubSubMessageHeaders remove(String key) {
public List<PubSubMessageHeader> toList() {
return new ArrayList<>(headers.values());
}

/**
* TODO: the following estimation doesn't consider the overhead of the internal structure.
*/
@Override
public int getSize() {
int size = 0;
for (Map.Entry<String, PubSubMessageHeader> entry: headers.entrySet()) {
size += entry.getKey().length() + entry.getValue().getSize();
}
return size;
}
}
Loading

0 comments on commit 3f2a130

Please sign in to comment.