Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional column_order in JSON reader #17029

Merged
merged 27 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1b6ca58
add optional column_order to schema_element
karthikeyann Oct 9, 2024
e0c373a
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Oct 21, 2024
732f234
doc fixes
karthikeyann Oct 21, 2024
ffdd817
fix ambiguous std::map call
karthikeyann Oct 21, 2024
02e8ab3
simplify schema_element interface
karthikeyann Oct 22, 2024
ac05ae9
create all null columns
karthikeyann Oct 23, 2024
f10d9c2
metadata for all null non-present columns
karthikeyann Oct 24, 2024
71b4142
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Oct 24, 2024
c8e223d
address review commemnts, unit test
karthikeyann Oct 24, 2024
1c871a8
Merge branch 'fea-json_column_order' of github.com:karthikeyann/cudf …
karthikeyann Oct 24, 2024
9297b7e
fix empty all-null rows issue at top level
karthikeyann Oct 25, 2024
eb9f8fc
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 4, 2024
9e31b71
add validation for dtypes with column order
karthikeyann Nov 4, 2024
82cf186
cleanup
karthikeyann Nov 4, 2024
105250b
address review comments
karthikeyann Nov 4, 2024
1a7a99c
add docs to dtype_variant
karthikeyann Nov 4, 2024
fcd8e3c
fix docs
karthikeyann Nov 4, 2024
15ef1d5
Merge branch 'branch-24.12' into fea-json_column_order
ttnghia Nov 5, 2024
b2dd7cd
moved dtype_variant alias to public
karthikeyann Nov 5, 2024
2272f72
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 5, 2024
e8e1c28
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 6, 2024
0bfe85e
remove chars in string column metadata
karthikeyann Nov 6, 2024
da24f1d
fix string col metadata in unit test
karthikeyann Nov 6, 2024
7e31f91
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 6, 2024
83b979c
add missing doc
karthikeyann Nov 7, 2024
a5442b9
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 7, 2024
4b82cf6
address review comments
karthikeyann Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "types.hpp"

#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -53,6 +54,11 @@ struct schema_element {
* @brief Allows specifying this column's child columns target type
*/
std::map<std::string, schema_element> child_types;

/**
* @brief Allows specifying the order of the columns
*/
std::optional<std::vector<std::string>> column_order;
};

/**
Expand Down Expand Up @@ -90,10 +96,11 @@ class json_reader_options {
source_info _source;

// Data types of the column; empty to infer dtypes
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>>
_dtypes;
using dtype_variant = std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>,
schema_element>;
dtype_variant _dtypes;
// Specify the compression format of the source or infer from file extension
compression_type _compression = compression_type::AUTO;

Expand Down Expand Up @@ -178,13 +185,7 @@ class json_reader_options {
*
* @returns Data types of the columns
*/
[[nodiscard]] std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>> const&
get_dtypes() const
{
return _dtypes;
}
[[nodiscard]] dtype_variant const& get_dtypes() const { return _dtypes; }

