diff --git a/CHANGELOG.md b/CHANGELOG.md index 99d5a1c01bb..cc5e9e0debb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ as necessary. Empty sections will not end in the release notes. store type, but uses way less columns, which reduces storage overhead for example in PostgreSQL a lot. - Introduce new `MONGODB2` version store type, which is has the same functionality as the `MONGODB` version store type, but uses way less attributes, which reduces storage overhead. +- Introduce new `CASSANDRA2` version store type, which is has the same functionality as the `CASSANDRA` version + store type, but uses way less attributes, which reduces storage overhead. ### Changes @@ -48,6 +50,9 @@ as necessary. Empty sections will not end in the release notes. - The current version store type `MONGODB` is deprecated, please migrate to the new `MONGODB2` version store type. Please use the [Nessie Server Admin Tool](https://projectnessie.org/nessie-latest/export_import) to migrate from the `MONGODB` version store type to `MONGODB2`. +- The current version store type `CASSANDRA` is deprecated, please migrate to the new `CASSANDRA2` version store + type. Please use the [Nessie Server Admin Tool](https://projectnessie.org/nessie-latest/export_import) + to migrate from the `CASSANDRA` version store type to `CASSANDRA2`. ### Fixes diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index f9e3bb32e96..0507dd1f473 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -81,6 +81,8 @@ dependencies { api(project(":nessie-versioned-storage-cache")) api(project(":nessie-versioned-storage-cassandra")) api(project(":nessie-versioned-storage-cassandra-tests")) + api(project(":nessie-versioned-storage-cassandra2")) + api(project(":nessie-versioned-storage-cassandra2-tests")) api(project(":nessie-versioned-storage-common")) api(project(":nessie-versioned-storage-common-proto")) api(project(":nessie-versioned-storage-common-serialize")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 0cfb5cc0207..0979e1ee07e 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -68,6 +68,8 @@ nessie-versioned-storage-bigtable-tests=versioned/storage/bigtable-tests nessie-versioned-storage-cache=versioned/storage/cache nessie-versioned-storage-cassandra=versioned/storage/cassandra nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests +nessie-versioned-storage-cassandra2=versioned/storage/cassandra2 +nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests nessie-versioned-storage-common=versioned/storage/common nessie-versioned-storage-common-proto=versioned/storage/common-proto nessie-versioned-storage-common-serialize=versioned/storage/common-serialize diff --git a/helm/nessie/values.yaml b/helm/nessie/values.yaml index 600dba7f3a7..b28840f4b80 100644 --- a/helm/nessie/values.yaml +++ b/helm/nessie/values.yaml @@ -28,9 +28,10 @@ imagePullSecrets: [] # `quarkus.log.category."io.smallrye.config".level: DEBUG` logLevel: INFO -# -- Which type of version store to use: IN_MEMORY, ROCKSDB, DYNAMODB, MONGODB2, CASSANDRA, JDBC2, BIGTABLE. +# -- Which type of version store to use: IN_MEMORY, ROCKSDB, DYNAMODB, MONGODB2, CASSANDRA2, JDBC2, BIGTABLE. # Note: the version store type JDBC is deprecated, please use the Nessie Server Admin Tool to migrate to JDBC2. # Note: the version store type MONGODB is deprecated, please use the Nessie Server Admin Tool to migrate to MONGODB2. +# Note: the version store type CASSANDRA is deprecated, please use the Nessie Server Admin Tool to migrate to CASSANDRA2. versionStoreType: IN_MEMORY # Cassandra settings. Only required when using CASSANDRA version store type; ignored otherwise. diff --git a/servers/quarkus-common/build.gradle.kts b/servers/quarkus-common/build.gradle.kts index 7d7ed7291a7..5b06b275140 100644 --- a/servers/quarkus-common/build.gradle.kts +++ b/servers/quarkus-common/build.gradle.kts @@ -43,6 +43,7 @@ dependencies { implementation(project(":nessie-versioned-storage-bigtable")) implementation(project(":nessie-versioned-storage-cache")) implementation(project(":nessie-versioned-storage-cassandra")) + implementation(project(":nessie-versioned-storage-cassandra2")) implementation(project(":nessie-versioned-storage-common")) implementation(project(":nessie-versioned-storage-dynamodb")) implementation(project(":nessie-versioned-storage-inmemory")) diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusCassandraConfig.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusCassandraConfig.java index 71c7affe5f6..29168cbd72a 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusCassandraConfig.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusCassandraConfig.java @@ -19,7 +19,7 @@ import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; import java.time.Duration; -import org.projectnessie.versioned.storage.cassandra.CassandraConfig; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2Config; /** * When setting {@code nessie.version.store.type=CASSANDRA} which enables Apache Cassandra or @@ -28,7 +28,7 @@ */ @StaticInitSafe @ConfigMapping(prefix = "nessie.version.store.cassandra") -public interface QuarkusCassandraConfig extends CassandraConfig { +public interface QuarkusCassandraConfig extends Cassandra2Config { @Override @WithDefault(DEFAULT_DML_TIMEOUT) Duration dmlTimeout(); diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/Cassandra2BackendBuilder.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/Cassandra2BackendBuilder.java new file mode 100644 index 00000000000..3612fd4a210 --- /dev/null +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/Cassandra2BackendBuilder.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.quarkus.providers.storage; + +import static org.projectnessie.quarkus.config.VersionStoreConfig.VersionStoreType.CASSANDRA2; + +import com.datastax.oss.quarkus.runtime.api.session.QuarkusCqlSession; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.projectnessie.quarkus.config.QuarkusCassandraConfig; +import org.projectnessie.quarkus.providers.versionstore.StoreType; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2BackendConfig; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2BackendFactory; +import org.projectnessie.versioned.storage.common.persist.Backend; + +@StoreType(CASSANDRA2) +@Dependent +public class Cassandra2BackendBuilder implements BackendBuilder { + + @Inject CompletionStage client; + + @Inject + @ConfigProperty(name = "quarkus.cassandra.keyspace") + String keyspace; + + @Inject QuarkusCassandraConfig config; + + @Override + public Backend buildBackend() { + Cassandra2BackendFactory factory = new Cassandra2BackendFactory(); + try { + Cassandra2BackendConfig c = + Cassandra2BackendConfig.builder() + .client(client.toCompletableFuture().get()) + .keyspace(keyspace) + .ddlTimeout(config.ddlTimeout()) + .dmlTimeout(config.dmlTimeout()) + .build(); + return factory.buildBackend(c); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/servers/quarkus-config/src/main/java/org/projectnessie/quarkus/config/VersionStoreConfig.java b/servers/quarkus-config/src/main/java/org/projectnessie/quarkus/config/VersionStoreConfig.java index 860817d90d0..87ef89929db 100644 --- a/servers/quarkus-config/src/main/java/org/projectnessie/quarkus/config/VersionStoreConfig.java +++ b/servers/quarkus-config/src/main/java/org/projectnessie/quarkus/config/VersionStoreConfig.java @@ -33,7 +33,10 @@ enum VersionStoreType { MONGODB, /** MongoDB variant using few attributes, saves storage overhead. */ MONGODB2, + /** Cassandra variant using many distinct columns. */ CASSANDRA, + /** Cassandra variant using few columns, saves storage overhead. */ + CASSANDRA2, /** JDBC variant using many distinct columns. */ JDBC, /** JDBC variant using few columns, saves storage overhead for example in PostgreSQL. */ diff --git a/servers/quarkus-server/src/main/resources/application.properties b/servers/quarkus-server/src/main/resources/application.properties index 8bdabf73c2c..ead933dcecc 100644 --- a/servers/quarkus-server/src/main/resources/application.properties +++ b/servers/quarkus-server/src/main/resources/application.properties @@ -146,9 +146,10 @@ nessie.server.send-stacktrace-to-client=false # nessie.server.authorization.rules.allow_listing_reflog=\ # op=='VIEW_REFLOG' && role=='admin_user' -### which type of version store to use: IN_MEMORY, ROCKSDB, DYNAMODB, MONGODB2, CASSANDRA, JDBC2, BIGTABLE. +### which type of version store to use: IN_MEMORY, ROCKSDB, DYNAMODB, MONGODB2, CASSANDRA2, JDBC2, BIGTABLE. # Note: the version store type JDBC is deprecated, please use the Nessie Server Admin Tool to migrate to JDBC2. # Note: the version store type MONGODB is deprecated, please use the Nessie Server Admin Tool to migrate to MONGODB2. +# Note: the version store type CASSANDRA is deprecated, please use the Nessie Server Admin Tool to migrate to CASSANDRA2. nessie.version.store.type=IN_MEMORY # Object cache size as a value relative to the JVM's max heap size. The `cache-capacity-fraction-adjust-mb` diff --git a/servers/quarkus-tests/build.gradle.kts b/servers/quarkus-tests/build.gradle.kts index 6a2188d6019..bb1eae38820 100644 --- a/servers/quarkus-tests/build.gradle.kts +++ b/servers/quarkus-tests/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { implementation(project(":nessie-versioned-tests")) implementation(project(":nessie-versioned-storage-bigtable-tests")) implementation(project(":nessie-versioned-storage-cassandra-tests")) + implementation(project(":nessie-versioned-storage-cassandra2-tests")) implementation(project(":nessie-versioned-storage-dynamodb-tests")) implementation(project(":nessie-versioned-storage-jdbc-tests")) implementation(project(":nessie-versioned-storage-jdbc2-tests")) diff --git a/servers/quarkus-tests/src/main/java/org/projectnessie/quarkus/tests/profiles/CassandraTestResourceLifecycleManager.java b/servers/quarkus-tests/src/main/java/org/projectnessie/quarkus/tests/profiles/CassandraTestResourceLifecycleManager.java index e3a8904f064..770a9b6029c 100644 --- a/servers/quarkus-tests/src/main/java/org/projectnessie/quarkus/tests/profiles/CassandraTestResourceLifecycleManager.java +++ b/servers/quarkus-tests/src/main/java/org/projectnessie/quarkus/tests/profiles/CassandraTestResourceLifecycleManager.java @@ -19,7 +19,7 @@ import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import java.util.Map; import java.util.Optional; -import org.projectnessie.versioned.storage.cassandratests.CassandraBackendTestFactory; +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; public class CassandraTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { diff --git a/servers/services-bench/build.gradle.kts b/servers/services-bench/build.gradle.kts index 1189569f4f6..063532514ca 100644 --- a/servers/services-bench/build.gradle.kts +++ b/servers/services-bench/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { jmhRuntimeOnly(project(":nessie-versioned-storage-inmemory")) jmhRuntimeOnly(project(":nessie-versioned-storage-bigtable")) jmhRuntimeOnly(project(":nessie-versioned-storage-cassandra")) + jmhRuntimeOnly(project(":nessie-versioned-storage-cassandra2")) jmhRuntimeOnly(project(":nessie-versioned-storage-rocksdb")) jmhRuntimeOnly(project(":nessie-versioned-storage-mongodb")) jmhRuntimeOnly(project(":nessie-versioned-storage-mongodb2")) diff --git a/servers/services/build.gradle.kts b/servers/services/build.gradle.kts index 525b73ab17a..8d3b12725ff 100644 --- a/servers/services/build.gradle.kts +++ b/servers/services/build.gradle.kts @@ -69,6 +69,7 @@ dependencies { testFixturesApi(project(":nessie-services-config")) testFixturesImplementation(libs.logback.classic) intTestImplementation(project(":nessie-versioned-storage-cassandra-tests")) + intTestImplementation(project(":nessie-versioned-storage-cassandra2-tests")) intTestImplementation(project(":nessie-versioned-storage-rocksdb-tests")) intTestImplementation(project(":nessie-versioned-storage-mongodb-tests")) intTestImplementation(project(":nessie-versioned-storage-mongodb2-tests")) diff --git a/servers/services/src/intTest/java/org/projectnessie/services/impl/ITApiImplsPersistCassandra.java b/servers/services/src/intTest/java/org/projectnessie/services/impl/ITApiImplsPersistCassandra.java index c6a031d6451..b636093cf46 100644 --- a/servers/services/src/intTest/java/org/projectnessie/services/impl/ITApiImplsPersistCassandra.java +++ b/servers/services/src/intTest/java/org/projectnessie/services/impl/ITApiImplsPersistCassandra.java @@ -18,7 +18,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.extension.ExtendWith; -import org.projectnessie.versioned.storage.cassandratests.CassandraBackendTestFactory; +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; import org.projectnessie.versioned.storage.testextension.NessieBackend; import org.projectnessie.versioned.storage.testextension.PersistExtension; diff --git a/site/in-dev/configuration.md b/site/in-dev/configuration.md index ca63f6569a6..da008d70f46 100644 --- a/site/in-dev/configuration.md +++ b/site/in-dev/configuration.md @@ -209,6 +209,11 @@ Related Quarkus settings: The `MONGODB` version store type is _deprecated for removal_, please use the [Nessie Server Admin Tool](export_import.md) to migrate from the `MONGODB` version store type to `MONGODB2`. +!!! warn + Prefer the `CASSANDRA2` version store type over the `CASSANDRA` version store type, because it has way less storage overhead. + The `CASSANDRA` version store type is _deprecated for removal_, please use the + [Nessie Server Admin Tool](export_import.md) to migrate from the `CASSANDRA` version store type to `CASSANDRA2`. + !!! warn Prefer the `JDBC2` version store type over the `JDBC` version store type, because it has way less storage overhead. The `JDBC` version store type is _deprecated for removal_, please use the diff --git a/tools/doc-generator/doclet/build.gradle.kts b/tools/doc-generator/doclet/build.gradle.kts index c43423ad213..6e90365b637 100644 --- a/tools/doc-generator/doclet/build.gradle.kts +++ b/tools/doc-generator/doclet/build.gradle.kts @@ -30,6 +30,7 @@ val genProjectPaths = ":nessie-quarkus-auth", ":nessie-versioned-storage-bigtable", ":nessie-versioned-storage-cassandra", + ":nessie-versioned-storage-cassandra2", ":nessie-versioned-storage-common", ":nessie-versioned-storage-dynamodb", ":nessie-versioned-storage-inmemory", diff --git a/tools/doc-generator/site-gen/build.gradle.kts b/tools/doc-generator/site-gen/build.gradle.kts index ef8ba079c31..f0d58dc26a6 100644 --- a/tools/doc-generator/site-gen/build.gradle.kts +++ b/tools/doc-generator/site-gen/build.gradle.kts @@ -38,6 +38,7 @@ val genProjectPaths = listOf( ":nessie-services-config", ":nessie-versioned-storage-bigtable", ":nessie-versioned-storage-cassandra", + ":nessie-versioned-storage-cassandra2", ":nessie-versioned-storage-common", ":nessie-versioned-storage-dynamodb", ":nessie-versioned-storage-inmemory", diff --git a/tools/server-admin/build.gradle.kts b/tools/server-admin/build.gradle.kts index 7a9d13149d2..54b06956bab 100644 --- a/tools/server-admin/build.gradle.kts +++ b/tools/server-admin/build.gradle.kts @@ -56,6 +56,7 @@ dependencies { implementation(project(":nessie-versioned-storage-bigtable")) implementation(project(":nessie-versioned-storage-cache")) implementation(project(":nessie-versioned-storage-cassandra")) + implementation(project(":nessie-versioned-storage-cassandra2")) implementation(project(":nessie-versioned-storage-common")) implementation(project(":nessie-versioned-storage-dynamodb")) implementation(project(":nessie-versioned-storage-inmemory")) @@ -97,6 +98,7 @@ dependencies { intTestImplementation(project(":nessie-versioned-storage-jdbc-tests")) intTestImplementation(project(":nessie-versioned-storage-jdbc2-tests")) intTestImplementation(project(":nessie-versioned-storage-cassandra-tests")) + intTestImplementation(project(":nessie-versioned-storage-cassandra2-tests")) intTestImplementation(project(":nessie-versioned-storage-bigtable-tests")) intTestImplementation(project(":nessie-versioned-storage-dynamodb-tests")) intTestImplementation(project(":nessie-multi-env-test-engine")) diff --git a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITExportImport.java b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITExportImport.java index ca1a07c1c00..401d5265f91 100644 --- a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITExportImport.java +++ b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITExportImport.java @@ -77,7 +77,7 @@ public class ITExportImport { private static final String VERSION_STORES = - "(ROCKSDB|DYNAMODB|MONGODB2|CASSANDRA|JDBC2|BIGTABLE)"; + "(ROCKSDB|DYNAMODB|MONGODB2|CASSANDRA2|JDBC2|BIGTABLE)"; @InjectSoftAssertions private SoftAssertions soft; diff --git a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/NessieServerAdminTestBackends.java b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/NessieServerAdminTestBackends.java index c5b66cba4d0..de1bf5bf3e9 100644 --- a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/NessieServerAdminTestBackends.java +++ b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/NessieServerAdminTestBackends.java @@ -18,7 +18,7 @@ import java.util.Map; import org.projectnessie.quarkus.config.VersionStoreConfig.VersionStoreType; import org.projectnessie.versioned.storage.bigtabletests.BigTableBackendContainerTestFactory; -import org.projectnessie.versioned.storage.cassandratests.CassandraBackendTestFactory; +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; import org.projectnessie.versioned.storage.dynamodbtests.DynamoDBBackendTestFactory; import org.projectnessie.versioned.storage.jdbc2tests.MariaDBBackendTestFactory; import org.projectnessie.versioned.storage.jdbc2tests.MySQLBackendTestFactory; @@ -71,7 +71,7 @@ BackendTestFactory backendFactory() { @Override Map quarkusConfig() { - return Map.of("nessie.version.store.type", VersionStoreType.CASSANDRA.name()); + return Map.of("nessie.version.store.type", VersionStoreType.CASSANDRA2.name()); } }, diff --git a/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBPersist.java b/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBPersist.java index 114d1196568..6ca6ca1dc3e 100644 --- a/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBPersist.java +++ b/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBPersist.java @@ -15,6 +15,7 @@ */ package org.projectnessie.versioned.storage.cassandra; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.projectnessie.versioned.storage.cassandratests.ScyllaDBBackendTestFactory; @@ -26,4 +27,5 @@ disabledReason = "ScyllaDB fails to start, see https://github.com/scylladb/scylladb/issues/10135") @NessieBackend(ScyllaDBBackendTestFactory.class) +@Disabled("Disabled in favor of CASSANDRA2") public class ITScyllaDBPersist extends AbstractPersistTests {} diff --git a/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBVersionStore.java b/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBVersionStore.java index 2eb1a2d386d..a65db730541 100644 --- a/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBVersionStore.java +++ b/versioned/storage/cassandra/src/intTest/java/org/projectnessie/versioned/storage/cassandra/ITScyllaDBVersionStore.java @@ -15,6 +15,7 @@ */ package org.projectnessie.versioned.storage.cassandra; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.projectnessie.versioned.storage.cassandratests.ScyllaDBBackendTestFactory; @@ -26,4 +27,5 @@ disabledReason = "ScyllaDB fails to start, see https://github.com/scylladb/scylladb/issues/10135") @NessieBackend(ScyllaDBBackendTestFactory.class) +@Disabled("Disabled in favor of CASSANDRA2") public class ITScyllaDBVersionStore extends AbstractVersionStoreTests {} diff --git a/versioned/storage/cassandra2-tests/build.gradle.kts b/versioned/storage/cassandra2-tests/build.gradle.kts new file mode 100644 index 00000000000..431d6e0d490 --- /dev/null +++ b/versioned/storage/cassandra2-tests/build.gradle.kts @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("nessie-conventions-server") + id("nessie-jacoco") +} + +publishingHelper { mavenName = "Nessie - Storage - Cassandra & ScyllaDB - Tests" } + +description = "Base test code for creating test backends using Cassandra & ScyllaDB." + +dependencies { + implementation(project(":nessie-versioned-storage-cassandra2")) + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-storage-testextension")) + implementation(project(":nessie-container-spec-helper")) + + compileOnly(libs.jakarta.annotation.api) + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + implementation(platform(libs.cassandra.driver.bom)) + implementation("com.datastax.oss:java-driver-core") + + implementation(platform(libs.testcontainers.bom)) + implementation("org.testcontainers:cassandra") { + exclude("com.datastax.cassandra", "cassandra-driver-core") + } +} diff --git a/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/AbstractCassandraBackendTestFactory.java b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/AbstractCassandraBackendTestFactory.java new file mode 100644 index 00000000000..d3450ee4fdb --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/AbstractCassandraBackendTestFactory.java @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2tests; + +import static java.lang.String.format; +import static org.testcontainers.containers.CassandraContainer.CQL_PORT; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.projectnessie.nessie.testing.containerspec.ContainerSpecHelper; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2Backend; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2BackendConfig; +import org.projectnessie.versioned.storage.testextension.BackendTestFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +public abstract class AbstractCassandraBackendTestFactory implements BackendTestFactory { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AbstractCassandraBackendTestFactory.class); + public static final String KEYSPACE_FOR_TEST = "nessie"; + + private final String dbName; + private final List args; + + private CassandraContainer container; + private InetSocketAddress hostAndPort; + private String localDc; + + AbstractCassandraBackendTestFactory(String dbName, List args) { + this.dbName = dbName; + this.args = args; + } + + @Override + public Cassandra2Backend createNewBackend() { + CqlSession client = buildNewClient(); + maybeCreateKeyspace(client); + return new Cassandra2Backend( + Cassandra2BackendConfig.builder().client(client).keyspace(KEYSPACE_FOR_TEST).build(), true); + } + + public void maybeCreateKeyspace() { + try (CqlSession client = buildNewClient()) { + maybeCreateKeyspace(client); + } + } + + private void maybeCreateKeyspace(CqlSession session) { + int replicationFactor = 1; + + Metadata metadata = session.getMetadata(); + + String datacenters = + metadata.getNodes().values().stream() + .map(Node::getDatacenter) + .distinct() + .map(dc -> format("'%s': %d", dc, replicationFactor)) + .collect(Collectors.joining(", ")); + + session.execute( + format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', %s};", + KEYSPACE_FOR_TEST, datacenters)); + session.refreshSchema(); + } + + public CqlSession buildNewClient() { + return CassandraClientProducer.builder() + .addContactPoints(hostAndPort) + .localDc(localDc) + .build() + .createClient(); + } + + @Override + @SuppressWarnings("resource") + public void start(Optional containerNetworkId) { + if (container != null) { + throw new IllegalStateException("Already started"); + } + + DockerImageName dockerImageName = + ContainerSpecHelper.builder() + .name(dbName) + .containerClass(AbstractCassandraBackendTestFactory.class) + .build() + .dockerImageName(null) + .asCompatibleSubstituteFor("cassandra"); + + for (int retry = 0; ; retry++) { + CassandraContainer c = + new CassandraContainer<>(dockerImageName) + .withLogConsumer(new Slf4jLogConsumer(LOGGER)) + .withCommand(args.toArray(new String[0])); + configureContainer(c); + containerNetworkId.ifPresent(c::withNetworkMode); + try { + c.start(); + container = c; + break; + } catch (ContainerLaunchException e) { + c.close(); + if (e.getCause() != null && retry < 3) { + LOGGER.warn("Launch of container {} failed, will retry...", c.getDockerImageName(), e); + continue; + } + LOGGER.error("Launch of container {} failed", c.getDockerImageName(), e); + throw new RuntimeException(e); + } + } + + int port = containerNetworkId.isPresent() ? CQL_PORT : container.getMappedPort(CQL_PORT); + String host = + containerNetworkId.isPresent() + ? container.getCurrentContainerInfo().getConfig().getHostName() + : container.getHost(); + + localDc = container.getLocalDatacenter(); + + hostAndPort = InetSocketAddress.createUnresolved(host, port); + } + + public String getKeyspace() { + return KEYSPACE_FOR_TEST; + } + + public InetSocketAddress getHostAndPort() { + return hostAndPort; + } + + public String getLocalDc() { + return localDc; + } + + protected abstract void configureContainer(CassandraContainer c); + + @Override + public void start() { + start(Optional.empty()); + } + + @Override + public void stop() { + try { + if (container != null) { + container.stop(); + } + } finally { + container = null; + } + } + + @Override + public Map getQuarkusConfig() { + return Map.of( + "quarkus.cassandra.contact-points", + String.format("%s:%d", getHostAndPort().getHostName(), getHostAndPort().getPort()), + "quarkus.cassandra.local-datacenter", + getLocalDc(), + "quarkus.cassandra.keyspace", + getKeyspace()); + } +} diff --git a/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraBackendTestFactory.java b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraBackendTestFactory.java new file mode 100644 index 00000000000..18cabfaea8b --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraBackendTestFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2tests; + +import static java.util.Collections.emptyList; + +import org.projectnessie.versioned.storage.cassandra2.Cassandra2BackendFactory; +import org.testcontainers.containers.CassandraContainer; + +public class CassandraBackendTestFactory extends AbstractCassandraBackendTestFactory { + + private static final String JVM_OPTS_TEST = + "-Dcassandra.skip_wait_for_gossip_to_settle=0 " + + "-Dcassandra.num_tokens=1 " + + "-Dcassandra.initial_token=0"; + + public CassandraBackendTestFactory() { + super("cassandra", emptyList()); + } + + @Override + public String getName() { + return Cassandra2BackendFactory.NAME; + } + + @Override + protected void configureContainer(CassandraContainer c) { + c.withEnv("JVM_OPTS", JVM_OPTS_TEST); + } +} diff --git a/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraClientProducer.java b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraClientProducer.java new file mode 100644 index 00000000000..88c7b18683c --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/CassandraClientProducer.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2tests; + +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.CONNECTION_CONNECT_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.CONNECTION_INIT_QUERY_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.CONNECTION_SET_KEYSPACE_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.CONTROL_CONNECTION_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.HEARTBEAT_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.REQUEST_LOG_WARNINGS; +import static com.datastax.oss.driver.api.core.config.TypedDriverOption.REQUEST_TIMEOUT; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.auth.AuthProvider; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.OptionsMap; +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import jakarta.annotation.Nullable; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.List; +import org.immutables.value.Value; + +@Value.Immutable +abstract class CassandraClientProducer { + + public static ImmutableCassandraClientProducer.Builder builder() { + return ImmutableCassandraClientProducer.builder(); + } + + abstract List contactPoints(); + + @Nullable + abstract String localDc(); + + @Nullable + abstract AuthProvider authProvider(); + + @Nullable + abstract LoadBalancingPolicy loadBalancingPolicy(); + + public CqlSession createClient() { + CqlSessionBuilder client = CqlSession.builder().addContactPoints(contactPoints()); + + String localDc = localDc(); + if (localDc != null) { + client.withLocalDatacenter(localDc); + } + + AuthProvider auth = authProvider(); + if (auth != null) { + client.withAuthProvider(auth); + } + + OptionsMap options = OptionsMap.driverDefaults(); + + // Increase some timeouts to avoid flakiness + Duration timeout = Duration.ofSeconds(15); + options.put(CONNECTION_CONNECT_TIMEOUT, timeout); + options.put(CONNECTION_INIT_QUERY_TIMEOUT, timeout); + options.put(CONNECTION_SET_KEYSPACE_TIMEOUT, timeout); + options.put(REQUEST_TIMEOUT, timeout); + options.put(HEARTBEAT_TIMEOUT, timeout); + options.put(METADATA_SCHEMA_REQUEST_TIMEOUT, timeout); + options.put(CONTROL_CONNECTION_TIMEOUT, timeout); + + // Disable warnings due to tombstone_warn_threshold + options.put(REQUEST_LOG_WARNINGS, false); + + return client.withConfigLoader(DriverConfigLoader.fromMap(options)).build(); + } +} diff --git a/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/ScyllaDBBackendTestFactory.java b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/ScyllaDBBackendTestFactory.java new file mode 100644 index 00000000000..18e0c5d1738 --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/java/org/projectnessie/versioned/storage/cassandra2tests/ScyllaDBBackendTestFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2tests; + +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.Runtime.getRuntime; +import static java.lang.String.format; +import static java.util.Arrays.asList; + +import org.testcontainers.containers.CassandraContainer; + +public class ScyllaDBBackendTestFactory extends AbstractCassandraBackendTestFactory { + + public ScyllaDBBackendTestFactory() { + super( + "scylladb", + asList( + "--smp", + Integer.toString(max(getRuntime().availableProcessors() / 3, 2)), + "--developer-mode", + "1", + "--skip-wait-for-gossip-to-settle", + "1", + "--memory", + format("%dG", min(4, max(getRuntime().availableProcessors(), 2))), + "--overprovisioned", + "1")); + } + + @Override + public String getName() { + return "Scylla"; + } + + @Override + protected void configureContainer(CassandraContainer c) {} +} diff --git a/versioned/storage/cassandra2-tests/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.testextension.BackendTestFactory b/versioned/storage/cassandra2-tests/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.testextension.BackendTestFactory new file mode 100644 index 00000000000..57aed05558c --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.testextension.BackendTestFactory @@ -0,0 +1,17 @@ +# +# Copyright (C) 2022 Dremio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory +org.projectnessie.versioned.storage.cassandra2tests.ScyllaDBBackendTestFactory diff --git a/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-cassandra-version b/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-cassandra-version new file mode 100644 index 00000000000..e6a55c3e597 --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-cassandra-version @@ -0,0 +1,3 @@ +# Dockerfile to provide the image name and tag to a test. +# Version is managed by Renovate - do not edit. +FROM docker.io/cassandra:5.0 diff --git a/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-scylladb-version b/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-scylladb-version new file mode 100644 index 00000000000..914c34a459b --- /dev/null +++ b/versioned/storage/cassandra2-tests/src/main/resources/org/projectnessie/versioned/storage/cassandra2tests/Dockerfile-scylladb-version @@ -0,0 +1,3 @@ +# Dockerfile to provide the image name and tag to a test. +# Version is managed by Renovate - do not edit. +FROM docker.io/scylladb/scylla:5.4.9 diff --git a/versioned/storage/cassandra2/build.gradle.kts b/versioned/storage/cassandra2/build.gradle.kts new file mode 100644 index 00000000000..13effe7d503 --- /dev/null +++ b/versioned/storage/cassandra2/build.gradle.kts @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tools.ant.taskdefs.condition.Os + +plugins { + id("nessie-conventions-server") + id("nessie-jacoco") +} + +publishingHelper { mavenName = "Nessie - Storage - Cassandra & ScyllaDB" } + +description = "Storage implementation for Cassandra & ScyllaDB." + +dependencies { + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-storage-common-proto")) + implementation(project(":nessie-versioned-storage-common-serialize")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + + compileOnly(libs.errorprone.annotations) + implementation(libs.agrona) + implementation(libs.guava) + implementation(libs.slf4j.api) + + implementation(platform(libs.cassandra.driver.bom)) + implementation("com.datastax.oss:java-driver-core") { + // spotbugs-annotations has only a GPL license! + exclude("com.github.spotbugs", "spotbugs-annotations") + } + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + intTestImplementation(project(":nessie-versioned-storage-cassandra2-tests")) + intTestImplementation(project(":nessie-versioned-storage-common-tests")) + intTestImplementation(project(":nessie-versioned-storage-testextension")) + intTestImplementation(project(":nessie-versioned-tests")) + intTestRuntimeOnly(platform(libs.testcontainers.bom)) + intTestRuntimeOnly("org.testcontainers:cassandra") + intTestImplementation(platform(libs.junit.bom)) + intTestImplementation(libs.bundles.junit.testing) + intTestRuntimeOnly(libs.logback.classic) +} + +// Testcontainers is not supported on Windows :( +if (Os.isFamily(Os.FAMILY_WINDOWS)) { + tasks.withType().configureEach { this.enabled = false } +} + +// Issue w/ testcontainers/podman in GH workflows :( +if (Os.isFamily(Os.FAMILY_MAC) && System.getenv("CI") != null) { + tasks.named("intTest").configure { this.enabled = false } +} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/AbstractTestCassandraBackendFactory.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/AbstractTestCassandraBackendFactory.java new file mode 100644 index 00000000000..0daa2576918 --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/AbstractTestCassandraBackendFactory.java @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import static java.lang.String.format; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_NAME; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REPO_ID; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.TABLE_OBJS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.TABLE_REFS; +import static org.projectnessie.versioned.storage.cassandra2tests.AbstractCassandraBackendTestFactory.KEYSPACE_FOR_TEST; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import java.time.Duration; +import java.util.stream.Collectors; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.versioned.storage.cassandra2tests.AbstractCassandraBackendTestFactory; +import org.projectnessie.versioned.storage.common.config.StoreConfig; +import org.projectnessie.versioned.storage.common.logic.RepositoryDescription; +import org.projectnessie.versioned.storage.common.logic.RepositoryLogic; +import org.projectnessie.versioned.storage.common.persist.Backend; +import org.projectnessie.versioned.storage.common.persist.BackendFactory; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.PersistFactory; +import org.projectnessie.versioned.storage.common.persist.PersistLoader; + +@ExtendWith(SoftAssertionsExtension.class) +public abstract class AbstractTestCassandraBackendFactory { + @InjectSoftAssertions protected SoftAssertions soft; + + @Test + public void productionLike() throws Exception { + AbstractCassandraBackendTestFactory testFactory = testFactory(); + testFactory.start(); + try { + BackendFactory factory = + PersistLoader.findFactoryByName(Cassandra2BackendFactory.NAME); + soft.assertThat(factory).isNotNull().isInstanceOf(Cassandra2BackendFactory.class); + + try (CqlSession client = testFactory.buildNewClient()) { + setupKeyspace(client); + + RepositoryDescription repoDesc; + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + backend.setupSchema(); + PersistFactory persistFactory = backend.createFactory(); + soft.assertThat(persistFactory).isNotNull().isInstanceOf(Cassandra2PersistFactory.class); + Persist persist = persistFactory.newPersist(StoreConfig.Adjustable.empty()); + soft.assertThat(persist).isNotNull().isInstanceOf(Cassandra2Persist.class); + + RepositoryLogic repositoryLogic = repositoryLogic(persist); + repositoryLogic.initialize("initializeAgain"); + repoDesc = repositoryLogic.fetchRepositoryDescription(); + soft.assertThat(repoDesc).isNotNull(); + } + + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + backend.setupSchema(); + PersistFactory persistFactory = backend.createFactory(); + soft.assertThat(persistFactory).isNotNull().isInstanceOf(Cassandra2PersistFactory.class); + Persist persist = persistFactory.newPersist(StoreConfig.Adjustable.empty()); + soft.assertThat(persist).isNotNull().isInstanceOf(Cassandra2Persist.class); + + RepositoryLogic repositoryLogic = repositoryLogic(persist); + repositoryLogic.initialize("initializeAgain"); + soft.assertThat(repositoryLogic.fetchRepositoryDescription()).isEqualTo(repoDesc); + } + } + } finally { + testFactory.stop(); + } + } + + @Test + public void backendTestFactory() throws Exception { + AbstractCassandraBackendTestFactory testFactory = testFactory(); + testFactory.start(); + try { + BackendFactory factory = + PersistLoader.findFactoryByName(Cassandra2BackendFactory.NAME); + soft.assertThat(factory).isNotNull().isInstanceOf(Cassandra2BackendFactory.class); + + RepositoryDescription repoDesc; + try (Backend backend = testFactory.createNewBackend()) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + backend.setupSchema(); + PersistFactory persistFactory = backend.createFactory(); + soft.assertThat(persistFactory).isNotNull().isInstanceOf(Cassandra2PersistFactory.class); + Persist persist = persistFactory.newPersist(StoreConfig.Adjustable.empty()); + soft.assertThat(persist).isNotNull().isInstanceOf(Cassandra2Persist.class); + + RepositoryLogic repositoryLogic = repositoryLogic(persist); + repositoryLogic.initialize("initializeAgain"); + repoDesc = repositoryLogic.fetchRepositoryDescription(); + soft.assertThat(repoDesc).isNotNull(); + } + + try (Backend backend = testFactory.createNewBackend()) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + backend.setupSchema(); + PersistFactory persistFactory = backend.createFactory(); + soft.assertThat(persistFactory).isNotNull().isInstanceOf(Cassandra2PersistFactory.class); + Persist persist = persistFactory.newPersist(StoreConfig.Adjustable.empty()); + soft.assertThat(persist).isNotNull().isInstanceOf(Cassandra2Persist.class); + + RepositoryLogic repositoryLogic = repositoryLogic(persist); + repositoryLogic.initialize("initializeAgain"); + soft.assertThat(repositoryLogic.fetchRepositoryDescription()).isEqualTo(repoDesc); + } + } finally { + testFactory.stop(); + } + } + + static void executeDDL(CqlSession client, String cql) { + client.execute(SimpleStatement.newInstance(cql).setTimeout(Duration.ofSeconds(30))); + } + + @Test + public void incompatibleTableSchema() throws Exception { + AbstractCassandraBackendTestFactory testFactory = testFactory(); + testFactory.start(); + try (CqlSession client = testFactory.buildNewClient()) { + setupKeyspace(client); + + BackendFactory factory = + PersistLoader.findFactoryByName(Cassandra2BackendFactory.NAME); + soft.assertThat(factory).isNotNull().isInstanceOf(Cassandra2BackendFactory.class); + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + backend.setupSchema(); + } + + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_REFS); + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_OBJS); + + executeDDL(client, "CREATE TABLE nessie." + TABLE_REFS + " (foobarbaz VARCHAR PRIMARY KEY)"); + + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + soft.assertThatIllegalStateException() + .isThrownBy(backend::setupSchema) + .withMessageStartingWith( + "Expected primary key columns {repo=TEXT, ref_name=TEXT} " + + "do not match existing primary key columns {foobarbaz=TEXT} for table '" + + TABLE_REFS + + "'. DDL template:\nCREATE TABLE nessie." + + TABLE_REFS); + } + + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_REFS); + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_OBJS); + + executeDDL( + client, + "CREATE TABLE nessie." + + TABLE_REFS + + " (" + + COL_REPO_ID + + " VARCHAR, " + + COL_REFS_NAME + + " VARCHAR, meep VARCHAR, boo BIGINT, PRIMARY KEY ((" + + COL_REPO_ID + + ", " + + COL_REFS_NAME + + ")))"); + + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + soft.assertThatIllegalStateException() + .isThrownBy(backend::setupSchema) + .withMessageStartingWith( + "The database table " + + TABLE_REFS + + " is missing mandatory columns created_at,deleted,ext_info,pointer,prev_ptr.\n" + + "Found columns : boo,meep,ref_name,repo\n" + + "Expected columns : ") + .withMessageContaining("DDL template:\nCREATE TABLE nessie." + TABLE_REFS); + } + + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_REFS); + executeDDL(client, "DROP TABLE IF EXISTS nessie." + TABLE_OBJS); + + executeDDL(client, "CREATE TABLE nessie." + TABLE_OBJS + " (foobarbaz VARCHAR PRIMARY KEY)"); + + try (Backend backend = factory.buildBackend(buildConfig(client))) { + soft.assertThat(backend).isNotNull().isInstanceOf(Cassandra2Backend.class); + soft.assertThatIllegalStateException() + .isThrownBy(backend::setupSchema) + .withMessageStartingWith( + "Expected primary key columns {repo=TEXT, obj_id=BLOB} " + + "do not match existing primary key columns {foobarbaz=TEXT} for table '" + + TABLE_OBJS + + "'. DDL template:\nCREATE TABLE nessie." + + TABLE_OBJS); + } + } finally { + testFactory.stop(); + } + } + + private static ImmutableCassandra2BackendConfig buildConfig(CqlSession client) { + return Cassandra2BackendConfig.builder().client(client).build(); + } + + private void setupKeyspace(CqlSession client) { + executeDDL(client, format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_FOR_TEST)); + executeDDL( + client, + format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', %s}", + KEYSPACE_FOR_TEST, + client.getMetadata().getNodes().values().stream() + .map(Node::getDatacenter) + .distinct() + .map(dc -> format("'%s': 1", dc)) + .collect(Collectors.joining(", ")))); + } + + protected abstract AbstractCassandraBackendTestFactory testFactory(); +} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraBackendFactory.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraBackendFactory.java new file mode 100644 index 00000000000..98ef2479226 --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraBackendFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import org.projectnessie.versioned.storage.cassandra2tests.AbstractCassandraBackendTestFactory; +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; + +public class ITCassandraBackendFactory extends AbstractTestCassandraBackendFactory { + @Override + protected AbstractCassandraBackendTestFactory testFactory() { + return new CassandraBackendTestFactory(); + } +} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraPersist.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraPersist.java new file mode 100644 index 00000000000..af7fca4a845 --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraPersist.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; +import org.projectnessie.versioned.storage.commontests.AbstractPersistTests; +import org.projectnessie.versioned.storage.testextension.NessieBackend; + +@NessieBackend(CassandraBackendTestFactory.class) +public class ITCassandraPersist extends AbstractPersistTests {} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraVersionStore.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraVersionStore.java new file mode 100644 index 00000000000..c687195f053 --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITCassandraVersionStore.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import org.projectnessie.versioned.storage.cassandra2tests.CassandraBackendTestFactory; +import org.projectnessie.versioned.storage.commontests.AbstractVersionStoreTests; +import org.projectnessie.versioned.storage.testextension.NessieBackend; + +@NessieBackend(CassandraBackendTestFactory.class) +public class ITCassandraVersionStore extends AbstractVersionStoreTests {} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBPersist.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBPersist.java new file mode 100644 index 00000000000..a91595959b6 --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBPersist.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.projectnessie.versioned.storage.cassandra2tests.ScyllaDBBackendTestFactory; +import org.projectnessie.versioned.storage.commontests.AbstractPersistTests; +import org.projectnessie.versioned.storage.testextension.NessieBackend; + +@DisabledOnOs( + value = OS.MAC, + disabledReason = + "ScyllaDB fails to start, see https://github.com/scylladb/scylladb/issues/10135") +@NessieBackend(ScyllaDBBackendTestFactory.class) +public class ITScyllaDBPersist extends AbstractPersistTests {} diff --git a/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBVersionStore.java b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBVersionStore.java new file mode 100644 index 00000000000..64c18690f0f --- /dev/null +++ b/versioned/storage/cassandra2/src/intTest/java/org/projectnessie/versioned/storage/cassandra2/ITScyllaDBVersionStore.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.projectnessie.versioned.storage.cassandra2tests.ScyllaDBBackendTestFactory; +import org.projectnessie.versioned.storage.commontests.AbstractVersionStoreTests; +import org.projectnessie.versioned.storage.testextension.NessieBackend; + +@DisabledOnOs( + value = OS.MAC, + disabledReason = + "ScyllaDB fails to start, see https://github.com/scylladb/scylladb/issues/10135") +@NessieBackend(ScyllaDBBackendTestFactory.class) +public class ITScyllaDBVersionStore extends AbstractVersionStoreTests {} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Backend.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Backend.java new file mode 100644 index 00000000000..74fe0fc915b --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Backend.java @@ -0,0 +1,529 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_QUORUM; +import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_SERIAL; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.lang.String.format; +import static java.util.Map.entry; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_ID; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_TYPE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_VALUE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_VERS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_CREATED_AT; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_DELETED; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_EXTENDED_INFO; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_NAME; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_POINTER; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REFS_PREVIOUS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REPO_ID; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.CREATE_TABLE_OBJS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.CREATE_TABLE_REFS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.ERASE_OBJ; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.ERASE_OBJS_SCAN; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.ERASE_REF; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.ERASE_REFS_SCAN; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.MAX_CONCURRENT_BATCH_READS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.MAX_CONCURRENT_DELETES; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.SELECT_BATCH_SIZE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.TABLE_OBJS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.TABLE_REFS; + +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.DriverTimeoutException; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.api.core.servererrors.QueryConsistencyException; +import jakarta.annotation.Nonnull; +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.agrona.collections.Hashing; +import org.agrona.collections.Object2IntHashMap; +import org.projectnessie.versioned.storage.common.exceptions.UnknownOperationResultException; +import org.projectnessie.versioned.storage.common.persist.Backend; +import org.projectnessie.versioned.storage.common.persist.PersistFactory; + +public final class Cassandra2Backend implements Backend { + + private final Cassandra2BackendConfig config; + private final boolean closeClient; + + private final Map statements = new ConcurrentHashMap<>(); + private final CqlSession session; + + public Cassandra2Backend(Cassandra2BackendConfig config, boolean closeClient) { + this.config = config; + this.session = requireNonNull(config.client()); + this.closeClient = closeClient; + } + + BatchedQuery newBatchedQuery( + Function, CompletionStage> queryBuilder, + Function rowToResult, + Function idExtractor, + int results, + Class elementType) { + return new BatchedQueryImpl<>(queryBuilder, rowToResult, idExtractor, results, elementType); + } + + interface BatchedQuery extends AutoCloseable { + void add(K key, int index); + + R[] finish(); + + @Override + void close(); + } + + private static final class BatchedQueryImpl implements BatchedQuery { + + private static final AtomicLong ID_GEN = new AtomicLong(); + private static final long BATCH_TIMEOUT_MILLIS = SECONDS.toMillis(30); + private final long id; + private final Function, CompletionStage> queryBuilder; + private final List keys = new ArrayList<>(); + private final Semaphore permits = new Semaphore(MAX_CONCURRENT_BATCH_READS); + private final Function rowToResult; + private final Function idExtractor; + private final Object2IntHashMap idToIndex; + private final AtomicReferenceArray result; + private final Class elementType; + private volatile Throwable failure; + private volatile int queryCount; + private volatile int queriesCompleted; + // A "hard", long timeout that's reset for every new submitted query. + private volatile long timeoutAt; + + BatchedQueryImpl( + Function, CompletionStage> queryBuilder, + Function rowToResult, + Function idExtractor, + int results, + Class elementType) { + this.idToIndex = new Object2IntHashMap<>(results * 2, Hashing.DEFAULT_LOAD_FACTOR, -1); + this.result = new AtomicReferenceArray<>(results); + this.elementType = elementType; + this.rowToResult = rowToResult; + this.idExtractor = idExtractor; + this.queryBuilder = queryBuilder; + this.id = ID_GEN.incrementAndGet(); + setNewTimeout(); + } + + private void setNewTimeout() { + this.timeoutAt = System.currentTimeMillis() + BATCH_TIMEOUT_MILLIS; + } + + @Override + public void add(K key, int index) { + idToIndex.put(key, index); + keys.add(key); + if (keys.size() == SELECT_BATCH_SIZE) { + flush(); + } + } + + private void noteException(Throwable ex) { + synchronized (this) { + Throwable curr = failure; + if (curr != null) { + curr.addSuppressed(ex); + } else { + failure = ex; + } + } + } + + private void flush() { + if (keys.isEmpty()) { + return; + } + + List batchKeys = new ArrayList<>(keys); + keys.clear(); + + synchronized (this) { + queryCount++; + } + + Consumer> terminate = + query -> { + // Remove the completed query from the queue, so another query can be submitted + permits.release(); + // Increment the number of completed queries and notify the "driver" + synchronized (this) { + queriesCompleted++; + this.notify(); + } + }; + + CompletionStage query = queryBuilder.apply(batchKeys); + + BiFunction pageHandler = + new BiFunction<>() { + @Override + public Object apply(AsyncResultSet rs, Throwable ex) { + if (ex != null) { + noteException(ex); + terminate.accept(query); + } else { + try { + for (Row row : rs.currentPage()) { + R resultItem = rowToResult.apply(row); + if (resultItem != null) { + K id = idExtractor.apply(resultItem); + int i = idToIndex.getValue(id); + if (i != -1) { + result.set(i, resultItem); + } + } + } + + if (rs.hasMorePages()) { + rs.fetchNextPage().handleAsync(this); + } else { + terminate.accept(query); + } + + } catch (Throwable t) { + noteException(t); + terminate.accept(query); + } + } + return null; + } + }; + + try { + permits.acquire(); + setNewTimeout(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + query.handleAsync(pageHandler); + } + + @Override + public void close() { + finish(); + } + + @Override + public R[] finish() { + flush(); + + while (true) { + synchronized (this) { + // If a failure happened, there's not much that can be done, just re-throw + Throwable f = failure; + if (f != null) { + if (f instanceof RuntimeException) { + throw (RuntimeException) f; + } + throw new RuntimeException(f); + } else if (queriesCompleted == queryCount) { + // No failure, all queries completed. + break; + } + + // Such a timeout should really never happen, it indicates a bug in the code above. + checkState( + System.currentTimeMillis() < timeoutAt, + "Batched Cassandra queries bcq%s timed out: completed: %s, queries: %s", + id, + queriesCompleted, + queryCount); + + try { + this.wait(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + return resultToArray(); + } + + private R[] resultToArray() { + int l = result.length(); + @SuppressWarnings("unchecked") + R[] r = (R[]) Array.newInstance(elementType, l); + for (int i = 0; i < l; i++) { + r[i] = result.get(i); + } + return r; + } + } + + @Nonnull + BoundStatement buildStatement(String cql, boolean idempotent, Object... values) { + PreparedStatement prepared = + statements.computeIfAbsent(cql, c -> session.prepare(format(c, config.keyspace()))); + return prepared + .boundStatementBuilder(values) + .setTimeout(config.dmlTimeout()) + .setConsistencyLevel(LOCAL_QUORUM) + .setSerialConsistencyLevel(LOCAL_SERIAL) + .setIdempotence(idempotent) + .build(); + } + + @Nonnull + BoundStatementBuilder newBoundStatementBuilder(String cql, boolean idempotent) { + PreparedStatement prepared = + statements.computeIfAbsent(cql, c -> session.prepare(format(c, config.keyspace()))); + return prepared.boundStatementBuilder().setIdempotence(idempotent); + } + + boolean executeCas(BoundStatement stmt) { + try { + ResultSet rs = session.execute(stmt); + return rs.wasApplied(); + } catch (DriverException e) { + throw unhandledException(e); + } + } + + ResultSet execute(BoundStatement stmt) { + try { + return session.execute(stmt); + } catch (DriverException e) { + throw unhandledException(e); + } + } + + CompletionStage executeAsync(BoundStatement stmt) { + return session.executeAsync(stmt); + } + + static RuntimeException unhandledException(DriverException e) { + if (isUnknownOperationResult(e)) { + return new UnknownOperationResultException(e); + } else if (e instanceof AllNodesFailedException) { + AllNodesFailedException all = (AllNodesFailedException) e; + if (all.getAllErrors().values().stream() + .flatMap(List::stream) + .filter(DriverException.class::isInstance) + .map(DriverException.class::cast) + .anyMatch(Cassandra2Backend::isUnknownOperationResult)) { + return new UnknownOperationResultException(e); + } + } + return e; + } + + private static boolean isUnknownOperationResult(DriverException e) { + return e instanceof QueryConsistencyException || e instanceof DriverTimeoutException; + } + + @Override + @Nonnull + public PersistFactory createFactory() { + return new Cassandra2PersistFactory(this); + } + + @Override + public void close() { + if (closeClient) { + session.close(); + } + } + + @Override + public Optional setupSchema() { + Metadata metadata = session.getMetadata(); + Optional keyspace = metadata.getKeyspace(config.keyspace()); + + checkState( + keyspace.isPresent(), + "Cassandra Keyspace '%s' must exist, but does not exist.", + config.keyspace()); + + createTableIfNotExists( + keyspace.get(), + TABLE_REFS, + CREATE_TABLE_REFS, + Stream.of( + COL_REPO_ID, + COL_REFS_NAME, + COL_REFS_POINTER, + COL_REFS_DELETED, + COL_REFS_CREATED_AT, + COL_REFS_EXTENDED_INFO, + COL_REFS_PREVIOUS) + .collect(toImmutableSet()), + List.of(COL_REPO_ID, COL_REFS_NAME)); + createTableIfNotExists( + keyspace.get(), + TABLE_OBJS, + CREATE_TABLE_OBJS, + Stream.of(COL_REPO_ID, COL_OBJ_ID, COL_OBJ_TYPE, COL_OBJ_VERS, COL_OBJ_VALUE) + .collect(toImmutableSet()), + List.of(COL_REPO_ID, COL_OBJ_ID)); + return Optional.of( + "keyspace: " + + config.keyspace() + + " DDL timeout: " + + config.ddlTimeout() + + " DML timeout: " + + config.dmlTimeout()); + } + + private void createTableIfNotExists( + KeyspaceMetadata meta, + String tableName, + String createTable, + Set expectedColumns, + List expectedPrimaryKey) { + + Optional table = meta.getTable(tableName); + + createTable = format(createTable, meta.getName()); + + if (table.isPresent()) { + + checkState( + checkPrimaryKey(table.get(), expectedPrimaryKey), + "Expected primary key columns %s do not match existing primary key columns %s for table '%s'. DDL template:\n%s", + expectedPrimaryKey.stream() + .map(col -> entry(col.name(), col.type().dataType())) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)), + table.get().getPartitionKey().stream() + .map(col -> entry(col.getName(), col.getType())) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)), + tableName, + createTable); + + List missingColumns = checkColumns(table.get(), expectedColumns); + if (!missingColumns.isEmpty()) { + throw new IllegalStateException( + format( + "The database table %s is missing mandatory columns %s.%nFound columns : %s%nExpected columns : %s%nDDL template:\n%s", + tableName, + sortedColumnNames(missingColumns), + sortedColumnNames(table.get().getColumns().keySet()), + sortedColumnNames(expectedColumns), + createTable)); + } + + // Existing table looks compatible + return; + } + + SimpleStatement stmt = + SimpleStatement.builder(createTable).setTimeout(config.ddlTimeout()).build(); + session.execute(stmt); + } + + private static String sortedColumnNames(Collection input) { + return input.stream().map(Object::toString).sorted().collect(Collectors.joining(",")); + } + + private boolean checkPrimaryKey(TableMetadata table, List expectedPrimaryKey) { + List partitionKey = table.getPartitionKey(); + if (partitionKey.size() == expectedPrimaryKey.size()) { + for (int i = 0; i < partitionKey.size(); i++) { + ColumnMetadata column = partitionKey.get(i); + CqlColumn expectedColumn = expectedPrimaryKey.get(i); + if (!column.getName().asInternal().equals(expectedColumn.name()) + || !column.getType().equals(expectedColumn.type().dataType())) { + return false; + } + } + return true; + } + return false; + } + + private List checkColumns(TableMetadata table, Set expectedColumns) { + List missing = new ArrayList<>(); + for (CqlColumn expectedColumn : expectedColumns) { + if (table.getColumn(expectedColumn.name()).isEmpty()) { + missing.add(expectedColumn.name()); + } + } + return missing; + } + + @Override + public void eraseRepositories(Set repositoryIds) { + if (repositoryIds == null || repositoryIds.isEmpty()) { + return; + } + + ArrayList repoIdList = new ArrayList<>(repositoryIds); + + try (LimitedConcurrentRequests requests = + new LimitedConcurrentRequests(MAX_CONCURRENT_DELETES)) { + for (Row row : execute(buildStatement(ERASE_REFS_SCAN, true, repoIdList))) { + String repoId = row.getString(0); + String ref = row.getString(1); + requests.submitted(executeAsync(buildStatement(ERASE_REF, true, repoId, ref))); + } + + for (Row row : execute(buildStatement(ERASE_OBJS_SCAN, true, repoIdList))) { + String repoId = row.getString(0); + ByteBuffer objId = row.getByteBuffer(1); + requests.submitted(executeAsync(buildStatement(ERASE_OBJ, true, repoId, objId))); + } + } + // We must ensure that the system clock advances a little, so that C*'s next write-timestamp + // does not collide with the write-timestamps of the DELETE statements above. Otherwise, the + // above DELETEs will silently "overrule" a following INSERT/UPDATE statement. In C*, if a + // DELETE and another INSERT/UPDATE have the same write-timestamp, the DELETE wins. This makes + // Nessie tests fail on machines that are "fast enough". + try { + Thread.sleep(2L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendConfig.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendConfig.java new file mode 100644 index 00000000000..dceeaa45293 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import com.datastax.oss.driver.api.core.CqlSession; +import java.time.Duration; +import org.immutables.value.Value; + +@Value.Immutable +public interface Cassandra2BackendConfig extends Cassandra2Config { + CqlSession client(); + + @Value.Default + default String keyspace() { + return "nessie"; + } + + @Override + @Value.Default + default Duration ddlTimeout() { + return Duration.parse(DEFAULT_DDL_TIMEOUT); + } + + @Override + @Value.Default + default Duration dmlTimeout() { + return Duration.parse(DEFAULT_DML_TIMEOUT); + } + + static ImmutableCassandra2BackendConfig.Builder builder() { + return ImmutableCassandra2BackendConfig.builder(); + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendFactory.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendFactory.java new file mode 100644 index 00000000000..f5f0bb73d5b --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2BackendFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import jakarta.annotation.Nonnull; +import org.projectnessie.versioned.storage.common.persist.BackendFactory; + +public class Cassandra2BackendFactory implements BackendFactory { + + public static final String NAME = "Cassandra"; + + @Override + @Nonnull + public String name() { + return NAME; + } + + @Override + @Nonnull + public Cassandra2BackendConfig newConfigInstance() { + return Cassandra2BackendConfig.builder().build(); + } + + @Override + @Nonnull + public Cassandra2Backend buildBackend(@Nonnull Cassandra2BackendConfig config) { + return new Cassandra2Backend(config, false); + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Config.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Config.java new file mode 100644 index 00000000000..27904b316c4 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Config.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import java.time.Duration; + +public interface Cassandra2Config { + + /** Timeout used when creating tables. */ + Duration ddlTimeout(); + + /** Timeout used for queries and updates. */ + Duration dmlTimeout(); + + String DEFAULT_DDL_TIMEOUT = "PT5S"; + + String DEFAULT_DML_TIMEOUT = "PT3S"; +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Constants.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Constants.java new file mode 100644 index 00000000000..a998c8f086e --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Constants.java @@ -0,0 +1,349 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +public final class Cassandra2Constants { + + static final int SELECT_BATCH_SIZE = 20; + static final int MAX_CONCURRENT_BATCH_READS = 20; + static final int MAX_CONCURRENT_DELETES = 20; + static final int MAX_CONCURRENT_STORES = 20; + + static final String TABLE_REFS = "refs2"; + static final String TABLE_OBJS = "objs2"; + + static final CqlColumn COL_REPO_ID = new CqlColumn("repo", CqlColumnType.NAME); + static final CqlColumn COL_OBJ_ID = new CqlColumn("obj_id", CqlColumnType.OBJ_ID); + static final CqlColumn COL_OBJ_TYPE = new CqlColumn("obj_type", CqlColumnType.NAME); + static final CqlColumn COL_OBJ_VERS = new CqlColumn("obj_vers", CqlColumnType.VARCHAR); + static final CqlColumn COL_OBJ_VALUE = new CqlColumn("obj_value", CqlColumnType.VARBINARY); + + static final String DELETE_OBJ = + "DELETE FROM %s." + TABLE_OBJS + " WHERE " + COL_REPO_ID + "=? AND " + COL_OBJ_ID + "=?"; + + public static final String EXPECTED_SUFFIX = "_expected"; + + static final String UPSERT_OBJ = + "INSERT INTO %s." + + TABLE_OBJS + + " (" + + COL_REPO_ID + + ", " + + COL_OBJ_ID + + ", " + + COL_OBJ_TYPE + + ", " + + COL_OBJ_VERS + + ", " + + COL_OBJ_VALUE + + ") VALUES (:" + + COL_REPO_ID + + ", :" + + COL_OBJ_ID + + ", :" + + COL_OBJ_TYPE + + ", :" + + COL_OBJ_VERS + + ", :" + + COL_OBJ_VALUE + + ")"; + + static final String STORE_OBJ = UPSERT_OBJ + " IF NOT EXISTS"; + + static final String UPDATE_OBJ = + "UPDATE %s." + + TABLE_OBJS + + " SET " + + COL_OBJ_VERS + + "=:" + + COL_OBJ_VERS + + ", " + + COL_OBJ_VALUE + + "=:" + + COL_OBJ_VALUE + + " WHERE " + + COL_REPO_ID + + "=:" + + COL_REPO_ID + + " AND " + + COL_OBJ_ID + + "=:" + + COL_OBJ_ID + + " IF " + + COL_OBJ_TYPE + + "=:" + + COL_OBJ_TYPE + + EXPECTED_SUFFIX + + " AND " + + COL_OBJ_VERS + + "=:" + + COL_OBJ_VERS + + EXPECTED_SUFFIX; + + static final String DELETE_OBJ_CONDITIONAL = + "DELETE FROM %s." + + TABLE_OBJS + + " WHERE " + + COL_REPO_ID + + "=? AND " + + COL_OBJ_ID + + "=? IF " + + COL_OBJ_TYPE + + "=? AND " + + COL_OBJ_VERS + + "=?"; + + static final String CREATE_TABLE_OBJS = + "CREATE TABLE %s." + + TABLE_OBJS + + "\n (\n " + + COL_REPO_ID + + " " + + COL_REPO_ID.type().cqlName() + + ",\n " + + COL_OBJ_ID + + " " + + COL_OBJ_ID.type().cqlName() + + ",\n " + + COL_OBJ_TYPE + + " " + + COL_OBJ_TYPE.type().cqlName() + + ",\n " + + COL_OBJ_VERS + + " " + + COL_OBJ_VERS.type().cqlName() + + ",\n " + + COL_OBJ_VALUE + + " " + + COL_OBJ_VALUE.type().cqlName() + + ",\n PRIMARY KEY ((" + + COL_REPO_ID + + ", " + + COL_OBJ_ID + + "))\n )"; + + static final CqlColumn COL_REFS_NAME = new CqlColumn("ref_name", CqlColumnType.NAME); + static final CqlColumn COL_REFS_POINTER = new CqlColumn("pointer", CqlColumnType.OBJ_ID); + static final CqlColumn COL_REFS_DELETED = new CqlColumn("deleted", CqlColumnType.BOOL); + static final CqlColumn COL_REFS_CREATED_AT = new CqlColumn("created_at", CqlColumnType.BIGINT); + static final CqlColumn COL_REFS_EXTENDED_INFO = new CqlColumn("ext_info", CqlColumnType.OBJ_ID); + static final CqlColumn COL_REFS_PREVIOUS = new CqlColumn("prev_ptr", CqlColumnType.VARBINARY); + + static final String UPDATE_REFERENCE_POINTER = + "UPDATE %s." + + TABLE_REFS + + " SET " + + COL_REFS_POINTER + + "=?, " + + COL_REFS_PREVIOUS + + "=? WHERE " + + COL_REPO_ID + + "=? AND " + + COL_REFS_NAME + + "=? IF " + + COL_REFS_POINTER + + "=? AND " + + COL_REFS_DELETED + + "=? AND " + + COL_REFS_CREATED_AT + + "=? AND " + + COL_REFS_EXTENDED_INFO + + "=?"; + static final String PURGE_REFERENCE = + "DELETE FROM %s." + + TABLE_REFS + + " WHERE " + + COL_REPO_ID + + "=? AND " + + COL_REFS_NAME + + "=? IF " + + COL_REFS_POINTER + + "=? AND " + + COL_REFS_DELETED + + "=? AND " + + COL_REFS_CREATED_AT + + "=? AND " + + COL_REFS_EXTENDED_INFO + + "=?"; + static final String MARK_REFERENCE_AS_DELETED = + "UPDATE %s." + + TABLE_REFS + + " SET " + + COL_REFS_DELETED + + "=? WHERE " + + COL_REPO_ID + + "=? AND " + + COL_REFS_NAME + + "=? IF " + + COL_REFS_POINTER + + "=? AND " + + COL_REFS_DELETED + + "=? AND " + + COL_REFS_CREATED_AT + + "=? AND " + + COL_REFS_EXTENDED_INFO + + "=?"; + static final String ADD_REFERENCE = + "INSERT INTO %s." + + TABLE_REFS + + " (" + + COL_REPO_ID + + ", " + + COL_REFS_NAME + + ", " + + COL_REFS_POINTER + + ", " + + COL_REFS_DELETED + + ", " + + COL_REFS_CREATED_AT + + ", " + + COL_REFS_EXTENDED_INFO + + ", " + + COL_REFS_PREVIOUS + + ") VALUES (?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS"; + static final String FIND_REFERENCES = + "SELECT " + + COL_REFS_NAME + + ", " + + COL_REFS_POINTER + + ", " + + COL_REFS_DELETED + + ", " + + COL_REFS_CREATED_AT + + ", " + + COL_REFS_EXTENDED_INFO + + ", " + + COL_REFS_PREVIOUS + + " FROM %s." + + TABLE_REFS + + " WHERE " + + COL_REPO_ID + + "=? AND " + + COL_REFS_NAME + + " IN ?"; + static final String CREATE_TABLE_REFS = + "CREATE TABLE %s." + + TABLE_REFS + + "\n (\n " + + COL_REPO_ID + + " " + + COL_REPO_ID.type().cqlName() + + ",\n " + + COL_REFS_NAME + + " " + + COL_REFS_NAME.type().cqlName() + + ",\n " + + COL_REFS_POINTER + + " " + + COL_REFS_POINTER.type().cqlName() + + ",\n " + + COL_REFS_DELETED + + " " + + COL_REFS_DELETED.type().cqlName() + + ",\n " + + COL_REFS_CREATED_AT + + " " + + COL_REFS_CREATED_AT.type().cqlName() + + ",\n " + + COL_REFS_EXTENDED_INFO + + " " + + COL_REFS_EXTENDED_INFO.type().cqlName() + + ",\n " + + COL_REFS_PREVIOUS + + " " + + COL_REFS_PREVIOUS.type().cqlName() + + ",\n PRIMARY KEY ((" + + COL_REPO_ID + + ", " + + COL_REFS_NAME + + "))\n )"; + + static final String FETCH_OBJ_TYPE = + "SELECT " + + COL_OBJ_TYPE + + " FROM %s." + + TABLE_OBJS + + " WHERE " + + COL_REPO_ID + + "=? AND " + + COL_OBJ_ID + + " IN ?"; + + static final String FIND_OBJS = + "SELECT " + + COL_OBJ_ID + + ", " + + COL_OBJ_TYPE + + ", " + + COL_OBJ_VERS + + ", " + + COL_OBJ_VALUE + + " FROM %s." + + TABLE_OBJS + + " WHERE " + + COL_REPO_ID + + "=? AND " + + COL_OBJ_ID + + " IN ?"; + + static final String SCAN_OBJS = + "SELECT " + + COL_OBJ_ID + + ", " + + COL_OBJ_TYPE + + ", " + + COL_OBJ_VERS + + ", " + + COL_OBJ_VALUE + + " FROM %s." + + TABLE_OBJS + + " WHERE " + + COL_REPO_ID + + "=? ALLOW FILTERING"; + + static final String ERASE_OBJS_SCAN = + "SELECT " + + COL_REPO_ID + + ", " + + COL_OBJ_ID + + " FROM %s." + + TABLE_OBJS + + " WHERE " + + COL_REPO_ID + + " IN ? ALLOW FILTERING"; + static final String ERASE_REFS_SCAN = + "SELECT " + + COL_REPO_ID + + ", " + + COL_REFS_NAME + + " FROM %s." + + TABLE_REFS + + " WHERE " + + COL_REPO_ID + + " IN ? ALLOW FILTERING"; + + static final String ERASE_OBJ = DELETE_OBJ; + static final String ERASE_REF = + "DELETE FROM %s." + + TABLE_REFS + + " WHERE " + + COL_REPO_ID + + " = ? AND " + + COL_REFS_NAME + + " = ?"; + + private Cassandra2Constants() {} +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Persist.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Persist.java new file mode 100644 index 00000000000..8d74200a0cf --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Persist.java @@ -0,0 +1,529 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Objects.requireNonNull; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Backend.unhandledException; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.ADD_REFERENCE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_ID; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_TYPE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_VALUE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_OBJ_VERS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REPO_ID; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.DELETE_OBJ; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.DELETE_OBJ_CONDITIONAL; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.EXPECTED_SUFFIX; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.FETCH_OBJ_TYPE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.FIND_OBJS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.FIND_REFERENCES; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.MARK_REFERENCE_AS_DELETED; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.MAX_CONCURRENT_STORES; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.PURGE_REFERENCE; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.SCAN_OBJS; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.STORE_OBJ; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.UPDATE_OBJ; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.UPDATE_REFERENCE_POINTER; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.UPSERT_OBJ; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Serde.deserializeObjId; +import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Serde.serializeObjId; +import static org.projectnessie.versioned.storage.common.persist.ObjTypes.objTypeByName; +import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.deserializeObj; +import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.serializePreviousPointers; + +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.common.collect.AbstractIterator; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.projectnessie.versioned.storage.cassandra2.Cassandra2Backend.BatchedQuery; +import org.projectnessie.versioned.storage.common.config.StoreConfig; +import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; +import org.projectnessie.versioned.storage.common.exceptions.ObjTooLargeException; +import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException; +import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException; +import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException; +import org.projectnessie.versioned.storage.common.objtypes.UpdateableObj; +import org.projectnessie.versioned.storage.common.persist.CloseableIterator; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjType; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.storage.serialize.ProtoSerialization; + +public class Cassandra2Persist implements Persist { + + private final Cassandra2Backend backend; + private final StoreConfig config; + + Cassandra2Persist(Cassandra2Backend backend, StoreConfig config) { + this.backend = backend; + this.config = config; + } + + @Nonnull + @Override + public String name() { + return Cassandra2BackendFactory.NAME; + } + + @Nonnull + @Override + public StoreConfig config() { + return config; + } + + @Override + public Reference fetchReference(@Nonnull String name) { + return fetchReferences(new String[] {name})[0]; + } + + @Nonnull + @Override + public Reference[] fetchReferences(@Nonnull String[] names) { + try (BatchedQuery batchedQuery = + backend.newBatchedQuery( + keys -> + backend.executeAsync( + backend.buildStatement(FIND_REFERENCES, true, config.repositoryId(), keys)), + Cassandra2Serde::deserializeReference, + Reference::name, + names.length, + Reference.class)) { + + for (int i = 0; i < names.length; i++) { + String name = names[i]; + if (name != null) { + batchedQuery.add(name, i); + } + } + + return batchedQuery.finish(); + } catch (DriverException e) { + throw unhandledException(e); + } + } + + @Nonnull + @Override + public Reference addReference(@Nonnull Reference reference) throws RefAlreadyExistsException { + checkArgument(!reference.deleted(), "Deleted references must not be added"); + + byte[] serializedPreviousPointers = serializePreviousPointers(reference.previousPointers()); + ByteBuffer previous = + serializedPreviousPointers != null ? ByteBuffer.wrap(serializedPreviousPointers) : null; + BoundStatement stmt = + backend.buildStatement( + ADD_REFERENCE, + false, + config.repositoryId(), + reference.name(), + serializeObjId(reference.pointer()), + reference.deleted(), + reference.createdAtMicros(), + serializeObjId(reference.extendedInfoObj()), + previous); + if (backend.executeCas(stmt)) { + return reference; + } + throw new RefAlreadyExistsException(fetchReference(reference.name())); + } + + @Nonnull + @Override + public Reference markReferenceAsDeleted(@Nonnull Reference reference) + throws RefNotFoundException, RefConditionFailedException { + BoundStatement stmt = + backend.buildStatement( + MARK_REFERENCE_AS_DELETED, + false, + true, + config().repositoryId(), + reference.name(), + serializeObjId(reference.pointer()), + false, + reference.createdAtMicros(), + serializeObjId(reference.extendedInfoObj())); + if (backend.executeCas(stmt)) { + return reference.withDeleted(true); + } + + Reference ref = fetchReference(reference.name()); + if (ref == null) { + throw new RefNotFoundException(reference); + } + throw new RefConditionFailedException(ref); + } + + @Override + public void purgeReference(@Nonnull Reference reference) + throws RefNotFoundException, RefConditionFailedException { + BoundStatement stmt = + backend.buildStatement( + PURGE_REFERENCE, + false, + config().repositoryId(), + reference.name(), + serializeObjId(reference.pointer()), + true, + reference.createdAtMicros(), + serializeObjId(reference.extendedInfoObj())); + if (!backend.executeCas(stmt)) { + Reference ref = fetchReference(reference.name()); + if (ref == null) { + throw new RefNotFoundException(reference); + } + throw new RefConditionFailedException(ref); + } + } + + @Nonnull + @Override + public Reference updateReferencePointer(@Nonnull Reference reference, @Nonnull ObjId newPointer) + throws RefNotFoundException, RefConditionFailedException { + Reference updated = reference.forNewPointer(newPointer, config); + byte[] serializedPreviousPointers = serializePreviousPointers(updated.previousPointers()); + ByteBuffer previous = + serializedPreviousPointers != null ? ByteBuffer.wrap(serializedPreviousPointers) : null; + BoundStatement stmt = + backend.buildStatement( + UPDATE_REFERENCE_POINTER, + false, + serializeObjId(newPointer), + previous, + config().repositoryId(), + reference.name(), + serializeObjId(reference.pointer()), + false, + reference.createdAtMicros(), + serializeObjId(reference.extendedInfoObj())); + if (!backend.executeCas(stmt)) { + Reference ref = fetchReference(reference.name()); + if (ref == null) { + throw new RefNotFoundException(reference); + } + throw new RefConditionFailedException(ref); + } + + return updated; + } + + @SuppressWarnings("unused") + @Override + @Nonnull + public T fetchTypedObj( + @Nonnull ObjId id, ObjType type, @Nonnull Class typeClass) throws ObjNotFoundException { + T obj = fetchTypedObjsIfExist(new ObjId[] {id}, type, typeClass)[0]; + + if (obj == null || (type != null && !type.equals(obj.type()))) { + throw new ObjNotFoundException(id); + } + + return obj; + } + + @Override + @Nonnull + public ObjType fetchObjType(@Nonnull ObjId id) throws ObjNotFoundException { + BoundStatement stmt = + backend.buildStatement( + FETCH_OBJ_TYPE, true, config.repositoryId(), singletonList(serializeObjId(id))); + Row row = backend.execute(stmt).one(); + if (row != null) { + String objType = requireNonNull(row.getString(0)); + return objTypeByName(objType); + } + throw new ObjNotFoundException(id); + } + + @Nonnull + @Override + public T[] fetchTypedObjsIfExist( + @Nonnull ObjId[] ids, ObjType type, @Nonnull Class typeClass) { + Function, List> idsToByteBuffers = + queryIds -> queryIds.stream().map(ObjId::asByteBuffer).collect(Collectors.toList()); + + Function, CompletionStage> queryFunc = + keys -> + backend.executeAsync( + backend.buildStatement( + FIND_OBJS, true, config.repositoryId(), idsToByteBuffers.apply(keys))); + + Function rowMapper = + row -> { + ObjType objType = objTypeByName(requireNonNull(row.getString(COL_OBJ_TYPE.name()))); + if (type != null && !type.equals(objType)) { + return null; + } + ObjId id = deserializeObjId(row.getByteBuffer(COL_OBJ_ID.name())); + String versionToken = row.getString(COL_OBJ_VERS.name()); + ByteBuffer serialized = row.getByteBuffer(COL_OBJ_VALUE.name()); + @SuppressWarnings("unchecked") + T typed = (T) deserializeObj(id, serialized, versionToken); + return typed; + }; + + T[] r; + try (BatchedQuery batchedQuery = + backend.newBatchedQuery(queryFunc, rowMapper, Obj::id, ids.length, typeClass)) { + + for (int i = 0; i < ids.length; i++) { + ObjId id = ids[i]; + if (id != null) { + batchedQuery.add(id, i); + } + } + + r = batchedQuery.finish(); + } catch (DriverException e) { + throw unhandledException(e); + } + + return r; + } + + @Override + public boolean storeObj(@Nonnull Obj obj, boolean ignoreSoftSizeRestrictions) + throws ObjTooLargeException { + return writeSingleObj(obj, false, ignoreSoftSizeRestrictions, backend::executeCas); + } + + @Nonnull + @Override + public boolean[] storeObjs(@Nonnull Obj[] objs) throws ObjTooLargeException { + return persistObjs(objs, false); + } + + @Override + public void upsertObj(@Nonnull Obj obj) throws ObjTooLargeException { + writeSingleObj(obj, true, false, backend::execute); + } + + @Override + public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException { + persistObjs(objs, true); + } + + @Override + public boolean deleteConditional(@Nonnull UpdateableObj obj) { + BoundStatement stmt = + backend.buildStatement( + DELETE_OBJ_CONDITIONAL, + false, + config.repositoryId(), + serializeObjId(obj.id()), + obj.type().shortName(), + obj.versionToken()); + return backend.executeCas(stmt); + } + + @Override + public boolean updateConditional(@Nonnull UpdateableObj expected, @Nonnull UpdateableObj newValue) + throws ObjTooLargeException { + ObjId id = expected.id(); + ObjType type = expected.type(); + String expectedVersion = expected.versionToken(); + String newVersion = newValue.versionToken(); + + checkArgument(id != null && id.equals(newValue.id())); + checkArgument(type.equals(newValue.type())); + checkArgument(!expectedVersion.equals(newVersion)); + + byte[] serialized = + ProtoSerialization.serializeObj( + newValue, + effectiveIncrementalIndexSizeLimit(), + effectiveIndexSegmentSizeLimit(), + false); + + BoundStatementBuilder stmt = + backend + .newBoundStatementBuilder(UPDATE_OBJ, false) + .setString(COL_REPO_ID.name(), config.repositoryId()) + .setByteBuffer(COL_OBJ_ID.name(), serializeObjId(id)) + .setString(COL_OBJ_TYPE.name() + EXPECTED_SUFFIX, type.shortName()) + .setString(COL_OBJ_VERS.name() + EXPECTED_SUFFIX, expectedVersion) + .setString(COL_OBJ_VERS.name(), newVersion) + .setByteBuffer(COL_OBJ_VALUE.name(), ByteBuffer.wrap(serialized)); + + return backend.executeCas(stmt.build()); + } + + @Nonnull + private boolean[] persistObjs(@Nonnull Obj[] objs, boolean upsert) throws ObjTooLargeException { + AtomicIntegerArray results = new AtomicIntegerArray(objs.length); + + try (LimitedConcurrentRequests requests = + new LimitedConcurrentRequests(MAX_CONCURRENT_STORES)) { + for (int i = 0; i < objs.length; i++) { + Obj o = objs[i]; + if (o != null) { + int idx = i; + CompletionStage cs = + writeSingleObj(o, upsert, false, backend::executeAsync) + .handle( + (resultSet, e) -> { + if (e != null) { + if (e instanceof DriverException) { + throw unhandledException((DriverException) e); + } + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } + if (resultSet.wasApplied()) { + results.set(idx, 1); + } + return null; + }); + requests.submitted(cs); + } + } + } catch (DriverException e) { + throw unhandledException(e); + } + + int l = results.length(); + boolean[] array = new boolean[l]; + for (int i = 0; i < l; i++) { + array[i] = results.get(i) == 1; + } + return array; + } + + @FunctionalInterface + private interface WriteSingleObj { + R apply(BoundStatement stmt); + } + + private R writeSingleObj( + @Nonnull Obj obj, + boolean upsert, + boolean ignoreSoftSizeRestrictions, + WriteSingleObj consumer) + throws ObjTooLargeException { + ObjId id = obj.id(); + ObjType type = obj.type(); + String versionToken = UpdateableObj.extractVersionToken(obj).orElse(null); + + int incrementalIndexSizeLimit = + ignoreSoftSizeRestrictions ? Integer.MAX_VALUE : effectiveIncrementalIndexSizeLimit(); + int indexSegmentSizeLimit = + ignoreSoftSizeRestrictions ? Integer.MAX_VALUE : effectiveIndexSegmentSizeLimit(); + + byte[] serialized = + ProtoSerialization.serializeObj( + obj, incrementalIndexSizeLimit, indexSegmentSizeLimit, false); + + BoundStatementBuilder stmt = + backend + .newBoundStatementBuilder(upsert ? UPSERT_OBJ : STORE_OBJ, upsert) + .setString(COL_REPO_ID.name(), config.repositoryId()) + .setByteBuffer(COL_OBJ_ID.name(), serializeObjId(id)) + .setString(COL_OBJ_TYPE.name(), type.shortName()) + .setString(COL_OBJ_VERS.name(), versionToken) + .setByteBuffer(COL_OBJ_VALUE.name(), ByteBuffer.wrap(serialized)); + + return consumer.apply(stmt.build()); + } + + @Override + public void deleteObj(@Nonnull ObjId id) { + BoundStatement stmt = + backend.buildStatement(DELETE_OBJ, true, config.repositoryId(), serializeObjId(id)); + backend.execute(stmt); + } + + @Override + public void deleteObjs(@Nonnull ObjId[] ids) { + try (LimitedConcurrentRequests requests = + new LimitedConcurrentRequests(MAX_CONCURRENT_STORES)) { + String repoId = config.repositoryId(); + for (ObjId id : ids) { + if (id != null) { + BoundStatement stmt = + backend.buildStatement(DELETE_OBJ, true, repoId, serializeObjId(id)); + requests.submitted(backend.executeAsync(stmt)); + } + } + } catch (DriverException e) { + throw unhandledException(e); + } + } + + @Override + public void erase() { + backend.eraseRepositories(singleton(config().repositoryId())); + } + + @Override + @Nonnull + public CloseableIterator scanAllObjects(@Nonnull Set returnedObjTypes) { + return new ScanAllObjectsIterator(returnedObjTypes); + } + + private class ScanAllObjectsIterator extends AbstractIterator + implements CloseableIterator { + + private final Iterator rs; + private final Set returnedObjTypes; + + ScanAllObjectsIterator(Set returnedObjTypes) { + this.returnedObjTypes = returnedObjTypes; + BoundStatement stmt = backend.buildStatement(SCAN_OBJS, true, config.repositoryId()); + rs = backend.execute(stmt).iterator(); + } + + @Override + public void close() {} + + @Nullable + @Override + protected Obj computeNext() { + while (true) { + if (!rs.hasNext()) { + return endOfData(); + } + + Row row = rs.next(); + ObjType type = objTypeByName(requireNonNull(row.getString(1))); + if (!returnedObjTypes.contains(type)) { + continue; + } + + ObjId id = deserializeObjId(row.getByteBuffer(COL_OBJ_ID.name())); + String versionToken = row.getString(COL_OBJ_VERS.name()); + ByteBuffer serialized = row.getByteBuffer(COL_OBJ_VALUE.name()); + return deserializeObj(id, serialized, versionToken); + } + } + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2PersistFactory.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2PersistFactory.java new file mode 100644 index 00000000000..a132c4e90cf --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2PersistFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import jakarta.annotation.Nonnull; +import org.projectnessie.versioned.storage.common.config.StoreConfig; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.PersistFactory; + +final class Cassandra2PersistFactory implements PersistFactory { + private final Cassandra2Backend backend; + + Cassandra2PersistFactory(Cassandra2Backend backend) { + this.backend = backend; + } + + @Override + @Nonnull + public Persist newPersist(@Nonnull StoreConfig config) { + return new Cassandra2Persist(backend, config); + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Serde.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Serde.java new file mode 100644 index 00000000000..7e51d36b033 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/Cassandra2Serde.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import static org.projectnessie.nessie.relocated.protobuf.UnsafeByteOperations.unsafeWrap; +import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteBuffer; +import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.deserializePreviousPointers; + +import com.datastax.oss.driver.api.core.cql.Row; +import java.nio.ByteBuffer; +import org.projectnessie.nessie.relocated.protobuf.ByteString; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Reference; + +public final class Cassandra2Serde { + + private Cassandra2Serde() {} + + public static ByteString deserializeBytes(Row row, String col) { + ByteBuffer bytes = row.getByteBuffer(col); + return bytes != null ? unsafeWrap(bytes) : null; + } + + public static Reference deserializeReference(Row row) { + ByteBuffer previous = row.getByteBuffer(5); + byte[] bytes; + if (previous != null) { + bytes = new byte[previous.remaining()]; + previous.get(bytes); + } else { + bytes = null; + } + return Reference.reference( + row.getString(0), + deserializeObjId(row.getByteBuffer(1)), + row.getBoolean(2), + row.getLong(3), + deserializeObjId(row.getByteBuffer(4)), + deserializePreviousPointers(bytes)); + } + + public static ObjId deserializeObjId(ByteBuffer id) { + return id != null ? objIdFromByteBuffer(id) : null; + } + + public static ByteBuffer serializeObjId(ObjId id) { + return id != null ? id.asByteBuffer() : null; + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumn.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumn.java new file mode 100644 index 00000000000..f2a007d4f8e --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumn.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import java.util.Objects; + +public final class CqlColumn { + + private final String name; + + private final CqlColumnType type; + + public CqlColumn(String name, CqlColumnType type) { + this.name = name; + this.type = type; + } + + public String name() { + return name; + } + + public CqlColumnType type() { + return type; + } + + @Override + public String toString() { + return name(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CqlColumn cqlColumn = (CqlColumn) o; + return Objects.equals(name, cqlColumn.name) && type == cqlColumn.type; + } + + @Override + public int hashCode() { + return Objects.hash(name, type); + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumnType.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumnType.java new file mode 100644 index 00000000000..1e2c3ca5a03 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/CqlColumnType.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; + +public enum CqlColumnType { + // 0 + NAME(DataTypes.TEXT), + // 1 + OBJ_ID(DataTypes.BLOB), + // 3 + BOOL(DataTypes.BOOLEAN), + // 4 + VARBINARY(DataTypes.BLOB), + // 5 + BIGINT(DataTypes.BIGINT), + // 6 + VARCHAR(DataTypes.TEXT), + // 7 + INT(DataTypes.INT); + + private final DataType dataType; + + CqlColumnType(DataType dataType) { + this.dataType = dataType; + } + + public String cqlName() { + return dataType.asCql(false, false); + } + + public DataType dataType() { + return dataType; + } +} diff --git a/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/LimitedConcurrentRequests.java b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/LimitedConcurrentRequests.java new file mode 100644 index 00000000000..47d13e5f6f2 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/java/org/projectnessie/versioned/storage/cassandra2/LimitedConcurrentRequests.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cassandra2; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Semaphore; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Synchronization helper for asynchronous "child" queries for {@link Cassandra2Persist#erase()}, + * {@link Cassandra2Persist#deleteObjs(ObjId[])}, {@link Cassandra2Persist#storeObjs(Obj[])}. + * + *

