Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark column chunks in a PQ reader pass as large strings when the cumulative offsets exceeds the large strings threshold. #17207

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,37 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
_stream);
}

// Compute column string sizes (using page string offsets) for this subpass
col_string_sizes = calculate_page_string_offsets();

// check for overflow
// ensure cumulative column string sizes have been initialized
if (pass.cumulative_col_string_sizes.empty()) {
pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0);
}

// Add to the cumulative column string sizes of this pass
std::transform(pass.cumulative_col_string_sizes.begin(),
pass.cumulative_col_string_sizes.end(),
col_string_sizes.begin(),
pass.cumulative_col_string_sizes.begin(),
vuule marked this conversation as resolved.
Show resolved Hide resolved
std::plus<>{});

// Check for overflow in cumulative column string sizes of this pass so that the page string
// offsets of overflowing (large) string columns are treated as 64-bit.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(),
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
pass.cumulative_col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
// Mark any chunks for which the cumulative column string size has exceeded the
// large strings threshold
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}
Expand Down Expand Up @@ -195,7 +210,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// only do string buffer for leaf
if (idx == max_depth - 1 and out_buf.string_size() == 0 and
col_string_sizes[pass.chunks[c].src_col_index] > 0) {
out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index], _stream);
out_buf.create_string_data(
col_string_sizes[pass.chunks[c].src_col_index],
pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] >
static_cast<size_t>(strings::detail::get_offset64_threshold()),
_stream);
}
if (has_strings) { str_data[idx] = out_buf.string_data(); }
out_buf.user_data |=
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ struct pass_intermediate_data {
rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()};
rmm::device_uvector<string_index_pair> str_dict_index{0, cudf::get_default_stream()};

// cumulative strings column sizes.
std::vector<size_t> cumulative_col_string_sizes{};

int level_type_size{0};

// skip_rows / num_rows for this pass.
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_d
}

void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream)
{
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
_is_large_strings_col = is_large_strings_col;
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

namespace {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,17 @@ class inline_column_buffer : public column_buffer_base<inline_column_buffer> {
[[nodiscard]] size_t data_size_impl() const { return _data.size(); }
std::unique_ptr<column> make_string_column_impl(rmm::cuda_stream_view stream);

void create_string_data(size_t num_bytes, rmm::cuda_stream_view stream);
void create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream);
void* string_data() { return _string_data.data(); }
[[nodiscard]] void const* string_data() const { return _string_data.data(); }
[[nodiscard]] size_t string_size() const { return _string_data.size(); }
[[nodiscard]] bool is_large_strings_column() const { return _is_large_strings_col; }

private:
rmm::device_buffer _string_data{};
bool _is_large_strings_col{};
};

using column_buffer = gather_column_buffer;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_colu
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (is_large_strings_column()) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
Expand Down
74 changes: 74 additions & 0 deletions cpp/tests/large_strings/parquet_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf_test/table_utilities.hpp>

#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>
Expand Down Expand Up @@ -69,3 +70,76 @@ TEST_F(ParquetStringsTest, ReadLargeStrings)
// go back to normal threshold
unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD");
}

// Disabled as the test is too brittle and depends on empirically set `pass_read_limit`,
// encoding type, and the currently used `ZSTD` scratch space size.
TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings)
{
// Construct a table with one large strings column > 2GB
auto const wide = this->wide_column();
auto input = cudf::concatenate(std::vector<cudf::column_view>(120000, wide)); ///< 230MB

int constexpr multiplier = 12;
std::vector<cudf::column_view> input_cols(multiplier, input->view());
auto col0 = cudf::concatenate(input_cols); ///< 2.70GB
Copy link
Member Author

@mhaseeb123 mhaseeb123 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same column from GTest: CaseTest.ToLower


// Expected table
auto const expected = cudf::table_view{{col0->view()}};
auto expected_metadata = cudf::io::table_input_metadata{expected};

// Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with
// regualar ones. This may change in the future and lead to false failures.
expected_metadata.column_metadata[0].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

// Host buffer to write Parquet
std::vector<char> buffer;

// Writer options
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected)
.metadata(expected_metadata);

// Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with
// regualar ones. This may change in the future and lead to false failures.
out_opts.set_compression(cudf::io::compression_type::ZSTD);

// Write to Parquet
cudf::io::write_parquet(out_opts);

// Empirically set pass_read_limit of 8GB so we read almost entire table (>2GB strings) in the
// first subpass and only a small amount in the second subpass. This may change in the future
// and lead to false failures.
size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * 1024;

// Reader options
cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size()));

// Chunked parquet reader
auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts);

// Read chunked
auto tables = std::vector<std::unique_ptr<cudf::table>>{};
while (reader.has_next()) {
tables.emplace_back(reader.read_chunk().tbl);
}
auto table_views = std::vector<cudf::table_view>{};
std::transform(tables.begin(), tables.end(), std::back_inserter(table_views), [](auto& tbl) {
return tbl->view();
});
auto result = cudf::concatenate(table_views);
auto const result_view = result->view();

// Verify offsets
for (auto const& cv : result_view) {
auto const offsets = cudf::strings_column_view(cv).offsets();
EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64});
}

// Verify tables to be equal
CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected);

// Verify that we read exactly two table chunks
EXPECT_EQ(tables.size(), 2);
}
Loading