Skip to content

Commit

Permalink
[test] Fix the unstable random tests in PrimaryKeyFileStoreTableITCase (
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Jan 16, 2025
1 parent 0df8b9d commit 3d81240
Showing 1 changed file with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -37,7 +39,6 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -787,7 +788,6 @@ public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}

@Disabled // TODO: fix this unstable test
@Test
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
Expand Down Expand Up @@ -1088,6 +1088,29 @@ private List<TableResult> testRandom(
tEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);

// We use a large number of rows to mimic unbounded streams because there is a known
// consistency issue in bounded streams.
//
// For bounded streams, if COMPACT snapshot fails to commit when the stream ends (due to
// conflict or whatever reasons), we have no chance to modify the compaction result, so the
// changelogs produced by compaction will not be committed.
//
// If it happens in production, users can run another job to compact the table, or run
// another job to write more data into the table. These remaining changelogs will be
// produced again.
int factor;
RuntimeExecutionMode mode =
tEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
if (mode == RuntimeExecutionMode.BATCH) {
factor = 1;
} else if (mode == RuntimeExecutionMode.STREAMING) {
factor = 10;
} else {
throw new UnsupportedOperationException(
"Unknown runtime execution mode " + mode.name());
}
int usefulNumRows = LIMIT + NUM_PARTS * NUM_KEYS;
tEnv.executeSql(
"CREATE TABLE `default_catalog`.`default_database`.`S` ("
+ " i INT"
Expand All @@ -1096,10 +1119,10 @@ private List<TableResult> testRandom(
+ " 'fields.i.kind' = 'sequence',"
+ " 'fields.i.start' = '0',"
+ " 'fields.i.end' = '"
+ (LIMIT + NUM_PARTS * NUM_KEYS - 1)
+ (usefulNumRows - 1) * factor
+ "',"
+ " 'number-of-rows' = '"
+ (LIMIT + NUM_PARTS * NUM_KEYS)
+ usefulNumRows * factor
+ "',"
+ " 'rows-per-second' = '"
+ (LIMIT / 20 + ThreadLocalRandom.current().nextInt(LIMIT / 20))
Expand Down Expand Up @@ -1129,7 +1152,7 @@ private List<TableResult> testRandom(
String v2Sql = "CAST(i AS STRING) || '.str' AS v2";
tEnv.executeSql(
String.format(
"CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s FROM `default_catalog`.`default_database`.`S`",
"CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s, i FROM `default_catalog`.`default_database`.`S`",
i, ptSql, kSql, v1Sql, v2Sql));

// run test SQL
Expand All @@ -1138,8 +1161,10 @@ private List<TableResult> testRandom(
FailingFileIO.retryArtificialException(
() ->
tEnv.executeSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView"
+ idx));
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+ idx
+ " WHERE i < "
+ usefulNumRows));
results.add(result);
}

Expand Down

0 comments on commit 3d81240

Please sign in to comment.