Skip to content

Commit

Permalink
Basic Integration with Datafusion (apache#324)
Browse files Browse the repository at this point in the history
* chore: basic structure

* feat: add IcebergCatalogProvider

* feat: add IcebergSchemaProvider

* feat: add IcebergTableProvider

* chore: add integration test infr

* fix: remove old test

* chore: update crate structure

* fix: remove workspace dep

* refactor: use try_join_all

* chore: remove feature flag

* chore: rename package

* chore: update readme

* feat: add TableType

* fix: import + async_trait

* fix: imports + async_trait

* chore: remove feature flag

* fix: cargo sort

* refactor: CatalogProvider `fn try_new`

* refactor: SchemaProvider `fn try_new`

* chore: update docs

* chore: update docs

* chore: update doc

* feat: impl `fn schema` on TableProvider

* chore: rename ArrowSchema

* refactor: remove DashMap

* feat: add basic IcebergTableScan

* chore: fix docs

* chore: add comments

* fix: clippy

* fix: typo

* fix: license

* chore: update docs

* chore: move derive stmt

* fix: collect into hashmap

* chore: use DFResult

* Update crates/integrations/datafusion/README.md

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
3 people authored May 2, 2024
1 parent 7dfc548 commit bbd042d
Show file tree
Hide file tree
Showing 15 changed files with 889 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
]

Expand Down Expand Up @@ -56,6 +57,7 @@ fnv = "1"
futures = "0.3"
iceberg = { version = "0.2.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" }
iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" }
itertools = "0.12"
lazy_static = "1"
log = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::AsyncReadExt;
use typed_builder::TypedBuilder;

/// Table represents a table in the catalog.
#[derive(TypedBuilder, Debug)]
#[derive(TypedBuilder, Debug, Clone)]
pub struct Table {
file_io: FileIO,
#[builder(default, setter(strip_option, into))]
Expand Down
43 changes: 43 additions & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iceberg-datafusion"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
rust-version = { workspace = true }

categories = ["database"]
description = "Apache Iceberg Datafusion Integration"
repository = { workspace = true }
license = { workspace = true }
keywords = ["iceberg", "integrations", "datafusion"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
datafusion = { version = "37.0.0" }
futures = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
iceberg-catalog-hms = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
22 changes: 22 additions & 0 deletions crates/integrations/datafusion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

# Apache Iceberg DataFusion Integration

This crate contains the integration of Apache DataFusion and Apache Iceberg.
95 changes: 95 additions & 0 deletions crates/integrations/datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, collections::HashMap, sync::Arc};

use datafusion::catalog::{schema::SchemaProvider, CatalogProvider};
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};

use crate::schema::IcebergSchemaProvider;

/// Provides an interface to manage and access multiple schemas
/// within an Iceberg [`Catalog`].
///
/// Acts as a centralized catalog provider that aggregates
/// multiple [`SchemaProvider`], each associated with distinct namespaces.
pub struct IcebergCatalogProvider {
/// A `HashMap` where keys are namespace names
/// and values are dynamic references to objects implementing the
/// [`SchemaProvider`] trait.
schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}

impl IcebergCatalogProvider {
/// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
/// using the given client to fetch and initialize schema providers for
/// each namespace in the Iceberg [`Catalog`].
///
/// This method retrieves the list of namespace names
/// attempts to create a schema provider for each namespace, and
/// collects these providers into a `HashMap`.
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
// TODO:
// Schemas and providers should be cached and evicted based on time
// As of right now; schemas might become stale.
let schema_names: Vec<_> = client
.list_namespaces(None)
.await?
.iter()
.flat_map(|ns| ns.as_ref().clone())
.collect();

let providers = try_join_all(
schema_names
.iter()
.map(|name| {
IcebergSchemaProvider::try_new(
client.clone(),
NamespaceIdent::new(name.clone()),
)
})
.collect::<Vec<_>>(),
)
.await?;

let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
.into_iter()
.zip(providers.into_iter())
.map(|(name, provider)| {
let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
(name, provider)
})
.collect();

Ok(IcebergCatalogProvider { schemas })
}
}

impl CatalogProvider for IcebergCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.schemas.keys().cloned().collect()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).cloned()
}
}
32 changes: 32 additions & 0 deletions crates/integrations/datafusion/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use iceberg::{Error, ErrorKind};

/// Converts a datafusion error into an iceberg error.
pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error {
Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting datafusion error".to_string(),
)
.with_source(anyhow!("datafusion error: {:?}", error))
}
/// Converts an iceberg error into a datafusion error.
pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError {
datafusion::error::DataFusionError::External(error.into())
}
26 changes: 26 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod catalog;
pub use catalog::*;

mod error;
pub use error::*;

mod physical_plan;
mod schema;
mod table;
18 changes: 18 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod scan;
Loading

0 comments on commit bbd042d

Please sign in to comment.