Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apoursam/spark kafka input #1272

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

// for spark impl this needs to be moved out to the next stages after all the kafka records are captured.


/**
* This class is a Combiner, which is a functionality of the MR framework where we can plug a {@link Reducer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public KafkaInputRecordReader(

/**
* This function will skip all the Control Messages right now.
* This method gets 2 objects, a key and a value, and fills them with
* the next key and value from the Kafka topic partition.
*/
@Override
public boolean next(KafkaInputMapperKey key, KafkaInputMapperValue value) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ public class SparkConstants {
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()) });

public static final StructType KAFKA_INPUT_TABLE_SCHEMA = new StructType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we need to version this at all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan is to keep this as is for the most part ( after getting a few runs in EI ) . and for the consistency checker I'll have an extended form of the table schema that contains the offset vectors and all the goodies prebaked and present for consistency checker.
If there are any specifics that you think we need for this ( and the ETL functionality ) Please advise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I'm wondering, do we need to be concerned if this struct type changes.... my initial thinking is that it's probably ok since this used to generate NEW datasets, not process old ones?

new StructField[] { new StructField("__offset__", IntegerType, false, Metadata.empty()),
new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()),
new StructField("__partition__", IntegerType, false, Metadata.empty()),
new StructField("__message_type__", IntegerType, false, Metadata.empty()),
new StructField("__schema_id__", IntegerType, false, Metadata.empty()),
new StructField("__replication_metadata_version_id__", IntegerType, false, Metadata.empty()),
new StructField("__replication_metadata_payload__", BinaryType, false, Metadata.empty()) });

public static final StructType DEFAULT_SCHEMA_WITH_PARTITION = new StructType(
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.linkedin.venice.spark.input.pubsub;

import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class PartitionSplitters {
// need a method called fullPartitionSplitter, takes in list of partition start and end offsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use proper javadoc style comments?

// and returns a list of VenicePubsubInputPartition splits
public static Map<Integer, List<List<Long>>> fullPartitionSplitter(Map<Integer, List<Long>> partitionOffsetsMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like theres an assumption that the list in entries for Map<Integer, List> partitionOffsetsMap need to be two entries, a start and an end. I'm not sure I see yet in this PR how this method is used, but it looks like between this and assembleSegment, an interface which uses a Pair might make more sense?

Or is there a flexibility gain in using a List that I'm missing?

Map<Integer, List<List<Long>>> splits = new HashMap<>();
for (Map.Entry<Integer, List<Long>> entry: partitionOffsetsMap.entrySet()) {
int partitionNumber = entry.getKey();
List<Long> offsets = entry.getValue(); // start and end offsets
List<List<Long>> split = new ArrayList<>();
split.add(assembleSegment(offsets.get(0), offsets.get(1)));
splits.put(partitionNumber, split);
}
return splits;
}

// this one splits the partitions into equal length segments based on the total number of segments intended
// the final segment count is going to be less than desired count + partition count to accomodate for partial segments
// and division remainders. This method prioritizes paralellism over segment count.
public static Map<Integer, List<List<Long>>> segmentCountSplitter(
Map<Integer, List<Long>> partitionOffsetsMap,
int totalSegments) {
Map<Integer, List<List<Long>>> splits = new HashMap<>();
long intendedSplitLength = computeIntendedSplitLengthBasedOnCount(partitionOffsetsMap, totalSegments);
for (Map.Entry<Integer, List<Long>> entry: partitionOffsetsMap.entrySet()) {
int partitionNumber = entry.getKey();
List<Long> offsets = entry.getValue();
long startOffset = offsets.get(0);
long endOffset = offsets.get(1);
long partitionLength = (endOffset - startOffset);
if (intendedSplitLength >= partitionLength) {
// this whole partition fits nicely in one chunk
List<List<Long>> split = new ArrayList<>();
split.add(assembleSegment(startOffset, endOffset));
splits.put(partitionNumber, split); // this partition is going to be consumed by a single task
} else {
// this partition needs to be split into multiple segments
// first lets see how many segments we can get out of this partition
long count = (long) Math.ceil((double) partitionLength / intendedSplitLength);
long segmentLength = (long) Math.ceil((double) partitionLength / count);
List<List<Long>> split = new ArrayList<>();

// generate the segments
for (int i = 0; i < count; i++) {
List<Long> segment = new ArrayList<>();
segment.add(startOffset + i * segmentLength);
if (i == count - 1) {
segment.add(endOffset); // last segment
} else {
segment.add(startOffset + (i + 1) * segmentLength - 1);
}
split.add(segment);
}
splits.put(partitionNumber, split); // Multiple splits for this partition
}
}
return splits;
}

// and a method that cuts the partitions into segments based on the total number of messages
public static Map<Integer, List<List<Long>>> messageCountSplitter(
Map<Integer, List<Long>> partitionOffsetsMap,
long chunkOffsetSize) {
Map<Integer, List<List<Long>>> splits = new HashMap<>();
for (Map.Entry<Integer, List<Long>> entry: partitionOffsetsMap.entrySet()) {
int partitionNumber = entry.getKey();
List<Long> offsets = entry.getValue();
long startOffset = offsets.get(0);
long endOffset = offsets.get(1);
long partitionLength = (endOffset - startOffset);
if (chunkOffsetSize <= partitionLength) {
// this whole partition fits nicely in one chunk
List<List<Long>> split = new ArrayList<>();
split.add(assembleSegment(startOffset, endOffset));
splits.put(partitionNumber, split); // this partition is going to be consumed by a single task
} else {
// this partition needs to be split into multiple segments
// first lets see how many segments we can get out of this partition
long count = (long) Math.ceil((double) partitionLength / chunkOffsetSize);
long segmentLength = (long) Math.ceil((double) partitionLength / count);
List<List<Long>> split = new ArrayList<>();

// generate the segments
for (int i = 0; i < count; i++) {
List<Long> segment = new ArrayList<>();
segment.add(startOffset + i * segmentLength);
segment.add(startOffset + (i + 1) * segmentLength);
split.add(segment);
}
splits.put(partitionNumber, split); // Multiple splits for this partition
}
}
return splits;
}

// assemble and wrap the splits into VenicePubsubInputPartition objects.
public static List<VenicePubsubInputPartition> convertToInputPartitions(
String region,
String topicName,
Map<Integer, List<List<Long>>> splits) {
List<VenicePubsubInputPartition> veniceInputPartitions = new ArrayList<>();
for (Map.Entry<Integer, List<List<Long>>> entry: splits.entrySet()) {
int partitionNumber = entry.getKey();
List<List<Long>> segments = entry.getValue();
for (List<Long> segment: segments) {
long startOffset = segment.get(0);
long endOffset = segment.get(1);
VenicePubsubInputPartition partition =
new VenicePubsubInputPartition(region, topicName, partitionNumber, startOffset, endOffset);
veniceInputPartitions.add(partition);
}
}
return veniceInputPartitions;
}

// utility methods
private static List<Long> assembleSegment(long startOffset, long endOffset) {
List<Long> split;
List<Long> segment = new ArrayList<>();
segment.add(startOffset);
segment.add(endOffset);
return segment;
}

static long computeIntendedSplitLengthBasedOnCount(Map<Integer, List<Long>> partitionOffsetsMap, int totalSegments) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantic difference between offsets and segments?

Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap);
return (long) Math.ceil(totalLength / totalSegments);
}

