Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compat][server][client][test] Global RT DIV improvement (part 2): Chunking support for DIV message #1257

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

lluwm
Copy link
Contributor

@lluwm lluwm commented Oct 23, 2024

This change mainly focuses on adding chunking support for DIV messages when they are produced to Kafka topics, as the size of DIV message can be large. We leverage today's chunking mechanism for regular records and extend it to support DIV with the following modifications:

  1. All the DIV messages are of type CONTROL_MESSAGE_DIV in its KafkaKey and their corresponding KafkaMessageEnvelope uses Put as the payload.
  2. Inside the Put payload, the actual message is stored in the putValue field and the schemaId can have the following cases:
    • If the DIV message is non-chunked, the schemaId is set to GLOBAL_DIV_STATE.
    • If the DIV message is chunk message, the schemaId is set to CHUNK.
    • If the DIV message is a chunk manifest message, the schemaId is set to CHUNKED_VALUE_MANIFEST.
  3. ChunkAssembler is adapted, on the receiver side, to buffer, assemble, and deserialize DIV messages (chunked/non-chunked).

How was this PR tested?

  • new integration test.
  • passed CI

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

…unking support for DIV message

This change mainly focuses on adding chunking support for DIV messages when they are produced to Kafka topics,
as the size of DIV message can be large. We leverage today's chunking mechanism for regular records and extend
it to support DIV with the following modifications:

1. All the DIV messages are of type {@link MessageType#CONTROL_MESSAGE_DIV} in its KafkaKey and their corresponding
   KafkaMessageEnvelope uses Put as the payload.
2. Inside the Put payload, the actual message is stored in the putValue field and the schemaId can have the following cases:
   - If the DIV message is non-chunked, the schemaId is set to GLOBAL_DIV_STATE.
   - If the DIV message is chunk message, the schemaId is set to CHUNK.
   - If the DIV message is a chunk manifest message, the schemaId is set to CHUNKED_VALUE_MANIFEST.
3. ChunkAssembler is adapted, on the receiver side, to buffer, assemble, and deserialize DIV messages (chunked/non-chunked).
Copy link

@lusong64 lusong64 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor comments. No need to be a blocker.

@sixpluszero sixpluszero changed the title [compat][server][client][test] Global RT DIV improvement (part 2): Ch… [compat][server][client][test] Global RT DIV improvement (part 2): Chunking support for DIV message Nov 7, 2024
Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the 2nd part of the change. I missed the first part but I went over the first PR description to make sure I am following. Left some comments but it should be easy to resolve

@@ -0,0 +1,85 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's start with v1 and follow the folder naming convention

* Note that if the passed-in record is a regular record (not chunked), we will return the record after
* deserializing it without buffering it in memory.
*/
public <T> T bufferAndAssembleRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there already a method that's doing almost exactly the same as this one, can we try to reuse that method and just pass in avro protocol's deserializer instead?

@@ -1083,6 +1089,13 @@ private int handleSingleMessage(
record.getTopicPartition().getPartitionNumber(),
partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()));
}
} else if (record.getKey().isDivControlMessage()) {
// This is a control message from the DIV topic, process it and return early.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about this "DIV topic" part. Do you mean follower see a DIV control message from local VT?

"Leader for replica: %s received a div control message in remote version topic. Skipping the message.",
partitionConsumptionState.getReplicaId());
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) {
LOGGER.info(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just skip it without logging as it is part of native replication from VT not from RT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants