Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function serializeSingleColumn to PrestoVectorSerde #10657

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4272,6 +4272,34 @@ void PrestoVectorSerde::deserializeSingleColumn(
*result = row->childAt(0);
}

void PrestoVectorSerde::serializeSingleColumn(
pramodsatya marked this conversation as resolved.
Show resolved Hide resolved
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output) {
const auto prestoOptions = toPrestoOptions(opts);
VELOX_USER_CHECK_EQ(
prestoOptions.compressionKind,
common::CompressionKind::CompressionKind_NONE);
VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false);

const IndexRange range{0, vector->size()};
pramodsatya marked this conversation as resolved.
Show resolved Hide resolved
const auto arena = std::make_unique<StreamArena>(pool);
auto stream = std::make_unique<VectorStream>(
vector->type(),
std::nullopt,
std::nullopt,
arena.get(),
vector->size(),
prestoOptions);
Scratch scratch;
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);

PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
}

namespace {
void initBitsToMapOnce() {
static folly::once_flag initOnceFlag;
Expand Down
16 changes: 16 additions & 0 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto {
/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer
/// returned by createBatchSerializer(). Since it serializes a single RowVector,
/// it tries to preserve the encodings of the input data.
///
/// 3. To serialize data from a vector into a single column, adhering to
/// PrestoPage's column format and excluding the PrestoPage header, one can use
/// serializeSingleColumn() directly.
class PrestoVectorSerde : public VectorSerde {
public:
// Input options that the serializer recognizes.
Expand Down Expand Up @@ -145,6 +149,18 @@ class PrestoVectorSerde : public VectorSerde {
VectorPtr* result,
const Options* options);

/// This function is used to serialize data from input vector into a single
/// column that conforms to PrestoPage's column format. The serialized binary
/// data is uncompressed and starts at the column header since the PrestoPage
/// header is not included. The deserializeSingleColumn function can be used
/// to deserialize the serialized binary data returned by this function back
/// to the input vector.
void serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output);

enum class TokenType {
HEADER,
NUM_COLUMNS,
Expand Down
50 changes: 37 additions & 13 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ class PrestoSerializerTest
makeMapVector<StringView, int32_t>({})});
}

void testDeserializeSingleColumn(
const std::string& serializedData,
const VectorPtr& expected) {
auto byteStream = toByteStream(serializedData);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), expected->type(), &deserialized, nullptr);
assertEqualVectors(expected, deserialized);
}

std::unique_ptr<serializer::presto::PrestoVectorSerde> serde_;
folly::Random::DefaultGenerator rng_;
};
Expand Down Expand Up @@ -1536,14 +1546,16 @@ INSTANTIATE_TEST_SUITE_P(
common::CompressionKind::CompressionKind_LZ4,
common::CompressionKind::CompressionKind_GZIP));

TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Verify that deserializeSingleColumn API can handle all supported types.
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;
TEST_F(PrestoSerializerTest, serdeSingleColumn) {
// The difference between serialized data obtained from
// PrestoIterativeVectorSerializer and serializeSingleColumn() is the
// PrestoPage header and number of columns section in the serialized data.
auto testSerializeRoundTrip = [&](const VectorPtr& vector) {
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;

auto testRoundTripSingleColumn = [&](const VectorPtr& vector) {
auto rowVector = makeRowVector({vector});
// Serialize to PrestoPage format.
std::ostringstream output;
Expand All @@ -1562,14 +1574,17 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Remove the PrestoPage header and Number of columns section from the
// serialized data.
std::string input = output.str().substr(kBytesToTrim);
testDeserializeSingleColumn(input, vector);
};

auto byteStream = toByteStream(input);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), vector->type(), &deserialized, nullptr);
assertEqualVectors(vector, deserialized);
auto testSerializeSingleColumnRoundTrip = [&](const VectorPtr& vector) {
std::ostringstream output;
serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output);
const auto serialized = output.str();
testDeserializeSingleColumn(serialized, vector);
};

// Verify that (de)serializeSingleColumn API can handle all supported types.
std::vector<TypePtr> typesToTest = {
BOOLEAN(),
TINYINT(),
Expand Down Expand Up @@ -1607,7 +1622,16 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
for (const auto& type : typesToTest) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
auto data = fuzzer.fuzz(type);
testRoundTripSingleColumn(data);

// Test deserializeSingleColumn() round trip with serialized data obtained
// by PrestoIterativeVectorSerializer. This serialized data includes the
// PrestoPage header and number of columns, which is removed for testing.
testSerializeRoundTrip(data);

// Test serializeSingleColumn() round trip with deserializeSingleColumn(),
// both of these functions do not consider the PrestoPage header and number
// of columns when (de)serializing the data.
testSerializeSingleColumnRoundTrip(data);
}
}

Expand Down
Loading