From d00e50ef65fd5cad0aa75e77c68393dcae2c5672 Mon Sep 17 00:00:00 2001 From: Pramod Date: Fri, 15 Nov 2024 19:45:36 -0800 Subject: [PATCH] Add function serializeSingleColumn to PrestoVectorSerde (#10657) Summary: https://github.com/prestodb/presto/pull/23331 adds a native expression optimizer to delegate expression evaluation to the native sidecar. This is used to constant fold expressions on the presto native sidecar, instead of on the presto java coordinator (which is the current behavior). https://github.com/prestodb/presto/pull/22927 implements a proxygen endpoint to accept `RowExpression`s from `NativeSidecarExpressionInterpreter`, optimize them if possible (rewrite special form expressions), and compile the `RowExpression` to a velox expression with constant folding enabled. This velox expression is then converted back to a `RowExpression` and returned by the sidecar to the coordinator. When the constant folded velox expression is of type `velox::exec::ConstantExpr`, we need to return a `RowExpression` of type `ConstantExpression`. This requires us to serialize the constant value from `velox::exec::ConstantExpr` into `protocol::ConstantExpression::valueBlock`. This can be done by serializing the constant value vector to presto SerializedPage::column format, followed by base 64 encoding the result (reverse engineering the logic from `Base64Util.cpp::readBlock`). This PR adds a new function, `serializeSingleColumn`, to `PrestoVectorSerde`. This can be used to serialize input data from vectors containing a single element into a single PrestoPage column format (without the PrestoPage header). This function is not added to `PrestoBatchVectorSerializer` alongside the existing `serialize` function since that would require adding it as a virtual function in `BatchVectorSerializer` as well, and this is not desirable since the `PrestoPage` format is not relevant in this base class. There is an existing function `deserializeSingleColumn` in `PrestoVectorSerde` which is used to deserialize data from a single column, since `serializeSingleColumn` performs the inverse operation to this function, it is added alongside it in `PrestoVectorSerde`. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10657 Reviewed By: amitkdutta Differential Revision: D66044754 Pulled By: pedroerp fbshipit-source-id: e509605067920f8207e5a3fa67552badc2ce0eba --- velox/serializers/PrestoSerializer.cpp | 28 +++++++++++ velox/serializers/PrestoSerializer.h | 16 ++++++ .../tests/PrestoSerializerTest.cpp | 50 ++++++++++++++----- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 95c2313d495a..80301dde0a7f 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4272,6 +4272,34 @@ void PrestoVectorSerde::deserializeSingleColumn( *result = row->childAt(0); } +void PrestoVectorSerde::serializeSingleColumn( + const VectorPtr& vector, + const Options* opts, + memory::MemoryPool* pool, + std::ostream* output) { + const auto prestoOptions = toPrestoOptions(opts); + VELOX_USER_CHECK_EQ( + prestoOptions.compressionKind, + common::CompressionKind::CompressionKind_NONE); + VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false); + + const IndexRange range{0, vector->size()}; + const auto arena = std::make_unique(pool); + auto stream = std::make_unique( + vector->type(), + std::nullopt, + std::nullopt, + arena.get(), + vector->size(), + prestoOptions); + Scratch scratch; + serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch); + + PrestoOutputStreamListener listener; + OStreamOutputStream outputStream(output, &listener); + stream->flush(&outputStream); +} + namespace { void initBitsToMapOnce() { static folly::once_flag initOnceFlag; diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index a769488ff52f..b6b2385e3d72 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto { /// 2. To serialize a single RowVector, one can use the BatchVectorSerializer /// returned by createBatchSerializer(). Since it serializes a single RowVector, /// it tries to preserve the encodings of the input data. +/// +/// 3. To serialize data from a vector into a single column, adhering to +/// PrestoPage's column format and excluding the PrestoPage header, one can use +/// serializeSingleColumn() directly. class PrestoVectorSerde : public VectorSerde { public: // Input options that the serializer recognizes. @@ -145,6 +149,18 @@ class PrestoVectorSerde : public VectorSerde { VectorPtr* result, const Options* options); + /// This function is used to serialize data from input vector into a single + /// column that conforms to PrestoPage's column format. The serialized binary + /// data is uncompressed and starts at the column header since the PrestoPage + /// header is not included. The deserializeSingleColumn function can be used + /// to deserialize the serialized binary data returned by this function back + /// to the input vector. + void serializeSingleColumn( + const VectorPtr& vector, + const Options* opts, + memory::MemoryPool* pool, + std::ostream* output); + enum class TokenType { HEADER, NUM_COLUMNS, diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 898939f97a27..472d6b05debf 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -773,6 +773,16 @@ class PrestoSerializerTest makeMapVector({})}); } + void testDeserializeSingleColumn( + const std::string& serializedData, + const VectorPtr& expected) { + auto byteStream = toByteStream(serializedData); + VectorPtr deserialized; + serde_->deserializeSingleColumn( + byteStream.get(), pool(), expected->type(), &deserialized, nullptr); + assertEqualVectors(expected, deserialized); + } + std::unique_ptr serde_; folly::Random::DefaultGenerator rng_; }; @@ -1536,14 +1546,16 @@ INSTANTIATE_TEST_SUITE_P( common::CompressionKind::CompressionKind_LZ4, common::CompressionKind::CompressionKind_GZIP)); -TEST_F(PrestoSerializerTest, deserializeSingleColumn) { - // Verify that deserializeSingleColumn API can handle all supported types. - static const size_t kPrestoPageHeaderBytes = 21; - static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t); - static const size_t kBytesToTrim = - kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes; +TEST_F(PrestoSerializerTest, serdeSingleColumn) { + // The difference between serialized data obtained from + // PrestoIterativeVectorSerializer and serializeSingleColumn() is the + // PrestoPage header and number of columns section in the serialized data. + auto testSerializeRoundTrip = [&](const VectorPtr& vector) { + static const size_t kPrestoPageHeaderBytes = 21; + static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t); + static const size_t kBytesToTrim = + kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes; - auto testRoundTripSingleColumn = [&](const VectorPtr& vector) { auto rowVector = makeRowVector({vector}); // Serialize to PrestoPage format. std::ostringstream output; @@ -1562,14 +1574,17 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { // Remove the PrestoPage header and Number of columns section from the // serialized data. std::string input = output.str().substr(kBytesToTrim); + testDeserializeSingleColumn(input, vector); + }; - auto byteStream = toByteStream(input); - VectorPtr deserialized; - serde_->deserializeSingleColumn( - byteStream.get(), pool(), vector->type(), &deserialized, nullptr); - assertEqualVectors(vector, deserialized); + auto testSerializeSingleColumnRoundTrip = [&](const VectorPtr& vector) { + std::ostringstream output; + serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output); + const auto serialized = output.str(); + testDeserializeSingleColumn(serialized, vector); }; + // Verify that (de)serializeSingleColumn API can handle all supported types. std::vector typesToTest = { BOOLEAN(), TINYINT(), @@ -1607,7 +1622,16 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { for (const auto& type : typesToTest) { SCOPED_TRACE(fmt::format("Type: {}", type->toString())); auto data = fuzzer.fuzz(type); - testRoundTripSingleColumn(data); + + // Test deserializeSingleColumn() round trip with serialized data obtained + // by PrestoIterativeVectorSerializer. This serialized data includes the + // PrestoPage header and number of columns, which is removed for testing. + testSerializeRoundTrip(data); + + // Test serializeSingleColumn() round trip with deserializeSingleColumn(), + // both of these functions do not consider the PrestoPage header and number + // of columns when (de)serializing the data. + testSerializeSingleColumnRoundTrip(data); } }