static long computeIntendedSplitCountBasedOnOffset(Map<Integer, List<Long>> partitionOffsetsMap, long offsetCount) {
Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap);
return (long) Math.ceil(totalLength / offsetCount);
}

protected static Double computeTopicLengthInOffsets(Map<Integer, List<Long>> partitionOffsetsMap) {
Double totalLength = 0.0;
for (Map.Entry<Integer, List<Long>> entry: partitionOffsetsMap.entrySet()) {
List<Long> offsets = entry.getValue();
totalLength += offsets.get(1) - offsets.get(0);
}
return totalLength;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.venice.spark.input.pubsub.table;

import org.apache.spark.sql.connector.read.InputPartition;

/*
This split can be a whole partition or sub part of a pubsub partition, hence the name segment.
This is intentional not to mix up the Kafka partition and spark idea of a split
the equivalent class for hdfs is VeniceHdfsInputPartition
*/


public class VenicePubsubInputPartition implements InputPartition {
//
private static final long serialVersionUID = 1L;

private final String region;
private final String TopicName;
private final int partitionNumber;
private final long segmentStartOffset;
private final long segmentEndOffset;

public VenicePubsubInputPartition(
String region,
String topicName,
int partitionNumber,
long startOffset,
long endOffset) {
this.region = region;
this.TopicName = topicName;
this.partitionNumber = partitionNumber;
this.segmentStartOffset = startOffset;
this.segmentEndOffset = endOffset;
}

public String getRegion() {
return region;
}

public String getTopicName() {
return TopicName;
}

public int getPartitionNumber() {
return partitionNumber;
}

public long getSegmentStartOffset() {
return segmentStartOffset;
}

public long getSegmentEndOffset() {
return segmentEndOffset;
}
}
Loading