From f15984444418ada7c059f6b3c8446d9d9530966d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 30 Oct 2024 00:53:12 +0000 Subject: [PATCH 01/15] Fix for invalid large strings columns in smaller chunks of chunked parquet reader --- cpp/src/io/parquet/reader_impl.cpp | 31 ++++++++++++++++--- cpp/src/io/parquet/reader_impl_chunking.hpp | 3 ++ cpp/src/io/utilities/column_buffer.cpp | 4 ++- cpp/src/io/utilities/column_buffer.hpp | 6 +++- cpp/src/io/utilities/column_buffer_strings.cu | 3 +- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index fed1a309064..06b4da9c069 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -108,11 +108,25 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // mark any chunks that are large string columns + // Mark any chunks that are large string columns if (has_large_strings) { + if (pass.large_strings_cols.empty()) { + pass.large_strings_cols.resize(_input_columns.size()); + } + for (auto& chunk : pass.chunks) { + auto const idx = chunk.src_col_index; + if (col_string_sizes[idx] > threshold) { + chunk.is_large_string_col = true; + pass.large_strings_cols[idx] = true; + } + } + } + // Mark any chunks belonging to large strings columns for a previous table chunk + // in the same pass + else if (pass.large_strings_cols.size()) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; - if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } + if (pass.large_strings_cols[idx]) { chunk.is_large_string_col = true; } } } } @@ -192,10 +206,17 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num if (owning_schema == 0 || owning_schema == input_col.schema_idx) { valids[idx] = out_buf.null_mask(); data[idx] = out_buf.data(); + // String size of the current column + auto const col_string_size = col_string_sizes[pass.chunks[c].src_col_index]; // only do string buffer for leaf - if (idx == max_depth - 1 and out_buf.string_size() == 0 and - col_string_sizes[pass.chunks[c].src_col_index] > 0) { - out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index], _stream); + if (idx == max_depth - 1 and out_buf.string_size() == 0 and col_string_size > 0) { + out_buf.create_string_data( + col_string_size, + pass.large_strings_cols.empty() + ? false + : pass.large_strings_cols[pass.chunks[c].src_col_index] or + col_string_size > static_cast(strings::detail::get_offset64_threshold()), + _stream); } if (has_strings) { str_data[idx] = out_buf.string_data(); } out_buf.user_data |= diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp index a0c2dbd3e44..0c75cf6b89e 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.hpp +++ b/cpp/src/io/parquet/reader_impl_chunking.hpp @@ -130,6 +130,9 @@ struct pass_intermediate_data { rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()}; rmm::device_uvector str_dict_index{0, cudf::get_default_stream()}; + // record of any large strings columns seen in a sub-pass. + std::vector large_strings_cols{}; + int level_type_size{0}; // skip_rows / num_rows for this pass. diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 6d954753af8..41ed55cd090 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -63,9 +63,11 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_d } void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes, + bool is_large_strings_col, rmm::cuda_stream_view stream) { - _string_data = rmm::device_buffer(num_bytes, stream, _mr); + _is_large_strings_col = is_large_strings_col; + _string_data = rmm::device_buffer(num_bytes, stream, _mr); } namespace { diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 31c8b781e77..da19539f509 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -246,13 +246,17 @@ class inline_column_buffer : public column_buffer_base { [[nodiscard]] size_t data_size_impl() const { return _data.size(); } std::unique_ptr make_string_column_impl(rmm::cuda_stream_view stream); - void create_string_data(size_t num_bytes, rmm::cuda_stream_view stream); + void create_string_data(size_t num_bytes, + bool is_large_strings_col, + rmm::cuda_stream_view stream); void* string_data() { return _string_data.data(); } [[nodiscard]] void const* string_data() const { return _string_data.data(); } [[nodiscard]] size_t string_size() const { return _string_data.size(); } + [[nodiscard]] bool is_large_strings_column() const { return _is_large_strings_col; } private: rmm::device_buffer _string_data{}; + bool _is_large_strings_col{}; }; using column_buffer = gather_column_buffer; diff --git a/cpp/src/io/utilities/column_buffer_strings.cu b/cpp/src/io/utilities/column_buffer_strings.cu index 4bc303a34a5..66d0a644c12 100644 --- a/cpp/src/io/utilities/column_buffer_strings.cu +++ b/cpp/src/io/utilities/column_buffer_strings.cu @@ -27,8 +27,7 @@ std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_colu { // if the size of _string_data is over the threshold for 64bit size_type, _data will contain // sizes rather than offsets. need special handling for that case. - auto const threshold = static_cast(strings::detail::get_offset64_threshold()); - if (_string_data.size() > threshold) { + if (is_large_strings_column()) { if (not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } From 82ffa0069f841d13eb29a8fe2b9e1303add6867d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 30 Oct 2024 01:13:54 +0000 Subject: [PATCH 02/15] Minor updates --- cpp/src/io/parquet/reader_impl.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 06b4da9c069..8eebd2ace7a 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -97,6 +97,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num _stream); } + // column string sizes for this subpass col_string_sizes = calculate_page_string_offsets(); // check for overflow @@ -108,22 +109,21 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // Mark any chunks that are large string columns + // Mark any chunks that are large string columns in this subpass if (has_large_strings) { if (pass.large_strings_cols.empty()) { pass.large_strings_cols.resize(_input_columns.size()); } for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; - if (col_string_sizes[idx] > threshold) { + if (col_string_sizes[idx] > threshold or pass.large_strings_cols[idx]) { chunk.is_large_string_col = true; pass.large_strings_cols[idx] = true; } } } - // Mark any chunks belonging to large strings columns for a previous table chunk - // in the same pass - else if (pass.large_strings_cols.size()) { + // Mark any chunks previously marked as large strings columns + else if (not pass.large_strings_cols.empty()) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; if (pass.large_strings_cols[idx]) { chunk.is_large_string_col = true; } From 033cc14dbdc6ecf92fe709d52ae42d9c24f24c2a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 30 Oct 2024 01:35:57 +0000 Subject: [PATCH 03/15] Minor improvements --- cpp/src/io/parquet/reader_impl.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8eebd2ace7a..b0cba34fdd1 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -109,10 +109,10 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // Mark any chunks that are large string columns in this subpass + // Mark any chunks that are large string columns in this or previous subpasses if (has_large_strings) { if (pass.large_strings_cols.empty()) { - pass.large_strings_cols.resize(_input_columns.size()); + pass.large_strings_cols.resize(_input_columns.size(), false); } for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; @@ -122,7 +122,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num } } } - // Mark any chunks previously marked as large strings columns + // Mark all chunks of previously large strings columns else if (not pass.large_strings_cols.empty()) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; From fd0c82aa22e30354d19b8e01f523594ed0915e23 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 30 Oct 2024 19:04:22 +0000 Subject: [PATCH 04/15] A better way to mark large string cols --- cpp/src/io/parquet/reader_impl.cpp | 40 ++++++++++----------- cpp/src/io/parquet/reader_impl_chunking.hpp | 4 +-- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b0cba34fdd1..2813afba305 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -100,33 +100,33 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // column string sizes for this subpass col_string_sizes = calculate_page_string_offsets(); + // ensure cumulative column string sizes have been initialized + if (pass.cumulative_col_string_sizes.empty()) { + pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0); + } + + // add to cumulative column string sizes from this subpass + std::transform(pass.cumulative_col_string_sizes.begin(), + pass.cumulative_col_string_sizes.end(), + col_string_sizes.begin(), + pass.cumulative_col_string_sizes.begin(), + std::plus<>{}); + // check for overflow auto const threshold = static_cast(strings::detail::get_offset64_threshold()); - auto const has_large_strings = std::any_of(col_string_sizes.cbegin(), - col_string_sizes.cend(), + auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(), + pass.cumulative_col_string_sizes.cend(), [=](std::size_t sz) { return sz > threshold; }); if (has_large_strings and not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // Mark any chunks that are large string columns in this or previous subpasses + // Mark any chunks for which the cumulative string columns size has exceeded the large strings + // threshold if (has_large_strings) { - if (pass.large_strings_cols.empty()) { - pass.large_strings_cols.resize(_input_columns.size(), false); - } - for (auto& chunk : pass.chunks) { - auto const idx = chunk.src_col_index; - if (col_string_sizes[idx] > threshold or pass.large_strings_cols[idx]) { - chunk.is_large_string_col = true; - pass.large_strings_cols[idx] = true; - } - } - } - // Mark all chunks of previously large strings columns - else if (not pass.large_strings_cols.empty()) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; - if (pass.large_strings_cols[idx]) { chunk.is_large_string_col = true; } + if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } } } } @@ -212,10 +212,8 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num if (idx == max_depth - 1 and out_buf.string_size() == 0 and col_string_size > 0) { out_buf.create_string_data( col_string_size, - pass.large_strings_cols.empty() - ? false - : pass.large_strings_cols[pass.chunks[c].src_col_index] or - col_string_size > static_cast(strings::detail::get_offset64_threshold()), + pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] > + static_cast(strings::detail::get_offset64_threshold()), _stream); } if (has_strings) { str_data[idx] = out_buf.string_data(); } diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp index 0c75cf6b89e..ca46f198bb8 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.hpp +++ b/cpp/src/io/parquet/reader_impl_chunking.hpp @@ -130,8 +130,8 @@ struct pass_intermediate_data { rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()}; rmm::device_uvector str_dict_index{0, cudf::get_default_stream()}; - // record of any large strings columns seen in a sub-pass. - std::vector large_strings_cols{}; + // cumulative strings column sizes. + std::vector cumulative_col_string_sizes{}; int level_type_size{0}; From 902ac8905ed509a5438d7b09250450e923ad15b7 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 00:33:45 +0000 Subject: [PATCH 05/15] Add a gtest for chunked pq reader with large strings column --- cpp/tests/large_strings/parquet_tests.cpp | 56 +++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index f47782a2d02..f7a18555812 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -69,3 +70,58 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) // go back to normal threshold unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); } + +// The test below requires a huge amount of memory, thus it is disabled by default. +TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings) +{ + // Construct a table with one large strings column > 2GB + auto const wide = this->wide_column(); + auto input = cudf::concatenate(std::vector(120000, wide)); ///< 230MB + + int constexpr multiplier = 12; + std::vector input_cols(multiplier, input->view()); + auto col0 = cudf::concatenate(input_cols); ///< 2.70GB + + // Expected table + auto const expected = cudf::table_view{{col0->view()}}; + auto expected_metadata = cudf::io::table_input_metadata{expected}; + expected_metadata.column_metadata[0].set_encoding( + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); + + // Write to Parquet + auto const filepath = g_temp_env->get_temp_filepath("ChunkedReadLargeStrings.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::ZSTD) + .stats_level(cudf::io::STATISTICS_NONE) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + // Read with chunked_parquet_reader + size_t constexpr pass_read_limit = + size_t{8} * 1024 * 1024 * + 1024; ///< Set to 8GB so we read almost entire table (>2GB string) in the first subpass + ///< and only a small amount in the second subpass. + cudf::io::parquet_reader_options default_in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts); + + auto tables = std::vector>{}; + while (reader.has_next()) { + tables.emplace_back(reader.read_chunk().tbl); + } + auto table_views = std::vector{}; + for (auto const& tbl : tables) { + table_views.emplace_back(tbl->view()); + } + auto result = cudf::concatenate(table_views); + auto const result_view = result->view(); + + // Verify + for (auto cv : result_view) { + auto const offsets = cudf::strings_column_view(cv).offsets(); + EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); + } + EXPECT_EQ(tables.size(), 2); + CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); +} \ No newline at end of file From ac5846f782eeafa44e7789e1aa2119cfceb7990c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 01:12:47 +0000 Subject: [PATCH 06/15] style fix --- cpp/tests/large_strings/parquet_tests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index f7a18555812..3fc63bad68c 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -124,4 +124,4 @@ TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings) } EXPECT_EQ(tables.size(), 2); CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); -} \ No newline at end of file +} From 16671df13ab1806b59926b45c20368d4ab158339 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 01:17:51 +0000 Subject: [PATCH 07/15] No need to disable the test --- cpp/tests/large_strings/parquet_tests.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index 3fc63bad68c..d4027ab092f 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -71,8 +71,7 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); } -// The test below requires a huge amount of memory, thus it is disabled by default. -TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings) +TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) { // Construct a table with one large strings column > 2GB auto const wide = this->wide_column(); From b0c033deb825f4bf03e35a2677855da952dab156 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 11:44:02 -0700 Subject: [PATCH 08/15] Apply suggestion Co-authored-by: David Wendt <45795991+davidwendt@users.noreply.github.com> --- cpp/tests/large_strings/parquet_tests.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index d4027ab092f..c1db8e7c3aa 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -110,9 +110,7 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) tables.emplace_back(reader.read_chunk().tbl); } auto table_views = std::vector{}; - for (auto const& tbl : tables) { - table_views.emplace_back(tbl->view()); - } + std::transform( tables.begin(), tables.end(), std::back_inserter(table_views), [] (auto& t) { return t->view(); }); auto result = cudf::concatenate(table_views); auto const result_view = result->view(); From 9b9273e087e7296853b0acf460e6e69f77e40e2c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 18:49:03 +0000 Subject: [PATCH 09/15] minor style fix --- cpp/tests/large_strings/parquet_tests.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index c1db8e7c3aa..51afffa0aec 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -110,7 +110,9 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) tables.emplace_back(reader.read_chunk().tbl); } auto table_views = std::vector{}; - std::transform( tables.begin(), tables.end(), std::back_inserter(table_views), [] (auto& t) { return t->view(); }); + std::transform(tables.begin(), tables.end(), std::back_inserter(table_views), [](auto& tbl) { + return tbl->view(); + }); auto result = cudf::concatenate(table_views); auto const result_view = result->view(); From ab64a46dbef8c1ba45e977910a6eedbe4e1d25d7 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 31 Oct 2024 19:06:32 +0000 Subject: [PATCH 10/15] minor --- cpp/src/io/parquet/reader_impl.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 2813afba305..ddeaa9bebd2 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -206,12 +206,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num if (owning_schema == 0 || owning_schema == input_col.schema_idx) { valids[idx] = out_buf.null_mask(); data[idx] = out_buf.data(); - // String size of the current column - auto const col_string_size = col_string_sizes[pass.chunks[c].src_col_index]; // only do string buffer for leaf - if (idx == max_depth - 1 and out_buf.string_size() == 0 and col_string_size > 0) { + if (idx == max_depth - 1 and out_buf.string_size() == 0 and + col_string_sizes[pass.chunks[c].src_col_index] > 0) { out_buf.create_string_data( - col_string_size, + col_string_sizes[pass.chunks[c].src_col_index], pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] > static_cast(strings::detail::get_offset64_threshold()), _stream); From 8cfa14a67b942bae8b0f9bb32115ec9759ffbc5f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 4 Nov 2024 13:27:38 -0800 Subject: [PATCH 11/15] Apply suggestions from code review Co-authored-by: Vukasin Milovanovic --- cpp/tests/large_strings/parquet_tests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index 51afffa0aec..76ff650347f 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -117,7 +117,7 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) auto const result_view = result->view(); // Verify - for (auto cv : result_view) { + for (auto const& cv : result_view) { auto const offsets = cudf::strings_column_view(cv).offsets(); EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); } From c5721cbccfe214f517525a0ad916643d0c9e4098 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 4 Nov 2024 23:55:17 +0000 Subject: [PATCH 12/15] Add comments --- cpp/src/io/parquet/reader_impl.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index ddeaa9bebd2..d847d3d3a8c 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -97,7 +97,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num _stream); } - // column string sizes for this subpass + // Compute column string sizes (using page string offsets) for this subpass col_string_sizes = calculate_page_string_offsets(); // ensure cumulative column string sizes have been initialized @@ -105,14 +105,15 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0); } - // add to cumulative column string sizes from this subpass + // Add to the cumulative column string sizes of this pass std::transform(pass.cumulative_col_string_sizes.begin(), pass.cumulative_col_string_sizes.end(), col_string_sizes.begin(), pass.cumulative_col_string_sizes.begin(), std::plus<>{}); - // check for overflow + // Check for overflow in cumulative column string sizes of this pass so that the page string + // offsets of overflowing (large) string columns are treated as 64-bit. auto const threshold = static_cast(strings::detail::get_offset64_threshold()); auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(), pass.cumulative_col_string_sizes.cend(), @@ -121,8 +122,8 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // Mark any chunks for which the cumulative string columns size has exceeded the large strings - // threshold + // Mark any chunks for which the cumulative column string size has exceeded the + // large strings threshold if (has_large_strings) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; From a75a8777afcad85cf99cfa7d2d5cc296901c6eda Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 5 Nov 2024 04:11:22 +0000 Subject: [PATCH 13/15] Apply suggestion from code review. --- cpp/tests/large_strings/parquet_tests.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index 76ff650347f..5e1171d2fbe 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -52,8 +52,6 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) - .compression(cudf::io::compression_type::ZSTD) - .stats_level(cudf::io::STATISTICS_NONE) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -88,21 +86,20 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); // Write to Parquet - auto const filepath = g_temp_env->get_temp_filepath("ChunkedReadLargeStrings.parquet"); + std::vector buffer; cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected) .compression(cudf::io::compression_type::ZSTD) - .stats_level(cudf::io::STATISTICS_NONE) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); // Read with chunked_parquet_reader size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * - 1024; ///< Set to 8GB so we read almost entire table (>2GB string) in the first subpass + 1024; ///< Set to 8GB so we read almost entire table (>2GB string) in the first subpass ///< and only a small amount in the second subpass. cudf::io::parquet_reader_options default_in_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size())); auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts); auto tables = std::vector>{}; From 6dc7ab783e33a69561e60fa957946f159363bd22 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 5 Nov 2024 04:24:49 +0000 Subject: [PATCH 14/15] minor fix --- cpp/tests/large_strings/parquet_tests.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index 5e1171d2fbe..48c303bd4a0 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -52,6 +52,8 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::ZSTD) + .stats_level(cudf::io::STATISTICS_NONE) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -83,21 +85,23 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) auto const expected = cudf::table_view{{col0->view()}}; auto expected_metadata = cudf::io::table_input_metadata{expected}; expected_metadata.column_metadata[0].set_encoding( - cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); ///< Needed to get exactly 2 chunks when + ///< chunked reading // Write to Parquet std::vector buffer; cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected) - .compression(cudf::io::compression_type::ZSTD) + .compression( + cudf::io::compression_type::ZSTD) ///< Needed to get exactly 2 chunks when chunked reading .metadata(expected_metadata); cudf::io::write_parquet(out_opts); // Read with chunked_parquet_reader size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * - 1024; ///< Set to 8GB so we read almost entire table (>2GB string) in the first subpass - ///< and only a small amount in the second subpass. + 1024; ///< Empirically set to 8GB so we read almost entire table (>2GB string) in the first + ///< subpass and only a small amount in the second subpass. cudf::io::parquet_reader_options default_in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size())); auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts); From 389571a621df996896be8acb6795f9141d99677f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 5 Nov 2024 21:08:43 +0000 Subject: [PATCH 15/15] Add comments --- cpp/tests/large_strings/parquet_tests.cpp | 44 ++++++++++++++++------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index 48c303bd4a0..39cd783de00 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -71,7 +71,9 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); } -TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) +// Disabled as the test is too brittle and depends on empirically set `pass_read_limit`, +// encoding type, and the currently used `ZSTD` scratch space size. +TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings) { // Construct a table with one large strings column > 2GB auto const wide = this->wide_column(); @@ -84,28 +86,40 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) // Expected table auto const expected = cudf::table_view{{col0->view()}}; auto expected_metadata = cudf::io::table_input_metadata{expected}; + + // Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with + // regualar ones. This may change in the future and lead to false failures. expected_metadata.column_metadata[0].set_encoding( - cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); ///< Needed to get exactly 2 chunks when - ///< chunked reading + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); - // Write to Parquet + // Host buffer to write Parquet std::vector buffer; + + // Writer options cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected) - .compression( - cudf::io::compression_type::ZSTD) ///< Needed to get exactly 2 chunks when chunked reading .metadata(expected_metadata); + + // Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with + // regualar ones. This may change in the future and lead to false failures. + out_opts.set_compression(cudf::io::compression_type::ZSTD); + + // Write to Parquet cudf::io::write_parquet(out_opts); - // Read with chunked_parquet_reader - size_t constexpr pass_read_limit = - size_t{8} * 1024 * 1024 * - 1024; ///< Empirically set to 8GB so we read almost entire table (>2GB string) in the first - ///< subpass and only a small amount in the second subpass. + // Empirically set pass_read_limit of 8GB so we read almost entire table (>2GB strings) in the + // first subpass and only a small amount in the second subpass. This may change in the future + // and lead to false failures. + size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * 1024; + + // Reader options cudf::io::parquet_reader_options default_in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size())); + + // Chunked parquet reader auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts); + // Read chunked auto tables = std::vector>{}; while (reader.has_next()) { tables.emplace_back(reader.read_chunk().tbl); @@ -117,11 +131,15 @@ TEST_F(ParquetStringsTest, ChunkedReadLargeStrings) auto result = cudf::concatenate(table_views); auto const result_view = result->view(); - // Verify + // Verify offsets for (auto const& cv : result_view) { auto const offsets = cudf::strings_column_view(cv).offsets(); EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); } - EXPECT_EQ(tables.size(), 2); + + // Verify tables to be equal CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); + + // Verify that we read exactly two table chunks + EXPECT_EQ(tables.size(), 2); }