From 99f4a45ddac044abb7ff5b56fa89d326e2414a13 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 7 Oct 2024 09:56:26 +0200 Subject: [PATCH] Add Project APIs --- ...b0302f8ff81b7046efaa73c0d2433e1867780.json | 29 ++ ...e028e5b5f57e0486177ae63aa2ec266d329c9.json | 20 - ...ad0a3a1d35b555715e7c720959c73b569d129.json | 15 + ...bc29710601746acb3b6e4e085a9019dfa85aa.json | 28 ++ ...5fa4d940951bf20555d77e727b523074f33ae.json | 23 ++ ...af1656ed5ddfe52f1c0c3185b6d35112d5b39.json | 14 + crates/iceberg-catalog-bin/src/main.rs | 1 - .../20241006083546_projects-table.sql | 16 + .../iceberg-catalog/src/api/management/mod.rs | 116 +++++- .../src/api/management/v1/project.rs | 202 +++++++++++ .../src/api/management/v1/warehouse.rs | 80 ++-- crates/iceberg-catalog/src/catalog/config.rs | 4 +- .../iceberg-catalog/src/catalog/namespace.rs | 2 +- crates/iceberg-catalog/src/catalog/tables.rs | 8 +- crates/iceberg-catalog/src/catalog/views.rs | 1 + .../src/catalog/views/commit.rs | 2 +- .../src/catalog/views/create.rs | 2 +- .../iceberg-catalog/src/catalog/views/drop.rs | 2 +- .../iceberg-catalog/src/catalog/views/load.rs | 2 +- .../src/implementations/postgres/catalog.rs | 194 ++++++---- .../src/implementations/postgres/namespace.rs | 8 +- .../implementations/postgres/tabular/table.rs | 24 +- .../implementations/postgres/tabular/view.rs | 4 +- .../src/implementations/postgres/warehouse.rs | 342 +++++++++++++----- crates/iceberg-catalog/src/service/catalog.rs | 67 +++- crates/iceberg-catalog/src/service/config.rs | 44 ++- crates/iceberg-catalog/src/service/mod.rs | 6 +- .../src/service/task_queue/mod.rs | 1 + .../service/task_queue/tabular_purge_queue.rs | 2 +- .../create-default-project.json | 4 + examples/self-contained/docker-compose.yaml | 27 ++ justfile | 7 +- openapi/management-open-api.yaml | 158 +++++++- tests/docker-compose.yaml | 2 +- tests/python/tests/conftest.py | 36 +- 35 files changed, 1172 insertions(+), 321 deletions(-) create mode 100644 .sqlx/query-04c6fd4a25005469a8342fe751db0302f8ff81b7046efaa73c0d2433e1867780.json delete mode 100644 .sqlx/query-730013736b62e29d934e1c320b3e028e5b5f57e0486177ae63aa2ec266d329c9.json create mode 100644 .sqlx/query-8528499bf3e9b68a051e0e68660ad0a3a1d35b555715e7c720959c73b569d129.json create mode 100644 .sqlx/query-99e4e108d554390e9b6d96270cfbc29710601746acb3b6e4e085a9019dfa85aa.json create mode 100644 .sqlx/query-9f23e7ec362c786e9524941bf385fa4d940951bf20555d77e727b523074f33ae.json create mode 100644 .sqlx/query-f64d0b807474d82210944a5004eaf1656ed5ddfe52f1c0c3185b6d35112d5b39.json create mode 100644 crates/iceberg-catalog/migrations/20241006083546_projects-table.sql create mode 100644 crates/iceberg-catalog/src/api/management/v1/project.rs create mode 100644 examples/self-contained/create-default-project.json diff --git a/.sqlx/query-04c6fd4a25005469a8342fe751db0302f8ff81b7046efaa73c0d2433e1867780.json b/.sqlx/query-04c6fd4a25005469a8342fe751db0302f8ff81b7046efaa73c0d2433e1867780.json new file mode 100644 index 00000000..40ca6ba1 --- /dev/null +++ b/.sqlx/query-04c6fd4a25005469a8342fe751db0302f8ff81b7046efaa73c0d2433e1867780.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT project_id, project_name FROM project WHERE project_id = ANY($1) or $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "project_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "project_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "UuidArray", + "Bool" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "04c6fd4a25005469a8342fe751db0302f8ff81b7046efaa73c0d2433e1867780" +} diff --git a/.sqlx/query-730013736b62e29d934e1c320b3e028e5b5f57e0486177ae63aa2ec266d329c9.json b/.sqlx/query-730013736b62e29d934e1c320b3e028e5b5f57e0486177ae63aa2ec266d329c9.json deleted file mode 100644 index 72579ed5..00000000 --- a/.sqlx/query-730013736b62e29d934e1c320b3e028e5b5f57e0486177ae63aa2ec266d329c9.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT DISTINCT project_id FROM warehouse", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "project_id", - "type_info": "Uuid" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "730013736b62e29d934e1c320b3e028e5b5f57e0486177ae63aa2ec266d329c9" -} diff --git a/.sqlx/query-8528499bf3e9b68a051e0e68660ad0a3a1d35b555715e7c720959c73b569d129.json b/.sqlx/query-8528499bf3e9b68a051e0e68660ad0a3a1d35b555715e7c720959c73b569d129.json new file mode 100644 index 00000000..e97c8887 --- /dev/null +++ b/.sqlx/query-8528499bf3e9b68a051e0e68660ad0a3a1d35b555715e7c720959c73b569d129.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE project\n SET project_name = $1\n WHERE project_id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8528499bf3e9b68a051e0e68660ad0a3a1d35b555715e7c720959c73b569d129" +} diff --git a/.sqlx/query-99e4e108d554390e9b6d96270cfbc29710601746acb3b6e4e085a9019dfa85aa.json b/.sqlx/query-99e4e108d554390e9b6d96270cfbc29710601746acb3b6e4e085a9019dfa85aa.json new file mode 100644 index 00000000..f67196c9 --- /dev/null +++ b/.sqlx/query-99e4e108d554390e9b6d96270cfbc29710601746acb3b6e4e085a9019dfa85aa.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n project_name,\n project_id\n FROM project\n WHERE project_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "project_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "project_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "99e4e108d554390e9b6d96270cfbc29710601746acb3b6e4e085a9019dfa85aa" +} diff --git a/.sqlx/query-9f23e7ec362c786e9524941bf385fa4d940951bf20555d77e727b523074f33ae.json b/.sqlx/query-9f23e7ec362c786e9524941bf385fa4d940951bf20555d77e727b523074f33ae.json new file mode 100644 index 00000000..6550e646 --- /dev/null +++ b/.sqlx/query-9f23e7ec362c786e9524941bf385fa4d940951bf20555d77e727b523074f33ae.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO project (project_name, project_id)\n VALUES ($1, $2)\n RETURNING project_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "project_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "9f23e7ec362c786e9524941bf385fa4d940951bf20555d77e727b523074f33ae" +} diff --git a/.sqlx/query-f64d0b807474d82210944a5004eaf1656ed5ddfe52f1c0c3185b6d35112d5b39.json b/.sqlx/query-f64d0b807474d82210944a5004eaf1656ed5ddfe52f1c0c3185b6d35112d5b39.json new file mode 100644 index 00000000..3b3e8ad3 --- /dev/null +++ b/.sqlx/query-f64d0b807474d82210944a5004eaf1656ed5ddfe52f1c0c3185b6d35112d5b39.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM project WHERE project_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "f64d0b807474d82210944a5004eaf1656ed5ddfe52f1c0c3185b6d35112d5b39" +} diff --git a/crates/iceberg-catalog-bin/src/main.rs b/crates/iceberg-catalog-bin/src/main.rs index fdde3e99..44b8acdb 100644 --- a/crates/iceberg-catalog-bin/src/main.rs +++ b/crates/iceberg-catalog-bin/src/main.rs @@ -121,7 +121,6 @@ async fn main() -> anyhow::Result<()> { // This embeds database migrations in the application binary so we can ensure the database // is migrated correctly on startup - iceberg_catalog::implementations::postgres::migrate(&write_pool).await?; println!("Database migration complete."); } diff --git a/crates/iceberg-catalog/migrations/20241006083546_projects-table.sql b/crates/iceberg-catalog/migrations/20241006083546_projects-table.sql new file mode 100644 index 00000000..6c319d9e --- /dev/null +++ b/crates/iceberg-catalog/migrations/20241006083546_projects-table.sql @@ -0,0 +1,16 @@ +-- Factors project into a separate table. +-- Previously projects where nameless and only existed as part of a warehouse. +create table project +( + project_id uuid primary key, + project_name text not null +); + +call add_time_columns('project'); +select trigger_updated_at('"project"'); + +INSERT INTO project (project_id, project_name) +SELECT warehouse.project_id, 'Unnamed Project' FROM warehouse; + +alter table warehouse + add constraint warehouse_project_id_fk foreign key (project_id) references project (project_id) on update cascade; diff --git a/crates/iceberg-catalog/src/api/management/mod.rs b/crates/iceberg-catalog/src/api/management/mod.rs index 3bfe25f0..dcc31b46 100644 --- a/crates/iceberg-catalog/src/api/management/mod.rs +++ b/crates/iceberg-catalog/src/api/management/mod.rs @@ -1,4 +1,5 @@ pub mod v1 { + pub mod project; pub mod warehouse; use axum::{Extension, Json, Router}; use utoipa::OpenApi; @@ -14,13 +15,17 @@ pub mod v1 { use crate::service::{storage::S3Flavor, Catalog, SecretStore, State}; use axum::extract::{Path, Query, State as AxumState}; use axum::routing::{get, post}; + use project::{ + CreateProjectRequest, CreateProjectResponse, GetProjectResponse, ListProjectsResponse, + RenameProjectRequest, Service as _, + }; use serde::Serialize; use warehouse::{ AzCredential, AzdlsProfile, CreateWarehouseRequest, CreateWarehouseResponse, GcsCredential, - GcsProfile, GcsServiceKey, GetWarehouseResponse, ListProjectsResponse, - ListWarehousesRequest, ListWarehousesResponse, ProjectResponse, RenameWarehouseRequest, - S3Credential, S3Profile, Service, StorageCredential, StorageProfile, TabularDeleteProfile, - UpdateWarehouseCredentialRequest, UpdateWarehouseStorageRequest, WarehouseStatus, + GcsProfile, GcsServiceKey, GetWarehouseResponse, ListWarehousesRequest, + ListWarehousesResponse, RenameWarehouseRequest, S3Credential, S3Profile, Service as _, + StorageCredential, StorageProfile, TabularDeleteProfile, UpdateWarehouseCredentialRequest, + UpdateWarehouseStorageRequest, WarehouseStatus, }; #[derive(Debug, OpenApi)] @@ -30,44 +35,50 @@ pub mod v1 { ), paths( activate_warehouse, + create_project, create_warehouse, deactivate_warehouse, + delete_project, delete_warehouse, + get_project, get_warehouse, + list_deleted_tabulars, list_projects, list_warehouses, + rename_project, rename_warehouse, update_storage_credential, update_storage_profile, - list_deleted_tabulars ), components(schemas( AzCredential, AzdlsProfile, + CreateProjectRequest, + CreateProjectResponse, CreateWarehouseRequest, CreateWarehouseResponse, + DeleteKind, + DeletedTabularResponse, GcsCredential, GcsProfile, GcsServiceKey, GetWarehouseResponse, + ListDeletedTabularsResponse, ListProjectsResponse, ListWarehousesRequest, ListWarehousesResponse, - ProjectResponse, + GetProjectResponse, RenameWarehouseRequest, S3Credential, - S3Profile, S3Flavor, + S3Profile, StorageCredential, StorageProfile, + TabularDeleteProfile, + TabularType, UpdateWarehouseCredentialRequest, UpdateWarehouseStorageRequest, WarehouseStatus, - ListDeletedTabularsResponse, - DeletedTabularResponse, - TabularType, - DeleteKind, - TabularDeleteProfile, )) )] pub struct ManagementApiDoc; @@ -102,7 +113,7 @@ pub mod v1 { ApiServer::::create_warehouse(request, api_context, metadata).await } - /// List all existing projects + /// List all projects the requesting user has access to #[utoipa::path( get, tag = "management", @@ -118,6 +129,78 @@ pub mod v1 { ApiServer::::list_projects(api_context, metadata).await } + /// Create a new project + #[utoipa::path( + post, + tag = "management", + path = "/management/v1/project", + responses( + (status = 201, description = "Project created successfully", body = [ProjectResponse]) + ) + )] + async fn create_project( + AxumState(api_context): AxumState>>, + Extension(metadata): Extension, + Json(request): Json, + ) -> Result { + ApiServer::::create_project(request, api_context, metadata).await + } + + /// Get a Project by ID + #[utoipa::path( + get, + tag = "management", + path = "/management/v1/project/{project_id}", + responses( + (status = 200, description = "Project details", body = [GetProjectResponse]) + ) + )] + async fn get_project( + Path(project_id): Path, + AxumState(api_context): AxumState>>, + Extension(metadata): Extension, + ) -> Result { + ApiServer::::get_project(project_id, api_context, metadata).await + } + + /// Delete a project by ID + /// + /// No warehouses must be present in the project to delete it. + #[utoipa::path( + delete, + tag = "management", + path = "/management/v1/project/{project_id}", + responses( + (status = 200, description = "Project deleted successfully") + ) + )] + async fn delete_project( + Path(project_id): Path, + AxumState(api_context): AxumState>>, + Extension(metadata): Extension, + ) -> Result<()> { + ApiServer::::delete_project(project_id, api_context, metadata).await + } + + /// Rename a project + #[utoipa::path( + post, + tag = "management", + path = "/management/v1/project/{project_id}/rename", + responses( + (status = 200, description = "Project renamed successfully") + ) + )] + async fn rename_project( + Path(project_id): Path, + AxumState(api_context): AxumState>>, + Extension(metadata): Extension, + Json(request): Json, + ) -> Result<()> { + ApiServer::::rename_project(project_id.into(), request, api_context, metadata) + .await + } + /// List all warehouses in a project /// /// By default, this endpoint does not return deactivated warehouses. @@ -349,6 +432,13 @@ pub mod v1 { impl ApiServer { pub fn new_v1_router() -> Router>> { Router::new() + // Create a new project + .route("/project", post(create_project)) + .route( + "/project/:project_id", + get(get_project).delete(delete_project), + ) + .route("/project/:project_id/rename", post(rename_project)) // Create a new warehouse .route("/warehouse", post(create_warehouse)) // List all projects diff --git a/crates/iceberg-catalog/src/api/management/v1/project.rs b/crates/iceberg-catalog/src/api/management/v1/project.rs new file mode 100644 index 00000000..10512b2e --- /dev/null +++ b/crates/iceberg-catalog/src/api/management/v1/project.rs @@ -0,0 +1,202 @@ +use crate::api::management::v1::ApiServer; +use crate::api::{ApiContext, Result}; +use crate::request_metadata::RequestMetadata; +pub use crate::service::storage::{ + AzCredential, AzdlsProfile, GcsCredential, GcsProfile, GcsServiceKey, S3Credential, S3Profile, + StorageCredential, StorageProfile, +}; + +pub use crate::service::WarehouseStatus; +use crate::service::{auth::AuthZHandler, secrets::SecretStore, Catalog, State, Transaction}; +use crate::ProjectIdent; +use iceberg_ext::catalog::rest::ErrorModel; +use utoipa::ToSchema; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] +pub struct GetProjectResponse { + /// ID of the project. + pub project_id: uuid::Uuid, + /// Name of the project + pub project_name: String, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] +#[serde(rename_all = "kebab-case")] +pub struct RenameProjectRequest { + /// New name for the project. + pub new_name: String, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] +#[serde(rename_all = "kebab-case")] +pub struct ListProjectsResponse { + /// List of projects + pub projects: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] +#[serde(rename_all = "kebab-case")] +pub struct CreateProjectRequest { + /// Name of the project to create. + pub project_name: String, + /// Request a specific project ID - optional. + /// If not provided, a new project ID will be generated (recommended). + pub project_id: Option, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] +#[serde(rename_all = "kebab-case")] +pub struct CreateProjectResponse { + /// ID of the created project. + pub project_id: uuid::Uuid, +} + +impl axum::response::IntoResponse for CreateProjectResponse { + fn into_response(self) -> axum::http::Response { + (http::StatusCode::CREATED, axum::Json(self)).into_response() + } +} + +impl axum::response::IntoResponse for GetProjectResponse { + fn into_response(self) -> axum::http::Response { + axum::Json(self).into_response() + } +} + +impl Service for ApiServer {} + +#[async_trait::async_trait] +pub(super) trait Service { + async fn create_project( + request: CreateProjectRequest, + context: ApiContext>, + _request_metadata: RequestMetadata, + ) -> Result { + // ------------------- AuthZ ------------------- + // Todo: AuthZ + + // ------------------- Business Logic ------------------- + let CreateProjectRequest { + project_name, + project_id, + } = request; + validate_project_name(&project_name)?; + let mut t = C::Transaction::begin_write(context.v1_state.catalog).await?; + let project_id: ProjectIdent = project_id.unwrap_or(uuid::Uuid::now_v7()).into(); + C::create_project(project_id, project_name, t.transaction()).await?; + t.commit().await?; + + Ok(CreateProjectResponse { + project_id: *project_id, + }) + } + + async fn rename_project( + warehouse_id: ProjectIdent, + request: RenameProjectRequest, + context: ApiContext>, + _request_metadata: RequestMetadata, + ) -> Result<()> { + // ------------------- AuthZ ------------------- + // ToDo AuthZ + + // ------------------- Business Logic ------------------- + validate_project_name(&request.new_name)?; + let mut transaction = C::Transaction::begin_write(context.v1_state.catalog).await?; + C::rename_project(warehouse_id, &request.new_name, transaction.transaction()).await?; + transaction.commit().await?; + + Ok(()) + } + + async fn get_project( + project_id: uuid::Uuid, + context: ApiContext>, + _request_metadata: RequestMetadata, + ) -> Result { + // ------------------- AuthZ ------------------- + // Todo: AuthZ + + // ------------------- Business Logic ------------------- + let mut t = C::Transaction::begin_read(context.v1_state.catalog).await?; + let project = C::get_project(project_id.into(), t.transaction()) + .await? + .ok_or(ErrorModel::not_found( + format!("Project with id {project_id} not found."), + "ProjectNotFound", + None, + ))?; + + Ok(GetProjectResponse { + project_id, + project_name: project.name, + }) + } + + async fn delete_project( + project_id: uuid::Uuid, + context: ApiContext>, + _request_metadata: RequestMetadata, + ) -> Result<()> { + // ------------------- AuthZ ------------------- + // Todo: AuthZ + + // ------------------- Business Logic ------------------- + let mut transaction = C::Transaction::begin_write(context.v1_state.catalog).await?; + + C::delete_project(project_id.into(), transaction.transaction()).await?; + + transaction.commit().await?; + + Ok(()) + } + + async fn list_projects( + context: ApiContext>, + request_metadata: RequestMetadata, + ) -> Result { + // ------------------- AuthZ ------------------- + let projects = A::check_list_projects(&request_metadata, context.v1_state.auth).await?; + + // ------------------- Business Logic ------------------- + + let projects = C::list_projects(projects, context.v1_state.catalog).await?; + + Ok(ListProjectsResponse { + projects: projects + .into_iter() + .map(|r| GetProjectResponse { + project_id: *r.project_id, + project_name: r.name, + }) + .collect(), + }) + } +} + +impl axum::response::IntoResponse for ListProjectsResponse { + fn into_response(self) -> axum::http::Response { + axum::Json(self).into_response() + } +} + +fn validate_project_name(project_name: &str) -> Result<()> { + if project_name.is_empty() { + return Err(ErrorModel::bad_request( + "Project name cannot be empty", + "EmptyProjectName", + None, + ) + .into()); + } + + if project_name.len() > 128 { + return Err(ErrorModel::bad_request( + "Project name must be shorter than 128 chars", + "ProjectNameTooLong", + None, + ) + .into()); + } + Ok(()) +} diff --git a/crates/iceberg-catalog/src/api/management/v1/warehouse.rs b/crates/iceberg-catalog/src/api/management/v1/warehouse.rs index af947e12..fd5e0e94 100644 --- a/crates/iceberg-catalog/src/api/management/v1/warehouse.rs +++ b/crates/iceberg-catalog/src/api/management/v1/warehouse.rs @@ -108,17 +108,11 @@ pub struct RenameWarehouseRequest { pub new_name: String, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] -pub struct ProjectResponse { - /// ID of the project. - pub project_id: uuid::Uuid, -} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(rename_all = "kebab-case")] -pub struct ListProjectsResponse { - /// List of projects - pub projects: Vec, +pub struct RenameProjectRequest { + /// New name for the project. + pub new_name: String, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] @@ -160,8 +154,7 @@ impl axum::response::IntoResponse for CreateWarehouseResponse { impl Service for ApiServer {} #[async_trait::async_trait] - -pub trait Service { +pub(super) trait Service { async fn create_warehouse( request: CreateWarehouseRequest, context: ApiContext>, @@ -180,6 +173,7 @@ pub trait Service { A::check_create_warehouse(&request_metadata, &project_ident, context.v1_state.auth).await?; // ------------------- Business Logic ------------------- + validate_warehouse_name(&warehouse_name)?; storage_profile.normalize()?; storage_profile .validate_access(storage_credential.as_ref(), None) @@ -215,36 +209,6 @@ pub trait Service { }) } - async fn list_projects( - context: ApiContext>, - request_metadata: RequestMetadata, - ) -> Result { - // ------------------- AuthZ ------------------- - let projects = A::check_list_projects(&request_metadata, context.v1_state.auth).await?; - - // ------------------- Business Logic ------------------- - if let Some(projects) = projects { - return Ok(ListProjectsResponse { - projects: projects - .into_iter() - .map(|project_id| ProjectResponse { - project_id: *project_id, - }) - .collect(), - }); - } - - let projects = C::list_projects(context.v1_state.catalog).await?; - Ok(ListProjectsResponse { - projects: projects - .into_iter() - .map(|project_id| ProjectResponse { - project_id: *project_id, - }) - .collect(), - }) - } - async fn list_warehouses( request: ListWarehousesRequest, context: ApiContext>, @@ -294,7 +258,7 @@ pub trait Service { // ------------------- Business Logic ------------------- let mut transaction = C::Transaction::begin_read(context.v1_state.catalog).await?; - let warehouses = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouses = C::require_warehouse(warehouse_id, transaction.transaction()).await?; Ok(warehouses.into()) } @@ -326,6 +290,7 @@ pub trait Service { A::check_rename_warehouse(&request_metadata, warehouse_id, context.v1_state.auth).await?; // ------------------- Business Logic ------------------- + validate_warehouse_name(&request.new_name)?; let mut transaction = C::Transaction::begin_write(context.v1_state.catalog).await?; C::rename_warehouse(warehouse_id, &request.new_name, transaction.transaction()).await?; @@ -403,7 +368,7 @@ pub trait Service { .await?; let mut transaction = C::Transaction::begin_write(context.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, transaction.transaction()).await?; warehouse .storage_profile .can_be_updated_with(&storage_profile)?; @@ -462,7 +427,7 @@ pub trait Service { } = request; let mut transaction = C::Transaction::begin_write(context.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, transaction.transaction()).await?; let old_secret_id = warehouse.storage_secret_id; let storage_profile = warehouse.storage_profile; @@ -558,12 +523,6 @@ pub trait Service { } } -impl axum::response::IntoResponse for ListProjectsResponse { - fn into_response(self) -> axum::http::Response { - axum::Json(self).into_response() - } -} - impl axum::response::IntoResponse for ListWarehousesResponse { fn into_response(self) -> axum::http::Response { axum::Json(self).into_response() @@ -588,6 +547,27 @@ impl From for GetWarehouseResponse { } } +fn validate_warehouse_name(warehouse_name: &str) -> Result<()> { + if warehouse_name.is_empty() { + return Err(ErrorModel::bad_request( + "Warehouse name cannot be empty", + "EmptyWarehouseName", + None, + ) + .into()); + } + + if warehouse_name.len() > 128 { + return Err(ErrorModel::bad_request( + "Warehouse must be shorter than 128 chars", + "WarehouseNameTooLong", + None, + ) + .into()); + } + Ok(()) +} + #[cfg(test)] mod test { #[test] diff --git a/crates/iceberg-catalog/src/catalog/config.rs b/crates/iceberg-catalog/src/catalog/config.rs index 524c87c3..27197fd3 100644 --- a/crates/iceberg-catalog/src/catalog/config.rs +++ b/crates/iceberg-catalog/src/catalog/config.rs @@ -86,7 +86,7 @@ impl< })?; let warehouse_id = if let Some(warehouse_from_arg) = warehouse_from_arg { - C::get_warehouse_by_name( + C::require_warehouse_by_name( &warehouse_from_arg, project_id, api_context.v1_state.catalog.clone(), @@ -112,7 +112,7 @@ impl< .await?; // Get config from DB and new token from AuthHandler simultaneously - let config = C::get_config_for_warehouse(warehouse_id, api_context.v1_state.catalog); + let config = C::require_config_for_warehouse(warehouse_id, api_context.v1_state.catalog); // Give the auth-handler a chance to exchange / enrich the token let new_token = T::exchange_token_for_warehouse( diff --git a/crates/iceberg-catalog/src/catalog/namespace.rs b/crates/iceberg-catalog/src/catalog/namespace.rs index 4f59c218..65db468f 100644 --- a/crates/iceberg-catalog/src/catalog/namespace.rs +++ b/crates/iceberg-catalog/src/catalog/namespace.rs @@ -98,7 +98,7 @@ impl // Set location if not specified - validate location if specified let mut t = C::Transaction::begin_write(state.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, t.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, t.transaction()).await?; let mut namespace_props = NamespaceProperties::try_from_maybe_props(properties.clone()) .map_err(|e| ErrorModel::bad_request(e.to_string(), e.err_type(), None))?; diff --git a/crates/iceberg-catalog/src/catalog/tables.rs b/crates/iceberg-catalog/src/catalog/tables.rs index d217c0b8..68479976 100644 --- a/crates/iceberg-catalog/src/catalog/tables.rs +++ b/crates/iceberg-catalog/src/catalog/tables.rs @@ -127,7 +127,7 @@ impl let mut t = C::Transaction::begin_write(state.v1_state.catalog).await?; let namespace = C::get_namespace(warehouse_id, &namespace, t.transaction()).await?; - let warehouse = C::get_warehouse(warehouse_id, t.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, t.transaction()).await?; let storage_profile = &warehouse.storage_profile; require_active_warehouse(warehouse.status)?; @@ -445,7 +445,7 @@ impl ) .await?; let previous_table = remove_table(&table_id, &table_ident, &mut previous_table)?; - let warehouse = C::get_warehouse(warehouse_id, t.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, t.transaction()).await?; // Contract verification state @@ -590,7 +590,7 @@ impl let purge = purge_requested.unwrap_or(false); let mut transaction = C::Transaction::begin_write(state.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, transaction.transaction()).await?; let table_id = table_id.ok_or_else(|| { ErrorModel::not_found( @@ -922,7 +922,7 @@ impl let table_ids = require_table_ids(table_ids)?; let mut transaction = C::Transaction::begin_write(state.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, transaction.transaction()).await?; // Store data for events before it is moved let mut events = vec![]; diff --git a/crates/iceberg-catalog/src/catalog/views.rs b/crates/iceberg-catalog/src/catalog/views.rs index 796dade7..564d6070 100644 --- a/crates/iceberg-catalog/src/catalog/views.rs +++ b/crates/iceberg-catalog/src/catalog/views.rs @@ -166,6 +166,7 @@ mod test { Some(StorageProfile::Test(TestProfile)), None, None, + true, ) .await; diff --git a/crates/iceberg-catalog/src/catalog/views/commit.rs b/crates/iceberg-catalog/src/catalog/views/commit.rs index b2ab5d6f..867cdb0d 100644 --- a/crates/iceberg-catalog/src/catalog/views/commit.rs +++ b/crates/iceberg-catalog/src/catalog/views/commit.rs @@ -96,7 +96,7 @@ pub(crate) async fn commit_view( storage_secret_id, status, tabular_delete_profile: _, - } = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + } = C::require_warehouse(warehouse_id, transaction.transaction()).await?; require_active_warehouse(status)?; check_asserts(requirements, view_id)?; diff --git a/crates/iceberg-catalog/src/catalog/views/create.rs b/crates/iceberg-catalog/src/catalog/views/create.rs index 97c5807d..425fff9d 100644 --- a/crates/iceberg-catalog/src/catalog/views/create.rs +++ b/crates/iceberg-catalog/src/catalog/views/create.rs @@ -69,7 +69,7 @@ pub(crate) async fn create_view( let mut t = C::Transaction::begin_write(state.v1_state.catalog.clone()).await?; let namespace = C::get_namespace(warehouse_id, &namespace, t.transaction()).await?; - let warehouse = C::get_warehouse(warehouse_id, t.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, t.transaction()).await?; let storage_profile = warehouse.storage_profile; require_active_warehouse(warehouse.status)?; diff --git a/crates/iceberg-catalog/src/catalog/views/drop.rs b/crates/iceberg-catalog/src/catalog/views/drop.rs index 7406ecdd..555f6971 100644 --- a/crates/iceberg-catalog/src/catalog/views/drop.rs +++ b/crates/iceberg-catalog/src/catalog/views/drop.rs @@ -56,7 +56,7 @@ pub(crate) async fn drop_view( let mut transaction = C::Transaction::begin_write(state.v1_state.catalog).await?; - let warehouse = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + let warehouse = C::require_warehouse(warehouse_id, transaction.transaction()).await?; state .v1_state diff --git a/crates/iceberg-catalog/src/catalog/views/load.rs b/crates/iceberg-catalog/src/catalog/views/load.rs index 5e8b21ca..8e9c7427 100644 --- a/crates/iceberg-catalog/src/catalog/views/load.rs +++ b/crates/iceberg-catalog/src/catalog/views/load.rs @@ -69,7 +69,7 @@ pub(crate) async fn load_view( storage_secret_id, status, tabular_delete_profile: _, - } = C::get_warehouse(warehouse_id, transaction.transaction()).await?; + } = C::require_warehouse(warehouse_id, transaction.transaction()).await?; require_active_warehouse(status)?; let ViewMetadataWithLocation { diff --git a/crates/iceberg-catalog/src/implementations/postgres/catalog.rs b/crates/iceberg-catalog/src/implementations/postgres/catalog.rs index f2a02464..0f6c01c3 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/catalog.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/catalog.rs @@ -9,8 +9,9 @@ use super::{ table_ident_to_id, table_idents_to_ids, }, warehouse::{ - create_warehouse, delete_warehouse, get_warehouse, list_projects, list_warehouses, - rename_warehouse, set_warehouse_status, update_storage_profile, + create_project, create_warehouse, delete_project, delete_warehouse, get_project, + get_warehouse, list_projects, list_warehouses, rename_project, rename_warehouse, + set_warehouse_status, update_storage_profile, }, CatalogState, PostgresTransaction, }; @@ -21,9 +22,9 @@ use crate::implementations::postgres::tabular::view::{ use crate::implementations::postgres::tabular::{list_tabulars, mark_tabular_as_deleted}; use crate::service::tabular_idents::{TabularIdentOwned, TabularIdentUuid}; use crate::service::{ - CreateNamespaceRequest, CreateNamespaceResponse, DeletionDetails, GetWarehouseResponse, - ListFlags, ListNamespacesQuery, ListNamespacesResponse, NamespaceIdent, Result, TableCreation, - TableIdent, WarehouseStatus, + CreateNamespaceRequest, CreateNamespaceResponse, DeletionDetails, GetProjectResponse, + GetWarehouseResponse, ListFlags, ListNamespacesQuery, ListNamespacesResponse, NamespaceIdent, + Result, TableCreation, TableIdent, WarehouseStatus, }; use crate::{ api::iceberg::v1::{PaginatedTabulars, PaginationQuery}, @@ -46,40 +47,6 @@ impl Catalog for super::PostgresCatalog { type Transaction = PostgresTransaction; type State = CatalogState; - async fn create_warehouse<'a>( - warehouse_name: String, - project_id: ProjectIdent, - storage_profile: StorageProfile, - tabular_delete_profile: TabularDeleteProfile, - storage_secret_id: Option, - transaction: >::Transaction<'a>, - ) -> Result { - create_warehouse( - warehouse_name, - project_id, - storage_profile, - tabular_delete_profile, - storage_secret_id, - transaction, - ) - .await - } - - async fn get_warehouse<'a>( - warehouse_id: WarehouseIdent, - transaction: >::Transaction<'a>, - ) -> Result { - get_warehouse(warehouse_id, transaction).await - } - - async fn get_namespace<'a>( - warehouse_id: WarehouseIdent, - namespace: &NamespaceIdent, - transaction: >::Transaction<'a>, - ) -> Result { - get_namespace(warehouse_id, namespace, transaction).await - } - async fn list_namespaces( warehouse_id: WarehouseIdent, query: &ListNamespacesQuery, @@ -97,6 +64,14 @@ impl Catalog for super::PostgresCatalog { create_namespace(warehouse_id, namespace_id, request, transaction).await } + async fn get_namespace<'a>( + warehouse_id: WarehouseIdent, + namespace: &NamespaceIdent, + transaction: >::Transaction<'a>, + ) -> Result { + get_namespace(warehouse_id, namespace, transaction).await + } + async fn namespace_ident_to_id( warehouse_id: WarehouseIdent, namespace: &NamespaceIdent, @@ -146,6 +121,24 @@ impl Catalog for super::PostgresCatalog { .await } + async fn table_ident_to_id( + warehouse_id: WarehouseIdent, + table: &TableIdent, + list_flags: crate::service::ListFlags, + catalog_state: Self::State, + ) -> Result> { + table_ident_to_id(warehouse_id, table, list_flags, &catalog_state.read_pool()).await + } + + async fn table_idents_to_ids( + warehouse_id: WarehouseIdent, + tables: HashSet<&TableIdent>, + list_flags: crate::service::ListFlags, + catalog_state: Self::State, + ) -> Result>> { + table_idents_to_ids(warehouse_id, tables, list_flags, &catalog_state.read_pool()).await + } + // Should also load staged tables but not tables of inactive warehouses async fn load_tables<'a>( warehouse_id: WarehouseIdent, @@ -174,15 +167,6 @@ impl Catalog for super::PostgresCatalog { get_table_metadata_by_s3_location(warehouse_id, location, list_flags, catalog_state).await } - async fn table_ident_to_id( - warehouse_id: WarehouseIdent, - table: &TableIdent, - list_flags: crate::service::ListFlags, - catalog_state: Self::State, - ) -> Result> { - table_ident_to_id(warehouse_id, table, list_flags, &catalog_state.read_pool()).await - } - async fn rename_table<'a>( warehouse_id: WarehouseIdent, source_id: TableIdentUuid, @@ -200,13 +184,11 @@ impl Catalog for super::PostgresCatalog { drop_table(table_id, transaction).await } - async fn table_idents_to_ids( - warehouse_id: WarehouseIdent, - tables: HashSet<&TableIdent>, - list_flags: crate::service::ListFlags, - catalog_state: Self::State, - ) -> Result>> { - table_idents_to_ids(warehouse_id, tables, list_flags, &catalog_state.read_pool()).await + async fn mark_tabular_as_deleted( + table_id: TabularIdentUuid, + transaction: >::Transaction<'_>, + ) -> Result<()> { + mark_tabular_as_deleted(table_id, transaction).await } async fn commit_table_transaction<'a>( @@ -217,9 +199,63 @@ impl Catalog for super::PostgresCatalog { commit_table_transaction(warehouse_id, commits, transaction).await } + async fn create_warehouse<'a>( + warehouse_name: String, + project_id: ProjectIdent, + storage_profile: StorageProfile, + tabular_delete_profile: TabularDeleteProfile, + storage_secret_id: Option, + transaction: >::Transaction<'a>, + ) -> Result { + create_warehouse( + warehouse_name, + project_id, + storage_profile, + tabular_delete_profile, + storage_secret_id, + transaction, + ) + .await + } + // ---------------- Management API ---------------- - async fn list_projects(catalog_state: Self::State) -> Result> { - list_projects(catalog_state).await + async fn create_project<'a>( + project_id: ProjectIdent, + project_name: String, + transaction: >::Transaction<'a>, + ) -> Result<()> { + create_project(project_id, project_name, transaction).await + } + + /// Delete a project + async fn delete_project<'a>( + project_id: ProjectIdent, + transaction: >::Transaction<'a>, + ) -> Result<()> { + delete_project(project_id, transaction).await + } + + /// Get the project metadata + async fn get_project<'a>( + project_id: ProjectIdent, + transaction: >::Transaction<'a>, + ) -> Result> { + get_project(project_id, transaction).await + } + + async fn list_projects( + project_ids: Option>, + catalog_state: Self::State, + ) -> Result> { + list_projects(project_ids, catalog_state).await + } + + async fn rename_project<'a>( + project_id: ProjectIdent, + new_name: &str, + transaction: >::Transaction<'a>, + ) -> Result<()> { + rename_project(project_id, new_name, transaction).await } async fn list_warehouses( @@ -237,6 +273,13 @@ impl Catalog for super::PostgresCatalog { .await } + async fn get_warehouse<'a>( + warehouse_id: WarehouseIdent, + transaction: >::Transaction<'a>, + ) -> Result> { + get_warehouse(warehouse_id, transaction).await + } + async fn delete_warehouse<'a>( warehouse_id: WarehouseIdent, transaction: >::Transaction<'a>, @@ -275,6 +318,14 @@ impl Catalog for super::PostgresCatalog { .await } + async fn view_ident_to_id( + warehouse_id: WarehouseIdent, + view: &TableIdent, + catalog_state: Self::State, + ) -> Result> { + view_ident_to_id(warehouse_id, view, false, &catalog_state.read_pool()).await + } + async fn create_view<'a>( namespace_id: NamespaceIdentUuid, view: &TableIdent, @@ -294,14 +345,6 @@ impl Catalog for super::PostgresCatalog { .await } - async fn view_ident_to_id( - warehouse_id: WarehouseIdent, - view: &TableIdent, - catalog_state: Self::State, - ) -> Result> { - view_ident_to_id(warehouse_id, view, false, &catalog_state.read_pool()).await - } - async fn load_view<'a>( view_id: TableIdentUuid, include_deleted: bool, @@ -348,6 +391,13 @@ impl Catalog for super::PostgresCatalog { .await } + async fn drop_view<'a>( + table_id: TableIdentUuid, + transaction: >::Transaction<'a>, + ) -> Result { + drop_view(table_id, transaction).await + } + async fn rename_view( warehouse_id: WarehouseIdent, source_id: TableIdentUuid, @@ -358,13 +408,6 @@ impl Catalog for super::PostgresCatalog { rename_view(warehouse_id, source_id, source, destination, transaction).await } - async fn drop_view<'a>( - table_id: TableIdentUuid, - transaction: >::Transaction<'a>, - ) -> Result { - drop_view(table_id, transaction).await - } - async fn list_tabulars( warehouse_id: WarehouseIdent, list_flags: ListFlags, @@ -382,11 +425,4 @@ impl Catalog for super::PostgresCatalog { ) .await } - - async fn mark_tabular_as_deleted( - table_id: TabularIdentUuid, - transaction: >::Transaction<'_>, - ) -> Result<()> { - mark_tabular_as_deleted(table_id, transaction).await - } } diff --git a/crates/iceberg-catalog/src/implementations/postgres/namespace.rs b/crates/iceberg-catalog/src/implementations/postgres/namespace.rs index 9d88d8c6..6fc44235 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/namespace.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/namespace.rs @@ -421,7 +421,7 @@ pub(crate) mod tests { async fn test_namespace_lifecycle(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["test".to_string()]).unwrap(); let properties = Some(HashMap::from_iter(vec![ @@ -512,7 +512,7 @@ pub(crate) mod tests { async fn test_pagination(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["test".to_string()]).unwrap(); let properties = Some(HashMap::from_iter(vec![ ("key1".to_string(), "value1".to_string()), @@ -607,7 +607,7 @@ pub(crate) mod tests { async fn test_cannot_drop_nonempty_namespace(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let staged = false; let table = initialize_table(warehouse_id, state.clone(), staged, None, None).await; @@ -625,7 +625,7 @@ pub(crate) mod tests { async fn test_case_insensitive_but_preserve_case(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace_1 = NamespaceIdent::from_vec(vec!["Test".to_string()]).unwrap(); let namespace_2 = NamespaceIdent::from_vec(vec!["test".to_string()]).unwrap(); diff --git a/crates/iceberg-catalog/src/implementations/postgres/tabular/table.rs b/crates/iceberg-catalog/src/implementations/postgres/tabular/table.rs index 2daeacef..e80c7d08 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/tabular/table.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/tabular/table.rs @@ -752,7 +752,7 @@ pub(crate) mod tests { async fn test_final_create(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let namespace_id = get_namespace_id(state.clone(), warehouse_id, &namespace).await; @@ -824,7 +824,7 @@ pub(crate) mod tests { async fn test_stage_create(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let namespace_id = get_namespace_id(state.clone(), warehouse_id, &namespace).await; @@ -931,7 +931,7 @@ pub(crate) mod tests { async fn test_to_id(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; @@ -982,7 +982,7 @@ pub(crate) mod tests { async fn test_to_ids(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; @@ -1079,7 +1079,7 @@ pub(crate) mod tests { async fn test_rename_without_namespace(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table = initialize_table(warehouse_id, state.clone(), false, None, None).await; let new_table_ident = TableIdent { @@ -1125,7 +1125,7 @@ pub(crate) mod tests { async fn test_rename_with_namespace(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table = initialize_table(warehouse_id, state.clone(), false, None, None).await; let new_namespace = NamespaceIdent::from_vec(vec!["new_namespace".to_string()]).unwrap(); @@ -1173,7 +1173,7 @@ pub(crate) mod tests { async fn test_list_tables(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let tables = list_tables( @@ -1232,7 +1232,7 @@ pub(crate) mod tests { async fn test_list_tables_pagination(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let tables = list_tables( @@ -1332,7 +1332,7 @@ pub(crate) mod tests { async fn test_commit_transaction(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table1 = initialize_table(warehouse_id, state.clone(), true, None, None).await; let table2 = initialize_table(warehouse_id, state.clone(), false, None, None).await; let _ = initialize_table(warehouse_id, state.clone(), false, None, None).await; @@ -1422,7 +1422,7 @@ pub(crate) mod tests { async fn test_get_id_by_location(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table = initialize_table(warehouse_id, state.clone(), false, None, None).await; let metadata = get_table_metadata_by_id( @@ -1504,7 +1504,7 @@ pub(crate) mod tests { async fn test_cannot_get_table_of_inactive_warehouse(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table = initialize_table(warehouse_id, state.clone(), false, None, None).await; let mut transaction = pool.begin().await.unwrap(); set_warehouse_status(warehouse_id, WarehouseStatus::Inactive, &mut transaction) @@ -1527,7 +1527,7 @@ pub(crate) mod tests { async fn test_drop_table_works(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let table = initialize_table(warehouse_id, state.clone(), false, None, None).await; let mut transaction = pool.begin().await.unwrap(); diff --git a/crates/iceberg-catalog/src/implementations/postgres/tabular/view.rs b/crates/iceberg-catalog/src/implementations/postgres/tabular/view.rs index a9aa3464..a2c2f305 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/tabular/view.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/tabular/view.rs @@ -654,7 +654,7 @@ pub(crate) mod tests { #[sqlx::test] async fn create_view(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let namespace_id = @@ -858,7 +858,7 @@ pub(crate) mod tests { String, ) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let namespace = NamespaceIdent::from_vec(vec!["my_namespace".to_string()]).unwrap(); initialize_namespace(state.clone(), warehouse_id, &namespace, None).await; let namespace_id = diff --git a/crates/iceberg-catalog/src/implementations/postgres/warehouse.rs b/crates/iceberg-catalog/src/implementations/postgres/warehouse.rs index e165885e..2a113fc2 100644 --- a/crates/iceberg-catalog/src/implementations/postgres/warehouse.rs +++ b/crates/iceberg-catalog/src/implementations/postgres/warehouse.rs @@ -1,7 +1,7 @@ use super::dbutils::DBErrorHandler as _; use crate::api::{CatalogConfig, ErrorModel, Result}; use crate::service::config::ConfigProvider; -use crate::service::{GetWarehouseResponse, WarehouseStatus}; +use crate::service::{GetProjectResponse, GetWarehouseResponse, WarehouseStatus}; use crate::{service::storage::StorageProfile, ProjectIdent, SecretIdent, WarehouseIdent}; use http::StatusCode; use sqlx::Error; @@ -18,44 +18,46 @@ impl ConfigProvider for super::PostgresCatalog { warehouse_name: &str, project_id: ProjectIdent, catalog_state: CatalogState, - ) -> Result { - let warehouse_id = sqlx::query_scalar!( - r#" + ) -> Result> { + let warehouse_id = row_not_found_to_option( + sqlx::query_scalar!( + r#" SELECT warehouse_id FROM warehouse WHERE warehouse_name = $1 AND project_id = $2 AND status = 'active' "#, - warehouse_name.to_string(), - *project_id - ) - .fetch_one(&catalog_state.read_pool()) - .await - .map_err(map_select_warehouse_err)?; - - Ok(warehouse_id.into()) + warehouse_name.to_string(), + *project_id + ) + .fetch_one(&catalog_state.read_pool()) + .await, + )?; + + Ok(warehouse_id.map(Into::into)) } async fn get_config_for_warehouse( warehouse_id: WarehouseIdent, catalog_state: CatalogState, - ) -> Result { - let storage_profile = sqlx::query_scalar!( - r#" + ) -> Result> { + let storage_profile = row_not_found_to_option( + sqlx::query_scalar!( + r#" SELECT storage_profile as "storage_profile: Json" FROM warehouse WHERE warehouse_id = $1 AND status = 'active' "#, - *warehouse_id - ) - .fetch_one(&catalog_state.read_pool()) - .await - .map_err(map_select_warehouse_err)?; + *warehouse_id + ) + .fetch_one(&catalog_state.read_pool()) + .await, + )?; - Ok(storage_profile.generate_catalog_config(warehouse_id)) + Ok(storage_profile.map(|p| p.generate_catalog_config(warehouse_id))) } } @@ -67,7 +69,6 @@ pub(crate) async fn create_warehouse<'a>( storage_secret_id: Option, transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result { - validate_warehouse_name(&warehouse_name)?; let storage_profile_ser = serde_json::to_value(storage_profile).map_err(|e| { ErrorModel::internal( "Error serializing storage profile", @@ -98,10 +99,11 @@ pub(crate) async fn create_warehouse<'a>( .await .map_err(|e| match &e { sqlx::Error::Database(db_err) => match db_err.constraint() { - // ToDo: Get constarint name from const + // ToDo: Get constraint name from const Some("unique_warehouse_name_in_project") => ErrorModel::conflict("Warehouse with this name already exists in the project.", "WarehouseNameAlreadyExists", Some(Box::new(e))) , + Some("warehouse_project_id_fk") => ErrorModel::not_found("Project not found", "ProjectNotFound", Some(Box::new(e))), _ => e.into_error_model("Error creating Warehouse".into()), }, _ => e.into_error_model("Error creating Warehouse".into()), @@ -110,6 +112,111 @@ pub(crate) async fn create_warehouse<'a>( Ok(warehouse_id.into()) } +pub(crate) async fn rename_project<'a>( + project_id: ProjectIdent, + new_name: &str, + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> Result<()> { + let row_count = sqlx::query!( + "UPDATE project + SET project_name = $1 + WHERE project_id = $2", + new_name, + *project_id + ) + .execute(&mut **transaction) + .await + .map_err(|e| e.into_error_model("Error renaming project".into()))? + .rows_affected(); + + if row_count == 0 { + return Err(ErrorModel::not_found("Project not found", "ProjectNotFound", None).into()); + } + + Ok(()) +} + +pub(crate) async fn create_project<'a>( + project_id: ProjectIdent, + project_name: String, + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> Result<()> { + let _project_id = sqlx::query_scalar!( + r#" + INSERT INTO project (project_name, project_id) + VALUES ($1, $2) + RETURNING project_id + "#, + project_name, + *project_id + ) + .fetch_one(&mut **transaction) + .await + .map_err(|e| e.into_error_model("Error creating Project".into()))?; + + Ok(()) +} + +pub(crate) async fn get_project<'a>( + project_id: ProjectIdent, + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> Result> { + let project = row_not_found_to_option( + sqlx::query!( + r#" + SELECT + project_name, + project_id + FROM project + WHERE project_id = $1 + "#, + *project_id + ) + .fetch_one(&mut **transaction) + .await, + )?; + + if let Some(project) = project { + Ok(Some(GetProjectResponse { + project_id: project.project_id.into(), + name: project.project_name, + })) + } else { + Ok(None) + } +} + +pub(crate) async fn delete_project<'a>( + project_id: ProjectIdent, + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> Result<()> { + let row_count = + sqlx::query_scalar!(r#"DELETE FROM project WHERE project_id = $1"#, *project_id) + .execute(&mut **transaction) + .await + .map_err(|e| match &e { + sqlx::Error::Database(db_error) => { + if db_error.is_foreign_key_violation() { + ErrorModel::conflict( + "Project is not empty", + "ProjectNotEmpty", + Some(Box::new(e)), + ) + } else { + e.into_error_model("Error deleting project".into()) + } + } + _ => e.into_error_model("Error deleting project".into()), + })? + .rows_affected(); + + if row_count == 0 { + return Err(ErrorModel::not_found("Project not found", "ProjectNotFound", None).into()); + } + + Ok(()) +} + pub(crate) async fn list_warehouses( project_id: ProjectIdent, include_status: Option>, @@ -213,9 +320,10 @@ pub(crate) async fn list_warehouses( pub(crate) async fn get_warehouse<'a>( warehouse_id: WarehouseIdent, transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, -) -> Result { - let warehouse = sqlx::query!( - r#" +) -> Result> { + let warehouse = row_not_found_to_option( + sqlx::query!( + r#" SELECT warehouse_name, project_id, @@ -227,47 +335,66 @@ pub(crate) async fn get_warehouse<'a>( FROM warehouse WHERE warehouse_id = $1 "#, - *warehouse_id - ) - .fetch_one(&mut **transaction) - .await - .map_err(map_select_warehouse_err)?; - - let tabular_delete_profile = match warehouse.tabular_delete_mode { - DbTabularDeleteProfile::Soft => TabularDeleteProfile::Soft { - expiration_seconds: chrono::Duration::seconds( - warehouse - .tabular_expiration_seconds - .ok_or(ErrorModel::internal( - "Tabular expiration seconds not found", - "TabularExpirationSecondsNotFound", - None, - ))?, - ), - }, - DbTabularDeleteProfile::Hard => TabularDeleteProfile::Hard {}, - }; - - Ok(GetWarehouseResponse { - id: warehouse_id, - name: warehouse.warehouse_name, - project_id: ProjectIdent::from(warehouse.project_id), - storage_profile: warehouse.storage_profile.deref().clone(), - storage_secret_id: warehouse.storage_secret_id.map(std::convert::Into::into), - status: warehouse.status, - tabular_delete_profile, - }) + *warehouse_id + ) + .fetch_one(&mut **transaction) + .await, + )?; + + if let Some(warehouse) = warehouse { + let tabular_delete_profile = match warehouse.tabular_delete_mode { + DbTabularDeleteProfile::Soft => TabularDeleteProfile::Soft { + expiration_seconds: chrono::Duration::seconds( + warehouse + .tabular_expiration_seconds + .ok_or(ErrorModel::internal( + "Tabular expiration seconds not found", + "TabularExpirationSecondsNotFound", + None, + ))?, + ), + }, + DbTabularDeleteProfile::Hard => TabularDeleteProfile::Hard {}, + }; + + Ok(Some(GetWarehouseResponse { + id: warehouse_id, + name: warehouse.warehouse_name, + project_id: ProjectIdent::from(warehouse.project_id), + storage_profile: warehouse.storage_profile.deref().clone(), + storage_secret_id: warehouse.storage_secret_id.map(std::convert::Into::into), + status: warehouse.status, + tabular_delete_profile, + })) + } else { + Ok(None) + } } -pub(crate) async fn list_projects(catalog_state: CatalogState) -> Result> { - let projects = sqlx::query!(r#"SELECT DISTINCT project_id FROM warehouse"#) - .fetch_all(&catalog_state.read_pool()) - .await - .map_err(|e| e.into_error_model("Error fetching projects".into()))?; +pub(crate) async fn list_projects( + project_ids: Option>, + catalog_state: CatalogState, +) -> Result> { + let return_all = project_ids.is_none(); + let projects = sqlx::query!( + r#" + SELECT project_id, project_name FROM project WHERE project_id = ANY($1) or $2 + "#, + project_ids + .map(|ids| ids.into_iter().map(|i| *i).collect::>()) + .unwrap_or_default() as Vec, + return_all + ) + .fetch_all(&catalog_state.read_pool()) + .await + .map_err(|e| e.into_error_model("Error fetching projects".into()))?; Ok(projects .into_iter() - .map(|project| ProjectIdent::from(project.project_id)) + .map(|project| GetProjectResponse { + project_id: ProjectIdent::from(project.project_id), + name: project.project_name, + }) .collect()) } @@ -309,8 +436,6 @@ pub(crate) async fn rename_warehouse<'a>( new_name: &str, transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<()> { - validate_warehouse_name(new_name)?; - let row_count = sqlx::query!( "UPDATE warehouse SET warehouse_name = $1 @@ -393,32 +518,18 @@ pub(crate) async fn update_storage_profile<'a>( Ok(()) } -fn map_select_warehouse_err(e: Error) -> ErrorModel { - match e { - sqlx::Error::RowNotFound => ErrorModel::not_found( - "Warehouse not found", - "WarehouseNotFound", - Some(Box::new(e)), - ), - _ => ErrorModel::internal( +fn row_not_found_to_option(t: Result) -> Result, ErrorModel> { + match t { + Ok(t) => Ok(Some(t)), + Err(Error::RowNotFound) => Ok(None), + Err(e) => Err(ErrorModel::internal( "Error fetching warehouse", "WarehouseFetchError", Some(Box::new(e)), - ), + )), } } -fn validate_warehouse_name(warehouse_name: &str) -> Result<()> { - if warehouse_name.is_empty() { - return Err(ErrorModel::bad_request( - "Warehouse name cannot be empty", - "EmptyWarehouseName", - None, - ) - .into()); - } - Ok(()) -} #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[sqlx(type_name = "tabular_delete_mode", rename_all = "kebab-case")] enum DbTabularDeleteProfile { @@ -449,14 +560,25 @@ pub(crate) mod test { storage_profile: Option, project_id: Option<&ProjectIdent>, secret_id: Option, + create_project: bool, ) -> crate::WarehouseIdent { let project_id = project_id.map_or( ProjectIdent::from(uuid::Uuid::nil()), std::borrow::ToOwned::to_owned, ); - let mut transaction = PostgresTransaction::begin_write(state.clone()) + let mut t = PostgresTransaction::begin_write(state.clone()) + .await + .unwrap(); + + if create_project { + PostgresCatalog::create_project( + project_id, + format!("Project {project_id}"), + t.transaction(), + ) .await .unwrap(); + } let storage_profile = storage_profile.unwrap_or(StorageProfile::S3(S3Profile { bucket: "test_bucket".to_string(), @@ -478,19 +600,19 @@ pub(crate) mod test { expiration_seconds: chrono::Duration::seconds(5), }, secret_id, - transaction.transaction(), + t.transaction(), ) .await .unwrap(); - transaction.commit().await.unwrap(); + t.commit().await.unwrap(); warehouse_id } #[sqlx::test] async fn test_get_warehouse_by_name(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); - let warehouse_id = initialize_warehouse(state.clone(), None, None, None).await; + let warehouse_id = initialize_warehouse(state.clone(), None, None, None, true).await; let fetched_warehouse_id = PostgresCatalog::get_warehouse_by_name( "test_warehouse", @@ -500,26 +622,49 @@ pub(crate) mod test { .await .unwrap(); - assert_eq!(warehouse_id, fetched_warehouse_id); + assert_eq!(Some(warehouse_id), fetched_warehouse_id); } #[sqlx::test] async fn test_list_projects(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); let project_id_1 = ProjectIdent::from(uuid::Uuid::new_v4()); - initialize_warehouse(state.clone(), None, Some(&project_id_1), None).await; + initialize_warehouse(state.clone(), None, Some(&project_id_1), None, true).await; - let projects = PostgresCatalog::list_projects(state.clone()).await.unwrap(); + let projects = PostgresCatalog::list_projects(None, state.clone()) + .await + .unwrap() + .into_iter() + .map(|p| p.project_id) + .collect::>(); assert_eq!(projects.len(), 1); assert!(projects.contains(&project_id_1)); let project_id_2 = ProjectIdent::from(uuid::Uuid::new_v4()); - initialize_warehouse(state.clone(), None, Some(&project_id_2), None).await; + initialize_warehouse(state.clone(), None, Some(&project_id_2), None, true).await; - let projects = PostgresCatalog::list_projects(state.clone()).await.unwrap(); + let projects = PostgresCatalog::list_projects(None, state.clone()) + .await + .unwrap() + .into_iter() + .map(|p| p.project_id) + .collect::>(); assert_eq!(projects.len(), 2); assert!(projects.contains(&project_id_1)); assert!(projects.contains(&project_id_2)); + + let projects = PostgresCatalog::list_projects( + Some(HashSet::from_iter(vec![project_id_1])), + state.clone(), + ) + .await + .unwrap() + .into_iter() + .map(|p| p.project_id) + .collect::>(); + + assert_eq!(projects.len(), 1); + assert!(projects.contains(&project_id_1)); } #[sqlx::test] @@ -527,7 +672,7 @@ pub(crate) mod test { let state = CatalogState::from_pools(pool.clone(), pool.clone()); let project_id = ProjectIdent::from(uuid::Uuid::new_v4()); let warehouse_id_1 = - initialize_warehouse(state.clone(), None, Some(&project_id), None).await; + initialize_warehouse(state.clone(), None, Some(&project_id), None, true).await; let warehouses = PostgresCatalog::list_warehouses(project_id, None, None, state.clone()) .await @@ -542,7 +687,7 @@ pub(crate) mod test { let state = CatalogState::from_pools(pool.clone(), pool.clone()); let project_id = ProjectIdent::from(uuid::Uuid::new_v4()); let warehouse_id_1 = - initialize_warehouse(state.clone(), None, Some(&project_id), None).await; + initialize_warehouse(state.clone(), None, Some(&project_id), None, true).await; // Rename warehouse 1 let mut transaction = PostgresTransaction::begin_write(state.clone()) @@ -562,7 +707,7 @@ pub(crate) mod test { // Create warehouse 2 let warehouse_id_2 = - initialize_warehouse(state.clone(), None, Some(&project_id), None).await; + initialize_warehouse(state.clone(), None, Some(&project_id), None, false).await; // Assert active whs let warehouses = PostgresCatalog::list_warehouses( @@ -589,7 +734,8 @@ pub(crate) mod test { async fn test_rename_warehouse(pool: sqlx::PgPool) { let state = CatalogState::from_pools(pool.clone(), pool.clone()); let project_id = ProjectIdent::from(uuid::Uuid::new_v4()); - let warehouse_id = initialize_warehouse(state.clone(), None, Some(&project_id), None).await; + let warehouse_id = + initialize_warehouse(state.clone(), None, Some(&project_id), None, true).await; let mut transaction = PostgresTransaction::begin_write(state.clone()) .await @@ -606,6 +752,6 @@ pub(crate) mod test { PostgresCatalog::get_warehouse(warehouse_id, read_transaction.transaction()) .await .unwrap(); - assert_eq!(warehouse.name, "new_name"); + assert_eq!(warehouse.unwrap().name, "new_name"); } } diff --git a/crates/iceberg-catalog/src/service/catalog.rs b/crates/iceberg-catalog/src/service/catalog.rs index e69636d1..1f1da90e 100644 --- a/crates/iceberg-catalog/src/service/catalog.rs +++ b/crates/iceberg-catalog/src/service/catalog.rs @@ -14,6 +14,7 @@ use crate::SecretIdent; use crate::api::management::v1::warehouse::TabularDeleteProfile; use crate::service::tabular_idents::{TabularIdentOwned, TabularIdentUuid}; use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec, ViewMetadata}; +use iceberg_ext::catalog::rest::ErrorModel; pub use iceberg_ext::catalog::rest::{CommitTableResponse, CreateTableRequest}; use iceberg_ext::configs::Location; use std::collections::{HashMap, HashSet}; @@ -21,11 +22,11 @@ use std::collections::{HashMap, HashSet}; #[async_trait::async_trait] pub trait Transaction where - Self: Sized + Send + Sync, + Self: Sized + Send + Sync + Unpin, { - type Transaction<'a> + type Transaction<'a>: Send + Sync + 'a where - Self: 'a; + Self: 'static; async fn begin_write(db_state: D) -> Result; @@ -97,6 +98,14 @@ pub struct GetWarehouseResponse { pub tabular_delete_profile: TabularDeleteProfile, } +#[derive(Debug, Clone)] +pub struct GetProjectResponse { + /// ID of the project. + pub project_id: ProjectIdent, + /// Name of the project. + pub name: String, +} + #[derive(Debug, Clone)] pub struct TableCommit { pub new_metadata: TableMetadata, @@ -280,8 +289,32 @@ where transaction: >::Transaction<'a>, ) -> Result; + /// Create a project + async fn create_project<'a>( + project_id: ProjectIdent, + project_name: String, + transaction: >::Transaction<'a>, + ) -> Result<()>; + + /// Delete a project + async fn delete_project<'a>( + project_id: ProjectIdent, + transaction: >::Transaction<'a>, + ) -> Result<()>; + + /// Get the project metadata + async fn get_project<'a>( + project_id: ProjectIdent, + transaction: >::Transaction<'a>, + ) -> Result>; + /// Return a list of all project ids in the catalog - async fn list_projects(catalog_state: Self::State) -> Result>; + /// + /// If project_ids is None, return all projects, otherwise return only the projects in the set + async fn list_projects( + project_ids: Option>, + catalog_state: Self::State, + ) -> Result>; /// Return a list of all warehouse in a project async fn list_warehouses( @@ -296,10 +329,27 @@ where ) -> Result>; /// Get the warehouse metadata - should only return active warehouses. + /// + /// Return Ok(None) if the warehouse does not exist. async fn get_warehouse<'a>( warehouse_id: WarehouseIdent, transaction: >::Transaction<'a>, - ) -> Result; + ) -> Result>; + + /// Wrapper around get_warehouse that returns a not-found error if the warehouse does not exist. + async fn require_warehouse<'a>( + warehouse_id: WarehouseIdent, + transaction: >::Transaction<'a>, + ) -> Result { + Self::get_warehouse(warehouse_id, transaction).await?.ok_or( + ErrorModel::not_found( + format!("Warehouse {warehouse_id} not found"), + "WarehouseNotFound", + None, + ) + .into(), + ) + } /// Delete a warehouse. async fn delete_warehouse<'a>( @@ -314,6 +364,13 @@ where transaction: >::Transaction<'a>, ) -> Result<()>; + /// Rename a project. + async fn rename_project<'a>( + project_id: ProjectIdent, + new_name: &str, + transaction: >::Transaction<'a>, + ) -> Result<()>; + /// Set the status of a warehouse. async fn set_warehouse_status<'a>( warehouse_id: WarehouseIdent, diff --git a/crates/iceberg-catalog/src/service/config.rs b/crates/iceberg-catalog/src/service/config.rs index b88d8115..e8de6266 100644 --- a/crates/iceberg-catalog/src/service/config.rs +++ b/crates/iceberg-catalog/src/service/config.rs @@ -1,5 +1,6 @@ use super::{Catalog, ProjectIdent, WarehouseIdent}; use crate::api::{CatalogConfig, Result}; +use iceberg_ext::catalog::rest::ErrorModel; #[async_trait::async_trait] @@ -7,16 +8,53 @@ pub trait ConfigProvider where Self: Clone + Send + Sync + 'static, { - // Should only return a warehouse if the warehouse is active. + /// Return Ok(Some(x)) only for active warehouses async fn get_warehouse_by_name( warehouse_name: &str, project_id: ProjectIdent, catalog_state: C::State, - ) -> Result; + ) -> Result>; + + /// Wrapper around get_warehouse_by_name that returns + /// not found error if the warehouse does not exist. + async fn require_warehouse_by_name( + warehouse_name: &str, + project_id: ProjectIdent, + catalog_state: C::State, + ) -> Result { + Self::get_warehouse_by_name(warehouse_name, project_id, catalog_state) + .await? + .ok_or( + ErrorModel::not_found( + format!("Warehouse {warehouse_name} not found"), + "WarehouseNotFound", + None, + ) + .into(), + ) + } // Should only return a warehouse if the warehouse is active. async fn get_config_for_warehouse( warehouse_id: WarehouseIdent, catalog_state: C::State, - ) -> Result; + ) -> Result>; + + /// Wrapper around get_config_for_warehouse that returns + /// not found error if the warehouse does not exist. + async fn require_config_for_warehouse( + warehouse_id: WarehouseIdent, + catalog_state: C::State, + ) -> Result { + Self::get_config_for_warehouse(warehouse_id, catalog_state) + .await? + .ok_or( + ErrorModel::not_found( + format!("Warehouse {warehouse_id} not found"), + "WarehouseNotFound", + None, + ) + .into(), + ) + } } diff --git a/crates/iceberg-catalog/src/service/mod.rs b/crates/iceberg-catalog/src/service/mod.rs index 1e2b2813..6df808f9 100644 --- a/crates/iceberg-catalog/src/service/mod.rs +++ b/crates/iceberg-catalog/src/service/mod.rs @@ -13,9 +13,9 @@ pub mod token_verification; pub use catalog::{ Catalog, CommitTableResponse, CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeletionDetails, DropFlags, GetNamespaceResponse, - GetStorageConfigResponse, GetTableMetadataResponse, GetWarehouseResponse, ListFlags, - ListNamespacesQuery, ListNamespacesResponse, LoadTableResponse, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, Transaction, UpdateNamespacePropertiesRequest, + GetProjectResponse, GetStorageConfigResponse, GetTableMetadataResponse, GetWarehouseResponse, + ListFlags, ListNamespacesQuery, ListNamespacesResponse, LoadTableResponse, NamespaceIdent, + Result, TableCommit, TableCreation, TableIdent, Transaction, UpdateNamespacePropertiesRequest, UpdateNamespacePropertiesResponse, ViewMetadataWithLocation, }; use std::ops::Deref; diff --git a/crates/iceberg-catalog/src/service/task_queue/mod.rs b/crates/iceberg-catalog/src/service/task_queue/mod.rs index 23856381..8c22ec7c 100644 --- a/crates/iceberg-catalog/src/service/task_queue/mod.rs +++ b/crates/iceberg-catalog/src/service/task_queue/mod.rs @@ -307,6 +307,7 @@ mod test { Some(StorageProfile::S3(profile)), None, Some(secret_ident), + true, ) .await; diff --git a/crates/iceberg-catalog/src/service/task_queue/tabular_purge_queue.rs b/crates/iceberg-catalog/src/service/task_queue/tabular_purge_queue.rs index 934844f3..a60c56ef 100644 --- a/crates/iceberg-catalog/src/service/task_queue/tabular_purge_queue.rs +++ b/crates/iceberg-catalog/src/service/task_queue/tabular_purge_queue.rs @@ -102,7 +102,7 @@ where e })?; - let warehouse = C::get_warehouse(*warehouse_ident, trx.transaction()) + let warehouse = C::require_warehouse(*warehouse_ident, trx.transaction()) .await .map_err(|e| { tracing::error!("Failed to get warehouse: {:?}", e); diff --git a/examples/self-contained/create-default-project.json b/examples/self-contained/create-default-project.json new file mode 100644 index 00000000..d36d615d --- /dev/null +++ b/examples/self-contained/create-default-project.json @@ -0,0 +1,4 @@ +{ + "project-name": "My Project", + "project-id": "00000000-0000-0000-0000-000000000000" +} \ No newline at end of file diff --git a/examples/self-contained/docker-compose.yaml b/examples/self-contained/docker-compose.yaml index 2829398e..cbdf77b5 100644 --- a/examples/self-contained/docker-compose.yaml +++ b/examples/self-contained/docker-compose.yaml @@ -63,6 +63,33 @@ services: server: condition: service_healthy restart: "no" + command: + - -w + - "%{http_code}" + - "-X" + - "POST" + - "-v" + - "http://server:8080/management/v1/project" + - "-H" + - "Content-Type: application/json" + - "--data" + - "@create-default-project.json" + - "-o" + - "/dev/null" + - "--fail-with-body" + volumes: + - ./create-default-project.json:/home/curl_user/create-default-project.json + networks: + iceberg_net: + + initialwarehouse: + image: curlimages/curl + depends_on: + server: + condition: service_healthy + initialproject: + condition: service_completed_successfully + restart: "no" command: - -w - "%{http_code}" diff --git a/justfile b/justfile index a6db68fa..6d82ae83 100644 --- a/justfile +++ b/justfile @@ -32,13 +32,16 @@ unit-test: doc-test test: doc-test cargo test --no-fail-fast --all-targets --all-features --workspace -update-openapi: +update-rest-openapi: # Download from https://raw.githubusercontent.com/apache/iceberg/main/open-api/rest-catalog-open-api.yaml and put into openapi folder curl -o openapi/rest-catalog-open-api.yaml https://raw.githubusercontent.com/apache/iceberg/main/open-api/rest-catalog-open-api.yaml + + +update-management-openapi: # For rust-server generation only: # Fix until https://github.com/OpenAPITools/openapi-generator/issues/7802 is resolved: # Parse the donwloaded yaml. Then set the for the existing object components.schemas.Namespace properties.length.type to integer # yq e '.components.schemas.Namespace.properties.length.type = "integer"' -i openapi/rest-catalog-open-api.yaml # Replace 5XX with 500 (gnu-sed) # gsed -i 's/5XX/500/g' openapi/rest-catalog-open-api.yaml - cargo run management-openapi > openapi/management-open-api.yaml \ No newline at end of file + cargo run management-openapi > openapi/management-open-api.yaml diff --git a/openapi/management-open-api.yaml b/openapi/management-open-api.yaml index c309d1ec..51abbadb 100644 --- a/openapi/management-open-api.yaml +++ b/openapi/management-open-api.yaml @@ -5,13 +5,13 @@ info: Implementation of the Iceberg REST Catalog server. license: name: Apache-2.0 - version: 0.3.0 + version: 0.4.0 paths: /management/v1/project: get: tags: - management - summary: List all existing projects + summary: List all projects the requesting user has access to operationId: list_projects responses: '200': @@ -22,6 +22,88 @@ paths: type: array items: $ref: '#/components/schemas/ListProjectsResponse' + post: + tags: + - management + summary: Create a new project + operationId: create_project + requestBody: + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/CreateProjectRequest' + required: true + responses: + '201': + description: Project created successfully + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ProjectResponse' + /management/v1/project/{project_id}: + get: + tags: + - management + summary: Get a Project by ID + operationId: get_project + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + responses: + '200': + description: Project details + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/GetProjectResponse' + delete: + tags: + - management + summary: Delete a project by ID + description: No warehouses must be present in the project to delete it. + operationId: delete_project + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + responses: + '200': + description: Project deleted successfully + /management/v1/project/{project_id}/rename: + post: + tags: + - management + summary: Rename a project + operationId: rename_project + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + requestBody: + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/RenameProjectRequest' + required: true + responses: + '200': + description: Project renamed successfully /management/v1/warehouse: get: tags: @@ -301,6 +383,30 @@ components: description: 'The validity of the sas token in seconds. Default: 3600.' nullable: true minimum: 0 + CreateProjectRequest: + type: object + required: + - project-name + properties: + project-id: + type: string + format: uuid + description: |- + Request a specific project ID - optional. + If not provided, a new project ID will be generated (recommended). + nullable: true + project-name: + type: string + description: Name of the project to create. + CreateProjectResponse: + type: object + required: + - project-id + properties: + project-id: + type: string + format: uuid + description: ID of the created project. CreateWarehouseRequest: type: object required: @@ -384,6 +490,10 @@ components: GcsCredential: oneOf: - type: object + description: |- + Service Account Key + + The key is the JSON object obtained when creating a service account key in the GCP console. required: - key - credential-type @@ -394,6 +504,26 @@ components: - service-account-key key: $ref: '#/components/schemas/GcsServiceKey' + description: |- + GCS Credentials + + Currently only supports Service Account Key + Example of a key: + ```json + { + "type": "service_account", + "project_id": "example-project-1234", + "private_key_id": "....", + "private_key": "-----BEGIN PRIVATE KEY-----\n.....\n-----END PRIVATE KEY-----\n", + "client_email": "abc@example-project-1234.iam.gserviceaccount.com", + "client_id": "123456789012345678901", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/abc%example-project-1234.iam.gserviceaccount.com", + "universe_domain": "googleapis.com" + } + ``` discriminator: propertyName: credential-type GcsProfile: @@ -447,6 +577,19 @@ components: type: string universe_domain: type: string + GetProjectResponse: + type: object + required: + - project_id + - project_name + properties: + project_id: + type: string + format: uuid + description: ID of the project. + project_name: + type: string + description: Name of the project GetWarehouseResponse: type: object required: @@ -493,7 +636,7 @@ components: projects: type: array items: - $ref: '#/components/schemas/ProjectResponse' + $ref: '#/components/schemas/GetProjectResponse' description: List of projects ListWarehousesRequest: type: object @@ -524,15 +667,6 @@ components: items: $ref: '#/components/schemas/GetWarehouseResponse' description: List of warehouses in the project. - ProjectResponse: - type: object - required: - - project_id - properties: - project_id: - type: string - format: uuid - description: ID of the project. RenameWarehouseRequest: type: object required: diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 292ffa47..1b0e9bc8 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -11,7 +11,7 @@ services: interval: 2s timeout: 80s retries: 2 - start_period: 40s + start_period: 60s ports: - "31100:8080" command: [ "start-dev", "--metrics-enabled=true", "--health-enabled=true", "--import-realm" ] diff --git a/tests/python/tests/conftest.py b/tests/python/tests/conftest.py index fc51208a..676fc681 100644 --- a/tests/python/tests/conftest.py +++ b/tests/python/tests/conftest.py @@ -169,10 +169,27 @@ class Server: management_url: str access_token: str + def create_project(self, name: str) -> uuid.UUID: + create_payload = {"project-name": name} + project_url = self.project_url + response = requests.post( + project_url, + json=create_payload, + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + if not response.ok: + raise ValueError( + f"Failed to create project ({response.status_code}): {response.text}" + ) + + project_id = response.json()["project-id"] + return uuid.UUID(project_id) + def create_warehouse( self, name: str, project_id: uuid.UUID, storage_config: dict ) -> uuid.UUID: """Create a warehouse in this server""" + create_payload = { "project-id": str(project_id), "warehouse-name": name, @@ -197,6 +214,10 @@ def create_warehouse( def warehouse_url(self) -> str: return urllib.parse.urljoin(self.management_url, "v1/warehouse") + @property + def project_url(self) -> str: + return urllib.parse.urljoin(self.management_url, "v1/project") + @dataclasses.dataclass class Warehouse: @@ -266,17 +287,24 @@ def server(access_token) -> Server: @pytest.fixture(scope="session") -def warehouse(server: Server, storage_config) -> Warehouse: - project_id = uuid.uuid4() +def project(server: Server) -> uuid.UUID: + test_id = uuid.uuid4() + project_name = f"project-{test_id}" + project_id = server.create_project(project_name) + return project_id + + +@pytest.fixture(scope="session") +def warehouse(server: Server, storage_config, project) -> Warehouse: test_id = uuid.uuid4() warehouse_name = f"warehouse-{test_id}" warehouse_id = server.create_warehouse( - warehouse_name, project_id=project_id, storage_config=storage_config + warehouse_name, project_id=project, storage_config=storage_config ) return Warehouse( access_token=server.access_token, server=server, - project_id=project_id, + project_id=project, warehouse_id=warehouse_id, warehouse_name=warehouse_name, )