From ce5647b25259f0f358a47e9a77dd11bcd457fd5c Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sun, 2 Jun 2024 19:04:03 +0200 Subject: [PATCH] Various fixes --- Cargo.toml | 2 +- crates/iceberg-ext/Cargo.toml | 3 + .../iceberg-ext/src/spec/partition_binder.rs | 354 ++++---- crates/iceberg-ext/src/spec/table_metadata.rs | 832 +++++++++++------- .../src/implementations/postgres/table.rs | 11 +- tests/python/tests/test_spark.py | 32 +- 6 files changed, 730 insertions(+), 504 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e1075912..2d5005e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ serde_json = { version = "^1.0", features = ["raw_value"] } url = { version = "^2.5", features = ["serde"] } uuid = { version = "^1.6", features = ["serde", "v4", "v7"] } reqwest = { version = "^0.12", features = ["json"] } -iceberg = { git = "https://github.com/hansetag/iceberg-rust.git", tag = "0.2.1-rc1" } +iceberg = { git = "https://github.com/hansetag/iceberg-rust.git", tag = "0.2.1-rc3" } typed-builder = "^0.18" strum_macros = "^0.26" axum = { version = "^0.7" } diff --git a/crates/iceberg-ext/Cargo.toml b/crates/iceberg-ext/Cargo.toml index a92c00f8..357ef5b7 100644 --- a/crates/iceberg-ext/Cargo.toml +++ b/crates/iceberg-ext/Cargo.toml @@ -31,3 +31,6 @@ strum_macros = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["serde"] } + +[dev-dependencies] +lazy_static = { workspace = true } diff --git a/crates/iceberg-ext/src/spec/partition_binder.rs b/crates/iceberg-ext/src/spec/partition_binder.rs index d6b8e303..3c3856cb 100644 --- a/crates/iceberg-ext/src/spec/partition_binder.rs +++ b/crates/iceberg-ext/src/spec/partition_binder.rs @@ -2,7 +2,8 @@ use crate::catalog::rest::ErrorModel; use crate::spec::partition_binder::bindable::Bindable; use http::StatusCode; use iceberg::spec::{ - NestedFieldRef, PartitionField, PartitionSpec, SchemaRef, Transform, Type, UnboundPartitionSpec, + NestedFieldRef, PartitionField, PartitionSpec, SchemaRef, Transform, Type, + UnboundPartitionField, UnboundPartitionSpec, }; use std::cmp::max; use std::collections::HashSet; @@ -67,7 +68,8 @@ pub(crate) struct PartitionSpecBinder { } impl PartitionSpecBinder { - const PARTITION_DATA_ID_START: i32 = 1000; + pub(crate) const PARTITION_DATA_ID_START: i32 = 1000; + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; pub(crate) fn new(schema: SchemaRef, spec_id: i32) -> Self { Self { @@ -79,18 +81,53 @@ impl PartitionSpecBinder { } } - pub(crate) fn bind_spec_schema(mut self, spec: UnboundPartitionSpec) -> Result { + pub(crate) fn bind_spec(mut self, spec: UnboundPartitionSpec) -> Result { self.bind(self.spec_id, spec.fields) } - pub(crate) fn update_spec_schema(mut self, other: &PartitionSpec) -> Result { - self.bind(self.spec_id, other.fields.clone()) + pub(crate) fn update_spec_schema(mut self, other: PartitionSpec) -> Result { + // build without validation because the schema may have changed in a way that makes this spec + // invalid. the spec should still be preserved so that older metadata can be interpreted. + + // We perform some basic checks here similar to java implementation. + // They should never fail if the spec was valid before the schema update. + other + .fields + .iter() + .map(|field| self.add_bound_field(field)) + .collect::>>()?; + + Ok(PartitionSpec { + spec_id: self.spec_id, + fields: other.fields, + }) } - fn bind( + fn add_bound_field(&mut self, field: &PartitionField) -> Result<()> { + self.get_schema_field_by_id(field.source_id) + // Schema field is almost always going to be NULL, as the field is typically bound to a different schema + .ok() + .map(|schema_field| { + self.check_name_does_not_collide_with_schema( + schema_field, + &field.name, + field.transform, + ) + }) + .transpose()?; + + self.check_partition_name_set_and_unique(&field.get_name())?; + + self.last_assigned_field_id = max(self.last_assigned_field_id, field.field_id); + self.update_names(&field.name); + + Ok(()) + } + + fn bind( &mut self, spec_id: i32, - fields: impl IntoIterator, + fields: impl IntoIterator, ) -> Result { Ok(PartitionSpec { spec_id, @@ -101,20 +138,26 @@ impl PartitionSpecBinder { }) } - fn bind_field(&mut self, spec_field: &T) -> Result { + fn bind_field(&mut self, spec_field: &UnboundPartitionField) -> Result { let spec_field_name = spec_field.get_name(); - self.check_partition_names(&spec_field_name)?; + let spec_field_id = spec_field.get_source_id(); + let transform = spec_field.get_transform(); + self.check_partition_name_set_and_unique(&spec_field_name)?; - let schema_field = self.get_schema_field_by_name(&spec_field_name)?; + let schema_field = self.get_schema_field_by_id(spec_field_id)?; + Self::check_transform_compatibility(transform, &schema_field.field_type)?; - Self::check_transform_compatibility(spec_field.get_transform(), &schema_field.field_type)?; - Self::check_schema_field_eq_source_id(schema_field, spec_field.get_source_id())?; + self.check_name_does_not_collide_with_schema(schema_field, &spec_field_name, transform)?; + + // Self::check_schema_field_eq_source_id(schema_field, spec_field.get_source_id())?; self.check_for_redundant_partitions( spec_field.get_source_id(), spec_field.get_transform(), )?; self.update_names(&spec_field_name); + self.dedup_fields + .insert((spec_field.get_source_id(), transform.dedup_name())); let field_id = spec_field .get_spec_id() @@ -146,7 +189,7 @@ impl PartitionSpecBinder { self.last_assigned_field_id } - fn check_partition_names(&self, new_name: &str) -> Result<()> { + fn check_partition_name_set_and_unique(&self, new_name: &str) -> Result<()> { if new_name.is_empty() { return Err(Self::err("Cannot use empty partition name.")); } @@ -160,25 +203,39 @@ impl PartitionSpecBinder { Ok(()) } - fn get_schema_field_by_name(&self, field_name: &str) -> Result<&NestedFieldRef> { + fn get_schema_field_by_id(&self, field_id: i32) -> Result<&NestedFieldRef> { self.schema - .as_struct() - .field_by_name(field_name) - .ok_or_else(|| Self::err(format!("Field '{field_name}' not found in schema."))) + .field_by_id(field_id) + .ok_or_else(|| Self::err(format!("Field '{field_id}' not found in schema."))) } - fn check_schema_field_eq_source_id( + fn check_name_does_not_collide_with_schema( + &self, schema_field: &NestedFieldRef, - source_id: i32, + partition_name: &str, + transform: Transform, ) -> Result<()> { - if schema_field.id == source_id { - return Ok(()); + let schema_collision = self.schema.field_by_name(partition_name); + + if let Some(schema_collision) = schema_collision { + // for identity transform case we allow conflicts between partition and schema field name + // as long as they are sourced from the same schema field + if transform == Transform::Identity { + if schema_collision.id == schema_field.id { + Ok(()) + } else { + Err(Self::err(format!( + "Cannot create identity partition sourced from different field in schema: Schema Name '{}' != Partition Name '{partition_name}'.", schema_field.name + ))) + } + } else { + Err(Self::err(format!( + "Cannot create partition with name: '{partition_name}' that conflicts with schema field and is not an identity transform." + ))) + } + } else { + Ok(()) } - - Err(Self::err(format!( - "Cannot create identity partition sourced from different field in schema: '{}'.", - schema_field.name - ))) } fn update_names(&mut self, new_name: &str) { @@ -216,7 +273,7 @@ impl PartitionSpecBinder { } Err(Self::err(format!( - "Cannot add redundant partition for: '{source_id}' with transform: '{transform}' !", + "Cannot add redundant partition for: '{source_id}' with transform: '{transform}'", ))) } } @@ -237,7 +294,7 @@ mod test { const MOCK_SPEC_ID: i32 = 0; - fn is_all_elements_uniq(elements: T) -> bool + fn is_all_elements_unique(elements: T) -> bool where T: IntoIterator, T::Item: Eq + Hash, @@ -269,25 +326,25 @@ mod test { let schema_fields = vec![ NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), - NestedField::required(3, "category", Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "category", Primitive(PrimitiveType::Int)).into(), ]; let spec_fields = vec![ UnboundPartitionField::builder() .name("id".to_owned()) - .transform(Transform::Void) + .transform(Transform::Identity) .partition_id(1) .source_id(1) .build(), UnboundPartitionField::builder() - .name("data".to_owned()) + .name("data_day".to_owned()) .transform(Transform::Day) .partition_id(2) .source_id(2) .build(), UnboundPartitionField::builder() - .name("category".to_owned()) - .transform(Transform::Unknown) + .name("category_bucket[2]".to_owned()) + .transform(Transform::Bucket(2)) .partition_id(3) .source_id(3) .build(), @@ -296,63 +353,6 @@ mod test { create_schema_and_spec(schema_fields, spec_fields) } - fn mock_incompatible_schema_and_spec() -> (SchemaRef, UnboundPartitionSpec) { - let schema_fields = vec![ - NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "data", Primitive(PrimitiveType::String)).into(), - NestedField::required(3, "category", Primitive(PrimitiveType::String)).into(), - ]; - - let spec_fields = vec![ - UnboundPartitionField::builder() - .name("id".to_owned()) - .transform(Transform::Identity) - .partition_id(1) - .source_id(1) - .build(), - UnboundPartitionField::builder() - .name("full_name".to_owned()) - .transform(Transform::Identity) - .partition_id(2) - .source_id(2) - .build(), - UnboundPartitionField::builder() - .name("email".to_owned()) - .transform(Transform::Identity) - .partition_id(4) - .source_id(4) - .build(), - ]; - - create_schema_and_spec(schema_fields, spec_fields) - } - - /// Ensure that the field names in the `UnboundPartitionSpec` are consistent with the names in the schema. - #[test] - fn names_consistent() { - let (schema, spec) = mock_compatible_schema_and_spec(); - let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); - - assert!(spec - .fields - .iter() - .map(|unbound| binder.get_schema_field_by_name(&unbound.name)) - .all(|field| field.is_ok())); - } - - /// Ensure that if field names in the `UnboundPartitionSpec` is inconsistent return an error. - #[test] - fn names_inconsistent() { - let (schema, spec) = mock_incompatible_schema_and_spec(); - let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); - - assert!(spec - .fields - .iter() - .map(|unbound| binder.get_schema_field_by_name(&unbound.name)) - .any(|field| field.is_err())); - } - /// Ensure each field referenced in the `UnboundPartitionSpec` exists in the schema. #[test] fn names_exist() { @@ -362,21 +362,27 @@ mod test { assert!(spec .fields .iter() - .map(|unbound| binder.get_schema_field_by_name(&unbound.name)) + .map(|unbound| binder.get_schema_field_by_id(unbound.source_id)) .all(|schema_field| schema_field.is_ok())); } /// Ensure that if a field referenced in the `UnboundPartitionSpec` not exists an error is returned. #[test] fn names_not_exist() { - let (schema, spec) = mock_incompatible_schema_and_spec(); + let schema_fields = + vec![NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into()]; + + let spec_fields = vec![UnboundPartitionField::builder() + .name("id".to_owned()) + .transform(Transform::Identity) + .partition_id(2) + .source_id(2) + .build()]; + + let (schema, spec) = create_schema_and_spec(schema_fields, spec_fields); let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); - assert!(spec - .fields - .iter() - .map(|unbound| binder.get_schema_field_by_name(&unbound.name)) - .any(|schema_field| schema_field.is_err())); + binder.bind_spec(spec).expect_err("Should fail binding!"); } /// Ensure there are no duplicate fields in the `PartitionSpec`. @@ -388,12 +394,12 @@ mod test { assert!(spec .fields .iter() - .map(|unbound| binder.check_partition_names(&unbound.name)) + .map(|unbound| binder.check_partition_name_set_and_unique(&unbound.name)) .all(|partition_name| partition_name.is_ok())); - assert!(is_all_elements_uniq( + assert!(is_all_elements_unique( binder - .bind_spec_schema(spec) + .bind_spec(spec) .expect("Cannot bind spec!") .fields .into_iter() @@ -402,74 +408,96 @@ mod test { )); } - /// Verify that the field IDs in the `UnboundPartitionSpec` match the field IDs in the schema. + /// Verify we can add a duplicate partition for identity transform #[test] - fn field_ids_match() { - let (schema, spec) = mock_compatible_schema_and_spec(); + fn duplicate_identity_partition() { + let (schema, spec) = { + let schema_fields = + vec![NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into()]; + + let spec_fields = vec![UnboundPartitionField::builder() + .name("id".to_owned()) + .transform(Transform::Identity) + .partition_id(1) + .source_id(1) + .build()]; + + create_schema_and_spec(schema_fields, spec_fields) + }; + let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); + binder + .bind_spec(spec) + .expect("Can bind duplicate field name for identity transform."); + } - assert!(spec - .fields - .iter() - .map( - |field| PartitionSpecBinder::check_schema_field_eq_source_id( - binder - .get_schema_field_by_name(&field.name) - .expect("Cannot get field."), - field.source_id - ) - ) - .all(|res| res.is_ok())); + /// Validate that we cannot add a duplicate partition for non-identity transform + #[test] + fn duplicate_partition() { + let (schema, spec) = { + let schema_fields = + vec![NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into()]; + + let spec_fields = vec![UnboundPartitionField::builder() + .name("id".to_owned()) + .transform(Transform::Unknown) + .partition_id(1) + .source_id(1) + .build()]; + + create_schema_and_spec(schema_fields, spec_fields) + }; + + let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); + binder + .bind_spec(spec) + .expect_err("Cannot bind duplicate field name for non-identity transform."); } - /// Verify that if the field IDs in the `UnboundPartitionSpec` doesn't match the field IDs in the schema an error returns. + /// Validate that for identity transforms, the partition name can also be different from the schema field name #[test] - fn field_ids_not_match() { + fn identity_partition_with_different_name() { let (schema, spec) = { - let schema_fields = vec![ - NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into(), - NestedField::required(3, "data", Primitive(PrimitiveType::String)).into(), - NestedField::required(3, "category", Primitive(PrimitiveType::String)).into(), - ]; + let schema_fields = + vec![NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into()]; - let spec_fields = vec![ - UnboundPartitionField::builder() - .name("id".to_owned()) - .transform(Transform::Void) - .partition_id(1) - .source_id(1) - .build(), - UnboundPartitionField::builder() - .name("data".to_owned()) - .transform(Transform::Unknown) - .partition_id(3) - .source_id(3) - .build(), - UnboundPartitionField::builder() - .name("category".to_owned()) - .transform(Transform::Unknown) - .partition_id(4) - .source_id(4) - .build(), - ]; + let spec_fields = vec![UnboundPartitionField::builder() + .name("id_partition".to_owned()) + .transform(Transform::Identity) + .partition_id(2) + .source_id(1) + .build()]; create_schema_and_spec(schema_fields, spec_fields) }; let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); + binder + .bind_spec(spec) + .expect("Can bind identity partition with different name."); + } - assert!(spec - .fields - .iter() - .map( - |field| PartitionSpecBinder::check_schema_field_eq_source_id( - binder - .get_schema_field_by_name(&field.name) - .expect("Cannot get field."), - field.source_id - ) - ) - .any(|res| res.is_err())); + /// Validate that for identity transforms, the field id must match if the name is identical + #[test] + fn identity_partition_with_different_field_id() { + let (schema, spec) = { + let schema_fields = + vec![NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into()]; + + let spec_fields = vec![UnboundPartitionField::builder() + .name("id".to_owned()) + .transform(Transform::Identity) + .partition_id(3) + .source_id(2) + .build()]; + + create_schema_and_spec(schema_fields, spec_fields) + }; + + let binder = PartitionSpecBinder::new(schema, MOCK_SPEC_ID); + binder + .bind_spec(spec) + .expect_err("Cannot bind identity partition with different field id."); } /// Ensure the transforms applied to the fields in the `UnboundPartitionSpec` are valid and supported. @@ -484,7 +512,7 @@ mod test { ( spec.fields[index].transform, &*binder - .get_schema_field_by_name(&spec.fields[index].name) + .get_schema_field_by_id(spec.fields[index].partition_id.unwrap()) .expect("Cannot get name!") .field_type, ) @@ -493,28 +521,28 @@ mod test { let (schema, spec) = { let schema_fields = vec![ NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into(), - NestedField::required(3, "data", Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "data", Primitive(PrimitiveType::String)).into(), NestedField::required(3, "category", Struct(StructType::default())).into(), ]; let spec_fields = vec![ UnboundPartitionField::builder() - .name("id".to_owned()) + .name("id_void".to_owned()) .transform(Transform::Void) .partition_id(1) .source_id(1) .build(), UnboundPartitionField::builder() - .name("data".to_owned()) + .name("data_unknown".to_owned()) .transform(Transform::Unknown) - .partition_id(3) - .source_id(3) + .partition_id(2) + .source_id(2) .build(), UnboundPartitionField::builder() - .name("category".to_owned()) + .name("category_bucket_1".to_owned()) .transform(Transform::Bucket(1)) - .partition_id(4) - .source_id(4) + .partition_id(3) + .source_id(3) .build(), ]; diff --git a/crates/iceberg-ext/src/spec/table_metadata.rs b/crates/iceberg-ext/src/spec/table_metadata.rs index f12f47cf..4cbd66eb 100644 --- a/crates/iceberg-ext/src/spec/table_metadata.rs +++ b/crates/iceberg-ext/src/spec/table_metadata.rs @@ -1,15 +1,13 @@ use std::cmp::max; use std::collections::HashSet; use std::ops::BitAnd; -use std::{ - collections::{hash_map::Entry as HashMapEntry, HashMap}, - vec, -}; +use std::{collections::HashMap, vec}; use http::StatusCode; use iceberg::spec::{ FormatVersion, PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, TableMetadata, UnboundPartitionSpec, + DEFAULT_SORT_ORDER_ID, DEFAULT_SPEC_ID, MAIN_BRANCH, }; use iceberg::TableUpdate; use uuid::Uuid; @@ -31,13 +29,9 @@ pub struct TableMetadataAggregate { } impl TableMetadataAggregate { - const MAIN_BRANCH: &'static str = "main"; - const INITIAL_SPEC_ID: i32 = 0; - const INITIAL_SORT_ORDER_ID: i64 = 0; - const PARTITION_DATA_ID_START: usize = 1000; + const PARTITION_DATA_ID_START: i32 = 1000; const LAST_ADDED_I32: i32 = -1; const LAST_ADDED_I64: i64 = -1; - const INITIAL_SCHEMA_ID: i32 = 0; const RESERVED_PROPERTIES: [&'static str; 9] = [ "format-version", "uuid", @@ -50,9 +44,11 @@ impl TableMetadataAggregate { "default-sort-order", ]; - /// Initialize new table metadata aggregate. #[must_use] - pub fn new(location: String) -> Self { + /// Creates a new table metadata builder. + pub fn new(location: String, schema: Schema) -> Self { + // ToDo: Assign fresh IDs? + // https://github.com/apache/iceberg/blob/6a594546b06df9fb75dd7e9713a8dc173e67c870/core/src/main/java/org/apache/iceberg/TableMetadata.java#L119 Self { metadata: TableMetadata { format_version: FormatVersion::V2, @@ -60,19 +56,19 @@ impl TableMetadataAggregate { location, last_sequence_number: 0, last_updated_ms: chrono::Utc::now().timestamp_millis(), - last_column_id: Self::LAST_ADDED_I32, - current_schema_id: Self::INITIAL_SCHEMA_ID, - schemas: HashMap::new(), + last_column_id: schema.highest_field_id(), + current_schema_id: schema.schema_id(), + schemas: HashMap::from_iter(vec![(schema.schema_id(), schema.into())]), partition_specs: HashMap::new(), - default_spec_id: Self::LAST_ADDED_I32, - last_partition_id: 0, + default_spec_id: DEFAULT_SPEC_ID - 1, + last_partition_id: Self::PARTITION_DATA_ID_START - 1, properties: HashMap::new(), current_snapshot_id: None, snapshots: HashMap::new(), snapshot_log: vec![], sort_orders: HashMap::new(), metadata_log: vec![], - default_sort_order_id: i64::from(Self::LAST_ADDED_I32), + default_sort_order_id: DEFAULT_SORT_ORDER_ID - 1, refs: HashMap::default(), }, changes: Vec::default(), @@ -217,61 +213,73 @@ impl TableMetadataAggregate { .build()); } - let new_schema_id = self.reuse_or_create_new_schema_id(&schema)?; + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + if schema_found && new_last_column_id == self.metadata.last_column_id { + // the new spec and last column id is already current and no change is needed + // update lastAddedSchemaId if the schema was added in this set of changes (since it is now + // the last) + let is_new_schema = self.last_added_schema_id + .map_or(false, |id| { + self.changes + .iter() + .any(|update| matches!(update, TableUpdate::AddSchema { schema, .. } if schema.schema_id() == id)) + }); + + self.last_added_schema_id = is_new_schema.then_some(new_schema_id); + + return Ok(self); + } + + let schema = if new_schema_id == schema.schema_id() { + schema + } else { + Schema::into_builder(schema) + .with_schema_id(new_schema_id) + .build() + .map_err(|e| { + ErrorModel::builder() + .code(StatusCode::INTERNAL_SERVER_ERROR.into()) + .message("Failed to assign new schema id") + .r#type("FailedToAssignSchemaId") + .stack(Some(vec![e.to_string()])) + .build() + })? + }; + + self.metadata.last_column_id = new_last_column_id; + if !schema_found { self.metadata .schemas .insert(schema.schema_id(), schema.clone().into()); - self.changes.push(TableUpdate::AddSchema { - schema, - last_column_id: Some(new_last_column_id), - }); - self.metadata.last_column_id = new_last_column_id; } + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(new_last_column_id), + }); self.last_added_schema_id = Some(new_schema_id); Ok(self) } - fn reuse_or_create_new_schema_id(&self, other: &Schema) -> Result { - let (other_id, other_fields) = (other.schema_id(), other.identifier_field_ids()); - - let err = Self::throw_err( - format!("Schema with id '{other_id}' already exists and is different!"), - "SchemaIdAlreadyExistsAndDifferent", - ); - - let is_schemas_eq = |(id, this_schema): (&i32, &SchemaRef)| -> Option { - other - .as_struct() - .eq(this_schema.as_struct()) - .bitand(other_fields.eq(this_schema.identifier_field_ids())) - .then_some(*id) + fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 { + // ToDo: Migrate to schema impl + let same_schema = |(first, second): (&SchemaRef, &Schema)| -> bool { + first.as_struct().eq(second.as_struct()).bitand( + first + .identifier_field_ids() + .eq(second.identifier_field_ids()), + ) }; - if other_id == Self::LAST_ADDED_I32 { - // Search for an identical schema and reuse the ID. - // If no identical schema is found, use the next available ID. - Ok(self - .metadata - .schemas - .iter() - .find_map(is_schemas_eq) - .unwrap_or_else(|| self.get_highest_schema_id() + 1)) - } else { - // If the schema_id() already exists, it must be the same schema. - Ok(self - .metadata - .schemas - .get(&other_id) - .map(AsRef::as_ref) - .map(|exising| exising.eq(other).then_some(other_id).ok_or_else(err)) - .transpose()? - .unwrap_or(other_id)) - } + self.metadata + .schemas + .iter() + .find_map(|(id, schema)| same_schema((schema, new_schema)).then_some(*id)) + .unwrap_or_else(|| self.get_highest_schema_id() + 1) } fn get_highest_schema_id(&self) -> i32 { @@ -298,7 +306,7 @@ impl TableMetadataAggregate { Err(ErrorModel::builder() .code(StatusCode::CONFLICT.into()) .message("Cannot set last added schema: no schema has been added.") - .r#type("CannotSetCurrentSchema") + .r#type("CurrentSchemaNotAdded") .build()) }; } @@ -311,24 +319,28 @@ impl TableMetadataAggregate { return Err(ErrorModel::builder() .code(StatusCode::CONFLICT.into()) .message(format!( - "Cannot set current schema to schema with unknown Id: '{schema_id}'!" + "Cannot set current schema to schema with unknown Id: '{schema_id}'" )) - .r#type("CannotSetCurrentSchema") + .r#type("CurrentSchemaNotFound") .build()); }; + // rebuild all the partition specs and sort orders for the new current schema self.metadata.partition_specs = self .metadata .partition_specs .values() - .cloned() + // ToDo: Check new() impl + // ToDo: Check Snapshot impl .map(|spec| { - PartitionSpecBinder::new(schema.clone(), spec.spec_id).update_spec_schema(&spec) + PartitionSpecBinder::new(schema.clone(), spec.spec_id) + .update_spec_schema((**spec).clone()) }) .collect::>>()? .into_iter() .map(|spec| (spec.spec_id, spec.into())) .collect::>(); + self.metadata.sort_orders = self .metadata .sort_orders @@ -338,12 +350,12 @@ impl TableMetadataAggregate { SortOrder::builder() .with_order_id(sort_order.order_id) .with_fields(sort_order.fields.clone()) - .build(schema.as_ref().clone()) + .build_unbound() .map_err(|e| { ErrorModel::builder() - .message("Failed to bound 'SortOrder'!") + .message(e.message()) .code(StatusCode::CONFLICT.into()) - .r#type("CannotSetCurrentSchema") + .r#type(e.kind().into_static()) .stack(Some(vec![e.to_string()])) .build() }) @@ -372,14 +384,37 @@ impl TableMetadataAggregate { /// # Errors /// None yet. Fails during build if `spec` cannot be bound. pub fn add_partition_spec(&mut self, unbound_spec: UnboundPartitionSpec) -> Result<&mut Self> { + if self.metadata.current_schema_id == Self::LAST_ADDED_I32 { + return Err(ErrorModel::builder() + .code(StatusCode::CONFLICT.into()) + .message("Cannot add partition spec before current schema has been set.") + .r#type("AddPartitionSpecBeforeSchema") + .build()); + } let mut spec = PartitionSpecBinder::new( self.get_current_schema()?.clone(), unbound_spec.spec_id.unwrap_or_default(), ) - .bind_spec_schema(unbound_spec.clone())?; + .bind_spec(unbound_spec.clone())?; // No spec_id specified, we need to reuse or create a new one. - spec.spec_id = self.reuse_or_create_new_spec_id(&spec)?; + let new_spec_id = self.reuse_or_create_new_spec_id(&spec); + + if self.metadata.partition_specs.contains_key(&new_spec_id) { + // update lastAddedSpecId if the spec was added in this set of changes (since it is now the + // last) + let is_new_spec = self.last_added_spec_id + .map_or(false, |id| { + self.changes + .iter() + // spec.spec_id should always be some, because we set the final id in this function before adding the change. + .any(|update| matches!(update, TableUpdate::AddSpec { spec, .. } if spec.spec_id == Some(id))) + }); + + self.last_added_spec_id = is_new_spec.then_some(new_spec_id); + + return Ok(self); + } if self.metadata.format_version <= FormatVersion::V1 && !Self::has_sequential_ids(&spec.fields) @@ -391,54 +426,63 @@ impl TableMetadataAggregate { .build()); } + // We already checked compatibility in the binder. + spec.spec_id = new_spec_id; + let mut unbound_spec = unbound_spec; + unbound_spec.spec_id = Some(new_spec_id); + self.last_added_spec_id = Some(spec.spec_id); + self.metadata.last_partition_id = max( + self.metadata.last_partition_id, + // ToDo: Move to last_assigned_field_id impl in PartitionSpec + spec.fields + .iter() + .map(|field| field.field_id) + .max() + .unwrap_or(PartitionSpecBinder::UNPARTITIONED_LAST_ASSIGNED_ID), + ); - if let HashMapEntry::Vacant(e) = self.metadata.partition_specs.entry(spec.spec_id) { - self.changes - .push(TableUpdate::AddSpec { spec: unbound_spec }); - self.metadata.last_partition_id = max( - self.metadata.last_partition_id, - spec.fields.iter().last().map_or(0, |field| field.field_id), - ); - e.insert(spec.into()); - } + self.metadata + .partition_specs + .insert(spec.spec_id, spec.clone().into()); + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); Ok(self) } /// If the spec already exists, use the same ID. Otherwise, use 1 more than the highest ID. - fn reuse_or_create_new_spec_id(&self, other: &PartitionSpec) -> Result { - let (other_id, other_fields) = (other.spec_id, &other.fields); + fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 { + // ToDo: Migrate to PartitionSpec impl + /// Returns true if this spec is equivalent to the other, with partition field ids ignored. That + /// is, if both specs have the same number of fields, field order, field name, source columns, and + /// transforms. + fn compatible_with(this: &PartitionSpec, other: &PartitionSpec) -> bool { + if this.eq(other) { + return true; + } - let err = Self::throw_err( - format!("PartitionSpec with id '{other_id}' already exists and it's different!",), - "PartitionSpecIdAlreadyExistsAndDifferent", - ); + if this.fields.len() != other.fields.len() { + return false; + } - let is_specs_eq = |(id, this_partition): (&i32, &PartitionSpecRef)| -> Option { - this_partition.fields.eq(other_fields).then_some(*id) - }; + for (this_field, other_field) in this.fields.iter().zip(other.fields.iter()) { + if this_field.source_id != other_field.source_id + || this_field.transform.to_string() != other_field.transform.to_string() + || this_field.name != other_field.name + { + return false; + } + } - if other_id == Self::LAST_ADDED_I32 { - // Search for identical `spec_id` and reuse the ID if possible. - // Otherwise, use the next available ID. - Ok(self - .metadata - .partition_specs - .iter() - .find_map(is_specs_eq) - .unwrap_or_else(|| self.highest_spec_id() + 1)) - } else { - // If the `spec_id` already exists, it must be the same spec. - Ok(self - .metadata - .partition_specs - .get(&other_id) - .map(AsRef::as_ref) - .map(|exising| exising.eq(other).then_some(other_id).ok_or_else(err)) - .transpose()? - .unwrap_or(other_id)) + true } + + self.metadata + .partition_specs + .iter() + .find_map(|(id, old_spec)| compatible_with(old_spec, new_spec).then_some(*id)) + .unwrap_or_else(|| self.highest_spec_id() + 1) } fn highest_spec_id(&self) -> i32 { @@ -447,7 +491,7 @@ impl TableMetadataAggregate { .partition_specs .keys() .max() - .unwrap_or(&Self::INITIAL_SPEC_ID) + .unwrap_or(&(DEFAULT_SPEC_ID - 1)) } /// Set the default partition spec. @@ -489,6 +533,30 @@ impl TableMetadataAggregate { /// # Errors /// - Sort Order ID to add already exists. pub fn add_sort_order(&mut self, sort_order: SortOrder) -> Result<&mut Self> { + let new_order_id = self.reuse_or_create_new_sort_id(&sort_order); + if self.metadata.sort_orders.contains_key(&new_order_id) { + // update lastAddedOrderId if the order was added in this set of changes (since it is now + // the last) + let is_new_order = self.last_added_order_id + .map_or(false, |id| { + self.changes + .iter() + .any(|update| matches!(update, TableUpdate::AddSortOrder { sort_order: order, .. } if order.order_id == id)) + }); + + self.last_added_order_id = is_new_order.then_some(new_order_id); + + return Ok(self); + } + + if self.metadata.current_schema_id == Self::LAST_ADDED_I32 { + return Err(ErrorModel::builder() + .code(StatusCode::CONFLICT.into()) + .message("Cannot add sort order before current schema has been set.") + .r#type("AddSortOrderBeforeSchema") + .build()); + } + let schema = self.get_current_schema()?.clone().as_ref().clone(); let mut sort_order = SortOrder::builder() .with_order_id(sort_order.order_id) @@ -496,62 +564,38 @@ impl TableMetadataAggregate { .build(schema) .map_err(|e| { ErrorModel::builder() - .message("Failed to bound 'SortOrder'!") + .message("Failed to bind 'SortOrder'") .code(StatusCode::CONFLICT.into()) - .r#type("FailedToBuildSortOrder") + .r#type("FailedToBindSortOrder") .stack(Some(vec![e.to_string()])) .build() })?; - sort_order.order_id = if sort_order.is_unsorted() { - 0 - } else { - self.reuse_or_create_new_sort_id(&sort_order)? - }; - self.last_added_order_id = Some(sort_order.order_id); - if let HashMapEntry::Vacant(e) = self.metadata.sort_orders.entry(sort_order.order_id) { - self.changes.push(TableUpdate::AddSortOrder { - sort_order: sort_order.clone(), - }); + sort_order.order_id = self.reuse_or_create_new_sort_id(&sort_order); - e.insert(sort_order.into()); - } + self.last_added_order_id = Some(sort_order.order_id); + self.metadata + .sort_orders + .insert(sort_order.order_id, sort_order.clone().into()); + self.changes.push(TableUpdate::AddSortOrder { sort_order }); Ok(self) } - fn reuse_or_create_new_sort_id(&self, other: &SortOrder) -> Result { - let (other_id, other_fields) = (other.order_id, &other.fields); - - let err = Self::throw_err( - format!("Sort Order with id '{other_id}' already exists and is different!",), - "SortOrderIdAlreadyExistsAndDifferent", - ); + fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 { + if new_sort_order.is_unsorted() { + return SortOrder::unsorted_order().order_id; + } - let is_order_eq = |(id, this_order): (&i64, &SortOrderRef)| -> Option { - this_order.fields.eq(other_fields).then_some(*id) + let same_order = |(first, second): (&SortOrderRef, &SortOrder)| -> bool { + first.fields.eq(&second.fields) }; - if other_id == Self::LAST_ADDED_I64 { - // Search for identical order_id and reuse the ID if possible. - // Otherwise, use the next available ID. - Ok(self - .metadata - .sort_orders - .iter() - .find_map(is_order_eq) - .unwrap_or_else(|| self.highest_sort_id() + 1)) - } else { - // If the order_id already exists, it must be the same sort order. - Ok(self - .metadata - .sort_orders - .get(&other_id) - .map(AsRef::as_ref) - .map(|exising| exising.eq(other).then_some(other_id).ok_or_else(err)) - .transpose()? - .unwrap_or(other_id)) - } + self.metadata + .sort_orders + .iter() + .find_map(|(id, sort_order)| same_order((sort_order, new_sort_order)).then_some(*id)) + .unwrap_or_else(|| self.highest_sort_id() + 1) } fn highest_sort_id(&self) -> i64 { @@ -560,7 +604,7 @@ impl TableMetadataAggregate { .sort_orders .keys() .max() - .unwrap_or(&Self::INITIAL_SORT_ORDER_ID) + .unwrap_or(&DEFAULT_SORT_ORDER_ID) } /// Set the default sort order. @@ -728,7 +772,7 @@ impl TableMetadataAggregate { self.metadata.last_updated_ms = snapshot.timestamp().timestamp_millis(); - if ref_name == Self::MAIN_BRANCH { + if ref_name == MAIN_BRANCH { self.metadata.current_snapshot_id = Some(snapshot.snapshot_id()); self.metadata.last_updated_ms = if self.metadata.last_updated_ms == 0 { chrono::Utc::now().timestamp_millis() @@ -756,7 +800,7 @@ impl TableMetadataAggregate { /// # Errors /// None yet. pub fn remove_snapshot_by_ref(&mut self, snapshot_ref: &str) -> Result<&mut Self> { - if snapshot_ref == Self::MAIN_BRANCH { + if snapshot_ref == MAIN_BRANCH { self.metadata.current_snapshot_id = Some(i64::from(Self::LAST_ADDED_I32)); self.metadata.snapshot_log.clear(); } @@ -777,19 +821,7 @@ impl TableMetadataAggregate { /// - Default sort order is set to -1 but no sort order has been added. /// - Default partition spec is set to -1 but no partition spec has been added. /// - Ref is set to an unknown snapshot. - pub fn build(self) -> Result { - if self.changes.is_empty() { - return Ok(self.metadata); - } - - if self.metadata.last_column_id < 0 { - return Err(ErrorModel::builder() - .message("Cannot create a table without last_column_id") - .code(StatusCode::CONFLICT.into()) - .r#type("LastColumnIdMissing") - .build()); - } - + pub fn build(mut self) -> Result { if self.metadata.current_schema_id < 0 { return Err(ErrorModel::builder() .message("Cannot create a table without current_schema_id") @@ -798,20 +830,36 @@ impl TableMetadataAggregate { .build()); } - if self.metadata.default_spec_id < 0 { + if self.metadata.last_column_id < 0 { return Err(ErrorModel::builder() - .message("Cannot create a table without default_spec_id") + .message("Cannot create a table without last_column_id") .code(StatusCode::CONFLICT.into()) - .r#type("DefaultSpecIdMissing") + .r#type("LastColumnIdMissing") .build()); } - if self.metadata.default_sort_order_id < 0 { - return Err(ErrorModel::builder() - .message("Cannot create a table without default_sort_order_id") - .code(StatusCode::CONFLICT.into()) - .r#type("DefaultSortOrderIdMissing") - .build()); + // It hasn't been changed at all + if self.metadata.default_spec_id == DEFAULT_SPEC_ID - 1 { + self.metadata.default_spec_id = DEFAULT_SPEC_ID; + self.metadata + .partition_specs + .entry(DEFAULT_SPEC_ID) + .or_insert_with(|| { + PartitionSpec { + spec_id: DEFAULT_SPEC_ID, + fields: vec![], + } + .into() + }); + } + + if self.metadata.default_sort_order_id == (DEFAULT_SORT_ORDER_ID - 1) { + let unsorted = SortOrder::unsorted_order(); + self.metadata.default_sort_order_id = DEFAULT_SORT_ORDER_ID; + self.metadata + .sort_orders + .entry(DEFAULT_SORT_ORDER_ID) + .or_insert_with(|| unsorted.into()); } Ok(self.metadata) @@ -835,7 +883,7 @@ impl TableMetadataAggregate { #[allow(clippy::cast_sign_loss)] fn has_sequential_ids(fields: &[PartitionField]) -> bool { for (index, field) in fields.iter().enumerate() { - if (field.field_id as usize).ne(&(Self::PARTITION_DATA_ID_START + index)) { + if (field.field_id as usize).ne(&(Self::PARTITION_DATA_ID_START as usize + index)) { return false; } } @@ -860,23 +908,61 @@ impl TableMetadataAggregate { #[cfg(test)] mod test { - use super::*; - use iceberg::spec::Type::Primitive; - use iceberg::spec::{NestedField, PrimitiveType}; + use std::sync::Arc; - fn get_mock_schema(from_id: i32) -> (Schema, i32) { - let schema_fields = vec![ - NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), - NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), - NestedField::required(3, "category", Primitive(PrimitiveType::String)).into(), - ]; - - let schema = Schema::builder() - .with_schema_id(from_id) - .with_fields(schema_fields) + use super::*; + use iceberg::spec::Type::{self, Primitive}; + use iceberg::spec::{ + NestedField, NullOrder, PrimitiveType, SortDirection, SortField, Transform, + UnboundPartitionField, + }; + + lazy_static::lazy_static! { + static ref SCHEMA: Schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![Arc::new(NestedField::required( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + ))]) .build() - .expect("Cannot create schema mock!"); - (schema, from_id) + .unwrap(); + + static ref PARTITION_SPEC: PartitionSpec = PartitionSpec::builder() + .with_spec_id(1) + .with_partition_field(PartitionField { + name: "id".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: 1000, + }) + .build() + .unwrap(); + + static ref TABLE_METADATA: TableMetadata = TableMetadata { + format_version: FormatVersion::V2, + table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), + location: "s3://b/wh/data.db/table".to_string(), + last_updated_ms: chrono::Utc::now().timestamp_millis(), + last_column_id: 1, + schemas: HashMap::from_iter(vec![(1, Arc::new(SCHEMA.clone()))]), + current_schema_id: 1, + partition_specs: HashMap::from_iter(vec![(1, PARTITION_SPEC.clone().into())]), + default_spec_id: 0, + last_partition_id: 1000, + default_sort_order_id: 0, + sort_orders: HashMap::from_iter(vec![]), + snapshots: HashMap::default(), + current_snapshot_id: None, + last_sequence_number: 1, + properties: HashMap::from_iter(vec![( + "commit.retry.num-retries".to_string(), + "1".to_string(), + )]), + snapshot_log: Vec::new(), + metadata_log: vec![], + refs: HashMap::new(), + }; } fn get_allowed_props() -> HashMap { @@ -885,108 +971,128 @@ mod test { .collect() } - fn get_unbounded_spec() -> (UnboundPartitionSpec, i32) { - let id = 1; - ( - UnboundPartitionSpec::builder() - .with_spec_id(id) - .with_fields(vec![]) - .build() - .expect("Cannot build partition spec."), - id, - ) - } - - fn get_sort_order(schema: Schema) -> (SortOrder, i64) { - let sort_id = 0; - ( - SortOrder::builder() - .with_order_id(sort_id) - .with_fields(vec![]) - .build(schema) - .expect("Cannot build sort order."), - sort_id, - ) - } - - fn get_empty_aggregate() -> TableMetadataAggregate { - TableMetadataAggregate::new(ToOwned::to_owned("location")) + #[test] + fn default_order_id_is_unsorted() { + let unsorted = SortOrder::unsorted_order(); + assert_eq!(unsorted.order_id, DEFAULT_SORT_ORDER_ID); } - fn get_full_aggregate() -> TableMetadataAggregate { - let (schema, schema_id) = get_mock_schema(1); - let (spec, spec_id) = get_unbounded_spec(); - let (sort, sort_id) = get_sort_order(schema.clone()); - - let mut aggregate = TableMetadataAggregate::new(ToOwned::to_owned("location")); + #[test] + fn get_full_aggregate() { + let new_schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![Arc::new(NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ))]) + .build() + .unwrap(); + + let new_spec = UnboundPartitionSpec::builder() + .with_spec_id(2) + .with_fields(vec![UnboundPartitionField { + name: "name".to_string(), + transform: Transform::Identity, + source_id: 2, + partition_id: None, + }]) + .build() + .unwrap(); + + let new_sort_order = SortOrder::builder() + .with_order_id(1) + .with_fields(vec![SortField { + source_id: 2, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .build_unbound() + .unwrap(); + + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + + let new_uuid = Uuid::now_v7(); aggregate - .assign_uuid(Uuid::now_v7()) + .assign_uuid(new_uuid) .expect("Cannot assign uuid.") - .add_schema(schema, None) + .add_schema(new_schema, None) .expect("Cannot set schema.") - .set_current_schema(schema_id) + .set_current_schema(-1) .expect("Cannot set current schema.") .upgrade_format_version(FormatVersion::V2) .expect("Cannot set format version.") - .add_partition_spec(spec) + .add_partition_spec(new_spec) .expect("Cannot set partition spec.") - .set_default_partition_spec(spec_id) + .set_default_partition_spec(-1) .expect("Cannot set default partition spec.") - .add_sort_order(sort) + .add_sort_order(new_sort_order) .expect("Cannot set sort order.") - .set_default_sort_order(sort_id) + .set_default_sort_order(-1) .expect("Cannot set default sort order.") .set_location("location".to_owned()) .expect("Cannot set location") .set_properties(get_allowed_props()) .expect("Cannot set properties"); - aggregate + let metadata = aggregate.build().expect("Cannot build metadata."); + assert_eq!(metadata.format_version, FormatVersion::V2); + assert_eq!(metadata.table_uuid, new_uuid); + assert_eq!(metadata.location, "location"); + assert_eq!(metadata.schemas.len(), 2); + assert_eq!(metadata.partition_specs.len(), 2); + assert_eq!(metadata.sort_orders.len(), 1); } #[test] fn downgrade_version() { - let mut aggregate = get_full_aggregate(); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); assert!(aggregate.upgrade_format_version(FormatVersion::V1).is_err()); } #[test] fn same_version() { - let mut aggregate = get_empty_aggregate(); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); assert!(aggregate.upgrade_format_version(FormatVersion::V2).is_ok()); } #[test] fn add_schema() { - let (schema, _) = get_mock_schema(1); - let mut aggregate = get_empty_aggregate(); - assert!(aggregate.add_schema(schema, None).is_ok()); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + let new_schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![Arc::new(NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ))]) + .build() + .unwrap(); + assert!(aggregate.add_schema(new_schema, None).is_ok()); + let metadata = aggregate.build().expect("Cannot build metadata."); + assert_eq!(metadata.schemas.len(), 2); + assert!(metadata.schemas.contains_key(&2)); + assert_eq!(metadata.current_schema_id, 1); } #[test] fn set_default_schema() { - let (schema, schema_id) = get_mock_schema(1); - let mut aggregate = get_empty_aggregate(); - assert!(aggregate - .set_current_schema(schema_id) - .inspect_err(|e| { - dbg!(e); - }) - .is_err()); - assert!(aggregate.add_schema(schema, None).is_ok()); - assert!(aggregate.set_current_schema(schema_id).is_ok()); - } - - #[test] - fn get_current_schema() { - let (schema, schema_id) = get_mock_schema(1); - let mut aggregate = get_empty_aggregate(); - assert!(aggregate.set_current_schema(schema_id).is_err()); - assert!(aggregate.add_schema(schema, None).is_ok()); - assert!(aggregate.set_current_schema(schema_id).is_ok()); - assert!(aggregate - .get_current_schema() - .is_ok_and(|schema| schema.schema_id().eq(&schema_id))); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + let new_schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![Arc::new(NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ))]) + .build() + .unwrap(); + assert!(aggregate.add_schema(new_schema.clone(), None).is_ok()); + assert!(aggregate.set_current_schema(-1).is_ok()); + let metadata = aggregate.build().expect("Cannot build metadata."); + assert_eq!(metadata.current_schema_id, 2); + assert_eq!(metadata.current_schema(), &Arc::new(new_schema)); } #[test] @@ -998,103 +1104,199 @@ mod test { .collect(); let allowed_props = get_allowed_props(); - let mut aggregate = get_empty_aggregate(); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); assert!(aggregate.set_properties(forbidden_props).is_err()); assert!(aggregate.set_properties(allowed_props).is_ok()); } #[test] - fn can_create_new_schema_id_for_uniq_schema() { - let mut aggregate = get_empty_aggregate(); - - for schema_id in 1..=10 { - let (schema, schema_id) = get_mock_schema(schema_id); - aggregate.add_schema(schema, None).unwrap_or_else(|_| panic!("Cannot add {schema_id} new schema.")); - assert!(aggregate.metadata.schemas.contains_key(&schema_id)); - } - } - - #[test] - fn cannot_add_schema_with_same_id_but_with_different_fields() { - let mut aggregate = get_empty_aggregate(); + fn cannot_add_schema_with_column_id_too_low() { + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); - let schema_fields1 = vec![ + let schema_fields_1 = vec![ NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), NestedField::required(3, "category", Primitive(PrimitiveType::String)).into(), ]; - let schema_fields2 = vec![ + let schema_fields_2 = vec![ NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), ]; - let schema_id1 = 1; - let schema1 = Schema::builder() - .with_schema_id(schema_id1) - .with_fields(schema_fields1) + let schema_1 = Schema::builder() + .with_schema_id(2) + .with_fields(schema_fields_1) .build() .expect("Cannot create schema mock!"); - let schema2 = Schema::builder() - .with_schema_id(schema_id1) - .with_fields(schema_fields2) + let schema_2 = Schema::builder() + .with_schema_id(3) + .with_fields(schema_fields_2) .build() .expect("Cannot create schema mock!"); - aggregate.add_schema(schema1, None).expect("Cannot add new schema."); - assert!(dbg!(aggregate.add_schema(schema2, None)).is_err()); + aggregate + .add_schema(schema_1, None) + .expect("Cannot add new schema."); + assert!(dbg!(aggregate.add_schema(schema_2, None)).is_err()); } #[test] fn reuse_schema_id_if_schemas_is_eq() { - let mut aggregate = get_empty_aggregate(); - - let schema_id = 1; - let (schema, schema_id) = get_mock_schema(schema_id); - - aggregate.add_schema(schema, None).expect("Cannot add new schema."); - assert!(aggregate.metadata.schemas.contains_key(&schema_id)); - - let (schema, schema_id) = get_mock_schema(schema_id); - - aggregate.add_schema(schema, None).expect("Cannot add new schema."); - assert!(aggregate.metadata.schemas.contains_key(&schema_id)); + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + aggregate + .add_schema(SCHEMA.clone(), None) + .expect("Cannot add new schema."); assert!(aggregate.metadata.schemas.len().eq(&1)); } - // This test will fail because of changes in this commit: - // https://github.com/hansetag/iceberg-rest-server/pull/36/commits/6faeb8d360f5c9779a4cd0d4ac237da1c6ec97ca - // In `table_metadata.rs` file on line 248. - // Because of this changes it do not assign new schema id to schema after `reuse_or_create_new_schema_id` call. #[test] - #[ignore] - fn should_take_highest_schema_id_plus_one_is() { - let mut aggregate = get_empty_aggregate(); - let highest = 5; - - for schema_id in 1..=highest { - let (schema, schema_id) = get_mock_schema(schema_id); - aggregate.add_schema(schema, None).unwrap_or_else(|_| panic!("Cannot add {schema_id} new schema.")); - assert!(aggregate.metadata.schemas.contains_key(&schema_id)); - } + fn should_take_highest_schema_id_plus_one() { + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + + let fields_1 = vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), + NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), + NestedField::required(3, "email", Primitive(PrimitiveType::String)).into(), + ]; + let schema_1 = Schema::builder() + .with_schema_id(0) + .with_fields(fields_1) + .build() + .expect("Cannot create schema mock!"); - let fields = vec![ + let fields_2 = vec![ NestedField::required(1, "id", Primitive(PrimitiveType::Uuid)).into(), NestedField::required(2, "data", Primitive(PrimitiveType::Date)).into(), NestedField::required(3, "email", Primitive(PrimitiveType::String)).into(), - NestedField::required(4, "name", Primitive(PrimitiveType::String)).into() + NestedField::required(4, "name", Primitive(PrimitiveType::String)).into(), ]; - let schema_id = -1; - let schema = Schema::builder() - .with_schema_id(schema_id) - .with_fields(fields) + let schema_2 = Schema::builder() + .with_schema_id(1) + .with_fields(fields_2) .build() .expect("Cannot create schema mock!"); - aggregate.add_schema(schema, None).unwrap_or_else(|e| panic!("Cannot add new schema: {e:?}.")); - assert!(dbg!(aggregate.metadata.schemas).contains_key(&(highest + 1))); + + aggregate + .add_schema(schema_1, None) + .unwrap() + .set_current_schema(-1) + .unwrap(); + let schema_id = aggregate.metadata.current_schema_id; + aggregate.add_schema(schema_2, None).unwrap(); + + assert!(aggregate.metadata.schemas.contains_key(&(schema_id + 1))); + assert!(aggregate.metadata.last_column_id.eq(&4)); + } + + #[test] + fn partition_spec_with_transform() { + let mut aggregate = TableMetadataAggregate::new_from_metadata(TABLE_METADATA.clone()); + let new_schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![Arc::new(NestedField::required( + 2, + "ints", + Type::Primitive(PrimitiveType::Int), + ))]) + .build() + .unwrap(); + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(2) + .with_fields(vec![UnboundPartitionField::builder() + .source_id(2) + .name("ints_bucket".to_string()) + .transform(iceberg::spec::Transform::Bucket(16)) + .build()]) + .build() + .unwrap(); + aggregate + .add_schema(new_schema.clone(), None) + .unwrap() + .set_current_schema(-1) + .unwrap() + .add_partition_spec(partition_spec) + .unwrap() + .set_default_partition_spec(-1) + .unwrap(); + let metadata = aggregate.build().unwrap(); + assert_eq!( + metadata.partition_specs[&2].fields[0].transform, + iceberg::spec::Transform::Bucket(16) + ); + assert_eq!(metadata.partition_specs[&2].fields[0].name, "ints_bucket"); + assert_eq!(metadata.partition_specs[&2].fields[0].source_id, 2); + assert_eq!(metadata.default_spec_id, 2); + } + + #[test] + fn default_sort_order_and_partitioning() { + let aggregate = TableMetadataAggregate::new("foo".to_string(), SCHEMA.clone()); + let metadata = aggregate.build().unwrap(); + assert_eq!(metadata.default_spec_id, 0); + assert!(metadata + .default_partition_spec() + .unwrap() + .fields + .len() + .eq(&0)); + assert_eq!(metadata.default_sort_order_id, 0); + assert!(metadata.default_sort_order().unwrap().fields.len().eq(&0),); } + #[test] + fn test_first_partition_gets_id_0() { + let mut aggregate = TableMetadataAggregate::new("foo".to_string(), SCHEMA.clone()); + let partition_spec = UnboundPartitionSpec { + spec_id: None, + fields: vec![], + }; + assert!(aggregate.add_partition_spec(partition_spec.clone()).is_ok()); + let metadata = aggregate.build().unwrap(); + + assert_eq!(metadata.partition_specs[&0].fields.len(), 0); + assert_eq!(metadata.default_spec_id, 0); + } + + #[test] + fn test_add_unsort_order() { + let mut aggregate = TableMetadataAggregate::new("foo".to_string(), SCHEMA.clone()); + let sort_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build_unbound() + .unwrap(); + assert!(aggregate.add_sort_order(sort_order).is_ok()); + let metadata = aggregate.build().unwrap(); + + assert_eq!(metadata.sort_orders[&0].fields.len(), 0); + assert_eq!(metadata.default_sort_order_id, 0); + } + + #[test] + fn add_sort_order() { + let mut aggregate = TableMetadataAggregate::new("foo".to_string(), SCHEMA.clone()); + let sort_order = SortOrder::builder() + .with_order_id(1) + .with_fields(vec![SortField { + source_id: 1, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .build_unbound() + .unwrap(); + aggregate + .add_sort_order(sort_order) + .unwrap() + .set_default_sort_order(-1) + .unwrap(); + let metadata = aggregate.build().unwrap(); + + assert_eq!(metadata.sort_orders[&1].fields.len(), 1); + assert_eq!(metadata.default_sort_order_id, 1); + } } diff --git a/crates/iceberg-rest-server/src/implementations/postgres/table.rs b/crates/iceberg-rest-server/src/implementations/postgres/table.rs index 5d5dfc22..4deb1725 100644 --- a/crates/iceberg-rest-server/src/implementations/postgres/table.rs +++ b/crates/iceberg-rest-server/src/implementations/postgres/table.rs @@ -13,7 +13,6 @@ use iceberg_ext::{ NamespaceIdent, TableRequirement, TableUpdate, }; -use iceberg::spec::{SortOrder, UnboundPartitionSpec}; use iceberg_rest_service::{ v1::TableIdent, CommitTableResponse, CommitTransactionRequest, CreateTableRequest, ErrorModel, Result, TableRequirementExt as _, TableUpdateExt, @@ -198,22 +197,14 @@ pub(crate) async fn create_table( .build() })?; - let mut builder = TableMetadataAggregate::new(location.clone()); - builder.add_schema(schema, None)?; - builder.set_current_schema(-1)?; + let mut builder = TableMetadataAggregate::new(location.clone(), schema); if let Some(partition_spec) = partition_spec { builder.add_partition_spec(partition_spec)?; builder.set_default_partition_spec(-1)?; - } else { - builder.add_partition_spec(UnboundPartitionSpec::default())?; - builder.set_default_partition_spec(-1)?; } if let Some(write_order) = write_order { builder.add_sort_order(write_order)?; builder.set_default_sort_order(-1)?; - } else { - builder.add_sort_order(SortOrder::default())?; - builder.set_default_sort_order(-1)?; } builder.set_properties(properties.unwrap_or_default())?; builder.assign_uuid(table_id.as_uuid().to_owned())?; diff --git a/tests/python/tests/test_spark.py b/tests/python/tests/test_spark.py index ea8586cb..7aecac77 100644 --- a/tests/python/tests/test_spark.py +++ b/tests/python/tests/test_spark.py @@ -4,16 +4,18 @@ def test_create_namespace(spark, warehouse: conftest.Warehouse): - spark.sql("CREATE NAMESPACE test_create_namespace") - assert ("test_create_namespace",) in warehouse.pyiceberg_catalog.list_namespaces() + spark.sql("CREATE NAMESPACE test_create_namespace_spark") + assert ( + "test_create_namespace_spark", + ) in warehouse.pyiceberg_catalog.list_namespaces() def test_list_namespaces(spark, warehouse: conftest.Warehouse): - spark.sql("CREATE NAMESPACE test_list_namespaces_1") - spark.sql("CREATE NAMESPACE test_list_namespaces_2") + spark.sql("CREATE NAMESPACE test_list_namespaces_spark_1") + spark.sql("CREATE NAMESPACE test_list_namespaces_spark_2") pdf = spark.sql("SHOW NAMESPACES").toPandas() - assert "test_list_namespaces_1" in pdf["namespace"].values - assert "test_list_namespaces_2" in pdf["namespace"].values + assert "test_list_namespaces_spark_1" in pdf["namespace"].values + assert "test_list_namespaces_spark_2" in pdf["namespace"].values def test_namespace_create_if_not_exists(spark, warehouse: conftest.Warehouse): @@ -173,15 +175,15 @@ def test_change_partitioning(spark, namespace): # ToDo: Fix {"error_id":"018fcac4-44a3-7333-b3c2-c0a4e0127339","message":"Field 'my_ints_bucket' not found in schema.","type":"FailedToBuildPartitionSpec","code":409,"stack":null} -# def test_partition_bucket(spark, namespace): -# spark.sql( -# f"CREATE TABLE {namespace.spark_name}.my_table (my_ints INT, my_floats DOUBLE, strings STRING) USING iceberg PARTITIONED BY (bucket(16, my_ints))" -# ) -# spark.sql( -# f"INSERT INTO {namespace.spark_name}.my_table VALUES (1, 1.2, 'foo'), (2, 2.2, 'bar')" -# ) -# pdf = spark.sql(f"SELECT * FROM {namespace.spark_name}.my_table").toPandas() -# assert len(pdf) == 2 +def test_partition_bucket(spark, namespace): + spark.sql( + f"CREATE TABLE {namespace.spark_name}.my_table (my_ints INT, my_floats DOUBLE, strings STRING) USING iceberg PARTITIONED BY (bucket(16, my_ints))" + ) + spark.sql( + f"INSERT INTO {namespace.spark_name}.my_table VALUES (1, 1.2, 'foo'), (2, 2.2, 'bar')" + ) + pdf = spark.sql(f"SELECT * FROM {namespace.spark_name}.my_table").toPandas() + assert len(pdf) == 2 def test_alter_schema(spark, namespace):