Skip to content

Commit

Permalink
feat: Glue Catalog - table operations (3/3) (apache#314)
Browse files Browse the repository at this point in the history
* add GlueSchemaBuilder

* add warehouse

* add serde_json, tokio, uuid

* add minio

* add create_table

* add tests utils

* add load_table

* add drop_table + table_exists

* add rename_table

* add docs

* fix: docs + err_msg

* fix: remove unused const

* fix: default_table_location

* fix: remove single quotes error message

* chore: add test-condition `test_rename_table`

* chore: add test-condition `test_table_exists`
  • Loading branch information
marvinlanhenke authored Apr 21, 2024
1 parent 7666bb6 commit dcc380a
Show file tree
Hide file tree
Showing 7 changed files with 1,243 additions and 28 deletions.
4 changes: 3 additions & 1 deletion crates/catalog/glue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ aws-config = { workspace = true }
aws-sdk-glue = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
304 changes: 286 additions & 18 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
// under the License.

use async_trait::async_trait;
use aws_sdk_glue::types::TableInput;
use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use typed_builder::TypedBuilder;

use crate::error::from_aws_sdk_error;
use crate::error::{from_aws_build_error, from_aws_sdk_error};
use crate::utils::{
convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace,
convert_to_database, convert_to_glue_table, convert_to_namespace, create_metadata_location,
create_sdk_config, get_default_table_location, get_metadata_location, validate_namespace,
};
use crate::with_catalog_id;

Expand All @@ -38,6 +43,7 @@ pub struct GlueCatalogConfig {
uri: Option<String>,
#[builder(default, setter(strip_option))]
catalog_id: Option<String>,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}
Expand All @@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client);
pub struct GlueCatalog {
config: GlueCatalogConfig,
client: GlueClient,
file_io: FileIO,
}

impl Debug for GlueCatalog {
Expand All @@ -60,15 +67,24 @@ impl Debug for GlueCatalog {

impl GlueCatalog {
/// Create a new glue catalog
pub async fn new(config: GlueCatalogConfig) -> Self {
pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;

let client = aws_sdk_glue::Client::new(&sdk_config);

GlueCatalog {
let file_io = FileIO::from_path(&config.warehouse)?
.with_props(&config.props)
.build()?;

Ok(GlueCatalog {
config,
client: GlueClient(client),
}
file_io,
})
}
/// Get the catalogs `FileIO`
pub fn file_io(&self) -> FileIO {
self.file_io.clone()
}
}

Expand All @@ -77,7 +93,7 @@ impl Catalog for GlueCatalog {
/// List namespaces from glue catalog.
///
/// Glue doesn't support nested namespaces.
/// We will return an empty list if parent is some
/// We will return an empty list if parent is some.
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
Expand Down Expand Up @@ -277,6 +293,7 @@ impl Catalog for GlueCatalog {
/// querying the database.
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let db_name = validate_namespace(namespace)?;

let mut table_list: Vec<TableIdent> = Vec::new();
let mut next_token: Option<String> = None;

Expand Down Expand Up @@ -310,31 +327,282 @@ impl Catalog for GlueCatalog {
Ok(table_list)
}

/// Creates a new table within a specified namespace using the provided
/// table creation settings.
///
/// # Returns
/// A `Result` wrapping a `Table` object representing the newly created
/// table.
///
/// # Errors
/// This function may return an error in several cases, including invalid
/// namespace identifiers, failure to determine a default storage location,
/// issues generating or writing table metadata, and errors communicating
/// with the Glue Catalog.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
todo!()
let db_name = validate_namespace(namespace)?;
let table_name = creation.name.clone();

let location = match &creation.location {
Some(location) => location.clone(),
None => {
let ns = self.get_namespace(namespace).await?;
get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse)
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
.new_output(&metadata_location)?
.writer()
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let glue_table = convert_to_glue_table(
&table_name,
metadata_location.clone(),
&metadata,
metadata.properties(),
None,
)?;

let builder = self
.client
.0
.create_table()
.database_name(&db_name)
.table_input(glue_table);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

let table = Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
}

async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
todo!()
/// Loads a table from the Glue Catalog and constructs a `Table` object
/// based on its metadata.
///
/// # Returns
/// A `Result` wrapping a `Table` object that represents the loaded table.
///
/// # Errors
/// This function may return an error in several scenarios, including:
/// - Failure to validate the namespace.
/// - Failure to retrieve the table from the Glue Catalog.
/// - Absence of metadata location information in the table's properties.
/// - Issues reading or deserializing the table's metadata file.
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

match glue_table_output.table() {
None => Err(Error::new(
ErrorKind::Unexpected,
format!(
"Table object for database: {} and table: {} does not exist",
db_name, table_name
),
)),
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;

let table = Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build();

Ok(table)
}
}
}

async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously drops a table from the database.
///
/// # Errors
/// Returns an error if:
/// - The namespace provided in `table` cannot be validated
/// or does not exist.
/// - The underlying database client encounters an error while
/// attempting to drop the table. This includes scenarios where
/// the table does not exist.
/// - Any network or communication error occurs with the database backend.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.delete_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

Ok(())
}

async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
todo!()
/// Asynchronously checks the existence of a specified table
/// in the database.
///
/// # Returns
/// - `Ok(true)` if the table exists in the database.
/// - `Ok(false)` if the table does not exist in the database.
/// - `Err(...)` if an error occurs during the process
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let resp = builder.send().await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if err
.as_service_error()
.map(|e| e.is_entity_not_found_exception())
== Some(true)
{
return Ok(false);
}
Err(from_aws_sdk_error(err))
}
}
}

async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously renames a table within the database
/// or moves it between namespaces (databases).
///
/// # Returns
/// - `Ok(())` on successful rename or move of the table.
/// - `Err(...)` if an error occurs during the process.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let src_db_name = validate_namespace(src.namespace())?;
let dest_db_name = validate_namespace(dest.namespace())?;

let src_table_name = src.name();
let dest_table_name = dest.name();

let builder = self
.client
.0
.get_table()
.database_name(&src_db_name)
.name(src_table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

match glue_table_output.table() {
None => Err(Error::new(
ErrorKind::Unexpected,
format!(
"'Table' object for database: {} and table: {} does not exist",
src_db_name, src_table_name
),
)),
Some(table) => {
let rename_table_input = TableInput::builder()
.name(dest_table_name)
.set_parameters(table.parameters.clone())
.set_storage_descriptor(table.storage_descriptor.clone())
.set_table_type(table.table_type.clone())
.set_description(table.description.clone())
.build()
.map_err(from_aws_build_error)?;

let builder = self
.client
.0
.create_table()
.database_name(&dest_db_name)
.table_input(rename_table_input);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

let drop_src_table_result = self.drop_table(src).await;

match drop_src_table_result {
Ok(_) => Ok(()),
Err(_) => {
let err_msg_src_table = format!(
"Failed to drop old table {}.{}.",
src_db_name, src_table_name
);

let drop_dest_table_result = self.drop_table(dest).await;

match drop_dest_table_result {
Ok(_) => Err(Error::new(
ErrorKind::Unexpected,
format!(
"{} Rolled back table creation for {}.{}.",
err_msg_src_table, dest_db_name, dest_table_name
),
)),
Err(_) => Err(Error::new(
ErrorKind::Unexpected,
format!(
"{} Failed to roll back table creation for {}.{}. Please clean up manually.",
err_msg_src_table, dest_db_name, dest_table_name
),
)),
}
}
}
}
}
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
}
}
Loading

0 comments on commit dcc380a

Please sign in to comment.