diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 95c2313d495a..80301dde0a7f 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4272,6 +4272,34 @@ void PrestoVectorSerde::deserializeSingleColumn( *result = row->childAt(0); } +void PrestoVectorSerde::serializeSingleColumn( + const VectorPtr& vector, + const Options* opts, + memory::MemoryPool* pool, + std::ostream* output) { + const auto prestoOptions = toPrestoOptions(opts); + VELOX_USER_CHECK_EQ( + prestoOptions.compressionKind, + common::CompressionKind::CompressionKind_NONE); + VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false); + + const IndexRange range{0, vector->size()}; + const auto arena = std::make_unique(pool); + auto stream = std::make_unique( + vector->type(), + std::nullopt, + std::nullopt, + arena.get(), + vector->size(), + prestoOptions); + Scratch scratch; + serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch); + + PrestoOutputStreamListener listener; + OStreamOutputStream outputStream(output, &listener); + stream->flush(&outputStream); +} + namespace { void initBitsToMapOnce() { static folly::once_flag initOnceFlag; diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index a769488ff52f..b6b2385e3d72 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto { /// 2. To serialize a single RowVector, one can use the BatchVectorSerializer /// returned by createBatchSerializer(). Since it serializes a single RowVector, /// it tries to preserve the encodings of the input data. +/// +/// 3. To serialize data from a vector into a single column, adhering to +/// PrestoPage's column format and excluding the PrestoPage header, one can use +/// serializeSingleColumn() directly. class PrestoVectorSerde : public VectorSerde { public: // Input options that the serializer recognizes. @@ -145,6 +149,18 @@ class PrestoVectorSerde : public VectorSerde { VectorPtr* result, const Options* options); + /// This function is used to serialize data from input vector into a single + /// column that conforms to PrestoPage's column format. The serialized binary + /// data is uncompressed and starts at the column header since the PrestoPage + /// header is not included. The deserializeSingleColumn function can be used + /// to deserialize the serialized binary data returned by this function back + /// to the input vector. + void serializeSingleColumn( + const VectorPtr& vector, + const Options* opts, + memory::MemoryPool* pool, + std::ostream* output); + enum class TokenType { HEADER, NUM_COLUMNS, diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 898939f97a27..472d6b05debf 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -773,6 +773,16 @@ class PrestoSerializerTest makeMapVector({})}); } + void testDeserializeSingleColumn( + const std::string& serializedData, + const VectorPtr& expected) { + auto byteStream = toByteStream(serializedData); + VectorPtr deserialized; + serde_->deserializeSingleColumn( + byteStream.get(), pool(), expected->type(), &deserialized, nullptr); + assertEqualVectors(expected, deserialized); + } + std::unique_ptr serde_; folly::Random::DefaultGenerator rng_; }; @@ -1536,14 +1546,16 @@ INSTANTIATE_TEST_SUITE_P( common::CompressionKind::CompressionKind_LZ4, common::CompressionKind::CompressionKind_GZIP)); -TEST_F(PrestoSerializerTest, deserializeSingleColumn) { - // Verify that deserializeSingleColumn API can handle all supported types. - static const size_t kPrestoPageHeaderBytes = 21; - static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t); - static const size_t kBytesToTrim = - kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes; +TEST_F(PrestoSerializerTest, serdeSingleColumn) { + // The difference between serialized data obtained from + // PrestoIterativeVectorSerializer and serializeSingleColumn() is the + // PrestoPage header and number of columns section in the serialized data. + auto testSerializeRoundTrip = [&](const VectorPtr& vector) { + static const size_t kPrestoPageHeaderBytes = 21; + static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t); + static const size_t kBytesToTrim = + kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes; - auto testRoundTripSingleColumn = [&](const VectorPtr& vector) { auto rowVector = makeRowVector({vector}); // Serialize to PrestoPage format. std::ostringstream output; @@ -1562,14 +1574,17 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { // Remove the PrestoPage header and Number of columns section from the // serialized data. std::string input = output.str().substr(kBytesToTrim); + testDeserializeSingleColumn(input, vector); + }; - auto byteStream = toByteStream(input); - VectorPtr deserialized; - serde_->deserializeSingleColumn( - byteStream.get(), pool(), vector->type(), &deserialized, nullptr); - assertEqualVectors(vector, deserialized); + auto testSerializeSingleColumnRoundTrip = [&](const VectorPtr& vector) { + std::ostringstream output; + serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output); + const auto serialized = output.str(); + testDeserializeSingleColumn(serialized, vector); }; + // Verify that (de)serializeSingleColumn API can handle all supported types. std::vector typesToTest = { BOOLEAN(), TINYINT(), @@ -1607,7 +1622,16 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { for (const auto& type : typesToTest) { SCOPED_TRACE(fmt::format("Type: {}", type->toString())); auto data = fuzzer.fuzz(type); - testRoundTripSingleColumn(data); + + // Test deserializeSingleColumn() round trip with serialized data obtained + // by PrestoIterativeVectorSerializer. This serialized data includes the + // PrestoPage header and number of columns, which is removed for testing. + testSerializeRoundTrip(data); + + // Test serializeSingleColumn() round trip with deserializeSingleColumn(), + // both of these functions do not consider the PrestoPage header and number + // of columns when (de)serializing the data. + testSerializeSingleColumnRoundTrip(data); } }