Skip to content

Commit

Permalink
[flink] Introduce scan bounded to force bounded in streaming job (#4941)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 17, 2025
1 parent 59c038a commit 43abe2d
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Boolean</td>
<td>If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. </td>
</tr>
<tr>
<td><h5>scan.bounded</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job.</td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public class DataTableSource extends BaseDataTableSource {
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
null,
Expand All @@ -57,7 +57,7 @@ public DataTableSource(
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -68,7 +68,7 @@ public DataTableSource(
super(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -83,7 +83,7 @@ public DataTableSource copy() {
return new DataTableSource(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand Down
4 changes: 4 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ under the License.
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Set;
import java.util.regex.Pattern;

import static java.lang.Boolean.parseBoolean;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
Expand All @@ -75,6 +76,7 @@
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_BOUNDED;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;

Expand All @@ -93,19 +95,25 @@ public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
boolean isStreamingMode =
Table table =
origin instanceof SystemCatalogTable
? ((SystemCatalogTable) origin).table()
: buildPaimonTable(context);
boolean unbounded =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
Map<String, String> options = table.options();
if (options.containsKey(SCAN_BOUNDED.key())
&& parseBoolean(options.get(SCAN_BOUNDED.key()))) {
unbounded = false;
}
if (origin instanceof SystemCatalogTable) {
return new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier());
return new SystemTableSource(table, unbounded, context.getObjectIdentifier());
} else {
return new DataTableSource(
context.getObjectIdentifier(),
buildPaimonTable(context),
isStreamingMode,
table,
unbounded,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,14 @@ public class FlinkConnectorOptions {
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static final ConfigOption<Boolean> SCAN_BOUNDED =
key("scan.bounded")
.booleanType()
.noDefaultValue()
.withDescription(
"Bounded mode for Paimon consumer. "
+ "By default, Paimon automatically selects bounded mode based on the mode of the Flink job.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource
CoreOptions.SCAN_VERSION);

protected final ObjectIdentifier tableIdentifier;
protected final boolean streaming;
protected final boolean unbounded;
protected final DynamicTableFactory.Context context;
@Nullable protected final LogStoreTableFactory logStoreTableFactory;

Expand All @@ -104,7 +104,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource
public BaseDataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -114,7 +114,7 @@ public BaseDataTableSource(
@Nullable Long countPushed) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
this.unbounded = unbounded;
this.context = context;
this.logStoreTableFactory = logStoreTableFactory;
this.predicate = predicate;
Expand All @@ -126,7 +126,7 @@ public BaseDataTableSource(

@Override
public ChangelogMode getChangelogMode() {
if (!streaming) {
if (!unbounded) {
// batch merge all, return insert only
return ChangelogMode.insertOnly();
}
Expand Down Expand Up @@ -195,7 +195,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(table)
.sourceName(tableIdentifier.asSummaryString())
.sourceBounded(!streaming)
.sourceBounded(!unbounded)
.logSourceProvider(logSourceProvider)
.projection(projectFields)
.predicate(predicate)
Expand All @@ -204,7 +204,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());

return new PaimonDataStreamScanProvider(
!streaming,
!unbounded,
env ->
sourceBuilder
.sourceParallelism(inferSourceParallelism(env))
Expand Down Expand Up @@ -294,7 +294,7 @@ public boolean applyAggregates(
List<int[]> groupingSets,
List<AggregateExpression> aggregateExpressions,
DataType producedDataType) {
if (isStreaming()) {
if (isUnbounded()) {
return false;
}

Expand Down Expand Up @@ -349,7 +349,7 @@ public String asSummaryString() {
}

@Override
public boolean isStreaming() {
return streaming;
public boolean isUnbounded() {
return unbounded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public class DataTableSource extends BaseDataTableSource
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
null,
Expand All @@ -76,7 +76,7 @@ public DataTableSource(
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -88,7 +88,7 @@ public DataTableSource(
super(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -104,7 +104,7 @@ public DataTableSource copy() {
return new DataTableSource(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -117,7 +117,7 @@ public DataTableSource copy() {

@Override
public TableStats reportStatistics() {
if (streaming) {
if (unbounded) {
return TableStats.UNKNOWN;
}
Optional<Statistics> optionStatistics = table.statistics();
Expand All @@ -142,13 +142,13 @@ public TableStats reportStatistics() {
@Override
public List<String> listAcceptedFilterFields() {
// note that streaming query doesn't support dynamic filtering
return streaming ? Collections.emptyList() : table.partitionKeys();
return unbounded ? Collections.emptyList() : table.partitionKeys();
}

@Override
public void applyDynamicFiltering(List<String> candidateFilterFields) {
checkState(
!streaming,
!unbounded,
"Cannot apply dynamic filtering to Paimon table '%s' when streaming reading.",
table.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Result applyFilters(List<ResolvedExpression> filters) {
unConsumedFilters.add(filter);
} else {
Predicate p = predicateOptional.get();
if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) {
if (isUnbounded() || !p.visit(onlyPartFieldsVisitor)) {
unConsumedFilters.add(filter);
} else {
consumedFilters.add(filter);
Expand Down Expand Up @@ -137,7 +137,7 @@ public void applyLimit(long limit) {
this.limit = limit;
}

public abstract boolean isStreaming();
public abstract boolean isUnbounded();

@Nullable
protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
Expand All @@ -150,7 +150,7 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (isStreaming()) {
if (isUnbounded()) {
parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
} else {
scanSplitsForInference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
/** A {@link FlinkTableSource} for system table. */
public class SystemTableSource extends FlinkTableSource {

private final boolean isStreamingMode;
private final boolean unbounded;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
private final ObjectIdentifier tableIdentifier;

public SystemTableSource(
Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) {
public SystemTableSource(Table table, boolean unbounded, ObjectIdentifier tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
this.unbounded = unbounded;
Options options = Options.fromMap(table.options());
this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
Expand All @@ -59,15 +58,15 @@ public SystemTableSource(

public SystemTableSource(
Table table,
boolean isStreamingMode,
boolean unbounded,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
FlinkConnectorOptions.SplitAssignMode splitAssignMode,
ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.unbounded = unbounded;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
this.tableIdentifier = tableIdentifier;
Expand Down Expand Up @@ -96,7 +95,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
}
readBuilder.withFilter(predicate);

if (isStreamingMode && table instanceof DataTable) {
if (unbounded && table instanceof DataTable) {
source =
new ContinuousFileStoreSource(
readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData);
Expand Down Expand Up @@ -125,7 +124,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
public SystemTableSource copy() {
return new SystemTableSource(
table,
isStreamingMode,
unbounded,
predicate,
projectFields,
limit,
Expand All @@ -140,7 +139,7 @@ public String asSummaryString() {
}

@Override
public boolean isStreaming() {
return isStreamingMode;
public boolean isUnbounded() {
return unbounded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -636,6 +638,20 @@ public void testParquetRowDecimalAndTimestamp() {
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
}

@Test
public void testScanBounded() {
sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
List<Row> result;
try (CloseableIterator<Row> iter =
sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.bounded'='true') */")
.collect()) {
result = ImmutableList.copyOf(iter);
} catch (Exception e) {
throw new RuntimeException(e);
}
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
}

private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv, sql);
while (!transformation.getInputs().isEmpty()) {
Expand Down

0 comments on commit 43abe2d

Please sign in to comment.