Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parquet_writer_version session property #11151

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "velox/connectors/hive/HiveConnector.h" // @manual
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/Cursor.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
Expand Down Expand Up @@ -77,6 +79,26 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
auto inputStream = std::make_unique<SeekableFileInputStream>(
std::move(file),
colChunkPtr.dataPageOffset(),
150,
*leafPool_,
LogType::TEST);
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
*leafPool_,
colChunkPtr.compression(),
colChunkPtr.totalCompressedSize());
return pageReader->readPageHeader().type;
};

inline static const std::string kHiveConnectorId = "test-hive";
};

Expand Down Expand Up @@ -146,6 +168,50 @@ TEST_F(ParquetWriterTest, compression) {
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

TEST_F(ParquetWriterTest, datapageVersion) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Set parquet datapage version and write data - then read to ensure the
// property took effect.
const auto testDataPageVersion =
[&](facebook::velox::parquet::arrow::ParquetDataPageVersion
dataPageVersion) {
// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion = dataPageVersion;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto readDataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
return readDataPageVersion;
};

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1),
thrift::PageType::type::DATA_PAGE);

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2),
thrift::PageType::type::DATA_PAGE_V2);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
Expand Down
24 changes: 24 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->data_page_version(options.parquetDataPageVersion.value());

return properties->build();
}

Expand Down Expand Up @@ -238,6 +240,21 @@ std::optional<std::string> getTimestampTimeZone(
return std::nullopt;
}

std::optional<arrow::ParquetDataPageVersion> getParquetDataPageVersion(
const config::ConfigBase& config,
const char* configKey) {
if (const auto version = config.get<std::string>(configKey)) {
if (version == "PARQUET_1_0") {
return arrow::ParquetDataPageVersion::V1;
} else if (version == "PARQUET_2_0") {
return arrow::ParquetDataPageVersion::V2;
} else {
VELOX_FAIL("Unsupported parquet datapage version {}", version.value());
}
}
return std::nullopt;
}

} // namespace

Writer::Writer(
Expand Down Expand Up @@ -470,6 +487,13 @@ void WriterOptions::processConfigs(
: getTimestampTimeZone(
connectorConfig, core::QueryConfig::kSessionTimezone);
}

if (!parquetDataPageVersion) {
parquetDataPageVersion =
getParquetDataPageVersion(session, kParquetSessionDataPageVersion).has_value()
? getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
: getParquetDataPageVersion(connectorConfig, kParquetSessionDataPageVersion);
}
}

} // namespace facebook::velox::parquet
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -108,6 +109,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;
std::optional<arrow::ParquetDataPageVersion> parquetDataPageVersion;

// Parsing session and hive configs.

Expand All @@ -117,6 +119,8 @@ struct WriterOptions : public dwio::common::WriterOptions {
"hive.parquet.writer.timestamp_unit";
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetSessionDataPageVersion =
"parquet_writer_version";

// Process hive connector and session configs.
void processConfigs(
Expand Down
Loading