Skip to content

Commit

Permalink
Add function serializeSingleColumn to PrestoVectorSerde
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Nov 5, 2024
1 parent 2ade7f7 commit 3d90559
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 32 deletions.
28 changes: 28 additions & 0 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4263,6 +4263,34 @@ void PrestoVectorSerde::deserializeSingleColumn(
*result = row->childAt(0);
}

void PrestoVectorSerde::serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output) {
const auto prestoOptions = toPrestoOptions(opts);
VELOX_USER_CHECK_EQ(
prestoOptions.compressionKind,
common::CompressionKind::CompressionKind_NONE);
VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false);

const IndexRange range{0, vector->size()};
const auto arena = std::make_unique<StreamArena>(pool);
auto stream = std::make_unique<VectorStream>(
vector->type(),
std::nullopt,
std::nullopt,
arena.get(),
vector->size(),
prestoOptions);
Scratch scratch;
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);

PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
}

// static
void PrestoVectorSerde::registerVectorSerde() {
auto toByte = [](int32_t number, int32_t bit) {
Expand Down
16 changes: 16 additions & 0 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto {
/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer
/// returned by createBatchSerializer(). Since it serializes a single RowVector,
/// it tries to preserve the encodings of the input data.
///
/// 3. To serialize data from a vector into a single column, adhering to
/// PrestoPage's column format and excluding the PrestoPage header, one can use
/// serializeSingleColumn() directly.
class PrestoVectorSerde : public VectorSerde {
public:
// Input options that the serializer recognizes.
Expand Down Expand Up @@ -134,6 +138,18 @@ class PrestoVectorSerde : public VectorSerde {
VectorPtr* result,
const Options* options);

/// This function is used to serialize data from input vector into a single
/// column that conforms to PrestoPage's column format. The serialized binary
/// data is uncompressed and starts at the column header since the PrestoPage
/// header is not included. The deserializeSingleColumn function can be used
/// to deserialize the serialized binary data returned by this function back
/// to the input vector.
void serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output);

enum class TokenType {
HEADER,
NUM_COLUMNS,
Expand Down
87 changes: 55 additions & 32 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,41 @@ class PrestoSerializerTest
makeMapVector<StringView, int32_t>({})});
}

VectorFuzzer getVectorFuzzer() {
VectorFuzzer::Options opts;
opts.vectorSize = 5;
opts.nullRatio = 0.1;
opts.dictionaryHasNulls = false;
opts.stringVariableLength = true;
opts.stringLength = 20;
opts.containerVariableLength = false;
opts.timestampPrecision =
VectorFuzzer::Options::TimestampPrecision::kMilliSeconds;
opts.containerLength = 10;

size_t seed = 0;
LOG(ERROR) << "Seed: " << seed;
SCOPED_TRACE(fmt::format("seed: {}", seed));
return {opts, pool_.get(), seed};
}

const std::vector<TypePtr> serdeTypesToTest_ = {
BOOLEAN(),
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
REAL(),
DOUBLE(),
VARCHAR(),
TIMESTAMP(),
ROW({VARCHAR(), INTEGER()}),
ARRAY(INTEGER()),
ARRAY(INTEGER()),
MAP(VARCHAR(), INTEGER()),
MAP(VARCHAR(), ARRAY(INTEGER())),
};

std::unique_ptr<serializer::presto::PrestoVectorSerde> serde_;
folly::Random::DefaultGenerator rng_;
};
Expand Down Expand Up @@ -1545,41 +1580,29 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
assertEqualVectors(vector, deserialized);
};

std::vector<TypePtr> typesToTest = {
BOOLEAN(),
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
REAL(),
DOUBLE(),
VARCHAR(),
TIMESTAMP(),
ROW({VARCHAR(), INTEGER()}),
ARRAY(INTEGER()),
ARRAY(INTEGER()),
MAP(VARCHAR(), INTEGER()),
MAP(VARCHAR(), ARRAY(INTEGER())),
};

VectorFuzzer::Options opts;
opts.vectorSize = 5;
opts.nullRatio = 0.1;
opts.dictionaryHasNulls = false;
opts.stringVariableLength = true;
opts.stringLength = 20;
opts.containerVariableLength = false;
opts.timestampPrecision =
VectorFuzzer::Options::TimestampPrecision::kMilliSeconds;
opts.containerLength = 10;
auto fuzzer = getVectorFuzzer();
for (const auto& type : serdeTypesToTest_) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
auto data = fuzzer.fuzz(type);
testRoundTripSingleColumn(data);
}
}

auto seed = 0;
TEST_F(PrestoSerializerTest, serializeSingleColumn) {
auto testRoundTripSingleColumn = [&](const VectorPtr& vector) {
std::ostringstream output;
serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output);
const auto serialized = output.str();

LOG(ERROR) << "Seed: " << seed;
SCOPED_TRACE(fmt::format("seed: {}", seed));
VectorFuzzer fuzzer(opts, pool_.get(), seed);
auto byteStream = toByteStream(serialized);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), vector->type(), &deserialized, nullptr);
assertEqualVectors(vector, deserialized);
};

for (const auto& type : typesToTest) {
auto fuzzer = getVectorFuzzer();
for (const auto& type : serdeTypesToTest_) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
auto data = fuzzer.fuzz(type);
testRoundTripSingleColumn(data);
Expand Down

0 comments on commit 3d90559

Please sign in to comment.