Skip to content

Commit

Permalink
Fix bug in recovering invalid lines in JSONL inputs (#17098)
Browse files Browse the repository at this point in the history
Addresses #16999

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)
  - Karthikeyan (https://github.com/karthikeyann)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Basit Ayantunde (https://github.com/lamarrr)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17098
  • Loading branch information
shrshi authored Oct 30, 2024
1 parent 6c2eb4e commit 0b9277b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 12 deletions.
44 changes: 32 additions & 12 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cudf/concatenate.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand Down Expand Up @@ -127,18 +128,19 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

std::size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);
auto const delimiter = reader_opts.get_delimiter();
auto const num_extra_delimiters = num_delimiter_chars * sources.size();
compression_type const reader_compression = reader_opts.get_compression();
std::size_t const chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;
auto should_load_till_last_source = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_till_last_source ? total_source_size - chunk_offset : chunk_size;

int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
Expand All @@ -155,17 +157,17 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

// Offset within buffer indicating first read position
std::int64_t buffer_offset = 0;
auto readbufspan =
ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream);
auto readbufspan = ingest_raw_input(
bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream);

auto const shift_for_nonzero_offset = std::min<std::int64_t>(chunk_offset, 1);
auto const first_delim_pos =
chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream);
chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream);
if (first_delim_pos == -1) {
// return empty owning datasource buffer
auto empty_buf = rmm::device_buffer(0, stream);
return datasource::owning_buffer<rmm::device_buffer>(std::move(empty_buf));
} else if (!should_load_all_sources) {
} else if (!should_load_till_last_source) {
// Find next delimiter
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
Expand All @@ -180,14 +182,15 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reader_compression,
next_subchunk_start,
size_per_subchunk,
delimiter,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
next_delim_pos = find_first_delimiter(readbufspan, delimiter, stream) + buffer_offset;
next_subchunk_start += size_per_subchunk;
}
if (next_delim_pos < buffer_offset) {
if (next_subchunk_start >= total_source_size) {
// If we have reached the end of source list but the source does not terminate with a
// newline character
// delimiter character
next_delim_pos = buffer_offset + readbufspan.size();
} else {
// Our buffer_size estimate is insufficient to read until the end of the line! We need to
Expand All @@ -209,10 +212,26 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
next_delim_pos - first_delim_pos - shift_for_nonzero_offset);
}

// Add delimiter to end of buffer - possibly adding an empty line to the input buffer - iff we are
// reading till the end of the last source i.e. should_load_till_last_source is true Note that the
// table generated from the JSONL input remains unchanged since empty lines are ignored by the
// parser.
size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset;
if (num_chars) {
auto last_char = delimiter;
cudf::detail::cuda_memcpy_async<char>(
device_span<char>(reinterpret_cast<char*>(buffer.data()), buffer.size())
.subspan(readbufspan.size(), 1),
host_span<char const>(&last_char, 1, false),
stream);
num_chars++;
}

return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
readbufspan.size() - first_delim_pos - shift_for_nonzero_offset);
num_chars);
}

// Helper function to read the current batch using byte range offsets and size
Expand Down Expand Up @@ -245,6 +264,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
std::size_t range_offset,
std::size_t range_size,
char delimiter,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand Down Expand Up @@ -296,7 +316,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
if (sources.size() > 1) {
static_assert(num_delimiter_chars == 1,
"Currently only single-character delimiters are supported");
auto const delimiter_source = thrust::make_constant_iterator('\n');
auto const delimiter_source = thrust::make_constant_iterator(delimiter);
auto const d_delimiter_map = cudf::detail::make_device_uvector_async(
delimiter_map, stream, cudf::get_current_device_resource_ref());
thrust::scatter(rmm::exec_policy_nosync(stream),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
size_t range_offset,
size_t range_size,
char delimiter,
rmm::cuda_stream_view stream);

/**
Expand Down
18 changes: 18 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2973,4 +2973,22 @@ TEST_F(JsonReaderTest, JsonDtypeSchema)
cudf::test::debug_output_level::ALL_ERRORS);
}

TEST_F(JsonReaderTest, LastRecordInvalid)
{
std::string data = R"({"key": "1"}
{"key": "})";
std::map<std::string, cudf::io::schema_element> schema{{"key", {dtype<cudf::string_view>()}}};
auto opts =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.dtypes(schema)
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL)
.build();
auto const result = cudf::io::read_json(opts);

EXPECT_EQ(result.metadata.schema_info[0].name, "key");
cudf::test::strings_column_wrapper expected{{"1", ""}, cudf::test::iterators::nulls_at({1})};
CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), cudf::table_view{{expected}});
}

CUDF_TEST_PROGRAM_MAIN()
1 change: 1 addition & 0 deletions cpp/tests/io/json/json_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ std::vector<cudf::io::table_with_metadata> split_byte_range_reading(
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
reader_opts.get_delimiter(),
stream);
// Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the
// return type of that function is size_type. However, when the chunk_size is
Expand Down

0 comments on commit 0b9277b

Please sign in to comment.