Note: this implementation does not actively prevent submitting a new query, but it prevents + * making further progress by blocking inside {@link #submitted(CompletionStage)}. + */ +final class LimitedConcurrentRequests implements AutoCloseable { + + /** Currently available "permits" for child queries. */ + final Semaphore permits; + + /** Holds the potential failure. */ + final Throwable[] failureHolder = new Throwable[1]; + + /** Number of started queries. */ + @GuardedBy("this") + int started; + + /** Number of finished queries. */ + @GuardedBy("this") + int finished; + + LimitedConcurrentRequests(int maxChildQueries) { + permits = new Semaphore(maxChildQueries); + } + + void submitted(CompletionStage cs) { + synchronized (this) { + // Increment the number of started queries. + started++; + } + + // Acquire a permit for the started query. + try { + permits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + cs.whenComplete( + (resultSet, throwable) -> { + try { + // Release the acquired permit + permits.release(); + + // Record the failure (if the query failed) + if (throwable != null) { + synchronized (failureHolder) { + Throwable ex = failureHolder[0]; + if (ex == null) { + failureHolder[0] = throwable; + } else { + ex.addSuppressed(throwable); + } + } + } + + } finally { + synchronized (this) { + // Increment the number of finished queries. + finished++; + // Notify potential waiter (`close()`). + notify(); + } + } + }); + } + + @Override + public void close() { + try { + // Wait until all started queries have finished. + while (true) { + synchronized (this) { + if (finished == started) { + break; + } + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } finally { + maybeThrow(); + } + } + + private void maybeThrow() { + synchronized (failureHolder) { + Throwable f = failureHolder[0]; + if (f != null) { + if (f instanceof RuntimeException) { + throw (RuntimeException) f; + } + throw new RuntimeException(f); + } + } + } +} diff --git a/versioned/storage/cassandra2/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.BackendFactory b/versioned/storage/cassandra2/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.BackendFactory new file mode 100644 index 00000000000..57dbac110a2 --- /dev/null +++ b/versioned/storage/cassandra2/src/main/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.BackendFactory @@ -0,0 +1,16 @@ +# +# Copyright (C) 2022 Dremio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.projectnessie.versioned.storage.cassandra2.Cassandra2BackendFactory diff --git a/versioned/transfer/build.gradle.kts b/versioned/transfer/build.gradle.kts index 9e933cef4e1..5724cc8f1d7 100644 --- a/versioned/transfer/build.gradle.kts +++ b/versioned/transfer/build.gradle.kts @@ -70,6 +70,7 @@ dependencies { testFixturesApi(project(":nessie-versioned-storage-inmemory")) testFixturesApi(project(":nessie-versioned-storage-testextension")) intTestImplementation(project(":nessie-versioned-storage-cassandra")) + intTestImplementation(project(":nessie-versioned-storage-cassandra2")) intTestImplementation(project(":nessie-versioned-storage-dynamodb")) intTestImplementation(project(":nessie-versioned-storage-jdbc")) intTestImplementation(project(":nessie-versioned-storage-jdbc2"))