Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compat][server][client][test] Global RT DIV improvement (part 2): Chunking support for DIV message #1257

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,20 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
}
}
}

/**
* Leader is consuming from remote version topic, and if it read div control messages in the remote version
* topic, it should ignore them and not process or apply them to its own div state.
*/
if (record.getKey().isDivControlMessage()) {
String msg = String.format(
"Leader for replica: %s received a div control message in remote version topic. Skipping the message.",
partitionConsumptionState.getReplicaId());
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) {
LOGGER.info(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just skip it without logging as it is part of native replication from VT not from RT

}
return false;
}
}
if (!record.getTopicPartition().getPubSubTopic().equals(currentLeaderTopic)) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_DIV_STATE;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static com.linkedin.venice.utils.Utils.getReplicaId;
import static java.util.concurrent.TimeUnit.HOURS;
Expand Down Expand Up @@ -47,6 +48,7 @@
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
Expand Down Expand Up @@ -241,6 +243,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
* flushed to the metadata partition of the storage engine regularly in {@link #syncOffset(String, PartitionConsumptionState)}
*/
private final KafkaDataIntegrityValidator kafkaDataIntegrityValidator;
private final ChunkAssembler divChunkAssembler;

protected final HostLevelIngestionStats hostLevelIngestionStats;
protected final AggVersionedDIVStats versionedDIVStats;
protected final AggVersionedIngestionStats versionedIngestionStats;
Expand Down Expand Up @@ -451,6 +455,8 @@ public StoreIngestionTask(
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion);
this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic));
this.chunkAssembler = new ChunkAssembler(storeName);
this.divChunkAssembler =
builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName);
this.cacheBackend = cacheBackend;

if (recordTransformerFunction != null) {
Expand Down Expand Up @@ -1125,6 +1131,13 @@ private int handleSingleMessage(
record.getTopicPartition().getPartitionNumber(),
partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()));
}
} else if (record.getKey().isDivControlMessage()) {
// This is a control message from the DIV topic, process it and return early.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about this "DIV topic" part. Do you mean follower see a DIV control message from local VT?

// TODO: This is a placeholder for the actual implementation.
if (isGlobalRtDivEnabled) {
processDivControlMessage(record);
}
return 0;
}

// This function may modify the original record in KME and it is unsafe to use the payload from KME directly after
Expand Down Expand Up @@ -1169,6 +1182,28 @@ private int handleSingleMessage(
return record.getPayloadSize();
}

void processDivControlMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Put put = (Put) value.getPayloadUnion();

Object assembledObject = divChunkAssembler.bufferAndAssembleRecord(
record.getTopicPartition(),
put.getSchemaId(),
key.getKey(),
put.getPutValue(),
record.getOffset(),
GLOBAL_DIV_STATE,
put.getSchemaId(),
new NoopCompressor());

// If the assembled object is null, it means that the object is not yet fully assembled, so we can return early.
if (assembledObject == null) {
return;
}
// TODO: We will add the code to process DIV control message later in here.
}

/**
* This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}.
*
Expand Down Expand Up @@ -2374,6 +2409,15 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
return false;
}

/**
* Only version topics are used for DIV control messages to be produced to and consumed from. It is unexpected to
* read div control messages from real-time topics. Skip them and log a warning.
*/
if (record.getKey().isDivControlMessage() && record.getTopic().isRealTime()) {
LOGGER.warn("Skipping control message from real-time topic-partition: {}", record.getTopicPartition());
return false;
}

if (partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.isBatchOnly()) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Expand Down Expand Up @@ -3740,6 +3784,11 @@ private void waitReadyToProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
return;
}

// Do not need to check schema availability for DIV messages as schema is already known.
if (record.getKey().isDivControlMessage()) {
return;
}

switch (MessageType.valueOf(kafkaValue)) {
case PUT:
Put put = (Put) kafkaValue.payloadUnion;
Expand Down Expand Up @@ -4443,6 +4492,10 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
}

ChunkAssembler getDivChunkAssembler() {
return this.divChunkAssembler;
}

