Skip to content

Commit

Permalink
refine: seperate parquet reader and arrow convert (apache#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME authored Apr 3, 2024
1 parent 3dcb3a9 commit 865f774
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 170 deletions.
23 changes: 23 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Conversion between Iceberg and Arrow schema
mod schema;
pub use schema::*;
mod reader;
pub use reader::*;
189 changes: 189 additions & 0 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parquet file data reader
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use futures::stream::StreamExt;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::schema::types::SchemaDescriptor;
use std::collections::HashMap;
use std::str::FromStr;

use crate::arrow::arrow_schema_to_schema;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;
use crate::{Error, ErrorKind};

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
field_ids: Vec<usize>,
file_io: FileIO,
schema: SchemaRef,
}

impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
field_ids: vec![],
file_io,
schema,
}
}

/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}

/// Sets the desired column projection with a list of field ids.
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
self.field_ids = field_ids.into_iter().collect();
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
field_ids: self.field_ids,
schema: self.schema,
file_io: self.file_io,
}
}
}

/// Reads data from Parquet files
pub struct ArrowReader {
batch_size: Option<usize>,
field_ids: Vec<usize>,
#[allow(dead_code)]
schema: SchemaRef,
file_io: FileIO,
}

impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_reader = file_io
.new_input(task.data().data_file().file_path())?
.reader()
.await?;

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?;
}
}
}
.boxed())
}

fn get_arrow_projection_mask(
&self,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
if self.field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
let mut column_map = HashMap::new();

let fields = arrow_schema.fields();
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
fields.filter_leaves(|idx, field| {
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
if field_id.is_none() {
return false;
}

let field_id = i32::from_str(field_id.unwrap());
if field_id.is_err() {
return false;
}
let field_id = field_id.unwrap();

if !self.field_ids.contains(&(field_id as usize)) {
return false;
}

let iceberg_field = self.schema.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);

if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
return false;
}

if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type {
return false;
}

column_map.insert(field_id, idx);
true
});

if column_map.len() != self.field_ids.len() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, self.schema
),
));
}

let mut indices = vec![];
for field_id in &self.field_ids {
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
indices.push(*col_idx);
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Field {} is not found in Parquet schema.", field_id),
));
}
}
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}
}
171 changes: 1 addition & 170 deletions crates/iceberg/src/arrow.rs → crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Parquet file data reader
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use futures::stream::StreamExt;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::schema::types::SchemaDescriptor;
use std::collections::HashMap;
use std::str::FromStr;

use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;
//! Conversion between Arrow schema and Iceberg schema.
use crate::error::Result;
use crate::spec::{
Expand All @@ -37,163 +25,6 @@ use crate::{Error, ErrorKind};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use std::sync::Arc;

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
field_ids: Vec<usize>,
file_io: FileIO,
schema: SchemaRef,
}

impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
field_ids: vec![],
file_io,
schema,
}
}

/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}

/// Sets the desired column projection with a list of field ids.
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
self.field_ids = field_ids.into_iter().collect();
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
field_ids: self.field_ids,
schema: self.schema,
file_io: self.file_io,
}
}
}

/// Reads data from Parquet files
pub struct ArrowReader {
batch_size: Option<usize>,
field_ids: Vec<usize>,
#[allow(dead_code)]
schema: SchemaRef,
file_io: FileIO,
}

impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_reader = file_io
.new_input(task.data().data_file().file_path())?
.reader()
.await?;

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?;
}
}
}
.boxed())
}

fn get_arrow_projection_mask(
&self,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
if self.field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
let mut column_map = HashMap::new();

let fields = arrow_schema.fields();
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
fields.filter_leaves(|idx, field| {
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
if field_id.is_none() {
return false;
}

let field_id = i32::from_str(field_id.unwrap());
if field_id.is_err() {
return false;
}
let field_id = field_id.unwrap();

if !self.field_ids.contains(&(field_id as usize)) {
return false;
}

let iceberg_field = self.schema.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);

if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
return false;
}

if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type {
return false;
}

column_map.insert(field_id, idx);
true
});

if column_map.len() != self.field_ids.len() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, self.schema
),
));
}

let mut indices = vec![];
for field_id in &self.field_ids {
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
indices.push(*col_idx);
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Field {} is not found in Parquet schema.", field_id),
));
}
}
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}
}

/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
Expand Down

0 comments on commit 865f774

Please sign in to comment.