Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apoursam/spark kafka input #1272

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

eldernewborn
Copy link
Collaborator

@eldernewborn eldernewborn commented Nov 1, 2024

[vpj] Spark input module to read from a Kafka topic and populate a dataframe

Introduced the VenicePubsubSource which is a table provider, backed by Kafka topics to materialize contents of a Venice Kafka topic as a dataframe to be used by Spark Jobs.
This is a foundational building block for KIF functionality (repush data from a Kafka topic source) as well as Data consistency Checker.

The implementation offers some niceties like a splitter that can split the task into chunks for better parallelism, which does so considering the available start and end offsets in topics, to account for message TTL and compaction.

Future Work to be done:
Add support for chunked messages and compressed messages.

How was this PR tested?

unit tests, integration tests, manual testing.

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

@eldernewborn eldernewborn marked this pull request as ready for review November 13, 2024 16:50
@@ -20,6 +20,16 @@ public class SparkConstants {
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()) });

public static final StructType KAFKA_INPUT_TABLE_SCHEMA = new StructType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we need to version this at all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan is to keep this as is for the most part ( after getting a few runs in EI ) . and for the consistency checker I'll have an extended form of the table schema that contains the offset vectors and all the goodies prebaked and present for consistency checker.
If there are any specifics that you think we need for this ( and the ETL functionality ) Please advise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I'm wondering, do we need to be concerned if this struct type changes.... my initial thinking is that it's probably ok since this used to generate NEW datasets, not process old ones?



public class PartitionSplitters {
// need a method called fullPartitionSplitter, takes in list of partition start and end offsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use proper javadoc style comments?

public class PartitionSplitters {
// need a method called fullPartitionSplitter, takes in list of partition start and end offsets
// and returns a list of VenicePubsubInputPartition splits
public static Map<Integer, List<List<Long>>> fullPartitionSplitter(Map<Integer, List<Long>> partitionOffsetsMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like theres an assumption that the list in entries for Map<Integer, List> partitionOffsetsMap need to be two entries, a start and an end. I'm not sure I see yet in this PR how this method is used, but it looks like between this and assembleSegment, an interface which uses a Pair might make more sense?

Or is there a flexibility gain in using a List that I'm missing?

return segment;
}

static long computeIntendedSplitLengthBasedOnCount(Map<Integer, List<Long>> partitionOffsetsMap, int totalSegments) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantic difference between offsets and segments?

properties.putAll(configs);
// the properties here is the entry point for all the configurations
// we receive from the outer layer.
// schem and partitioning are useless and should be discarded?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these were notes?

Properties properties = jobConfig.getPropertiesCopy();
properties.putAll(options.asCaseSensitiveMap());

return new VenicePubsubInputScanBuilder(new VeniceProperties(properties)); // should we flip this to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this comment isn't necessary?

PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName);
PubSubClientsFactory clientsFactory = new PubSubClientsFactory(jobConfig);
// PubSubAdminAdapter pubsubAdminClient =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's clean these out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants