Skip to content

Commit

Permalink
Add support for deserializing list-encoded JSON structs [apache#6558]
Browse files Browse the repository at this point in the history
Currently, a StructArray can only be deserialized from a JSON object
(e.g. `{a: 1, b: "c"}`), but some services (e.g. Presto and Trino)
encode ROW types as JSON lists (e.g. `[1, "c"]`) because this is more
compact, and the schema is known.

This PR adds the ability to parse JSON lists into StructArrays, if the
StructParseMode is set to ListOnly.  In ListOnly mode, object-encoded
structs raise an error.  Setting to ObjectOnly (the default) has the
original parsing behavior.

Some notes/questions/points for discussion:
1. I've made a JsonParseMode struct instead of a bool flag for two
   reasons.  One is that it's self-descriptive (what would `true` be?),
   and the other is that it allows a future Mixed mode that could
   deserialize either.  The latter isn't currently requested by anyone.
2. I kept the error messages as similar to the old messages as possible.
   I considered having more specific error messages (like "Encountered a
   '[' when parsing a Struct, but the StructParseMode is ObjectOnly" or
   similar), but wanted to hear opinions before I went that route.
3. I'm not attached to any name/code-style/etc, so happy to modify to
   fit local conventions.

Fixes apache#6558
  • Loading branch information
jagill committed Oct 29, 2024
1 parent 9f889aa commit 8fc7592
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 33 deletions.
4 changes: 3 additions & 1 deletion arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
use crate::reader::{make_decoder, ArrayDecoder, StructParseMode};
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::OffsetSizeTrait;
use arrow_buffer::buffer::NullBuffer;
Expand All @@ -37,6 +37,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Self, ArrowError> {
let field = match &data_type {
DataType::List(f) if !O::IS_LARGE => f,
Expand All @@ -48,6 +49,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive,
strict_mode,
field.is_nullable(),
struct_parse_mode,
)?;

Ok(Self {
Expand Down
5 changes: 4 additions & 1 deletion arrow-json/src/reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
use crate::reader::{make_decoder, ArrayDecoder, StructParseMode};
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::ArrowNativeType;
Expand All @@ -36,6 +36,7 @@ impl MapArrayDecoder {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Self, ArrowError> {
let fields = match &data_type {
DataType::Map(_, true) => {
Expand All @@ -59,12 +60,14 @@ impl MapArrayDecoder {
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
struct_parse_mode,
)?;
let values = make_decoder(
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
fields[1].is_nullable(),
struct_parse_mode,
)?;

Ok(Self {
Expand Down
264 changes: 258 additions & 6 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,30 @@ mod struct_array;
mod tape;
mod timestamp_array;

/// Specifies what is considered valie JSON when parsing StructArrays.
///
/// If a struct with fields `("a", Int32)` and `("b", Utf8)`, it could be represented as
/// a JSON object (`{"a": 1, "b": "c"}`) or a JSON list (`[1, "c"]`). This enum controls
/// which form(s) the Reader will accept.
///
/// For objects, the order of the key does not matter. (??? Extra keys?)
/// For lists, the entries must be the same number and in the same order as the struct fields.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum StructParseMode {
#[default]
/// Only parse objects (e.g., {"a": 1, "b": "c"})
ObjectOnly,
/// Only parse lists (e.g., [1, "c"])
ListOnly,
}

/// A builder for [`Reader`] and [`Decoder`]
pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
is_field: bool,
struct_parse_mode: StructParseMode,

schema: SchemaRef,
}
Expand All @@ -195,6 +213,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: false,
struct_parse_mode: StructParseMode::ObjectOnly,
schema,
}
}
Expand Down Expand Up @@ -235,6 +254,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: true,
struct_parse_mode: StructParseMode::ObjectOnly,
schema: Arc::new(Schema::new([field.into()])),
}
}
Expand Down Expand Up @@ -269,6 +289,15 @@ impl ReaderBuilder {
}
}

/// Set the [`StructParseMode`] for the reader, which determines whether
/// structs can be represented by JSON objects, lists, or either.
pub fn with_struct_parse_mode(self, struct_parse_mode: StructParseMode) -> Self {
Self {
struct_parse_mode,
..self
}
}

/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
Ok(Reader {
Expand All @@ -287,7 +316,13 @@ impl ReaderBuilder {
}
};

let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?;
let decoder = make_decoder(
data_type,
self.coerce_primitive,
self.strict_mode,
nullable,
self.struct_parse_mode,
)?;

let num_fields = self.schema.flattened_fields().len();

Expand Down Expand Up @@ -650,6 +685,7 @@ fn make_decoder(
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
downcast_integer! {
data_type => (primitive_decoder, data_type),
Expand Down Expand Up @@ -696,13 +732,13 @@ fn make_decoder(
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
}
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
}
}
Expand All @@ -718,7 +754,7 @@ mod tests {
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_data::ArrayDataBuilder;
use arrow_schema::Field;
use arrow_schema::{Field, Fields};

use super::*;

Expand Down Expand Up @@ -2316,4 +2352,220 @@ mod tests {
.unwrap()
);
}

#[test]
fn test_struct_decoding_list_length() {
use arrow_array::array;

let row = "[1, 2]";

let mut fields = vec![Field::new("a", DataType::Int32, true)];
let too_few_fields = Fields::from(fields.clone());
fields.push(Field::new("b", DataType::Int32, true));
let correct_fields = Fields::from(fields.clone());
fields.push(Field::new("c", DataType::Int32, true));
let too_many_fields = Fields::from(fields.clone());

let parse = |fields: Fields, as_field: bool| {
let builder = if as_field {
ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
} else {
ReaderBuilder::new(Arc::new(Schema::new(fields)))
};
builder
.with_struct_parse_mode(StructParseMode::ListOnly)
.build(Cursor::new(row.as_bytes()))
.unwrap()
.next()
.unwrap()
};

let expected_row = StructArray::new(
correct_fields.clone(),
vec![
Arc::new(array::Int32Array::from(vec![1])),
Arc::new(array::Int32Array::from(vec![2])),
],
None,
);
let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);

assert_eq!(
parse(too_few_fields.clone(), true).unwrap_err().to_string(),
"Json error: found extra columns for 1 fields".to_string()
);
assert_eq!(
parse(too_few_fields, false).unwrap_err().to_string(),
"Json error: found extra columns for 1 fields".to_string()
);
assert_eq!(
parse(correct_fields.clone(), true).unwrap(),
RecordBatch::try_new(
Arc::new(Schema::new(vec![row_field])),
vec![Arc::new(expected_row.clone())]
)
.unwrap()
);
assert_eq!(
parse(correct_fields, false).unwrap(),
RecordBatch::from(expected_row)
);
assert_eq!(
parse(too_many_fields.clone(), true)
.unwrap_err()
.to_string(),
"Json error: found 2 columns for 3 fields".to_string()
);
assert_eq!(
parse(too_many_fields, false).unwrap_err().to_string(),
"Json error: found 2 columns for 3 fields".to_string()
);
}

#[test]
fn test_struct_decoding() {
use arrow_array::builder;

let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;

let struct_fields = Fields::from(vec![
Field::new("b", DataType::new_list(DataType::Int32, true), true),
Field::new_map(
"c",
"entries",
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int32, true),
false,
false,
),
]);

let list_array =
ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);

let map_array = {
let mut map_builder = builder::MapBuilder::new(
None,
builder::StringBuilder::new(),
builder::Int32Builder::new(),
);
map_builder.keys().append_value("d");
map_builder.values().append_value(3);
map_builder.append(true).unwrap();
map_builder.finish()
};

let struct_array = StructArray::new(
struct_fields.clone(),
vec![Arc::new(list_array), Arc::new(map_array)],
None,
);

let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Struct(struct_fields),
true,
)]));
let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();

