Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting writing schema metadata when writing Parquet in parallel #13866

Merged
merged 16 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ apache-avro = { version = "0.17", default-features = false, features = [
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
base64 = "0.22.1"
half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ config_namespace! {
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".to_string()

/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
///
/// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
/// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
pub skip_arrow_metadata: bool, default = false

/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
Expand Down Expand Up @@ -1496,6 +1502,20 @@ impl TableParquetOptions {
pub fn new() -> Self {
Self::default()
}

/// Set whether the encoding of the arrow metadata should occur
/// during the writing of parquet.
///
/// Default is to encode the arrow schema in the file kv_metadata.
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
Self {
global: ParquetOptions {
skip_arrow_metadata: skip,
..self.global
},
..self
}
}
}

impl ConfigField for TableParquetOptions {
Expand Down
15 changes: 9 additions & 6 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod parquet_writer;
mod tests {
use std::collections::HashMap;

use super::parquet_writer::ParquetWriterOptions;
use crate::{
config::{ConfigFileType, TableOptions},
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
Expand All @@ -40,7 +39,7 @@ mod tests {

use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
schema::types::ColumnPath,
};

Expand Down Expand Up @@ -79,8 +78,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = WriterPropertiesBuilder::try_from(
&table_config.parquet.with_skip_arrow_metadata(true),
)?
.build();

// Verify the expected options propagated down to parquet crate WriterProperties struct
assert_eq!(properties.max_row_group_size(), 123);
Expand Down Expand Up @@ -184,8 +185,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = WriterPropertiesBuilder::try_from(
&table_config.parquet.with_skip_arrow_metadata(true),
)?
.build();

let col1 = ColumnPath::from(vec!["col1".to_owned()]);
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
Expand Down
113 changes: 106 additions & 7 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

//! Options related to how parquet files should be written

use base64::Engine;
use std::sync::Arc;

use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
DataFusionError, Result, _internal_datafusion_err,
};

use arrow_schema::Schema;
use parquet::{
arrow::ARROW_SCHEMA_META_KEY,
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
file::{
metadata::KeyValue,
properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
},
},
format::KeyValue,
schema::types::ColumnPath,
};

Expand All @@ -51,6 +58,17 @@ impl ParquetWriterOptions {
}
}

impl TableParquetOptions {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
/// Add the arrow schema to the parquet kv_metadata.
/// If already exists, then overwrites.
pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
self.key_value_metadata.insert(
ARROW_SCHEMA_META_KEY.into(),
Some(encode_arrow_schema(schema)),
);
}
}

impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

Expand Down Expand Up @@ -79,6 +97,14 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {

let mut builder = global.into_writer_properties_builder()?;

// check that the arrow schema is present in the kv_metadata, if configured to do so
if !global.skip_arrow_metadata
&& !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
{
return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
}

// add kv_meta, if any
if !key_value_metadata.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
Expand Down Expand Up @@ -140,11 +166,38 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
}
}

/// Encodes the Arrow schema into the IPC format, and base64 encodes it
///
/// TODO: use extern parquet's private method, once publicly available.
/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
let options = arrow_ipc::writer::IpcWriteOptions::default();
let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
schema,
&mut dictionary_tracker,
&options,
);

// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
let schema_len = serialized_schema.ipc_message.len();
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
len_prefix_schema.append(&mut serialized_schema.ipc_message);

base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
}

impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
/// applied per column; a customization which is not applicable for [`ParquetOptions`].
///
/// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
let ParquetOptions {
data_pagesize_limit,
Expand Down Expand Up @@ -177,6 +230,7 @@ impl ParquetOptions {
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
skip_arrow_metadata: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -444,6 +498,7 @@ mod tests {
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
}
}

Expand Down Expand Up @@ -546,19 +601,55 @@ mod tests {
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
},
column_specific_options,
key_value_metadata,
}
}

#[test]
fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
// TableParquetOptions, all props set to default
let mut table_parquet_opts = TableParquetOptions::default();
assert!(
!table_parquet_opts.global.skip_arrow_metadata,
"default false, to not skip the arrow schema requirement"
);

// see errors without the schema added, using default settings
let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_error.is_err(),
"should error without the required arrow schema in kv_metadata",
);

// succeeds if we permit skipping the arrow schema
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_succeed.is_ok(),
"should work with the arrow schema skipped by config",
);

// Set the arrow schema back to required
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
// add the arrow schema to the kv_meta
table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_succeed.is_ok(),
"should work with the arrow schema included in TableParquetOptions",
);
}

#[test]
fn table_parquet_opts_to_writer_props() {
// ParquetOptions, all props set to non-default
let parquet_options = parquet_options_with_non_defaults();

// TableParquetOptions, using ParquetOptions for global settings
let key = "foo".to_string();
let key = ARROW_SCHEMA_META_KEY.to_string();
let value = Some("bar".into());
let table_parquet_opts = TableParquetOptions {
global: parquet_options.clone(),
Expand All @@ -585,14 +676,18 @@ mod tests {
#[test]
fn test_defaults_match() {
// ensure the global settings are the same
let default_table_writer_opts = TableParquetOptions::default();
let mut default_table_writer_opts = TableParquetOptions::default();
let default_parquet_opts = ParquetOptions::default();
assert_eq!(
default_table_writer_opts.global,
default_parquet_opts,
"should have matching defaults for TableParquetOptions.global and ParquetOptions",
);

// selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
default_table_writer_opts =
default_table_writer_opts.with_skip_arrow_metadata(true);

// WriterProperties::default, a.k.a. using extern parquet's defaults
let default_writer_props = WriterProperties::new();

Expand Down Expand Up @@ -640,6 +735,7 @@ mod tests {
session_config_from_writer_props(&default_writer_props);
from_extern_parquet.global.created_by = same_created_by;
from_extern_parquet.global.compression = Some("zstd(3)".into());
from_extern_parquet.global.skip_arrow_metadata = true;

assert_eq!(
default_table_writer_opts,
Expand All @@ -653,6 +749,7 @@ mod tests {
// the TableParquetOptions::default, with only the bloom filter turned on
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down Expand Up @@ -681,6 +778,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down Expand Up @@ -713,6 +811,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down
Loading
Loading