Skip to content

Commit

Permalink
add support for parquet_datapage_version session property
Browse files Browse the repository at this point in the history
  • Loading branch information
svm1 committed Nov 26, 2024
1 parent 059337f commit cd990a4
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 0 deletions.
29 changes: 29 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,21 @@ std::optional<std::string> getTimestampTimeZone(
return std::nullopt;
}

parquet::WriterOptions::DataPageVersion getParquetDataPageVersion(
const config::ConfigBase& config,
const char* configKey) {
if (const auto version = config.get<std::string>(configKey)) {
if (version == "PARQUET_1_0") {
return parquet::WriterOptions::DataPageVersion::V1;
} else if (version == "PARQUET_2_0") {
return parquet::WriterOptions::DataPageVersion::V2;
} else {
VELOX_FAIL("Unsupported parquet datapage version {}", version.value());
}
}
return parquet::WriterOptions::DataPageVersion::kDefault;
}

void updateParquetWriterOptions(
const std::shared_ptr<const HiveConfig>& hiveConfig,
const config::ConfigBase* sessionProperties,
Expand Down Expand Up @@ -956,6 +971,20 @@ void updateParquetWriterOptions(
*hiveConfig->config(), core::QueryConfig::kSessionTimezone);
}

auto sessionParquetDataPageVersion = getParquetDataPageVersion(
*sessionProperties,
parquet::WriterOptions::kParquetSessionDataPageVersion);

if (sessionParquetDataPageVersion !=
parquet::WriterOptions::DataPageVersion::kDefault) {
parquetWriterOptions->parquetDataPageVersion =
sessionParquetDataPageVersion;
} else {
parquetWriterOptions->parquetDataPageVersion = getParquetDataPageVersion(
*hiveConfig->config(),
parquet::WriterOptions::kParquetSessionDataPageVersion);
}

writerOptions = std::move(parquetWriterOptions);
}
#endif
Expand Down
66 changes: 66 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "velox/connectors/hive/HiveConnector.h" // @manual
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand Down Expand Up @@ -77,6 +79,26 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
auto inputStream = std::make_unique<SeekableFileInputStream>(
std::move(file),
colChunkPtr.dataPageOffset(),
150,
*leafPool_,
LogType::TEST);
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
*leafPool_,
colChunkPtr.compression(),
colChunkPtr.totalCompressedSize());
return pageReader->readPageHeader().type;
};

inline static const std::string kHiveConnectorId = "test-hive";
};

Expand Down Expand Up @@ -146,6 +168,50 @@ TEST_F(ParquetWriterTest, compression) {
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

TEST_F(ParquetWriterTest, datapageVersion) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Set parquet datapage version and write data - then read to ensure the
// property took effect.
const auto testDataPageVersion =
[&](facebook::velox::parquet::WriterOptions::DataPageVersion
dataPageVersion) {
// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion = dataPageVersion;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto readDataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
return readDataPageVersion;
};

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::WriterOptions::DataPageVersion::V1),
thrift::PageType::type::DATA_PAGE);

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::WriterOptions::DataPageVersion::V2),
thrift::PageType::type::DATA_PAGE_V2);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();

if (options.parquetDataPageVersion == WriterOptions::DataPageVersion::V2) {
properties = properties->data_page_version(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2);
} else {
properties = properties->data_page_version(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1);
}
return properties->build();
}

Expand Down
11 changes: 11 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -109,6 +110,13 @@ struct WriterOptions : public dwio::common::WriterOptions {
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;

enum class DataPageVersion {
kDefault,
V1,
V2,
};
DataPageVersion parquetDataPageVersion = DataPageVersion::kDefault;

// Parsing session and hive configs.

// This isn't a typo; session and hive connector config names are different
Expand All @@ -117,6 +125,9 @@ struct WriterOptions : public dwio::common::WriterOptions {
"hive.parquet.writer.timestamp_unit";
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
/// Parquet datapage version to be used (V1 or V2). Defaults to V1.
static constexpr const char* kParquetSessionDataPageVersion =
"parquet_writer_version";
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down

0 comments on commit cd990a4

Please sign in to comment.