diff --git a/.github/workflows/scheduled_snyk.yaml b/.github/workflows/scheduled_snyk.yaml index d3923b3e..2aeb232a 100644 --- a/.github/workflows/scheduled_snyk.yaml +++ b/.github/workflows/scheduled_snyk.yaml @@ -2,6 +2,10 @@ name: Snyk scheduled test on: schedule: - cron: '0 2 * * 1' + push: + branches: + - master + jobs: security: runs-on: ubuntu-latest @@ -17,7 +21,7 @@ jobs: - uses: actions/checkout@v3 - uses: snyk/actions/setup@master with: - snyk-version: v1.931.0 + snyk-version: v1.1032.0 - uses: actions/setup-java@v3 with: @@ -34,13 +38,15 @@ jobs: snyk test --all-sub-projects --configuration-matching='^runtimeClasspath$' + --fail-on=upgradable --json-file-output=${{ env.REPORT_FILE }} --org=radar-base + --policy-path=$PWD/.snyk - name: Report new vulnerabilities uses: thehyve/report-vulnerability@master + if: success() || failure() with: report-file: ${{ env.REPORT_FILE }} env: TOKEN: ${{ secrets.GITHUB_TOKEN }} - if: ${{ failure() }} diff --git a/.github/workflows/snyk.yaml b/.github/workflows/snyk.yaml index 2c8ec962..8731bc7e 100644 --- a/.github/workflows/snyk.yaml +++ b/.github/workflows/snyk.yaml @@ -3,6 +3,7 @@ on: pull_request: branches: - master + jobs: security: runs-on: ubuntu-latest @@ -15,7 +16,7 @@ jobs: - uses: actions/checkout@v3 - uses: snyk/actions/setup@master with: - snyk-version: v1.931.0 + snyk-version: v1.1032.0 - uses: actions/setup-java@v3 with: @@ -32,6 +33,5 @@ jobs: snyk test --all-sub-projects --configuration-matching='^runtimeClasspath$' - --fail-on=upgradable --org=radar-base - --severity-threshold=high + --policy-path=$PWD/.snyk diff --git a/java-sdk/build.gradle.kts b/java-sdk/build.gradle.kts index a330cd26..04e4a2ee 100644 --- a/java-sdk/build.gradle.kts +++ b/java-sdk/build.gradle.kts @@ -9,7 +9,7 @@ plugins { } allprojects { - version = "0.8.1" + version = "0.8.2" group = "org.radarbase" } @@ -126,7 +126,7 @@ nexusPublishing { } tasks.wrapper { - gradleVersion = "7.5.1" + gradleVersion = "7.6" } /** Set the given Java [version] for compiled Java and Kotlin code. */ diff --git a/java-sdk/gradle.properties b/java-sdk/gradle.properties index 8d1885df..c2f80d26 100644 --- a/java-sdk/gradle.properties +++ b/java-sdk/gradle.properties @@ -1,23 +1,21 @@ org.gradle.jvmargs=-XX:MaxMetaspaceSize=512m -kotlinVersion=1.7.20 -dokkaVersion=1.7.10 +kotlinVersion=1.7.22 +dokkaVersion=1.7.20 nexusPluginVersion=1.1.0 -dependencyUpdateVersion=0.42.0 -jacksonVersion=2.13.4 +dependencyUpdateVersion=0.44.0 +jacksonVersion=2.14.1 avroGeneratorVersion=1.5.0 avroVersion=1.11.1 argparseVersion=0.9.0 -radarJerseyVersion=0.9.0 +radarJerseyVersion=0.9.1 junitVersion=5.9.1 -confluentVersion=7.2.2 -kafkaVersion=7.2.2-ce +confluentVersion=7.3.0 +kafkaVersion=7.3.0-ce okHttpVersion=4.10.0 radarCommonsVersion=0.15.0 -slf4jVersion=2.0.3 +slf4jVersion=2.0.5 javaxValidationVersion=2.0.1.Final jsoupVersion=1.15.3 log4j2Version=2.19.0 -nettyVersion=4.1.82.Final -jettyVersion=9.4.48.v20220622 diff --git a/java-sdk/gradle/wrapper/gradle-wrapper.jar b/java-sdk/gradle/wrapper/gradle-wrapper.jar index 249e5832..943f0cbf 100644 Binary files a/java-sdk/gradle/wrapper/gradle-wrapper.jar and b/java-sdk/gradle/wrapper/gradle-wrapper.jar differ diff --git a/java-sdk/gradle/wrapper/gradle-wrapper.properties b/java-sdk/gradle/wrapper/gradle-wrapper.properties index ae04661e..f398c33c 100644 --- a/java-sdk/gradle/wrapper/gradle-wrapper.properties +++ b/java-sdk/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip +networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/java-sdk/gradlew b/java-sdk/gradlew index a69d9cb6..65dcd68d 100755 --- a/java-sdk/gradlew +++ b/java-sdk/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,10 +80,10 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' @@ -143,12 +143,16 @@ fi if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac diff --git a/java-sdk/gradlew.bat b/java-sdk/gradlew.bat index 53a6b238..6689b85b 100644 --- a/java-sdk/gradlew.bat +++ b/java-sdk/gradlew.bat @@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% diff --git a/java-sdk/radar-catalog-server/src/main/java/org/radarbase/schema/service/SourceCatalogueServer.kt b/java-sdk/radar-catalog-server/src/main/java/org/radarbase/schema/service/SourceCatalogueServer.kt index a9d65c97..b3754d02 100644 --- a/java-sdk/radar-catalog-server/src/main/java/org/radarbase/schema/service/SourceCatalogueServer.kt +++ b/java-sdk/radar-catalog-server/src/main/java/org/radarbase/schema/service/SourceCatalogueServer.kt @@ -84,7 +84,11 @@ class SourceCatalogueServer(private val serverPort: Int) : Closeable { } val config = loadConfig(parsedArgs.getString("config")) val sourceCatalogue: SourceCatalogue = try { - load(Paths.get(parsedArgs.getString("root")), config.schemas) + load( + Paths.get(parsedArgs.getString("root")), + schemaConfig = config.schemas, + sourceConfig = config.sources, + ) } catch (e: IOException) { logger.error("Failed to load source catalogue", e) logger.error(parser.formatUsage()) diff --git a/java-sdk/radar-catalog-server/src/test/java/org/radarbase/schema/service/SourceCatalogueServerTest.java b/java-sdk/radar-catalog-server/src/test/java/org/radarbase/schema/service/SourceCatalogueServerTest.java index f81806f0..09172062 100644 --- a/java-sdk/radar-catalog-server/src/test/java/org/radarbase/schema/service/SourceCatalogueServerTest.java +++ b/java-sdk/radar-catalog-server/src/test/java/org/radarbase/schema/service/SourceCatalogueServerTest.java @@ -12,6 +12,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.radarbase.schema.specification.SourceCatalogue; +import org.radarbase.schema.specification.config.SchemaConfig; +import org.radarbase.schema.specification.config.SourceConfig; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -27,7 +29,7 @@ public void setUp() { server = new SourceCatalogueServer(9876); serverThread = new Thread(() -> { try { - SourceCatalogue sourceCatalog = SourceCatalogue.Companion.load(Paths.get("../..")); + SourceCatalogue sourceCatalog = SourceCatalogue.Companion.load(Paths.get("../.."), new SchemaConfig(), new SourceConfig()); server.start(sourceCatalog); } catch (IllegalStateException e) { // this is acceptable diff --git a/java-sdk/radar-schemas-commons/build.gradle.kts b/java-sdk/radar-schemas-commons/build.gradle.kts index 2d26297d..f9f5c3e2 100644 --- a/java-sdk/radar-schemas-commons/build.gradle.kts +++ b/java-sdk/radar-schemas-commons/build.gradle.kts @@ -5,7 +5,7 @@ plugins { } // Generated avro files -val avroOutputDir = file("src/generated/java") +val avroOutputDir = file("$projectDir/src/generated/java") description = "RADAR Schemas Commons SDK" diff --git a/java-sdk/radar-schemas-core/src/main/java/org/radarbase/schema/specification/SourceCatalogue.kt b/java-sdk/radar-schemas-core/src/main/java/org/radarbase/schema/specification/SourceCatalogue.kt index 29998a74..60f57506 100644 --- a/java-sdk/radar-schemas-core/src/main/java/org/radarbase/schema/specification/SourceCatalogue.kt +++ b/java-sdk/radar-schemas-core/src/main/java/org/radarbase/schema/specification/SourceCatalogue.kt @@ -80,11 +80,10 @@ class SourceCatalogue internal constructor( * @throws IOException if the source catalogue could not be read. */ @Throws(IOException::class, InvalidPathException::class) - @JvmOverloads fun load( root: Path, - schemaConfig: SchemaConfig = SchemaConfig(), - sourceConfig: SourceConfig = SourceConfig(), + schemaConfig: SchemaConfig, + sourceConfig: SourceConfig, ): SourceCatalogue { val specRoot = root.resolve(SPECIFICATIONS_PATH) val mapper = ObjectMapper(YAMLFactory()).apply { diff --git a/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SchemaValidatorTest.java b/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SchemaValidatorTest.java index 95a6d3da..eb0ece1e 100644 --- a/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SchemaValidatorTest.java +++ b/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SchemaValidatorTest.java @@ -24,6 +24,7 @@ import org.radarbase.schema.Scope; import org.radarbase.schema.specification.SourceCatalogue; import org.radarbase.schema.specification.config.SchemaConfig; +import org.radarbase.schema.specification.config.SourceConfig; import java.io.IOException; import java.nio.file.Path; @@ -115,7 +116,7 @@ public void connectorSpecifications() throws IOException { } private void testFromSpecification(Scope scope) throws IOException { - SourceCatalogue sourceCatalogue = SourceCatalogue.Companion.load(ROOT); + SourceCatalogue sourceCatalogue = SourceCatalogue.Companion.load(ROOT, new SchemaConfig(), new SourceConfig()); String result = SchemaValidator.format( validator.analyseSourceCatalogue(scope, sourceCatalogue)); diff --git a/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SourceCatalogueValidationTest.java b/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SourceCatalogueValidationTest.java index 9ca512a1..9ac5fb93 100644 --- a/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SourceCatalogueValidationTest.java +++ b/java-sdk/radar-schemas-core/src/test/java/org/radarbase/schema/validation/SourceCatalogueValidationTest.java @@ -22,6 +22,8 @@ import org.opentest4j.MultipleFailuresError; import org.radarbase.schema.specification.DataProducer; import org.radarbase.schema.specification.SourceCatalogue; +import org.radarbase.schema.specification.config.SchemaConfig; +import org.radarbase.schema.specification.config.SourceConfig; import java.io.IOException; import java.nio.file.Path; @@ -48,7 +50,7 @@ public class SourceCatalogueValidationTest { @BeforeAll public static void setUp() throws IOException { - catalogue = SourceCatalogue.Companion.load(BASE_PATH); + catalogue = SourceCatalogue.Companion.load(BASE_PATH, new SchemaConfig(), new SourceConfig()); } @Test diff --git a/java-sdk/radar-schemas-registration/build.gradle.kts b/java-sdk/radar-schemas-registration/build.gradle.kts index 07984f0f..ceba6259 100644 --- a/java-sdk/radar-schemas-registration/build.gradle.kts +++ b/java-sdk/radar-schemas-registration/build.gradle.kts @@ -12,16 +12,10 @@ dependencies { val radarCommonsVersion: String by project api("org.radarbase:radar-commons-server:$radarCommonsVersion") - val nettyVersion: String by project - implementation(platform("io.netty:netty-bom:$nettyVersion")) - val jettyVersion: String by project - implementation(platform("org.eclipse.jetty:jetty-bom:$jettyVersion")) - val confluentVersion: String by project implementation("io.confluent:kafka-connect-avro-converter:$confluentVersion") - implementation("io.confluent:kafka-schema-registry:$confluentVersion") { - exclude(group = "org.slf4j", module = "slf4j-reload4j") - } + implementation("io.confluent:kafka-schema-registry-client:$confluentVersion") + val kafkaVersion: String by project implementation("org.apache.kafka:connect-json:$kafkaVersion") diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java deleted file mode 100644 index c6d7e560..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.radarbase.schema.registration; - -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; - -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.io.Writer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileTime; -import java.util.Arrays; -import java.util.Locale; -import javax.validation.constraints.NotNull; - -/** - * Schema topic backup storage to JSON files. - */ -public class JsonSchemaBackupStorage implements SchemaBackupStorage { - private static final Logger logger = LoggerFactory.getLogger(JsonSchemaBackupStorage.class); - - private static final String EXT = ".json"; - private static final String INVALID_EXT = ".invalid" + EXT; - private static final ObjectMapper MAPPER = new ObjectMapper() - .setSerializationInclusion(Include.NON_NULL); - private final Path path; - - public JsonSchemaBackupStorage(@NotNull Path path) { - this.path = path.toAbsolutePath(); - } - - static boolean contentEquals(@NotNull Path path, @NotNull Path backupPath) throws IOException { - if (Files.size(path) != Files.size(backupPath)) { - return false; - } - - try (InputStream input1 = Files.newInputStream(path); - InputStream input2 = Files.newInputStream(backupPath)) { - - byte[] buf1 = new byte[4096]; - byte[] buf2 = new byte[4096]; - - int numRead1 = input1.read(buf1); - - while (numRead1 != -1) { - int numRead2 = input2.readNBytes(buf2, 0, numRead1); - if (numRead2 != numRead1 - || !Arrays.equals(buf1, 0, numRead1, buf2, 0, numRead1)) { - return false; - } - numRead1 = input1.read(buf1); - } - - return input2.read() == -1; - } - } - - @Override - public void store(SchemaTopicBackup topic) throws IOException { - Path tmpPath = Files.createTempFile(path.getParent(), ".schema-backup", EXT); - - try (Writer writer = Files.newBufferedWriter(tmpPath)) { - MAPPER.writeValue(writer, topic); - } - - replaceAndBackup(tmpPath, path, EXT); - } - - private void replaceAndBackup(Path tmpPath, Path mainPath, String suffix) throws IOException { - if (!Files.exists(mainPath)) { - logger.info("Creating new {}", mainPath); - Files.move(tmpPath, mainPath, ATOMIC_MOVE); - } else if (contentEquals(mainPath, tmpPath)) { - logger.info("Not replacing old identical value {}", mainPath); - Files.delete(tmpPath); - } else { - FileTime lastModified = Files.getLastModifiedTime(mainPath); - Path backupPath = changeJsonSuffix(mainPath, "." + lastModified.toInstant() + suffix); - logger.info("Creating new {} and moving the existing value to {}", - mainPath, backupPath); - Files.copy(mainPath, backupPath); - Files.move(tmpPath, mainPath, ATOMIC_MOVE); - } - } - - @Override - public void storeInvalid(@NotNull SchemaTopicBackup topic) throws IOException { - Path tmpPath = Files.createTempFile(path.getParent(), ".schema-backup", INVALID_EXT); - - try (Writer writer = Files.newBufferedWriter(tmpPath)) { - MAPPER.writeValue(writer, topic); - } - - replaceAndBackup(tmpPath, changeJsonSuffix(path, INVALID_EXT), INVALID_EXT); - } - - private Path changeJsonSuffix(@NotNull Path path, @NotNull String suffix) { - String filename = path.getFileName().toString(); - if (filename.toLowerCase(Locale.US).endsWith(EXT)) { - filename = filename.substring(0, filename.length() - EXT.length()); - } - return path.getParent().resolve(filename + suffix); - } - - @Override - public SchemaTopicBackup load() throws IOException { - try (Reader reader = Files.newBufferedReader(path)) { - return MAPPER.readValue(reader, SchemaTopicBackup.class); - } - } - - @Override - public Path getPath() { - return path; - } -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java deleted file mode 100644 index 0127dcf9..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.radarbase.schema.registration; - -import java.io.IOException; -import java.nio.file.Path; - -/** - * Storage for _schemas topic backups. - */ -public interface SchemaBackupStorage { - - /** - * Store a valid _schemas topic backup. - * - * @param topic backup to store. - * @throws IOException if the data cannot be stored. - */ - void store(SchemaTopicBackup topic) throws IOException; - - /** - * Store an invalid _schemas topic backup. - * - * @param topic backup to store. - * @throws IOException if the data cannot be stored. - */ - void storeInvalid(SchemaTopicBackup topic) throws IOException; - - /** - * Load a valid _schemas topic backup from storage. - * - * @return backup or {@code null} if no backup was available. - * @throws IOException if the data cannot be stored. - */ - SchemaTopicBackup load() throws IOException; - - Path getPath(); -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java deleted file mode 100644 index abf7bd26..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java +++ /dev/null @@ -1,242 +0,0 @@ -package org.radarbase.schema.registration; - -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKeyType; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue; -import io.confluent.kafka.schemaregistry.storage.SchemaValue; -import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; -import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; -import javax.validation.constraints.NotNull; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** - * Data class for containing the data and metadata of the _schemas topic. - */ -public class SchemaTopicBackup { - - @JsonProperty - private final Map settings; - @JsonProperty - private final Set records; - - /** - * Empty backup. - */ - public SchemaTopicBackup() { - settings = new HashMap<>(); - records = new LinkedHashSet<>(); - } - - /** - * Fully ready backup. - */ - @JsonCreator - public SchemaTopicBackup( - @JsonProperty("settings") Map settings, - @JsonProperty("records") List records) { - this.settings = new HashMap<>(settings); - this.records = new LinkedHashSet<>(records); - } - - /** - * Whether the backup contains schemas starting from ID 1. If so, the schema topic is valid. - */ - public boolean startsAtFirstId() { - return records.stream() - .map(SchemaRecord::getSchemaId) - .filter(Objects::nonNull) - .anyMatch(r -> r == 1); - } - - @JsonGetter - @NotNull - public Map getSettings() { - return settings; - } - - /** - * Put settings. - * - * @param settings Kafka topic config settings - */ - public void putAll(@NotNull Map settings) { - this.settings.putAll(settings); - if (!Objects.equals(settings.get(CLEANUP_POLICY_CONFIG), CLEANUP_POLICY_COMPACT)) { - this.settings.put(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT); - } - } - - /** - * Add schema from topic. This will deduplicate using record key. - */ - public void addSchemaRecord( - SchemaRegistrySerializer serializer, - ConsumerRecord record) throws IOException { - - SchemaRecord schemaRecord = null; - SchemaRegistryKey messageKey; - try { - messageKey = serializer.deserializeKey(record.key()); - - if (messageKey.getKeyType() == SchemaRegistryKeyType.SCHEMA - && record.value() != null) { - SchemaRegistryValue message = serializer.deserializeValue( - messageKey, record.value()); - - if (message instanceof SchemaValue) { - SchemaValue schemaValue = (SchemaValue) message; - schemaRecord = new SchemaRecord( - messageKey.getKeyType(), - schemaValue.getSubject(), - schemaValue.getId(), - record.key(), - record.value()); - } - } - } catch (SerializationException ex) { - throw new IOException("Cannot deserialize message", ex); - } - if (schemaRecord == null) { - schemaRecord = new SchemaRecord( - messageKey.getKeyType(), - null, - null, - record.key(), - record.value()); - } - - // preserve order - records.remove(schemaRecord); - records.add(schemaRecord); - } - - @JsonGetter - @NotNull - public List getRecords() { - return new ArrayList<>(records); - } - - /** - * Get the Kafka config of a topic. - * - * @return configuration of the topic. It may be empty if not initialized. - */ - @JsonIgnore - @NotNull - public Config getConfig() { - return new Config(settings.entrySet().stream() - .map(e -> new ConfigEntry(e.getKey(), e.getValue())) - .collect(Collectors.toList())); - } - - /** - * Set the Kafka config of a topic. - * - * @param config configuration of the topic. - */ - @JsonIgnore - public void setConfig(@NotNull Config config) { - putAll(config.entries().stream() - .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value))); - } - - /** - * Schema record. - */ - public static class SchemaRecord { - - private final SchemaRegistryKeyType type; - private final Integer schemaId; - private final String subject; - private final byte[] key; - private final byte[] value; - - /** - * Full constructor of all properties. - * - * @param type record type - * @param subject schema subject that the record belongs to, or null if not a schema. - * @param schemaId schema ID or null if not a schema. - * @param key raw key data - * @param value raw value data - */ - @JsonCreator - @SuppressWarnings("PMD.ArrayIsStoredDirectly") - public SchemaRecord( - @NotNull @JsonProperty("type") SchemaRegistryKeyType type, - @JsonProperty("subject") String subject, - @JsonProperty("schemaId") Integer schemaId, - @JsonProperty("key") byte[] key, - @JsonProperty("value") byte[] value) { - this.type = type; - this.schemaId = schemaId; - this.subject = subject; - this.key = key; - this.value = value; - } - - public SchemaRegistryKeyType getType() { - return type; - } - - public Integer getSchemaId() { - return schemaId; - } - - public String getSubject() { - return subject; - } - - public byte[] getKey() { - if (key != null) { - return Arrays.copyOf(key, key.length); - } else { - return null; - } - } - - public byte[] getValue() { - if (value != null) { - return Arrays.copyOf(value, value.length); - } else { - return null; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SchemaRecord that = (SchemaRecord) o; - return Arrays.equals(key, that.key); - } - - @Override - public int hashCode() { - return Arrays.hashCode(key); - } - } -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java deleted file mode 100644 index 92ce1dc9..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java +++ /dev/null @@ -1,384 +0,0 @@ -package org.radarbase.schema.registration; - -import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; - -import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.validation.constraints.NotNull; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.AlterConfigOp.OpType; -import org.apache.kafka.clients.admin.AlterConfigsResult; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages the _schemas topic. Currently, this backs up and restores all data in the _schemas - * topic. - */ -@SuppressWarnings("WeakerAccess") -public class SchemaTopicManager { - private static final Logger logger = LoggerFactory.getLogger(SchemaTopicManager.class); - private static final String TOPIC_NAME = "_schemas"; - private static final Duration SECONDARY_TIMEOUT = Duration.ofSeconds(10L); - private final TopicRegistrar topics; - private final SchemaBackupStorage storage; - private final SchemaRegistrySerializer serializer; - private final ConfigResource topicResource; - private boolean isInitialized; - - /** - * Schema topic manager. - * - * @param topicRegistrar topic registrar - * @param storage storage medium to read and write backups from and to. - */ - public SchemaTopicManager( - @NotNull TopicRegistrar topicRegistrar, - @NotNull SchemaBackupStorage storage - ) { - topics = topicRegistrar; - this.storage = storage; - serializer = new SchemaRegistrySerializer(); - topicResource = new ConfigResource(TOPIC, TOPIC_NAME); - isInitialized = false; - } - - /** - * Wait for brokers and topics to become available. - * - * @param numBrokers number of brokers to wait for - * @throws InterruptedException if waiting for the brokers or topics was interrupted. - * @throws IllegalStateException if the brokers or topics are not available - */ - public void initialize(int numBrokers) throws InterruptedException { - topics.initialize(numBrokers); - isInitialized = true; - } - - private void ensureInitialized() { - if (!isInitialized) { - throw new IllegalStateException("Manager is not initialized yet"); - } - } - - /** - * Read a backup from the _schemas topic. This backup only includes the actual schemas, not - * configuration changes or NOOP messages. - * - * @param timeout time to wait for first schema records to become available. - * @return backup of the _schemas topic. - * @throws IOException if a message in the topic cannot be read - * @throws ExecutionException if the topic configuration cannot be read - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized - */ - @NotNull - public SchemaTopicBackup readBackup(Duration timeout) - throws IOException, ExecutionException, InterruptedException { - ensureInitialized(); - - SchemaTopicBackup storeTopic = new SchemaTopicBackup(); - - try { - readSchemas(getConsumerProps(), storeTopic, timeout); - - storeTopic.setConfig(topics.getKafkaClient() - .describeConfigs(List.of(topicResource)) - .values() - .get(topicResource) - .get()); - } catch (IOException e) { - logger.error("Failed to deserialize the schema or config key", e); - throw e; - } catch (ExecutionException e) { - logger.error("Failed to get _schemas config", e); - throw e; - } catch (RuntimeException ex) { - logger.error("Failed to store schemas", ex); - throw ex; - } catch (InterruptedException ex) { - logger.error("Failed waiting for _schemas records", ex); - Thread.currentThread().interrupt(); - throw ex; - } - - return storeTopic; - } - - /** - * Read a backup from the _schemas topic and store it. This backup only includes the actual - * schemas, not configuration changes or NOOP messages. - * - * @param timeout time to wait for first schema records to become available. - * @throws IOException if a message in the topic cannot be read - * @throws ExecutionException if the topic configuration cannot be read - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized - */ - public void makeBackup(Duration timeout) - throws IOException, InterruptedException, ExecutionException { - logger.info("Reading backup data"); - SchemaTopicBackup storeTopic = readBackup(timeout); - try { - if (storeTopic.startsAtFirstId()) { - logger.info("Storing valid backup"); - storage.store(storeTopic); - } else { - logger.info("Storing invalid backup"); - storage.storeInvalid(storeTopic); - } - } catch (IOException e) { - logger.error("Failed to store _schemas data", e); - throw e; - } - } - - @NotNull - private Map getConsumerProps() { - Map consumerProps = new HashMap<>(); - - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, - "schema-backup-" + UUID.randomUUID()); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "schema-backup"); - - consumerProps.putAll(topics.getKafkaProperties()); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArrayDeserializer.class); - return consumerProps; - } - - @NotNull - private Map getProducerProps() { - Map producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "schema-backup"); - - producerProps.putAll(topics.getKafkaProperties()); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArraySerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArraySerializer.class); - return producerProps; - } - - /** - * Checks if the current _schemas topic is valid. If not, the topic is replaced by the data in - * the backup. The _schemas topic is considered valid if it exists, is not empty and starts at - * id 1. - * - * @param timeout time to wait for first schema records to become available. - * @throws IOException if a message in the topic cannot be read or written - * @throws ExecutionException if the topic configuration cannot be read or written - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized or schema registry was - * running - */ - public void ensure(short replication, Duration timeout) - throws InterruptedException, ExecutionException, IOException { - ensureInitialized(); - - boolean topicExists = topics.getTopics().contains(TOPIC_NAME); - if (topicExists) { - SchemaTopicBackup backup = readBackup(timeout); - if (backup.startsAtFirstId()) { - logger.info("Existing topic is valid."); - return; - } - logger.info("Existing _schemas topic is invalid. Backing up current topic."); - - try { - storage.storeInvalid(backup); - } catch (IOException e) { - logger.error("Backup storage failure.", e); - throw e; - } - } - SchemaTopicBackup newBackup; - try { - logger.info("Loading data from backup storage at {}", storage.getPath()); - newBackup = storage.load(); - } catch (IOException e) { - logger.error("Backup storage loading failure at {}", storage.getPath(), e); - throw e; - } - if (newBackup == null) { - logger.error("No valid backup in storage at {}", storage.getPath()); - return; - } - - // Backups successful. Remove old topic. - if (topicExists) { - logger.info("Removing existing {} topic", TOPIC_NAME); - topics.getKafkaClient().deleteTopics(List.of(TOPIC_NAME)) - .all().get(); - try { - topics.refreshTopics(); - } catch (InterruptedException e) { - logger.info("Failed to wait to refresh topics"); - Thread.currentThread().interrupt(); - throw e; - } - } - - if (!topics.createTopics(Stream.of(TOPIC_NAME), 1, replication)) { - throw new IllegalStateException("Failed to create _schemas topic"); - } - - commitBackup(newBackup); - } - - /** - * Read the schemas in the _schemas topic and put them in a backup. - */ - private void readSchemas(Map consumerProps, SchemaTopicBackup storeTopic, - Duration timeout) throws IOException { - try (Consumer consumer = new KafkaConsumer<>(consumerProps)) { - ensurePartitions(consumer); - - TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0); - consumer.assign(List.of(topicPartition)); - consumer.seekToBeginning(List.of(topicPartition)); - - logger.info("Kafka store reader thread started"); - - int numRecords = -1; - Duration duration = timeout; - - while (numRecords != 0) { - ConsumerRecords records = consumer.poll(duration); - duration = SECONDARY_TIMEOUT; - numRecords = records.count(); - for (ConsumerRecord record : records) { - storeTopic.addSchemaRecord(serializer, record); - } - } - } - } - - /** - * Ensure that _schemas has exactly one partition. - * - * @param consumer consumer to subscribe partitions with. - * @throws IllegalArgumentException if the _schemas topic cannot be found. - * @throws IllegalStateException if the _schemas topic has more than one partition. - */ - private void ensurePartitions(Consumer consumer) { - // Include a few retries since topic creation may take some time to propagate and schema - // registry is often started immediately after creating the schemas topic. - int retries = 0; - List partitions; - do { - partitions = consumer.partitionsFor(TOPIC_NAME); - if (partitions != null && !partitions.isEmpty()) { - break; - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // ignore - } - retries++; - } - while (retries < 10); - - if (partitions == null || partitions.isEmpty()) { - throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " - + TOPIC_NAME - + " backing this data store. Topic may not exist."); - } else if (partitions.size() > 1) { - throw new IllegalStateException("Unexpected number of partitions in the " - + TOPIC_NAME - + " topic. Expected 1 and instead got " + partitions.size()); - } - } - - /** - * Restores the _schemas from backup. - * - * @throws RuntimeException storage failure or any other error. - * @throws ExecutionException if the topic configuration cannot be written - * @throws IllegalStateException if this manager was not initialized or if the schema registry - * is running. - */ - public void restoreBackup(short replication) - throws IOException, ExecutionException, InterruptedException { - ensureInitialized(); - SchemaTopicBackup storeTopic; - logger.info("Loading backup from {}", storage.getPath()); - try { - storeTopic = storage.load(); - } catch (IOException e) { - logger.error("Failed to load _schemas data", e); - throw e; - } - - if (storeTopic == null) { - logger.error("Backup not available"); - return; - } - - if (topics.getTopics().contains(TOPIC_NAME)) { - throw new IllegalStateException( - "Topic _schemas already exists, cannot restore it from backup"); - } - - if (!topics.createTopics(Stream.of(TOPIC_NAME), 1, replication)) { - throw new IllegalStateException("Failed to create _schemas topic"); - } - - logger.info("Restoring backup"); - commitBackup(storeTopic); - } - - private void commitBackup(SchemaTopicBackup backup) - throws ExecutionException, InterruptedException { - AlterConfigsResult alterResult = topics.getKafkaClient() - .incrementalAlterConfigs(Map.of(topicResource, backup.getConfig().entries().stream() - .map(e -> new AlterConfigOp(e, OpType.SET)) - .collect(Collectors.toList()))); - - try (KafkaProducer producer = new KafkaProducer<>(getProducerProps())) { - List> futures = backup.getRecords().stream() - .map(r -> new ProducerRecord<>(TOPIC_NAME, r.getKey(), r.getValue())) - .map(producer::send) - .collect(Collectors.toList()); - - logger.info("Waiting for records to be committed"); - // collect so we can do a blocking operation after all records have been sent - for (Future future : futures) { - future.get(); - } - logger.info("Records have been committed"); - } - - alterResult.all().get(); - } -} diff --git a/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java b/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java deleted file mode 100644 index 6603f1f9..00000000 --- a/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.radarbase.schema.registration; - -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class JsonSchemaBackupStorageTest { - - @Test - public void contentEquals() throws IOException { - Path path1 = Files.createTempFile("some", "test"); - Path path2 = Files.createTempFile("some", "test"); - Path path3 = Files.createTempFile("some", "test"); - Path path4 = Files.createTempFile("some", "test"); - Files.writeString(path1, "some"); - Files.writeString(path2, "some"); - Files.writeString(path3, "soma"); - Files.writeString(path4, "somee"); - - assertTrue(JsonSchemaBackupStorage.contentEquals(path1, path2)); - assertTrue(JsonSchemaBackupStorage.contentEquals(path1, path1)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path1, path3)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path1, path4)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path4, path1)); - } -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc b/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc deleted file mode 100644 index a8fc5f2b..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc +++ /dev/null @@ -1,10 +0,0 @@ -{ - "namespace": "org.radarcns.kafka.key", - "type": "record", - "name": "ObservationKeyTest", - "doc": "Measurement key in the RADAR-CNS project.", - "fields": [ - {"name": "userTestId", "type": "string", "doc": "User Identifier created during the enrolment."}, - {"name": "sourceTestId", "type": "string", "doc": "Unique identifier associated with the source."} - ] -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc b/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc deleted file mode 100644 index bf682e32..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc +++ /dev/null @@ -1,11 +0,0 @@ -{"namespace": "org.radarcns.kafka.key", - "type": "record", - "name": "AggregateKeyTest", - "doc": "Windowed key in the RADAR-CNS project.", - "fields": [ - {"name": "userTestId", "type": "string", "doc": "User Identifier created during the enrolment."}, - {"name": "sourceTestId", "type": "string", "doc": "Unique identifier associated with the source."}, - {"name": "start", "type": "long", "doc": "First timestamp in UNIX time contained in the time window."}, - {"name": "end", "type": "long", "doc": "Last timestamp in UNIX time contained in the time window."} - ] -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml b/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml deleted file mode 100644 index cee4d6bf..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - diff --git a/java-sdk/radar-schemas-registration/src/test/resources/schema.yml b/java-sdk/radar-schemas-registration/src/test/resources/schema.yml deleted file mode 100644 index c56f343d..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/schema.yml +++ /dev/null @@ -1,14 +0,0 @@ -#============================= SKIP FILE =============================# -files: - - README.md - - .DS_Store - -#====================== SKIP TEST CONFIGURATION ======================# -validation: - org.radarcns.passive.biovotion.BiovotionVsm1OxygenSaturation: - fields: - - spO2 - - spO2Quality - org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval: - fields: - - interBeatInterval diff --git a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt index 1a6a9f04..10dc867a 100644 --- a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt +++ b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt @@ -111,7 +111,6 @@ class CommandLineApp( SchemaRegistryCommand(), ListCommand(), ValidatorCommand(), - SchemaTopicManagerCommand(), ).sortedBy { it.name } val parser = getArgumentParser(subCommands) diff --git a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt deleted file mode 100644 index 5e604e1c..00000000 --- a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt +++ /dev/null @@ -1,92 +0,0 @@ -package org.radarbase.schema.tools - -import net.sourceforge.argparse4j.impl.action.StoreConstArgumentAction -import net.sourceforge.argparse4j.inf.ArgumentParser -import net.sourceforge.argparse4j.inf.Namespace -import org.radarbase.schema.registration.* -import org.radarbase.schema.registration.KafkaTopics.Companion.configureKafka -import org.radarbase.schema.tools.SubCommand.Companion.addRootArgument -import org.slf4j.LoggerFactory -import java.nio.file.Paths -import java.time.Duration - -class SchemaTopicManagerCommand : SubCommand { - override val name = "schema-topic" - - override fun execute(options: Namespace, app: CommandLineApp): Int { - val toolConfig = app.config - .configureKafka(bootstrapServers = options.getString("bootstrap_servers")) - try { - KafkaTopics(toolConfig).use { topics -> - val jsonStorage = JsonSchemaBackupStorage( - Paths.get(options.getString("file"))) - val manager = SchemaTopicManager(topics, jsonStorage) - manager.initialize(options.getInt("brokers")) - val timeout = Duration.ofSeconds(options.getInt("timeout").toLong()) - when (options.get(SUBACTION)) { - SubAction.BACKUP -> manager.makeBackup(timeout) - SubAction.RESTORE -> manager.restoreBackup(options.getShort("replication")) - SubAction.ENSURE -> manager.ensure(options.getShort("replication"), timeout) - else -> { - logger.error("Unknown action") - return 3 - } - } - return 0 - } - } catch (ex: Exception) { - logger.error("Action failed: {}", ex.toString()) - return 2 - } - } - - override fun addParser(parser: ArgumentParser) { - parser.apply { - description("Manage the _schemas topic") - addArgument("--backup") - .help("back up schema topic data") - .action(StoreConstArgumentAction()) - .setConst(SubAction.BACKUP) - .dest(SUBACTION) - addArgument("--restore") - .help("restore schema topic from backup") - .action(StoreConstArgumentAction()) - .setConst(SubAction.RESTORE) - .dest(SUBACTION) - addArgument("--ensure") - .help("ensure that the schema topic is restored if needed") - .action(StoreConstArgumentAction()) - .setConst(SubAction.ENSURE) - .dest(SUBACTION) - addArgument("-r", "--replication") - .help("number of replicas per data packet") - .type(Short::class.java).default = 3.toShort() - addArgument("-t", "--timeout") - .help("time (seconds) to wait for records in the _schemas topic to become" - + " available") - .setDefault(600) - .type(Int::class.java) - addArgument("-f", "--file") - .help("JSON file to load _schemas from") - .type(String::class.java) - .required(true) - addArgument("-b", "--brokers") - .help("number of brokers that are expected to be available.") - .type(Int::class.java).default = 3 - addArgument("-s", "--bootstrap-servers") - .help("Kafka hosts, ports and protocols, comma-separated") - .type(String::class.java) - addRootArgument() - } - } - - private enum class SubAction { - BACKUP, RESTORE, ENSURE - } - - companion object { - private const val SUBACTION = "subaction" - private val logger = LoggerFactory.getLogger( - SchemaTopicManagerCommand::class.java) - } -}