Skip to content

Commit

Permalink
JSON: write struct array nulls as null (#5133)
Browse files Browse the repository at this point in the history
* JSON: write struct array nulls as null

* Fix

* Fix

* Update arrow-json/src/writer.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* Refactoring

---------

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Jefffrey and tustvold authored Nov 28, 2023
1 parent c161456 commit ef6932f
Showing 1 changed file with 117 additions and 39 deletions.
156 changes: 117 additions & 39 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ where
fn struct_array_to_jsonmap_array(
array: &StructArray,
explicit_nulls: bool,
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
) -> Result<Vec<Option<JsonMap<String, Value>>>, ArrowError> {
let inner_col_names = array.column_names();

let mut inner_objs = iter::repeat(JsonMap::new())
.take(array.len())
.collect::<Vec<JsonMap<String, Value>>>();
let mut inner_objs = (0..array.len())
// Ensure we write nulls for struct arrays as nulls in JSON
// Instead of writing a struct with nulls
.map(|index| array.is_valid(index).then(JsonMap::new))
.collect::<Vec<Option<JsonMap<String, Value>>>>();

for (j, struct_col) in array.columns().iter().enumerate() {
set_column_for_json_rows(
Expand Down Expand Up @@ -227,7 +229,11 @@ fn array_to_json_array_internal(
.collect(),
DataType::Struct(_) => {
let jsonmaps = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
Ok(jsonmaps.into_iter().map(Value::Object).collect())
let json_values = jsonmaps
.into_iter()
.map(|maybe_map| maybe_map.map(Value::Object).unwrap_or(Value::Null))
.collect();
Ok(json_values)
}
DataType::Map(_, _) => as_map_array(array)
.iter()
Expand All @@ -251,6 +257,7 @@ macro_rules! set_column_by_array_type {
$rows
.iter_mut()
.zip(arr.iter())
.filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value)))
.for_each(|(row, maybe_value)| {
if let Some(j) = maybe_value.map(Into::into) {
row.insert($col_name.to_string(), j);
Expand All @@ -262,7 +269,7 @@ macro_rules! set_column_by_array_type {
}

fn set_column_by_primitive_type<T>(
rows: &mut [JsonMap<String, Value>],
rows: &mut [Option<JsonMap<String, Value>>],
array: &ArrayRef,
col_name: &str,
explicit_nulls: bool,
Expand All @@ -274,6 +281,7 @@ fn set_column_by_primitive_type<T>(

rows.iter_mut()
.zip(primitive_arr.iter())
.filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value)))
.for_each(|(row, maybe_value)| {
if let Some(j) = maybe_value.and_then(|v| v.into_json_value()) {
row.insert(col_name.to_string(), j);
Expand All @@ -284,7 +292,7 @@ fn set_column_by_primitive_type<T>(
}

fn set_column_for_json_rows(
rows: &mut [JsonMap<String, Value>],
rows: &mut [Option<JsonMap<String, Value>>],
array: &ArrayRef,
col_name: &str,
explicit_nulls: bool,
Expand Down Expand Up @@ -325,9 +333,11 @@ fn set_column_for_json_rows(
}
DataType::Null => {
if explicit_nulls {
rows.iter_mut().for_each(|row| {
row.insert(col_name.to_string(), Value::Null);
});
rows.iter_mut()
.filter_map(|maybe_row| maybe_row.as_mut())
.for_each(|row| {
row.insert(col_name.to_string(), Value::Null);
});
}
}
DataType::Boolean => {
Expand All @@ -348,28 +358,43 @@ fn set_column_for_json_rows(
let options = FormatOptions::default();
let formatter = ArrayFormatter::try_new(array.as_ref(), &options)?;
let nulls = array.nulls();
rows.iter_mut().enumerate().for_each(|(idx, row)| {
let maybe_value = nulls
.map(|x| x.is_valid(idx))
.unwrap_or(true)
.then(|| formatter.value(idx).to_string().into());
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
};
});
rows.iter_mut()
.enumerate()
.filter_map(|(idx, maybe_row)| maybe_row.as_mut().map(|row| (idx, row)))
.for_each(|(idx, row)| {
let maybe_value = nulls
.map(|x| x.is_valid(idx))
.unwrap_or(true)
.then(|| formatter.value(idx).to_string().into());
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
});
}
DataType::Struct(_) => {
let inner_objs = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
rows.iter_mut().zip(inner_objs).for_each(|(row, obj)| {
row.insert(col_name.to_string(), Value::Object(obj));
});
rows.iter_mut()
.zip(inner_objs)
.filter_map(|(maybe_row, maybe_obj)| maybe_row.as_mut().map(|row| (row, maybe_obj)))
.for_each(|(row, maybe_obj)| {
let json = if let Some(obj) = maybe_obj {
Value::Object(obj)
} else {
Value::Null
};
row.insert(col_name.to_string(), json);
});
}
DataType::List(_) => {
let listarr = as_list_array(array);
rows.iter_mut().zip(listarr.iter()).try_for_each(
|(row, maybe_value)| -> Result<(), ArrowError> {
rows.iter_mut()
.zip(listarr.iter())
.filter_map(|(maybe_row, maybe_value)| {
maybe_row.as_mut().map(|row| (row, maybe_value))
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.transpose()?;
Expand All @@ -379,13 +404,16 @@ fn set_column_for_json_rows(
row.insert(col_name.to_string(), Value::Null);
}
Ok(())
},
)?;
})?;
}
DataType::LargeList(_) => {
let listarr = as_large_list_array(array);
rows.iter_mut().zip(listarr.iter()).try_for_each(
|(row, maybe_value)| -> Result<(), ArrowError> {
rows.iter_mut()
.zip(listarr.iter())
.filter_map(|(maybe_row, maybe_value)| {
maybe_row.as_mut().map(|row| (row, maybe_value))
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.transpose()?;
Expand All @@ -395,8 +423,7 @@ fn set_column_for_json_rows(
row.insert(col_name.to_string(), Value::Null);
}
Ok(())
},
)?;
})?;
}
DataType::Dictionary(_, value_type) => {
let hydrated = arrow_cast::cast::cast(&array, value_type)
Expand All @@ -422,7 +449,11 @@ fn set_column_for_json_rows(

let mut kv = keys.iter().zip(values);

for (i, row) in rows.iter_mut().enumerate() {
for (i, row) in rows
.iter_mut()
.enumerate()
.filter_map(|(i, maybe_row)| maybe_row.as_mut().map(|row| (i, row)))
{
if maparr.is_null(i) {
row.insert(col_name.to_string(), serde_json::Value::Null);
continue;
Expand Down Expand Up @@ -461,7 +492,7 @@ fn record_batches_to_json_rows_internal(
batches: &[&RecordBatch],
explicit_nulls: bool,
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
let mut rows: Vec<Option<JsonMap<String, Value>>> = iter::repeat(Some(JsonMap::new()))
.take(batches.iter().map(|b| b.num_rows()).sum())
.collect();

Expand All @@ -479,6 +510,7 @@ fn record_batches_to_json_rows_internal(
}
}

let rows = rows.into_iter().map(|a| a.unwrap()).collect::<Vec<_>>();
Ok(rows)
}

Expand Down Expand Up @@ -1478,18 +1510,64 @@ mod tests {
writer.write_batches(&[&batch]).unwrap();
}

// NOTE: The last value should technically be {"list": [null]} but it appears
// that implementations differ on the treatment of a null struct.
// It would be more accurate to return a null struct, so this can be done
// as a follow up.
assert_json_eq(
&buf,
r#"{"list":[{"ints":1}]}
{"list":[{}]}
{"list":[]}
{}
{"list":[{}]}
{"list":[{}]}
{"list":[null]}
"#,
);
}

#[test]
fn json_struct_array_nulls() {
let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![None]),
Some(vec![]),
Some(vec![Some(3), None]), // masked for a
Some(vec![Some(4), Some(5)]),
None, // masked for a
None,
]);

let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
let array = Arc::new(inner) as ArrayRef;
let struct_array_a = StructArray::from((
vec![(field.clone(), array.clone())],
Buffer::from([0b01010111]),
));
let struct_array_b = StructArray::from(vec![(field, array)]);

let schema = Schema::new(vec![
Field::new_struct("a", struct_array_a.fields().clone(), true),
Field::new_struct("b", struct_array_b.fields().clone(), true),
]);

let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
)
.unwrap();

let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}

assert_json_eq(
&buf,
r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
{"a":{"list":[null]},"b":{"list":[null]}}
{"a":{"list":[]},"b":{"list":[]}}
{"a":null,"b":{"list":[3,null]}}
{"a":{"list":[4,5]},"b":{"list":[4,5]}}
{"a":null,"b":{}}
{"a":{},"b":{}}
"#,
);
}
Expand Down

0 comments on commit ef6932f

Please sign in to comment.