Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ArrowToParquetSchemaConverter, deprecate arrow_to_parquet_schema #6840

Merged
merged 21 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ use arrow_array::types::*;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
};
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};

use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
use crate::arrow::ArrowSchemaConverter;
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::ColumnValueEncoder;
use crate::column::writer::{
Expand Down Expand Up @@ -181,10 +179,11 @@ impl<W: Write + Send> ArrowWriter<W> {
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
};
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}
let schema = converter.convert(&arrow_schema)?;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -390,9 +389,9 @@ impl ArrowWriterOptions {
}

/// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
pub fn with_schema_root(self, name: String) -> Self {
pub fn with_schema_root(self, schema_root: String) -> Self {
Self {
schema_root: Some(name),
schema_root: Some(schema_root),
..self
}
}
Expand Down Expand Up @@ -538,7 +537,7 @@ impl ArrowColumnChunk {
/// # use std::sync::Arc;
/// # use arrow_array::*;
/// # use arrow_schema::*;
/// # use parquet::arrow::arrow_to_parquet_schema;
/// # use parquet::arrow::ArrowSchemaConverter;
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
/// # use parquet::file::properties::WriterProperties;
/// # use parquet::file::writer::SerializedFileWriter;
Expand All @@ -550,7 +549,10 @@ impl ArrowColumnChunk {
///
/// // Compute the parquet schema
/// let props = Arc::new(WriterProperties::default());
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
/// let parquet_schema = ArrowSchemaConverter::new()
/// .with_coerce_types(props.coerce_types())
/// .convert(&schema)
/// .unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};

// continue to export deprecated methods until they are removed
#[allow(deprecated)]
pub use self::schema::arrow_to_parquet_schema;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
ArrowSchemaConverter, FieldLevels,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
172 changes: 141 additions & 31 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Provides API for converting parquet schema to arrow schema and vice versa.
//!
//! The main interfaces for converting parquet schema to arrow schema are
//! `parquet_to_arrow_schema`, `parquet_to_arrow_schema_by_columns` and
//! `parquet_to_arrow_field`.
//!
//! The interfaces for converting arrow schema to parquet schema is coming.
//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -226,27 +220,134 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
}
}

/// Convert arrow schema to parquet schema
/// Converter for Arrow schema to Parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`arrow_to_parquet_schema_with_root`]
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
/// Example:
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Field, Schema, DataType};
/// # use parquet::arrow::ArrowSchemaConverter;
/// use parquet::schema::types::{SchemaDescriptor, Type};
/// use parquet::basic; // note there are two `Type`s in the following example
/// // create an Arrow Schema
/// let arrow_schema = Schema::new(vec![
/// Field::new("a", DataType::Int64, true),
/// Field::new("b", DataType::Date32, true),
/// ]);
/// // convert the Arrow schema to a Parquet schema
/// let parquet_schema = ArrowSchemaConverter::new()
/// .convert(&arrow_schema)
/// .unwrap();
///
/// let expected_parquet_schema = SchemaDescriptor::new(
/// Arc::new(
/// Type::group_type_builder("arrow_schema")
/// .with_fields(vec![
/// Arc::new(
/// Type::primitive_type_builder("a", basic::Type::INT64)
/// .build().unwrap()
/// ),
/// Arc::new(
/// Type::primitive_type_builder("b", basic::Type::INT32)
/// .with_converted_type(basic::ConvertedType::DATE)
/// .with_logical_type(Some(basic::LogicalType::Date))
/// .build().unwrap()
/// ),
/// ])
/// .build().unwrap()
/// )
/// );
/// assert_eq!(parquet_schema, expected_parquet_schema);
/// ```
#[derive(Debug)]
pub struct ArrowSchemaConverter<'a> {
/// Name of the root schema in Parquet
schema_root: &'a str,
/// Should we coerce Arrow types to compatible Parquet types?
///
/// See docs on [Self::with_coerce_types]`
coerce_types: bool,
}

/// Convert arrow schema to parquet schema specifying the name of the root schema element
pub fn arrow_to_parquet_schema_with_root(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out this function is not actually exported (it is pub in this module, but not pub exported):

https://docs.rs/parquet/latest/parquet/?search=arrow_to_parquet_schema_with_root

Returns no results

The compiler told me it was unused once I switched everything over to use ArrowToParquetSchemaConverter

schema: &Schema,
root: &str,
coerce_types: bool,
) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
impl Default for ArrowSchemaConverter<'_> {
fn default() -> Self {
Self::new()
}
}

