diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 839a45a6e47a..0b243558dda3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -28,6 +28,8 @@ import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; @@ -37,7 +39,6 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -787,7 +788,6 @@ public void testFullCompactionChangelogProducerBatchRandom() throws Exception { testFullCompactionChangelogProducerRandom(bEnv, 1, false); } - @Disabled // TODO: fix this unstable test @Test @Timeout(TIMEOUT) public void testFullCompactionChangelogProducerStreamingRandom() throws Exception { @@ -1088,6 +1088,29 @@ private List testRandom( tEnv.getConfig() .getConfiguration() .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + // We use a large number of rows to mimic unbounded streams because there is a known + // consistency issue in bounded streams. + // + // For bounded streams, if COMPACT snapshot fails to commit when the stream ends (due to + // conflict or whatever reasons), we have no chance to modify the compaction result, so the + // changelogs produced by compaction will not be committed. + // + // If it happens in production, users can run another job to compact the table, or run + // another job to write more data into the table. These remaining changelogs will be + // produced again. + int factor; + RuntimeExecutionMode mode = + tEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE); + if (mode == RuntimeExecutionMode.BATCH) { + factor = 1; + } else if (mode == RuntimeExecutionMode.STREAMING) { + factor = 10; + } else { + throw new UnsupportedOperationException( + "Unknown runtime execution mode " + mode.name()); + } + int usefulNumRows = LIMIT + NUM_PARTS * NUM_KEYS; tEnv.executeSql( "CREATE TABLE `default_catalog`.`default_database`.`S` (" + " i INT" @@ -1096,10 +1119,10 @@ private List testRandom( + " 'fields.i.kind' = 'sequence'," + " 'fields.i.start' = '0'," + " 'fields.i.end' = '" - + (LIMIT + NUM_PARTS * NUM_KEYS - 1) + + (usefulNumRows - 1) * factor + "'," + " 'number-of-rows' = '" - + (LIMIT + NUM_PARTS * NUM_KEYS) + + usefulNumRows * factor + "'," + " 'rows-per-second' = '" + (LIMIT / 20 + ThreadLocalRandom.current().nextInt(LIMIT / 20)) @@ -1129,7 +1152,7 @@ private List testRandom( String v2Sql = "CAST(i AS STRING) || '.str' AS v2"; tEnv.executeSql( String.format( - "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s FROM `default_catalog`.`default_database`.`S`", + "CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s, i FROM `default_catalog`.`default_database`.`S`", i, ptSql, kSql, v1Sql, v2Sql)); // run test SQL @@ -1138,8 +1161,10 @@ private List testRandom( FailingFileIO.retryArtificialException( () -> tEnv.executeSql( - "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView" - + idx)); + "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView" + + idx + + " WHERE i < " + + usefulNumRows)); results.add(result); }