/**
* @brief Returns compression format of the source.
Expand Down Expand Up @@ -228,7 +229,11 @@ class json_reader_options {
*/
[[nodiscard]] size_t get_byte_range_padding() const
{
auto const num_columns = std::visit([](auto const& dtypes) { return dtypes.size(); }, _dtypes);
auto const num_columns =
std::visit(cudf::detail::visitor_overload{
[](auto const& dtypes) { return dtypes.size(); },
[](schema_element const& dtypes) { return dtypes.child_types.size(); }},
_dtypes);

auto const max_row_bytes = 16 * 1024; // 16KB
auto const column_bytes = 64;
Expand Down Expand Up @@ -390,6 +395,14 @@ class json_reader_options {
*/
void set_dtypes(std::map<std::string, schema_element> types) { _dtypes = std::move(types); }

/**
* @brief Set data types for a potentially nested column hierarchy.
*
* @param types schema element with column names and column order to support arbitrary nesting of
* data types
*/
void set_dtypes(schema_element types);

/**
* @brief Set the compression type.
*
Expand Down Expand Up @@ -624,6 +637,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set data types for columns to be read.
*
* @param types Struct schema_element with Column name -> schema_element with map and order
* @return this for chaining
*/
json_reader_options_builder& dtypes(schema_element types)
{
options.set_dtypes(std::move(types));
return *this;
}

/**
* @brief Set the compression type.
*
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/io/json/host_tree_algorithms.cu
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
});
return dnew;
},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; }},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; },
[](schema_element const& user_dtypes) { return user_dtypes.child_types; }},
options.get_dtypes());
}

Expand Down Expand Up @@ -593,6 +594,15 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
if (user_dtypes.count(name))
mark_is_pruned(first_child_id, user_dtypes.at(name));
}
},
[&root_list_col_id, &adj, &mark_is_pruned, &column_names](
schema_element const& user_dtypes) -> void {
for (size_t i = 0; i < adj[root_list_col_id].size(); i++) {
auto const first_child_id = adj[root_list_col_id][i];
auto name = column_names[first_child_id];
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (user_dtypes.child_types.count(name))
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
mark_is_pruned(first_child_id, user_dtypes.child_types.at(name));
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
}
}},
options.get_dtypes());
} else {
Expand Down Expand Up @@ -626,7 +636,9 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](
std::map<std::string, schema_element> const& user_dtypes) -> void {
mark_is_pruned(root_struct_col_id, u_schema);
}},
},
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](schema_element const& user_dtypes)
-> void { mark_is_pruned(root_struct_col_id, u_schema); }},
options.get_dtypes());
}
// Useful for array of arrays
Expand Down
110 changes: 85 additions & 25 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,33 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
std::vector<column_name_info> column_names{};
size_type num_rows{json_col.num_rows};
// Create children columns
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
auto const& col_order = prune_columns and schema.has_value() and
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
schema.value().column_order.has_value() and
not schema.value().column_order->empty()
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
? schema.value().column_order.value()
: json_col.column_order;

for (auto const& col_name : col_order) {
auto child_schema_element = get_child_schema(col_name);
auto const found_it = json_col.child_columns.find(col_name);

if (prune_columns and found_it == std::end(json_col.child_columns)) {
CUDF_EXPECTS(child_schema_element.has_value(),
"Column name not found in input schema map, but present in column order and "
"prune_columns is enabled");
column_names.emplace_back(make_column_name_info(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), col_name));
auto all_null_column = make_all_nulls_column(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}),
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
num_rows,
stream,
mr);
child_columns.emplace_back(std::move(all_null_column));
continue;
}
column_names.emplace_back(found_it->first);

auto& child_col = found_it->second;
if (!prune_columns or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, prune_columns, child_schema_element, stream, mr);
Expand Down Expand Up @@ -576,11 +598,24 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
std::vector<column_name_info> out_column_names;
auto parse_opt = parsing_options(options, stream);

// Iterate over the struct's child columns and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;
bool const has_column_order =
options.is_enabled_prune_columns() and
std::holds_alternative<schema_element>(options.get_dtypes()) and
std::get<schema_element>(options.get_dtypes()).column_order.has_value() and
not std::get<schema_element>(options.get_dtypes()).column_order->empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw if options.is_enabled_prune_columns() is false but column order is given? The users may want to specify column order but they may forget to enable prune column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, the user wants to prune column, but maintain the order as in json file. that's why that throw was not added.

auto const& col_order = has_column_order
? std::get<schema_element>(options.get_dtypes()).column_order.value()
: root_struct_col.column_order;
if (has_column_order) {
CUDF_EXPECTS(
std::get<schema_element>(options.get_dtypes()).child_types.size() == col_order.size(),
"Input schema column order size mismatch with input schema child types");
}
auto root_col_size = root_struct_col.num_rows;

// Iterate over the struct's child columns/column_order and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : col_order) {
std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
[column_index](std::vector<data_type> const& user_dtypes) -> std::optional<schema_element> {
Expand All @@ -590,38 +625,63 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
},
[col_name](
std::map<std::string, data_type> const& user_dtypes) -> std::optional<schema_element> {
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? std::optional<schema_element>{{user_dtypes.find(col_name)->second}}
: std::optional<schema_element>{};
if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes))
return std::optional<schema_element>{{it->second}};
return std::nullopt;
},
[col_name](std::map<std::string, schema_element> const& user_dtypes)
-> std::optional<schema_element> {
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? user_dtypes.find(col_name)->second
: std::optional<schema_element>{};
if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes)) return it->second;
return std::nullopt;
},
[col_name](schema_element const& user_dtypes) -> std::optional<schema_element> {
if (auto it = user_dtypes.child_types.find(col_name);
it != std::end(user_dtypes.child_types))
return it->second;
return std::nullopt;
}},
options.get_dtypes());

#ifdef NJP_DEBUG_PRINT
auto debug_schema_print = [](auto ret) {
std::cout << ", type id: "
<< (ret.has_value() ? std::to_string(static_cast<int>(ret->type.id())) : "n/a")
<< ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children"
<< "\n";
};
std::visit(
cudf::detail::visitor_overload{[column_index](std::vector<data_type> const&) {
std::cout << "Column by index: #" << column_index;
},
[col_name](std::map<std::string, data_type> const&) {
std::cout << "Column by flat name: '" << col_name;
},
[col_name](std::map<std::string, schema_element> const&) {
std::cout << "Column by nested name: #" << col_name;
}},
options.get_dtypes());
std::visit(cudf::detail::visitor_overload{
[column_index](std::vector<data_type> const&) {
std::cout << "Column by index: #" << column_index;
},
[col_name](std::map<std::string, data_type> const&) {
std::cout << "Column by flat name: '" << col_name;
},
[col_name](std::map<std::string, schema_element> const&) {
std::cout << "Column by nested name: #" << col_name;
},
[col_name](schema_element const&) {
std::cout << "Column by nested schema with column order: #" << col_name;
}},
options.get_dtypes());
debug_schema_print(child_schema_element);
#endif

auto const found_it = root_struct_col.child_columns.find(col_name);
if (options.is_enabled_prune_columns() and
found_it == std::end(root_struct_col.child_columns)) {
CUDF_EXPECTS(child_schema_element.has_value(),
"Column name not found in input schema map, but present in column order and "
"prune_columns is enabled");
// inserts all null column
out_column_names.emplace_back(make_column_name_info(child_schema_element.value(), col_name));
auto all_null_column =
make_all_nulls_column(child_schema_element.value(), root_col_size, stream, mr);
out_columns.emplace_back(std::move(all_null_column));
column_index++;
continue;
}
auto& json_col = found_it->second;

if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) {
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,28 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create all null column of a given nested schema
*
* @param schema The schema of the column to create
* @param num_rows The number of rows in the column
* @param stream The CUDA stream to which kernels are dispatched
* @param mr resource with which to allocate
* @return The all null column
*/
std::unique_ptr<column> make_all_nulls_column(schema_element const& schema,
size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create metadata for a column of a given schema
*
* @param schema The schema of the column
* @param col_name The name of the column
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
*/
column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name);

/**
* @brief Get the path data type of a column by path if present in input schema
*
Expand Down
Loading
Loading