From 2808625bea15401e89343cd184deefca931243e2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 16 Dec 2024 16:32:08 -0500 Subject: [PATCH] Add `ArrowToParquetSchemaConverter`, deprecate `arrow_to_parquet_schema` (#6840) * Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_schema` et al * Fmt * update test * Update parquet/src/arrow/schema/mod.rs Co-authored-by: Ed Seidl * Apply suggestions from code review Co-authored-by: Ed Seidl * Improve comments * Add more detail to WriterPropertiesBuilder docs * Update parquet/src/file/properties.rs Co-authored-by: Ed Seidl * Fix some more capitalization and add a link to Parquet date spec * Update parquet/src/arrow/schema/mod.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * Revert "Update parquet/src/arrow/schema/mod.rs" This reverts commit bd4e2d5135e3e4fe2d3dc61e043659243f4423bb. * rename to ArrowSchemaConverter * change from build --> convert * update doc * fix fmt --------- Co-authored-by: Ed Seidl Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/arrow/arrow_writer/mod.rs | 26 ++-- parquet/src/arrow/mod.rs | 8 +- parquet/src/arrow/schema/mod.rs | 172 +++++++++++++++++++++----- parquet/src/file/properties.rs | 37 +++--- 4 files changed, 180 insertions(+), 63 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 59cfac047194..871b140768cb 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -30,12 +30,10 @@ use arrow_array::types::*; use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter}; use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef}; -use super::schema::{ - add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, - arrow_to_parquet_schema_with_root, decimal_length_from_precision, -}; +use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision}; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::arrow::ArrowSchemaConverter; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::writer::encoder::ColumnValueEncoder; use crate::column::writer::{ @@ -181,10 +179,11 @@ impl ArrowWriter { options: ArrowWriterOptions, ) -> Result { let mut props = options.properties; - let schema = match options.schema_root { - Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?, - None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?, - }; + let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types()); + if let Some(schema_root) = &options.schema_root { + converter = converter.schema_root(schema_root); + } + let schema = converter.convert(&arrow_schema)?; if !options.skip_arrow_metadata { // add serialized arrow schema add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); @@ -390,9 +389,9 @@ impl ArrowWriterOptions { } /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`) - pub fn with_schema_root(self, name: String) -> Self { + pub fn with_schema_root(self, schema_root: String) -> Self { Self { - schema_root: Some(name), + schema_root: Some(schema_root), ..self } } @@ -538,7 +537,7 @@ impl ArrowColumnChunk { /// # use std::sync::Arc; /// # use arrow_array::*; /// # use arrow_schema::*; -/// # use parquet::arrow::arrow_to_parquet_schema; +/// # use parquet::arrow::ArrowSchemaConverter; /// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers}; /// # use parquet::file::properties::WriterProperties; /// # use parquet::file::writer::SerializedFileWriter; @@ -550,7 +549,10 @@ impl ArrowColumnChunk { /// /// // Compute the parquet schema /// let props = Arc::new(WriterProperties::default()); -/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap(); +/// let parquet_schema = ArrowSchemaConverter::new() +/// .with_coerce_types(props.coerce_types()) +/// .convert(&schema) +/// .unwrap(); /// /// // Create writers for each of the leaf columns /// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap(); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 2d09cd19203f..d77436bc1ff7 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter; use crate::schema::types::SchemaDescriptor; use arrow_schema::{FieldRef, Schema}; +// continue to export deprecated methods until they are removed +#[allow(deprecated)] +pub use self::schema::arrow_to_parquet_schema; + pub use self::schema::{ - arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema, - parquet_to_arrow_schema_by_columns, FieldLevels, + parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + ArrowSchemaConverter, FieldLevels, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 26b0d8b7b885..4ae3fdb8e5cf 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -15,13 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Provides API for converting parquet schema to arrow schema and vice versa. -//! -//! The main interfaces for converting parquet schema to arrow schema are -//! `parquet_to_arrow_schema`, `parquet_to_arrow_schema_by_columns` and -//! `parquet_to_arrow_field`. -//! -//! The interfaces for converting arrow schema to parquet schema is coming. +//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema] use base64::prelude::BASE64_STANDARD; use base64::Engine; @@ -226,27 +220,134 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut } } -/// Convert arrow schema to parquet schema +/// Converter for Arrow schema to Parquet schema /// -/// The name of the root schema element defaults to `"arrow_schema"`, this can be -/// overridden with [`arrow_to_parquet_schema_with_root`] -pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result { - arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types) +/// Example: +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::{Field, Schema, DataType}; +/// # use parquet::arrow::ArrowSchemaConverter; +/// use parquet::schema::types::{SchemaDescriptor, Type}; +/// use parquet::basic; // note there are two `Type`s in the following example +/// // create an Arrow Schema +/// let arrow_schema = Schema::new(vec![ +/// Field::new("a", DataType::Int64, true), +/// Field::new("b", DataType::Date32, true), +/// ]); +/// // convert the Arrow schema to a Parquet schema +/// let parquet_schema = ArrowSchemaConverter::new() +/// .convert(&arrow_schema) +/// .unwrap(); +/// +/// let expected_parquet_schema = SchemaDescriptor::new( +/// Arc::new( +/// Type::group_type_builder("arrow_schema") +/// .with_fields(vec![ +/// Arc::new( +/// Type::primitive_type_builder("a", basic::Type::INT64) +/// .build().unwrap() +/// ), +/// Arc::new( +/// Type::primitive_type_builder("b", basic::Type::INT32) +/// .with_converted_type(basic::ConvertedType::DATE) +/// .with_logical_type(Some(basic::LogicalType::Date)) +/// .build().unwrap() +/// ), +/// ]) +/// .build().unwrap() +/// ) +/// ); +/// assert_eq!(parquet_schema, expected_parquet_schema); +/// ``` +#[derive(Debug)] +pub struct ArrowSchemaConverter<'a> { + /// Name of the root schema in Parquet + schema_root: &'a str, + /// Should we coerce Arrow types to compatible Parquet types? + /// + /// See docs on [Self::with_coerce_types]` + coerce_types: bool, } -/// Convert arrow schema to parquet schema specifying the name of the root schema element -pub fn arrow_to_parquet_schema_with_root( - schema: &Schema, - root: &str, - coerce_types: bool, -) -> Result { - let fields = schema - .fields() - .iter() - .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new)) - .collect::>()?; - let group = Type::group_type_builder(root).with_fields(fields).build()?; - Ok(SchemaDescriptor::new(Arc::new(group))) +impl Default for ArrowSchemaConverter<'_> { + fn default() -> Self { + Self::new() + } +} + +impl<'a> ArrowSchemaConverter<'a> { + /// Create a new converter + pub fn new() -> Self { + Self { + schema_root: "arrow_schema", + coerce_types: false, + } + } + + /// Should Arrow types be coerced into Parquet native types (default `false`). + /// + /// Setting this option to `true` will result in Parquet files that can be + /// read by more readers, but may lose precision for Arrow types such as + /// [`DataType::Date64`] which have no direct [corresponding Parquet type]. + /// + /// By default, this converter does not coerce to native Parquet types. Enabling type + /// coercion allows for meaningful representations that do not require + /// downstream readers to consider the embedded Arrow schema, and can allow + /// for greater compatibility with other Parquet implementations. However, + /// type coercion also prevents data from being losslessly round-tripped. + /// + /// # Discussion + /// + /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no + /// corresponding Parquet logical type. Thus, they can not be losslessly + /// round-tripped when stored using the appropriate Parquet logical type. + /// For example, some Date64 values may be truncated when stored with + /// parquet's native 32 bit date type. + /// + /// For [`List`] and [`Map`] types, some Parquet readers expect certain + /// schema elements to have specific names (earlier versions of the spec + /// were somewhat ambiguous on this point). Type coercion will use the names + /// prescribed by the Parquet specification, potentially losing naming + /// metadata from the Arrow schema. + /// + /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps + /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date + /// + pub fn with_coerce_types(mut self, coerce_types: bool) -> Self { + self.coerce_types = coerce_types; + self + } + + /// Set the root schema element name (defaults to `"arrow_schema"`). + pub fn schema_root(mut self, schema_root: &'a str) -> Self { + self.schema_root = schema_root; + self + } + + /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`] + /// + /// See example in [`ArrowSchemaConverter`] + pub fn convert(&self, schema: &Schema) -> Result { + let fields = schema + .fields() + .iter() + .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new)) + .collect::>()?; + let group = Type::group_type_builder(self.schema_root) + .with_fields(fields) + .build()?; + Ok(SchemaDescriptor::new(Arc::new(group))) + } +} + +/// Convert arrow schema to parquet schema +/// +/// The name of the root schema element defaults to `"arrow_schema"`, this can be +/// overridden with [`ArrowSchemaConverter`] +#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")] +pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { + ArrowSchemaConverter::new().convert(schema) } fn parse_key_value_metadata( @@ -1488,7 +1589,10 @@ mod tests { "; let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap(); + let converted_arrow_schema = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema) + .unwrap(); assert_eq!( parquet_schema.columns().len(), converted_arrow_schema.columns().len() @@ -1512,7 +1616,10 @@ mod tests { "; let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); + let converted_arrow_schema = ArrowSchemaConverter::new() + .with_coerce_types(false) + .convert(&arrow_schema) + .unwrap(); assert_eq!( parquet_schema.columns().len(), converted_arrow_schema.columns().len() @@ -1668,7 +1775,7 @@ mod tests { Field::new("decimal256", DataType::Decimal256(39, 2), false), ]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); + let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap(); assert_eq!( parquet_schema.columns().len(), @@ -1705,9 +1812,10 @@ mod tests { false, )]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true); + let converted_arrow_schema = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema); - assert!(converted_arrow_schema.is_err()); converted_arrow_schema.unwrap(); } @@ -1978,7 +2086,9 @@ mod tests { // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; - let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?; + let parq_schema_descr = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); assert_eq!(parq_fields.len(), 2); assert_eq!(parq_fields[0].get_basic_info().id(), 1); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index aac450acd82f..7b688333e540 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -16,14 +16,13 @@ // under the License. //! Configuration via [`WriterProperties`] and [`ReaderProperties`] -use std::str::FromStr; -use std::{collections::HashMap, sync::Arc}; - use crate::basic::{Compression, Encoding}; use crate::compression::{CodecOptions, CodecOptionsBuilder}; use crate::file::metadata::KeyValue; use crate::format::SortingColumn; use crate::schema::types::ColumnPath; +use std::str::FromStr; +use std::{collections::HashMap, sync::Arc}; /// Default value for [`WriterProperties::data_page_size_limit`] pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; @@ -780,22 +779,24 @@ impl WriterPropertiesBuilder { self } - /// Sets flag to control if type coercion is enabled (defaults to `false`). + /// Should the writer coerce types to parquet native types (defaults to `false`). /// - /// # Notes - /// Some Arrow types do not have a corresponding Parquet logical type. - /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. - /// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements - /// to have specific names to be considered fully compliant. - /// Writers have the option to coerce these types and names to match those required - /// by the Parquet specification. - /// This type coercion allows for meaningful representations that do not require - /// downstream readers to consider the embedded Arrow schema, and can allow for greater - /// compatibility with other Parquet implementations. However, type - /// coercion also prevents the data from being losslessly round-tripped. - /// - /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps + /// Leaving this option the default `false` will ensure the exact same data + /// written to parquet using this library will be read. + /// + /// Setting this option to `true` will result in parquet files that can be + /// read by more readers, but potentially lose information in the process. + /// + /// * Types such as [`DataType::Date64`], which have no direct corresponding + /// Parquet type, may be stored with lower precision. + /// + /// * The internal field names of `List` and `Map` types will be renamed if + /// necessary to match what is required by the newest Parquet specification. + /// + /// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details + /// + /// [`DataType::Date64`]: arrow_schema::DataType::Date64 + /// [`ArrowToParquetSchemaConverter::with_coerce_types`]: crate::arrow::ArrowSchemaConverter::with_coerce_types pub fn set_coerce_types(mut self, coerce_types: bool) -> Self { self.coerce_types = coerce_types; self