let parse = |s: &str, mode: StructParseMode| {
ReaderBuilder::new(schema.clone())
.with_struct_parse_mode(mode)
.build(Cursor::new(s.as_bytes()))
.unwrap()
.next()
.unwrap()
};

assert_eq!(
parse(nested_object_json, StructParseMode::ObjectOnly).unwrap(),
expected
);
assert_eq!(
parse(nested_list_json, StructParseMode::ObjectOnly)
.unwrap_err()
.to_string(),
"Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
);
assert_eq!(
parse(nested_mixed_json, StructParseMode::ObjectOnly)
.unwrap_err()
.to_string(),
"Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
);

assert_eq!(
parse(nested_list_json, StructParseMode::ListOnly).unwrap(),
expected
);
assert_eq!(
parse(nested_object_json, StructParseMode::ListOnly)
.unwrap_err()
.to_string(),
"Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
);
assert_eq!(
parse(nested_mixed_json, StructParseMode::ListOnly)
.unwrap_err()
.to_string(),
"Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
);
}

// Test cases:
// [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error
// [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
// [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
// [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
#[test]
fn test_struct_decoding_empty_list() {
let int_field = Field::new("a", DataType::Int32, true);
let struct_field = Field::new(
"r",
DataType::Struct(Fields::from(vec![int_field.clone()])),
true,
);

let parse = |json: &str, as_field: bool, field: Field| {
let builder = if as_field {
ReaderBuilder::new_with_field(field.clone())
} else {
ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone())))
};
builder
.with_struct_parse_mode(StructParseMode::ListOnly)
.build(Cursor::new(json.as_bytes()))
.unwrap()
.next()
.unwrap()
};

assert_eq!(
parse("[]", true, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);
assert_eq!(
parse("[]", false, int_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);
assert_eq!(
parse("[]", false, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);

assert_eq!(
parse("[[]]", false, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
);
}
}
Loading

0 comments on commit 8fc7592

Please sign in to comment.