diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index b6f8c18ea9c..92484e23c06 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_array::OffsetSizeTrait; use arrow_buffer::buffer::NullBuffer; @@ -37,6 +37,7 @@ impl ListArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let field = match &data_type { DataType::List(f) if !O::IS_LARGE => f, @@ -48,6 +49,7 @@ impl ListArrayDecoder { coerce_primitive, strict_mode, field.is_nullable(), + struct_parse_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index cd1ca5f71fa..4433dad05c0 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::ArrowNativeType; @@ -36,6 +36,7 @@ impl MapArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let fields = match &data_type { DataType::Map(_, true) => { @@ -59,12 +60,14 @@ impl MapArrayDecoder { coerce_primitive, strict_mode, fields[0].is_nullable(), + struct_parse_mode, )?; let values = make_decoder( fields[1].data_type().clone(), coerce_primitive, strict_mode, fields[1].is_nullable(), + struct_parse_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index bcacf6f706b..81dd593a464 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -170,12 +170,30 @@ mod struct_array; mod tape; mod timestamp_array; +/// Specifies what is considered valid JSON when parsing StructArrays. +/// +/// If a struct with fields `("a", Int32)` and `("b", Utf8)`, it could be represented as +/// a JSON object (`{"a": 1, "b": "c"}`) or a JSON list (`[1, "c"]`). This enum controls +/// which form(s) the Reader will accept. +/// +/// For objects, the order of the key does not matter. +/// For lists, the entries must be the same number and in the same order as the struct fields. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum StructParseMode { + #[default] + /// Only parse objects (e.g., {"a": 1, "b": "c"}) + ObjectOnly, + /// Only parse lists (e.g., [1, "c"]) + ListOnly, +} + /// A builder for [`Reader`] and [`Decoder`] pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, is_field: bool, + struct_parse_mode: StructParseMode, schema: SchemaRef, } @@ -195,6 +213,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: false, + struct_parse_mode: StructParseMode::ObjectOnly, schema, } } @@ -235,6 +254,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: true, + struct_parse_mode: StructParseMode::ObjectOnly, schema: Arc::new(Schema::new([field.into()])), } } @@ -269,6 +289,15 @@ impl ReaderBuilder { } } + /// Set the [`StructParseMode`] for the reader, which determines whether + /// structs can be represented by JSON objects, lists, or either. + pub fn with_struct_parse_mode(self, struct_parse_mode: StructParseMode) -> Self { + Self { + struct_parse_mode, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -287,7 +316,13 @@ impl ReaderBuilder { } }; - let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?; + let decoder = make_decoder( + data_type, + self.coerce_primitive, + self.strict_mode, + nullable, + self.struct_parse_mode, + )?; let num_fields = self.schema.flattened_fields().len(); @@ -650,6 +685,7 @@ fn make_decoder( coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result, ArrowError> { downcast_integer! { data_type => (primitive_decoder, data_type), @@ -696,13 +732,13 @@ fn make_decoder( DataType::Boolean => Ok(Box::::default()), DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON"))) } - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } @@ -718,7 +754,7 @@ mod tests { use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; - use arrow_schema::Field; + use arrow_schema::{Field, Fields}; use super::*; @@ -2316,4 +2352,220 @@ mod tests { .unwrap() ); } + + #[test] + fn test_struct_decoding_list_length() { + use arrow_array::array; + + let row = "[1, 2]"; + + let mut fields = vec![Field::new("a", DataType::Int32, true)]; + let too_few_fields = Fields::from(fields.clone()); + fields.push(Field::new("b", DataType::Int32, true)); + let correct_fields = Fields::from(fields.clone()); + fields.push(Field::new("c", DataType::Int32, true)); + let too_many_fields = Fields::from(fields.clone()); + + let parse = |fields: Fields, as_field: bool| { + let builder = if as_field { + ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true)) + } else { + ReaderBuilder::new(Arc::new(Schema::new(fields))) + }; + builder + .with_struct_parse_mode(StructParseMode::ListOnly) + .build(Cursor::new(row.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + let expected_row = StructArray::new( + correct_fields.clone(), + vec![ + Arc::new(array::Int32Array::from(vec![1])), + Arc::new(array::Int32Array::from(vec![2])), + ], + None, + ); + let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true); + + assert_eq!( + parse(too_few_fields.clone(), true).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(too_few_fields, false).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(correct_fields.clone(), true).unwrap(), + RecordBatch::try_new( + Arc::new(Schema::new(vec![row_field])), + vec![Arc::new(expected_row.clone())] + ) + .unwrap() + ); + assert_eq!( + parse(correct_fields, false).unwrap(), + RecordBatch::from(expected_row) + ); + assert_eq!( + parse(too_many_fields.clone(), true) + .unwrap_err() + .to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + assert_eq!( + parse(too_many_fields, false).unwrap_err().to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + } + + #[test] + fn test_struct_decoding() { + use arrow_array::builder; + + let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#; + let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#; + let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#; + + let struct_fields = Fields::from(vec![ + Field::new("b", DataType::new_list(DataType::Int32, true), true), + Field::new_map( + "c", + "entries", + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + false, + false, + ), + ]); + + let list_array = + ListArray::from_iter_primitive::(vec![Some(vec![Some(1), Some(2)])]); + + let map_array = { + let mut map_builder = builder::MapBuilder::new( + None, + builder::StringBuilder::new(), + builder::Int32Builder::new(), + ); + map_builder.keys().append_value("d"); + map_builder.values().append_value(3); + map_builder.append(true).unwrap(); + map_builder.finish() + }; + + let struct_array = StructArray::new( + struct_fields.clone(), + vec![Arc::new(list_array), Arc::new(map_array)], + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Struct(struct_fields), + true, + )])); + let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + let parse = |s: &str, mode: StructParseMode| { + ReaderBuilder::new(schema.clone()) + .with_struct_parse_mode(mode) + .build(Cursor::new(s.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse(nested_object_json, StructParseMode::ObjectOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_list_json, StructParseMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructParseMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned() + ); + + assert_eq!( + parse(nested_list_json, StructParseMode::ListOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_object_json, StructParseMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructParseMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned() + ); + } + + // Test cases: + // [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error + // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error + // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error + // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error + #[test] + fn test_struct_decoding_empty_list() { + let int_field = Field::new("a", DataType::Int32, true); + let struct_field = Field::new( + "r", + DataType::Struct(Fields::from(vec![int_field.clone()])), + true, + ); + + let parse = |json: &str, as_field: bool, field: Field| { + let builder = if as_field { + ReaderBuilder::new_with_field(field.clone()) + } else { + ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone()))) + }; + builder + .with_struct_parse_mode(StructParseMode::ListOnly) + .build(Cursor::new(json.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse("[]", true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, int_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + + assert_eq!( + parse("[[]]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned() + ); + } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 6c805591d39..9931744aaf8 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; @@ -27,6 +27,7 @@ pub struct StructArrayDecoder { decoders: Vec>, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, } impl StructArrayDecoder { @@ -35,6 +36,7 @@ impl StructArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let decoders = struct_fields(&data_type) .iter() @@ -48,6 +50,7 @@ impl StructArrayDecoder { coerce_primitive, strict_mode, nullable, + struct_parse_mode, ) }) .collect::, ArrowError>>()?; @@ -57,6 +60,7 @@ impl StructArrayDecoder { decoders, strict_mode, is_nullable, + struct_parse_mode, }) } } @@ -71,42 +75,70 @@ impl ArrayDecoder for StructArrayDecoder { .then(|| BooleanBufferBuilder::new(pos.len())); for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartObject(end_idx), None) => end_idx, - (TapeElement::StartObject(end_idx), Some(nulls)) => { + let end_idx = match (tape.get(*p), nulls.as_mut(), self.struct_parse_mode) { + (TapeElement::StartObject(end_idx), None, StructParseMode::ObjectOnly) => end_idx, + (TapeElement::StartObject(end_idx), Some(nulls), StructParseMode::ObjectOnly) => { nulls.append(true); end_idx } - (TapeElement::Null, Some(nulls)) => { + (TapeElement::StartList(end_idx), None, StructParseMode::ListOnly) => end_idx, + (TapeElement::StartList(end_idx), Some(nulls), StructParseMode::ListOnly) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls), _) => { nulls.append(false); continue; } - _ => return Err(tape.error(*p, "{")), + (_, _, StructParseMode::ObjectOnly) => return Err(tape.error(*p, "{")), + (_, _, StructParseMode::ListOnly) => return Err(tape.error(*p, "[")), }; let mut cur_idx = *p + 1; - while cur_idx < end_idx { - // Read field name - let field_name = match tape.get(cur_idx) { - TapeElement::String(s) => tape.get_string(s), - _ => return Err(tape.error(cur_idx, "field name")), - }; - - // Update child pos if match found - match fields.iter().position(|x| x.name() == field_name) { - Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, - None => { - if self.strict_mode { - return Err(ArrowError::JsonError(format!( - "column '{}' missing from schema", - field_name - ))); + if self.struct_parse_mode == StructParseMode::ObjectOnly { + while cur_idx < end_idx { + // Read field name + let field_name = match tape.get(cur_idx) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(cur_idx, "field name")), + }; + + // Update child pos if match found + match fields.iter().position(|x| x.name() == field_name) { + Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, + None => { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{}' missing from schema", + field_name + ))); + } } } + // Advance to next field + cur_idx = tape.next(cur_idx + 1, "field value")?; + } + } else { + let mut entry_idx = 0; + while cur_idx < end_idx { + if entry_idx >= fields.len() { + return Err(ArrowError::JsonError(format!( + "found extra columns for {} fields", + fields.len() + ))); + } + child_pos[entry_idx][row] = cur_idx; + entry_idx += 1; + // Advance to next field + cur_idx = tape.next(cur_idx, "field value")?; + } + if entry_idx != fields.len() { + return Err(ArrowError::JsonError(format!( + "found {} columns for {} fields", + entry_idx, + fields.len() + ))); } - - // Advance to next field - cur_idx = tape.next(cur_idx + 1, "field value")?; } }