From 552828e3977d23a1bd8feb47cf068c438b31a675 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 20 Jan 2024 12:55:45 +0000 Subject: [PATCH] Raw JSON writer (#5314) --- arrow-json/src/writer.rs | 99 +++++--- arrow-json/src/writer/encoder.rs | 418 +++++++++++++++++++++++++++++++ arrow-json/test/data/basic.json | 4 +- 3 files changed, 478 insertions(+), 43 deletions(-) create mode 100644 arrow-json/src/writer/encoder.rs diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs index cabda5e2dca8..80cb76b2e0fe 100644 --- a/arrow-json/src/writer.rs +++ b/arrow-json/src/writer.rs @@ -20,28 +20,6 @@ //! This JSON writer converts Arrow [`RecordBatch`]es into arrays of //! JSON objects or JSON formatted byte streams. //! -//! ## Writing JSON Objects -//! -//! To serialize [`RecordBatch`]es into array of -//! [JSON](https://docs.serde.rs/serde_json/) objects, use -//! [`record_batches_to_json_rows`]: -//! -//! ``` -//! # use std::sync::Arc; -//! # use arrow_array::{Int32Array, RecordBatch}; -//! # use arrow_schema::{DataType, Field, Schema}; -//! -//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); -//! let a = Int32Array::from(vec![1, 2, 3]); -//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); -//! -//! let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap(); -//! assert_eq!( -//! serde_json::Value::Object(json_rows[1].clone()), -//! serde_json::json!({"a": 2}), -//! ); -//! ``` -//! //! ## Writing JSON formatted byte streams //! //! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use @@ -97,6 +75,8 @@ //! In order to explicitly write null values for keys, configure a custom [`Writer`] by //! using a [`WriterBuilder`] to construct a [`Writer`]. +mod encoder; + use std::iter; use std::{fmt::Debug, io::Write}; @@ -109,7 +89,9 @@ use arrow_array::types::*; use arrow_array::*; use arrow_schema::*; +use crate::writer::encoder::EncoderOptions; use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use encoder::make_encoder; fn primitive_array_to_json(array: &dyn Array) -> Result, ArrowError> where @@ -481,6 +463,7 @@ fn set_column_for_json_rows( /// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON /// [`JsonMap`]s (objects) +#[deprecated(note = "Use Writer")] pub fn record_batches_to_json_rows( batches: &[&RecordBatch], ) -> Result>, ArrowError> { @@ -597,11 +580,7 @@ pub type ArrayWriter = Writer; /// JSON writer builder. #[derive(Debug, Clone, Default)] -pub struct WriterBuilder { - /// Controls whether null values should be written explicitly for keys - /// in objects, or whether the key should be omitted entirely. - explicit_nulls: bool, -} +pub struct WriterBuilder(EncoderOptions); impl WriterBuilder { /// Create a new builder for configuring JSON writing options. @@ -629,7 +608,7 @@ impl WriterBuilder { /// Returns `true` if this writer is configured to keep keys with null values. pub fn explicit_nulls(&self) -> bool { - self.explicit_nulls + self.0.explicit_nulls } /// Set whether to keep keys with null values, or to omit writing them. @@ -654,7 +633,7 @@ impl WriterBuilder { /// /// Default is to skip nulls (set to `false`). pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { - self.explicit_nulls = explicit_nulls; + self.0.explicit_nulls = explicit_nulls; self } @@ -669,7 +648,7 @@ impl WriterBuilder { started: false, finished: false, format: F::default(), - explicit_nulls: self.explicit_nulls, + options: self.0, } } } @@ -703,7 +682,7 @@ where format: F, /// Whether keys with null values should be written or skipped - explicit_nulls: bool, + options: EncoderOptions, } impl Writer @@ -718,11 +697,12 @@ where started: false, finished: false, format: F::default(), - explicit_nulls: false, + options: EncoderOptions::default(), } } /// Write a single JSON row to the output writer + #[deprecated(note = "Use Writer::write")] pub fn write_row(&mut self, row: &Value) -> Result<(), ArrowError> { let is_first_row = !self.started; if !self.started { @@ -738,18 +718,48 @@ where Ok(()) } - /// Convert the `RecordBatch` into JSON rows, and write them to the output + /// Serialize `batch` to JSON output pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { - for row in record_batches_to_json_rows_internal(&[batch], self.explicit_nulls)? { - self.write_row(&Value::Object(row))?; + if batch.num_rows() == 0 { + return Ok(()); } + + // BufWriter uses a buffer size of 8KB + // We therefore double this and flush once we have more than 8KB + let mut buffer = Vec::with_capacity(16 * 1024); + + let mut is_first_row = !self.started; + if !self.started { + self.format.start_stream(&mut buffer)?; + self.started = true; + } + + let array = StructArray::from(batch.clone()); + let mut encoder = make_encoder(&array, &self.options)?; + + for idx in 0..batch.num_rows() { + self.format.start_row(&mut buffer, is_first_row)?; + is_first_row = false; + + encoder.encode(idx, &mut buffer); + if buffer.len() > 8 * 1024 { + self.writer.write_all(&buffer)?; + buffer.clear(); + } + self.format.end_row(&mut buffer)?; + } + + if !buffer.is_empty() { + self.writer.write_all(&buffer)?; + } + Ok(()) } - /// Convert the [`RecordBatch`] into JSON rows, and write them to the output + /// Serialize `batches` to JSON output pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> { - for row in record_batches_to_json_rows_internal(batches, self.explicit_nulls)? { - self.write_row(&Value::Object(row))?; + for b in batches { + self.write(b)?; } Ok(()) } @@ -803,6 +813,9 @@ mod tests { /// Asserts that the NDJSON `input` is semantically identical to `expected` fn assert_json_eq(input: &[u8], expected: &str) { + let s = std::str::from_utf8(input).unwrap(); + println!("{s}"); + let expected: Vec> = expected .split('\n') .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap())) @@ -1453,6 +1466,7 @@ mod tests { } #[test] + #[allow(deprecated)] fn json_writer_one_row() { let mut writer = ArrayWriter::new(vec![] as Vec); let v = json!({ "an": "object" }); @@ -1465,6 +1479,7 @@ mod tests { } #[test] + #[allow(deprecated)] fn json_writer_two_rows() { let mut writer = ArrayWriter::new(vec![] as Vec); let v = json!({ "an": "object" }); @@ -1564,9 +1579,9 @@ mod tests { r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}} {"a":{"list":[null]},"b":{"list":[null]}} {"a":{"list":[]},"b":{"list":[]}} -{"a":null,"b":{"list":[3,null]}} +{"b":{"list":[3,null]}} {"a":{"list":[4,5]},"b":{"list":[4,5]}} -{"a":null,"b":{}} +{"b":{}} {"a":{},"b":{}} "#, ); @@ -1621,7 +1636,7 @@ mod tests { assert_json_eq( &buf, r#"{"map":{"foo":10}} -{"map":null} +{} {"map":{}} {"map":{"bar":20,"baz":30,"qux":40}} {"map":{"quux":50}} @@ -1918,6 +1933,8 @@ mod tests { writer.finish()?; } + println!("{}", std::str::from_utf8(&buf).unwrap()); + let actual = serde_json::from_slice::>(&buf).unwrap(); let expected = serde_json::from_value::>(json!([ { diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs new file mode 100644 index 000000000000..6c1f9bb538fd --- /dev/null +++ b/arrow-json/src/writer/encoder.rs @@ -0,0 +1,418 @@ +use arrow_array::cast::AsArray; +use arrow_array::types::*; +use arrow_array::*; +use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use half::f16; +use lexical_core::FormattedSize; +use serde::Serializer; +use std::io::Write; + +#[derive(Debug, Clone, Default)] +pub struct EncoderOptions { + pub explicit_nulls: bool, +} + +pub trait Encoder { + fn encode(&mut self, idx: usize, out: &mut Vec); +} + +pub fn make_encoder<'a>( + array: &'a dyn Array, + options: &EncoderOptions, +) -> Result, ArrowError> { + let (encoder, nulls) = make_encoder_impl(array, options)?; + assert!(nulls.is_none(), "root cannot be nullable"); + Ok(encoder) +} + +fn make_encoder_impl<'a>( + array: &'a dyn Array, + options: &EncoderOptions, +) -> Result<(Box, Option), ArrowError> { + macro_rules! primitive_helper { + ($t:ty) => {{ + let array = array.as_primitive::<$t>(); + let nulls = array.nulls().cloned(); + (Box::new(PrimitiveEncoder::new(array)) as _, nulls) + }}; + } + + Ok(downcast_integer! { + array.data_type() => (primitive_helper), + DataType::Float16 => primitive_helper!(Float16Type), + DataType::Float32 => primitive_helper!(Float32Type), + DataType::Float64 => primitive_helper!(Float64Type), + DataType::Boolean => { + let array = array.as_boolean(); + (Box::new(BooleanEncoder(array.clone())), array.nulls().cloned()) + } + DataType::Null => (Box::new(NullEncoder), array.logical_nulls()), + DataType::Utf8 => { + let array = array.as_string::(); + (Box::new(StringEncoder(array.clone())) as _, array.nulls().cloned()) + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + (Box::new(StringEncoder(array.clone())) as _, array.nulls().cloned()) + } + DataType::List(_) => { + let array = array.as_list::(); + (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + DataType::LargeList(_) => { + let array = array.as_list::(); + (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => (Box::new(DictionaryEncoder::try_new(array, options)?) as _, array.logical_nulls()), + _ => unreachable!() + } + + DataType::Map(_, _) => { + let array = array.as_map(); + (Box::new(MapEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + + DataType::Struct(fields) => { + let array = array.as_struct(); + let encoders = fields.iter().zip(array.columns()).map(|(field, array)| { + let (encoder, nulls) = make_encoder_impl(array, options)?; + Ok(FieldEncoder{ + field: field.clone(), + encoder, nulls + }) + }).collect::, ArrowError>>()?; + + let encoder = StructArrayEncoder{ + encoders, + explicit_nulls: options.explicit_nulls, + }; + (Box::new(encoder) as _, array.nulls().cloned()) + } + d => match d.is_temporal() { + true => { + // Note: the implementation assumes that escaping is not necessary + // If this is extended to support user-provided format specifications + // this assumption may need to be revisited + let options = FormatOptions::new().with_display_error(true); + let formatter = ArrayFormatter::try_new(array, &options)?; + (Box::new(formatter) as _, array.nulls().cloned()) + } + false => return Err(ArrowError::InvalidArgumentError(format!("JSON Writer does not support data type: {d}"))), + } + }) +} + +fn encode_string(s: &str, out: &mut Vec) { + let mut serializer = serde_json::Serializer::new(out); + serializer.serialize_str(s).unwrap(); +} + +struct FieldEncoder<'a> { + field: FieldRef, + encoder: Box, + nulls: Option, +} + +struct StructArrayEncoder<'a> { + encoders: Vec>, + explicit_nulls: bool, +} + +impl<'a> Encoder for StructArrayEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'{'); + let mut is_first = true; + for field_encoder in &mut self.encoders { + let is_null = field_encoder.nulls.as_ref().is_some_and(|n| n.is_null(idx)); + if is_null && !self.explicit_nulls { + continue; + } + + if !is_first { + out.extend_from_slice(b", "); + } + is_first = false; + + encode_string(field_encoder.field.name(), out); + out.extend_from_slice(b": "); + + match is_null { + true => out.extend_from_slice(b"null"), + false => field_encoder.encoder.encode(idx, out), + } + } + out.push(b'}'); + } +} + +trait PrimitiveEncode: ArrowNativeType { + type Buffer; + + // Workaround https://github.com/rust-lang/rust/issues/61415 + fn init_buffer() -> Self::Buffer; + + fn encode(self, buf: &mut Self::Buffer) -> &[u8]; +} + +macro_rules! integer_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + lexical_core::write(self, buf) + } + } + )* + }; +} +integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64); + +macro_rules! float_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + if self.is_infinite() || self.is_nan() { + b"null" + } else { + lexical_core::write(self, buf) + } + } + } + )* + }; +} +float_encode!(f32, f64); + +impl PrimitiveEncode for f16 { + type Buffer = ::Buffer; + + fn init_buffer() -> Self::Buffer { + f64::init_buffer() + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + self.to_f64().encode(buf) + } +} + +struct PrimitiveEncoder { + values: ScalarBuffer, + buffer: N::Buffer, +} + +impl PrimitiveEncoder { + fn new>(array: &PrimitiveArray

) -> Self { + Self { + values: array.values().clone(), + buffer: N::init_buffer(), + } + } +} + +impl Encoder for PrimitiveEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.extend_from_slice(self.values[idx].encode(&mut self.buffer)); + } +} + +struct BooleanEncoder(BooleanArray); + +impl Encoder for BooleanEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + match self.0.value(idx) { + true => out.extend_from_slice(b"true"), + false => out.extend_from_slice(b"false"), + } + } +} + +struct StringEncoder(GenericStringArray); + +impl Encoder for StringEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + encode_string(self.0.value(idx), out); + } +} + +struct ListEncoder<'a, O: OffsetSizeTrait> { + offsets: OffsetBuffer, + nulls: Option, + encoder: Box, +} + +impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { + fn try_new( + array: &'a GenericListArray, + options: &EncoderOptions, + ) -> Result { + let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + Ok(Self { + offsets: array.offsets().clone(), + encoder, + nulls, + }) + } +} + +impl<'a, O: OffsetSizeTrait> Encoder for ListEncoder<'a, O> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + out.push(b'['); + match self.nulls.as_ref() { + Some(n) => (start..end).for_each(|idx| { + if idx != start { + out.extend_from_slice(b", ") + } + match n.is_null(idx) { + true => out.extend_from_slice(b"null"), + false => self.encoder.encode(idx, out), + } + }), + None => (start..end).for_each(|idx| { + if idx != start { + out.extend_from_slice(b", ") + } + self.encoder.encode(idx, out); + }), + } + out.push(b']'); + } +} + +struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> { + keys: ScalarBuffer, + encoder: Box, +} + +impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { + fn try_new( + array: &'a DictionaryArray, + options: &EncoderOptions, + ) -> Result { + let encoder = make_encoder(array.values().as_ref(), options)?; + + Ok(Self { + keys: array.keys().values().clone(), + encoder, + }) + } +} + +impl<'a, K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'a, K> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + self.encoder.encode(self.keys[idx].as_usize(), out) + } +} + +impl<'a> Encoder for ArrayFormatter<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'"'); + // Should be infallible + // Note: We are making an assumption that the formatter does not produce characters that require escaping + let _ = write!(out, "{}", self.value(idx)); + out.push(b'"') + } +} + +struct NullEncoder; + +impl Encoder for NullEncoder { + fn encode(&mut self, _idx: usize, _out: &mut Vec) { + unreachable!() + } +} + +struct MapEncoder<'a> { + offsets: OffsetBuffer, + keys: Box, + values: Box, + value_nulls: Option, + explicit_nulls: bool, +} + +impl<'a> MapEncoder<'a> { + fn try_new(array: &'a MapArray, options: &EncoderOptions) -> Result { + let values = array.values(); + let keys = array.keys(); + + if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) { + return Err(ArrowError::JsonError(format!( + "Only UTF8 keys supported by JSON MapArray Writer: got {:?}", + keys.data_type() + ))); + } + + let (keys, key_nulls) = make_encoder_impl(keys, options)?; + let (values, value_nulls) = make_encoder_impl(values, options)?; + + // We sanity check nulls as these are currently not enforced by MapArray (#1697) + if key_nulls.is_some_and(|x| x.null_count() != 0) { + return Err(ArrowError::InvalidArgumentError( + "Encountered nulls in MapArray keys".to_string(), + )); + } + + if array.entries().nulls().is_some_and(|x| x.null_count() != 0) { + return Err(ArrowError::InvalidArgumentError( + "Encountered nulls in MapArray entries".to_string(), + )); + } + + Ok(Self { + offsets: array.offsets().clone(), + keys, + values, + value_nulls, + explicit_nulls: options.explicit_nulls, + }) + } +} + +impl<'a> Encoder for MapEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + + let mut is_first = true; + + out.push(b'{'); + for idx in start..end { + let is_null = self.value_nulls.as_ref().is_some_and(|n| n.is_null(idx)); + if is_null && !self.explicit_nulls { + continue; + } + + if !is_first { + out.extend_from_slice(b", "); + } + is_first = false; + + self.keys.encode(idx, out); + out.extend_from_slice(b": "); + + match is_null { + true => out.extend_from_slice(b"null"), + false => self.values.encode(idx, out), + } + } + out.push(b'}'); + } +} diff --git a/arrow-json/test/data/basic.json b/arrow-json/test/data/basic.json index a6a8766bf97c..7578ff064691 100644 --- a/arrow-json/test/data/basic.json +++ b/arrow-json/test/data/basic.json @@ -1,5 +1,5 @@ -{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2", "f": "1.02", "g": "2012-04-23T18:25:43.511", "h": 1.1} -{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31", "f": "-0.3", "g": "2016-04-23T18:25:43.511", "h": 3.141} +{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2", "f": "1.02", "g": "2012-04-23T18:25:43.511", "h": 1.2802734375} +{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31", "f": "-0.3", "g": "2016-04-23T18:25:43.511", "h": 3.140625} {"a":2, "b":0.6, "c":false, "d":"text", "e": "1970-01-02 11:11:11", "f": "1377.223"} {"a":1, "b":2.0, "c":false, "d":"4", "f": "1337.009"} {"a":7, "b":-3.5, "c":true, "d":"4", "f": "1"}