impl<'a> ArrowSchemaConverter<'a> {
/// Create a new converter
pub fn new() -> Self {
Self {
schema_root: "arrow_schema",
coerce_types: false,
}
}

/// Should Arrow types be coerced into Parquet native types (default `false`).
///
/// Setting this option to `true` will result in Parquet files that can be
/// read by more readers, but may lose precision for Arrow types such as
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
///
/// By default, this converter does not coerce to native Parquet types. Enabling type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow
/// for greater compatibility with other Parquet implementations. However,
/// type coercion also prevents data from being losslessly round-tripped.
///
/// # Discussion
///
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
/// corresponding Parquet logical type. Thus, they can not be losslessly
/// round-tripped when stored using the appropriate Parquet logical type.
/// For example, some Date64 values may be truncated when stored with
/// parquet's native 32 bit date type.
///
/// For [`List`] and [`Map`] types, some Parquet readers expect certain
/// schema elements to have specific names (earlier versions of the spec
/// were somewhat ambiguous on this point). Type coercion will use the names
/// prescribed by the Parquet specification, potentially losing naming
/// metadata from the Arrow schema.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
/// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
///
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
}

/// Set the root schema element name (defaults to `"arrow_schema"`).
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
alamb marked this conversation as resolved.
Show resolved Hide resolved
self.schema_root = schema_root;
self
}

/// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
///
/// See example in [`ArrowSchemaConverter`]
pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(self.schema_root)
.with_fields(fields)
.build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}
}

/// Convert arrow schema to parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`ArrowSchemaConverter`]
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrowSchemaConverter::new().convert(schema)
}

fn parse_key_value_metadata(
Expand Down Expand Up @@ -1488,7 +1589,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema)
.unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
Expand All @@ -1512,7 +1616,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(false)
.convert(&arrow_schema)
.unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
Expand Down Expand Up @@ -1668,7 +1775,7 @@ mod tests {
Field::new("decimal256", DataType::Decimal256(39, 2), false),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();

assert_eq!(
parquet_schema.columns().len(),
Expand Down Expand Up @@ -1705,9 +1812,10 @@ mod tests {
false,
)];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema);

assert!(converted_arrow_schema.is_err());
converted_arrow_schema.unwrap();
}

Expand Down Expand Up @@ -1978,7 +2086,9 @@ mod tests {
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;

let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
let parq_schema_descr = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema)?;
let parq_fields = parq_schema_descr.root_schema().get_fields();
assert_eq!(parq_fields.len(), 2);
assert_eq!(parq_fields[0].get_basic_info().id(), 1);
Expand Down
37 changes: 19 additions & 18 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
// under the License.

//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

/// Default value for [`WriterProperties::data_page_size_limit`]
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -780,22 +779,24 @@ impl WriterPropertiesBuilder {
self
}

/// Sets flag to control if type coercion is enabled (defaults to `false`).
/// Should the writer coerce types to parquet native types (defaults to `false`).
///
/// # Notes
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
/// to have specific names to be considered fully compliant.
/// Writers have the option to coerce these types and names to match those required
/// by the Parquet specification.
/// This type coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
/// compatibility with other Parquet implementations. However, type
/// coercion also prevents the data from being losslessly round-tripped.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
/// Leaving this option the default `false` will ensure the exact same data
/// written to parquet using this library will be read.
///
/// Setting this option to `true` will result in parquet files that can be
/// read by more readers, but potentially lose information in the process.
///
/// * Types such as [`DataType::Date64`], which have no direct corresponding
/// Parquet type, may be stored with lower precision.
///
/// * The internal field names of `List` and `Map` types will be renamed if
/// necessary to match what is required by the newest Parquet specification.
///
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
///
/// [`DataType::Date64`]: arrow_schema::DataType::Date64
/// [`ArrowToParquetSchemaConverter::with_coerce_types`]: crate::arrow::ArrowSchemaConverter::with_coerce_types
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
Expand Down
Loading