diff --git a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java index 4cfd71ac238e7..4f06c15e9f899 100644 --- a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java +++ b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java @@ -19,6 +19,8 @@ package org.apache.flink.util; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import javax.annotation.Nullable; @@ -71,6 +73,22 @@ public Boolean getAsBoolean() { return this == UNDEFINED ? null : (this == TRUE ? Boolean.TRUE : Boolean.FALSE); } + /** + * Merges an existing value with a config, accepting the config's value only if the existing + * value is undefined. + * + * @param original the value to merge with the config. + * @param configOption the config option to merge with from the config. + * @param config the config to merge with. + */ + public static TernaryBoolean mergeTernaryBooleanWithConfig( + TernaryBoolean original, ConfigOption configOption, ReadableConfig config) { + if (original != TernaryBoolean.UNDEFINED) { + return original; + } + return TernaryBoolean.fromBoxedBoolean(config.getOptional(configOption).orElse(null)); + } + // ------------------------------------------------------------------------ /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 1a489fad8c899..baca5f4a01345 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -321,21 +321,20 @@ private EmbeddedRocksDBStateBackend( "Overlap fraction threshold of restoring should be between 0 and 1"); incrementalRestoreAsyncCompactAfterRescale = - original.incrementalRestoreAsyncCompactAfterRescale == TernaryBoolean.UNDEFINED - ? TernaryBoolean.fromBoxedBoolean( - config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE)) - : original.incrementalRestoreAsyncCompactAfterRescale; + TernaryBoolean.mergeTernaryBooleanWithConfig( + original.incrementalRestoreAsyncCompactAfterRescale, + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, + config); useIngestDbRestoreMode = - original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED - ? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE)) - : TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode()); + TernaryBoolean.mergeTernaryBooleanWithConfig( + original.useIngestDbRestoreMode, USE_INGEST_DB_RESTORE_MODE, config); rescalingUseDeleteFilesInRange = - original.rescalingUseDeleteFilesInRange == TernaryBoolean.UNDEFINED - ? TernaryBoolean.fromBoxedBoolean( - config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING)) - : original.rescalingUseDeleteFilesInRange; + TernaryBoolean.mergeTernaryBooleanWithConfig( + original.rescalingUseDeleteFilesInRange, + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, + config); this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 5feeeaf5608dd..41773e4be2813 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; @@ -92,9 +93,12 @@ import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.reset; @@ -186,14 +190,19 @@ protected ConfigurableStateBackend getStateBackend() throws IOException { dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath(); EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); + Configuration configuration = createBackendConfig(); + backend = backend.configure(configuration, Thread.currentThread().getContextClassLoader()); + backend.setDbStoragePath(dbPath); + return backend; + } + + private Configuration createBackendConfig() { Configuration configuration = new Configuration(); configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB); configuration.set( RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); - backend = backend.configure(configuration, Thread.currentThread().getContextClassLoader()); - backend.setDbStoragePath(dbPath); - return backend; + return configuration; } @Override @@ -656,6 +665,48 @@ public void testMapStateClear() throws Exception { assertThatThrownBy(state::clear).isInstanceOf(FlinkRuntimeException.class); } + /** Test for all configs that use {@link org.apache.flink.util.TernaryBoolean}. */ + @TestTemplate + public void testConfigureTernaryBooleanConfigs() throws Exception { + ConfigurableStateBackend stateBackend = getStateBackend(); + if (!(stateBackend instanceof EmbeddedRocksDBStateBackend)) { + return; + } + EmbeddedRocksDBStateBackend rocksDBStateBackend = + (EmbeddedRocksDBStateBackend) stateBackend; + Configuration baseConfig = createBackendConfig(); + Configuration testConfig = new Configuration(); + testConfig.setBoolean( + USE_INGEST_DB_RESTORE_MODE, !USE_INGEST_DB_RESTORE_MODE.defaultValue()); + testConfig.setBoolean( + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, + !INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue()); + testConfig.setBoolean( + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, + !USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()); + EmbeddedRocksDBStateBackend configuredBackend = + rocksDBStateBackend.configure( + testConfig, Thread.currentThread().getContextClassLoader()); + + checkBooleanWithBaseConf( + baseConfig, + USE_INGEST_DB_RESTORE_MODE, + configuredBackend.getUseIngestDbRestoreMode()); + checkBooleanWithBaseConf( + baseConfig, + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, + configuredBackend.isRescalingUseDeleteFilesInRange()); + checkBooleanWithBaseConf( + baseConfig, + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, + configuredBackend.getIncrementalRestoreAsyncCompactAfterRescale()); + } + + private void checkBooleanWithBaseConf( + Configuration testConfig, ConfigOption option, boolean value) { + assertEquals(testConfig.getOptional(option).orElse(!option.defaultValue()), value); + } + private void runStateUpdates() throws Exception { for (int i = 50; i < 150; ++i) { if (i % 10 == 0) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index d54210e727e1c..79869038bd75c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -890,6 +890,31 @@ public void testConfigureUseIngestDB() { assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode()); } + @Test + public void testDefaultUseDeleteFilesInRange() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + assertEquals( + RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING + .defaultValue(), + rocksDBStateBackend.isRescalingUseDeleteFilesInRange()); + } + + @Test + public void testConfigureUseFilesInRange() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + Configuration configuration = new Configuration(); + configuration.set( + RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, + !RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING + .defaultValue()); + rocksDBStateBackend = + rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); + assertEquals( + !RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING + .defaultValue(), + rocksDBStateBackend.isRescalingUseDeleteFilesInRange()); + } + @Test public void testDefaultIncrementalRestoreInstanceBufferSize() { EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);