From 8898d2ed27c46745bd825b4c485e3413f67cb9e8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 20 Jan 2024 12:55:45 +0000 Subject: [PATCH] Raw JSON writer (#5314) --- arrow-json/src/writer.rs | 91 +++++++----- arrow-json/src/writer/encoder.rs | 247 +++++++++++++++++++++++++++++++ 2 files changed, 298 insertions(+), 40 deletions(-) create mode 100644 arrow-json/src/writer/encoder.rs diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs index cabda5e2dca8..d0a416ca6583 100644 --- a/arrow-json/src/writer.rs +++ b/arrow-json/src/writer.rs @@ -20,28 +20,6 @@ //! This JSON writer converts Arrow [`RecordBatch`]es into arrays of //! JSON objects or JSON formatted byte streams. //! -//! ## Writing JSON Objects -//! -//! To serialize [`RecordBatch`]es into array of -//! [JSON](https://docs.serde.rs/serde_json/) objects, use -//! [`record_batches_to_json_rows`]: -//! -//! ``` -//! # use std::sync::Arc; -//! # use arrow_array::{Int32Array, RecordBatch}; -//! # use arrow_schema::{DataType, Field, Schema}; -//! -//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); -//! let a = Int32Array::from(vec![1, 2, 3]); -//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); -//! -//! let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap(); -//! assert_eq!( -//! serde_json::Value::Object(json_rows[1].clone()), -//! serde_json::json!({"a": 2}), -//! ); -//! ``` -//! //! ## Writing JSON formatted byte streams //! //! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use @@ -97,6 +75,8 @@ //! In order to explicitly write null values for keys, configure a custom [`Writer`] by //! using a [`WriterBuilder`] to construct a [`Writer`]. +mod encoder; + use std::iter; use std::{fmt::Debug, io::Write}; @@ -109,7 +89,9 @@ use arrow_array::types::*; use arrow_array::*; use arrow_schema::*; +use crate::writer::encoder::EncoderOptions; use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use encoder::make_encoder; fn primitive_array_to_json(array: &dyn Array) -> Result, ArrowError> where @@ -481,6 +463,7 @@ fn set_column_for_json_rows( /// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON /// [`JsonMap`]s (objects) +#[deprecated(note = "Use Writer")] pub fn record_batches_to_json_rows( batches: &[&RecordBatch], ) -> Result>, ArrowError> { @@ -597,11 +580,7 @@ pub type ArrayWriter = Writer; /// JSON writer builder. #[derive(Debug, Clone, Default)] -pub struct WriterBuilder { - /// Controls whether null values should be written explicitly for keys - /// in objects, or whether the key should be omitted entirely. - explicit_nulls: bool, -} +pub struct WriterBuilder(EncoderOptions); impl WriterBuilder { /// Create a new builder for configuring JSON writing options. @@ -629,7 +608,7 @@ impl WriterBuilder { /// Returns `true` if this writer is configured to keep keys with null values. pub fn explicit_nulls(&self) -> bool { - self.explicit_nulls + self.0.explicit_nulls } /// Set whether to keep keys with null values, or to omit writing them. @@ -654,7 +633,7 @@ impl WriterBuilder { /// /// Default is to skip nulls (set to `false`). pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { - self.explicit_nulls = explicit_nulls; + self.0.explicit_nulls = explicit_nulls; self } @@ -669,7 +648,7 @@ impl WriterBuilder { started: false, finished: false, format: F::default(), - explicit_nulls: self.explicit_nulls, + options: self.0, } } } @@ -703,7 +682,7 @@ where format: F, /// Whether keys with null values should be written or skipped - explicit_nulls: bool, + options: EncoderOptions, } impl Writer @@ -718,11 +697,12 @@ where started: false, finished: false, format: F::default(), - explicit_nulls: false, + options: EncoderOptions::default(), } } /// Write a single JSON row to the output writer + #[deprecated(note = "Use Writer::write")] pub fn write_row(&mut self, row: &Value) -> Result<(), ArrowError> { let is_first_row = !self.started; if !self.started { @@ -738,18 +718,46 @@ where Ok(()) } - /// Convert the `RecordBatch` into JSON rows, and write them to the output + /// Serialize `batch` to JSON output pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { - for row in record_batches_to_json_rows_internal(&[batch], self.explicit_nulls)? { - self.write_row(&Value::Object(row))?; + if batch.num_rows() == 0 { + return Ok(()); + } + + // BufWriter uses a buffer size of 8KB + // We therefore double this and flush once we have more than 8KB + let mut buffer = Vec::with_capacity(16 * 1024); + + let is_first_row = !self.started; + if !self.started { + self.format.start_stream(&mut buffer)?; + self.started = true; } + + let array = StructArray::from(batch.clone()); + let mut encoder = make_encoder(&array, &self.options)?; + + for idx in 0..batch.num_rows() { + self.format.start_row(&mut buffer, is_first_row)?; + encoder.encode(idx, &mut buffer); + if buffer.len() > 8 * 1024 { + self.writer.write_all(&buffer)?; + buffer.clear(); + } + self.format.end_row(&mut buffer)?; + } + + if !buffer.is_empty() { + self.writer.write_all(&buffer)?; + } + Ok(()) } - /// Convert the [`RecordBatch`] into JSON rows, and write them to the output + /// Serialize `batches` to JSON output pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> { - for row in record_batches_to_json_rows_internal(batches, self.explicit_nulls)? { - self.write_row(&Value::Object(row))?; + for b in batches { + self.write(b)?; } Ok(()) } @@ -803,6 +811,9 @@ mod tests { /// Asserts that the NDJSON `input` is semantically identical to `expected` fn assert_json_eq(input: &[u8], expected: &str) { + let s = std::str::from_utf8(input).unwrap(); + println!("{s}"); + let expected: Vec> = expected .split('\n') .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap())) @@ -1564,9 +1575,9 @@ mod tests { r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}} {"a":{"list":[null]},"b":{"list":[null]}} {"a":{"list":[]},"b":{"list":[]}} -{"a":null,"b":{"list":[3,null]}} +{"b":{"list":[3,null]}} {"a":{"list":[4,5]},"b":{"list":[4,5]}} -{"a":null,"b":{}} +{"b":{}} {"a":{},"b":{}} "#, ); diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs new file mode 100644 index 000000000000..d57ff0842a58 --- /dev/null +++ b/arrow-json/src/writer/encoder.rs @@ -0,0 +1,247 @@ +use arrow_array::cast::AsArray; +use arrow_array::types::*; +use arrow_array::*; +use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use lexical_core::FormattedSize; + +#[derive(Debug, Clone, Default)] +pub struct EncoderOptions { + pub explicit_nulls: bool, +} + +pub trait Encoder { + fn encode(&mut self, idx: usize, out: &mut Vec); +} + +pub fn make_encoder( + array: &dyn Array, + options: &EncoderOptions, +) -> Result, ArrowError> { + let (encoder, nulls) = make_encoder_impl(array, options)?; + assert!(nulls.is_none(), "root cannot be nullable"); + Ok(encoder) +} + +pub fn make_encoder_impl( + array: &dyn Array, + options: &EncoderOptions, +) -> Result<(Box, Option), ArrowError> { + macro_rules! primitive_helper { + ($t:ty) => {{ + let array = array.as_primitive::<$t>(); + let nulls = array.nulls().cloned(); + (Box::new(PrimitiveEncoder::new(array)) as _, nulls) + }}; + } + + Ok(downcast_integer! { + array.data_type() => (primitive_helper), + DataType::Float32 => primitive_helper!(Float32Type), + DataType::Float64 => primitive_helper!(Float64Type), + DataType::Utf8 => { + let array = array.as_string::(); + let nulls = array.nulls().cloned(); + (Box::new(StringEncoder(array.clone())) as _, nulls) + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + let nulls = array.nulls().cloned(); + (Box::new(StringEncoder(array.clone())) as _, nulls) + } + DataType::List(_) => { + let array = array.as_list::(); + let nulls = array.nulls().cloned(); + (Box::new(ListEncoder::try_new(array, options)?) as _, nulls) + } + DataType::LargeList(_) => { + let array = array.as_list::(); + let nulls = array.nulls().cloned(); + (Box::new(ListEncoder::try_new(array, options)?) as _, nulls) + } + + DataType::Struct(fields) => { + let array = array.as_struct(); + let nulls = array.nulls().cloned(); + let encoders = fields.iter().zip(array.columns()).map(|(field, array)| { + let (encoder, nulls) = make_encoder_impl(array, options)?; + Ok(FieldEncoder{ + field: field.clone(), + encoder, nulls + }) + }).collect::, ArrowError>>()?; + + let encoder = StructArrayEncoder{ + encoders, + explicit_nulls: options.explicit_nulls, + }; + (Box::new(encoder) as _, nulls) + } + _ => unreachable!() + }) +} + +fn encode_string(s: &str, out: &mut Vec) { + //TODO Escaping + out.reserve(s.len() + 2); + out.push(b'"'); + out.extend_from_slice(s.as_bytes()); + out.push(b'"'); +} + +struct FieldEncoder { + field: FieldRef, + encoder: Box, + nulls: Option, +} + +struct StructArrayEncoder { + encoders: Vec, + explicit_nulls: bool, +} + +impl Encoder for StructArrayEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'{'); + let mut is_first = true; + for field_encoder in &mut self.encoders { + let is_null = field_encoder.nulls.as_ref().is_some_and(|n| n.is_null(idx)); + if is_null && !self.explicit_nulls { + continue; + } + + if !is_first { + out.extend_from_slice(b", "); + } + is_first = false; + + encode_string(field_encoder.field.name(), out); + out.extend_from_slice(b": "); + + match is_null { + true => out.extend_from_slice(b"null"), + false => field_encoder.encoder.encode(idx, out), + } + } + out.push(b'}'); + } +} + +trait PrimitiveEncode: ArrowNativeType { + type Buffer; + + // Workaround https://github.com/rust-lang/rust/issues/61415 + fn init_buffer() -> Self::Buffer; + + fn encode(self, buf: &mut Self::Buffer) -> &mut [u8]; +} + +macro_rules! integer_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &mut [u8] { + lexical_core::write(self, buf) + } + } + )* + }; +} +integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64); + +macro_rules! float_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &mut [u8] { + // TODO: Handle NAN, infinity, etc... + lexical_core::write(self, buf) + } + } + )* + }; +} +float_encode!(f32, f64); + +struct PrimitiveEncoder { + values: ScalarBuffer, + buffer: N::Buffer, +} + +impl PrimitiveEncoder { + fn new>(array: &PrimitiveArray

) -> Self { + Self { + values: array.values().clone(), + buffer: N::init_buffer(), + } + } +} + +impl Encoder for PrimitiveEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.extend_from_slice(self.values[idx].encode(&mut self.buffer)); + } +} + +struct StringEncoder(GenericStringArray); + +impl Encoder for StringEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + encode_string(self.0.value(idx), out); + } +} + +struct ListEncoder { + offsets: OffsetBuffer, + nulls: Option, + encoder: Box, +} + +impl ListEncoder { + fn try_new(array: &GenericListArray, options: &EncoderOptions) -> Result { + let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + Ok(Self { + offsets: array.offsets().clone(), + encoder, + nulls, + }) + } +} + +impl Encoder for ListEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + out.push(b'['); + match self.nulls.as_ref() { + Some(n) => (start..end).for_each(|idx| { + if idx != start { + out.extend_from_slice(b", ") + } + match n.is_null(idx) { + true => out.extend_from_slice(b"null"), + false => self.encoder.encode(idx, out), + } + }), + None => (start..end).for_each(|idx| { + if idx != start { + out.extend_from_slice(b", ") + } + self.encoder.encode(idx, out); + }), + } + out.push(b']'); + } +}