Skip to content

Commit

Permalink
[Fix-3006] Fix CDCSOURCE does not supports delete event after optimiz…
Browse files Browse the repository at this point in the history
…e code (#3012)

Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Jan 17, 2024
1 parent 3e50168 commit adf013f
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private String addSourceTableView(
CustomTableEnvironment customTableEnvironment, DataStream<Row> rowDataDataStream, Table table) {
// 上游表名称
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream);
customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
logger.info("Create {} temporaryView successful...", viewName);
return viewName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void addTableSink(
String sinkTableName = catalogName + "." + sinkSchemaName + "." + tableName;
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();

customTableEnvironment.createTemporaryView(viewName, rowDataDataStream);
customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
logger.info("Create {} temporaryView successful...", viewName);

createInsertOperations(customTableEnvironment, table, viewName, sinkTableName);
Expand Down

0 comments on commit adf013f

Please sign in to comment.