protected boolean isDaVinciClient() {
return isDaVinciClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.davinci.utils.ChunkAssembler;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -122,6 +123,8 @@ public static class Builder {
private Runnable runnableForKillIngestionTasksForNonCurrentVersions;
private ExecutorService aaWCWorkLoadProcessingThreadPool;

private ChunkAssembler divChunkAssembler;

private interface Setter {
void apply();
}
Expand Down Expand Up @@ -331,5 +334,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi
public ExecutorService getAAWCWorkLoadProcessingThreadPool() {
return this.aaWCWorkLoadProcessingThreadPool;
}

public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) {
return set(() -> this.divChunkAssembler = divChunkAssembler);
}

public ChunkAssembler getDivChunkAssembler() {
return divChunkAssembler;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.specific.SpecificRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -84,6 +87,51 @@ public <T> T bufferAndAssembleRecord(
return decompressedAndDeserializedRecord;
}

/**
* This method is used to buffer and assemble chunking records consumed from a Kafka topic. For chunked records, we
* buffer the chunks in memory until we have all the chunks for a given key. Once we have all the chunks indicated by
* receiving the chunk manifest record, we assemble the chunks and deserialize it from binary back into an object
* by using the provided deserializer and return the fully assembled record.
*
* The provided deserializer is associated with an AvroProtocolDefinition to select the appropriate
* protocol serializer for deserialization.
*
* Note that if the passed-in record is a regular record (not chunked), we will return the record after
* deserializing it without buffering it in memory.
*/
public <T> T bufferAndAssembleRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there already a method that's doing almost exactly the same as this one, can we try to reuse that method and just pass in avro protocol's deserializer instead?

PubSubTopicPartition pubSubTopicPartition,
int schemaId,
byte[] keyBytes,
ByteBuffer valueBytes,
long recordOffset,
AvroProtocolDefinition protocol,
int readerSchemaId,
VeniceCompressor compressor) {
ByteBuffer assembledRecord = bufferAndAssembleRecord(
pubSubTopicPartition,
schemaId,
keyBytes,
valueBytes,
recordOffset,
readerSchemaId,
compressor);
T decompressedAndDeserializedRecord = null;

// Record is a chunk. Return null
if (assembledRecord == null) {
return decompressedAndDeserializedRecord;
}

try {
decompressedAndDeserializedRecord = decompressAndDeserialize(protocol, compressor, assembledRecord);
} catch (Exception e) {
throw new RuntimeException(e);
}

return decompressedAndDeserializedRecord;
}

/**
* Buffers and assembles chunks of a record.
*
Expand All @@ -105,17 +153,20 @@ public ByteBuffer bufferAndAssembleRecord(
}
// If this is a record chunk, store the chunk and return null for processing this record
if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
// We need to extract data from valueBytes, otherwise it could contain non-data in the array.
inMemoryStorageEngine.put(
pubSubTopicPartition.getPartitionNumber(),
keyBytes,
ValueRecord.create(schemaId, valueBytes.array()).serialize());
ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize());
return null;
} else if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
}

if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
// This is the last value. Store it, and now read it from the in memory store as a fully assembled value
inMemoryStorageEngine.put(
pubSubTopicPartition.getPartitionNumber(),
keyBytes,
ValueRecord.create(schemaId, valueBytes.array()).serialize());
ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize());
try {
assembledRecord = RawBytesChunkingAdapter.INSTANCE.get(
inMemoryStorageEngine,
Expand All @@ -137,7 +188,8 @@ public ByteBuffer bufferAndAssembleRecord(
LOGGER.warn(
"Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}",
recordOffset,
pubSubTopicPartition.getPubSubTopic().getName());
pubSubTopicPartition.getPubSubTopic().getName(),
ex);
}
} else {
// this is a fully specified record, no need to buffer and assemble it, just return the valueBytes
Expand All @@ -163,6 +215,15 @@ protected <T> T decompressAndDeserialize(
return deserializer.deserialize(compressor.decompress(value));
}

protected <T extends SpecificRecord> T decompressAndDeserialize(
AvroProtocolDefinition protocol,
VeniceCompressor compressor,
ByteBuffer value) throws IOException {
InternalAvroSpecificSerializer<T> deserializer = protocol.getSerializer();
return deserializer
.deserialize(ByteUtils.extractByteArray(compressor.decompress(value)), protocol.getCurrentProtocolVersion());
}

public void clearInMemoryDB() {
inMemoryStorageEngine.drop();
}
Expand Down
Loading
Loading