Skip to content

Commit

Permalink
Add function serializeSingleColumn to PrestoVectorSerde
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Nov 14, 2024
1 parent f37dc00 commit 6e3d6dd
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
28 changes: 28 additions & 0 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4272,6 +4272,34 @@ void PrestoVectorSerde::deserializeSingleColumn(
*result = row->childAt(0);
}

void PrestoVectorSerde::serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output) {
const auto prestoOptions = toPrestoOptions(opts);
VELOX_USER_CHECK_EQ(
prestoOptions.compressionKind,
common::CompressionKind::CompressionKind_NONE);
VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false);

const IndexRange range{0, vector->size()};
const auto arena = std::make_unique<StreamArena>(pool);
auto stream = std::make_unique<VectorStream>(
vector->type(),
std::nullopt,
std::nullopt,
arena.get(),
vector->size(),
prestoOptions);
Scratch scratch;
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);

PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
}

namespace {
void initBitsToMapOnce() {
static folly::once_flag initOnceFlag;
Expand Down
16 changes: 16 additions & 0 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto {
/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer
/// returned by createBatchSerializer(). Since it serializes a single RowVector,
/// it tries to preserve the encodings of the input data.
///
/// 3. To serialize data from a vector into a single column, adhering to
/// PrestoPage's column format and excluding the PrestoPage header, one can use
/// serializeSingleColumn() directly.
class PrestoVectorSerde : public VectorSerde {
public:
// Input options that the serializer recognizes.
Expand Down Expand Up @@ -145,6 +149,18 @@ class PrestoVectorSerde : public VectorSerde {
VectorPtr* result,
const Options* options);

/// This function is used to serialize data from input vector into a single
/// column that conforms to PrestoPage's column format. The serialized binary
/// data is uncompressed and starts at the column header since the PrestoPage
/// header is not included. The deserializeSingleColumn function can be used
/// to deserialize the serialized binary data returned by this function back
/// to the input vector.
void serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output);

enum class TokenType {
HEADER,
NUM_COLUMNS,
Expand Down
50 changes: 37 additions & 13 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ class PrestoSerializerTest
makeMapVector<StringView, int32_t>({})});
}

void testDeserializeSingleColumn(
const std::string& serializedData,
const VectorPtr& expected) {
auto byteStream = toByteStream(serializedData);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), expected->type(), &deserialized, nullptr);
assertEqualVectors(expected, deserialized);
}

std::unique_ptr<serializer::presto::PrestoVectorSerde> serde_;
folly::Random::DefaultGenerator rng_;
};
Expand Down Expand Up @@ -1536,14 +1546,16 @@ INSTANTIATE_TEST_SUITE_P(
common::CompressionKind::CompressionKind_LZ4,
common::CompressionKind::CompressionKind_GZIP));

TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Verify that deserializeSingleColumn API can handle all supported types.
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;
TEST_F(PrestoSerializerTest, serdeSingleColumn) {
// The difference between serialized data obtained from
// PrestoIterativeVectorSerializer and serializeSingleColumn() is the
// PrestoPage header and number of columns section in the serialized data.
auto testSerializeRoundTrip = [&](const VectorPtr& vector) {
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;

auto testRoundTripSingleColumn = [&](const VectorPtr& vector) {
auto rowVector = makeRowVector({vector});
// Serialize to PrestoPage format.
std::ostringstream output;
Expand All @@ -1562,14 +1574,17 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Remove the PrestoPage header and Number of columns section from the
// serialized data.
std::string input = output.str().substr(kBytesToTrim);
testDeserializeSingleColumn(input, vector);
};

auto byteStream = toByteStream(input);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), vector->type(), &deserialized, nullptr);
assertEqualVectors(vector, deserialized);
auto testSerializeSingleColumnRoundTrip = [&](const VectorPtr& vector) {
std::ostringstream output;
serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output);
const auto serialized = output.str();
testDeserializeSingleColumn(serialized, vector);
};

// Verify that (de)serializeSingleColumn API can handle all supported types.
std::vector<TypePtr> typesToTest = {
BOOLEAN(),
TINYINT(),
Expand Down Expand Up @@ -1607,7 +1622,16 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
for (const auto& type : typesToTest) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
auto data = fuzzer.fuzz(type);
testRoundTripSingleColumn(data);

// Test deserializeSingleColumn() round trip with serialized data obtained
// by PrestoIterativeVectorSerializer. This serialized data includes the
// PrestoPage header and number of columns, which is removed for testing.
testSerializeRoundTrip(data);

// Test serializeSingleColumn() round trip with deserializeSingleColumn(),
// both of these functions do not consider the PrestoPage header and number
// of columns when (de)serializing the data.
testSerializeSingleColumnRoundTrip(data);
}
}

Expand Down

0 comments on commit 6e3d6dd

Please sign in to comment.