Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter(ticdc): fix incorrect event filter with "rename" DDLs #11956

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,21 @@ func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (skip bool, err erro
if len(f.rules) == 0 {
return false, nil
}
evenType := ddlToEventType(ddl.Type)
schema := ddl.TableInfo.TableName.Schema
table := ddl.TableInfo.TableName.Table
evenType := ddlToEventType(ddl.Type)
if evenType == bf.RenameTable {
if ddl.PreTableInfo == nil {
log.Warn("sql event filter doesn't find old table info when the event type is `rename table`",
zap.String("schema", schema),
zap.String("table", table),
zap.Stringer("type", ddl.Type),
zap.String("query", ddl.Query))
return true, cerror.ErrTableIneligible.GenWithStackByArgs(ddl.TableInfo.TableName)
}
schema = ddl.PreTableInfo.TableName.Schema
table = ddl.PreTableInfo.TableName.Table
}
if evenType == bf.NullEvent {
log.Warn("sql event filter unsupported ddl type, do nothing",
zap.String("type", ddl.Type.String()),
Expand Down
80 changes: 75 additions & 5 deletions pkg/filter/sql_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
func TestShouldSkipDDL(t *testing.T) {
t.Parallel()
type innerCase struct {
schema string
table string
query string
ddlType timodel.ActionType
skip bool
schema string
table string
preSchema string
preTable string
query string
ddlType timodel.ActionType
skip bool
}

type testCase struct {
Expand Down Expand Up @@ -286,6 +288,74 @@ func TestShouldSkipDDL(t *testing.T) {
Type: c.ddlType,
}
skip, err := f.shouldSkipDDL(ddl)
if c.ddlType == timodel.ActionRenameTable || c.ddlType == timodel.ActionRenameTables {
require.ErrorIs(t, err, cerror.ErrTableIneligible)
require.Equal(t, true, skip, "case: %+v", c)
} else {
require.NoError(t, err)
require.Equal(t, c.skip, skip, "case: %+v", c)
}
}
case7 := testCase{
cfg: &config.FilterConfig{
Rules: []string{"*.t1", "*.t2"},
EventFilters: []*config.EventFilterRule{
{
Matcher: []string{"*.t1"},
IgnoreEvent: allEventTypes,
},
},
},
cases: []innerCase{
{
schema: "test",
table: "t2",
preSchema: "test",
preTable: "t1",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t3",
preSchema: "test",
preTable: "t1",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t1",
preSchema: "test",
preTable: "t2",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: false,
},
},
}
f, err = newSQLEventFilter(case7.cfg)
require.NoError(t, err)
for _, c := range case7.cases {
ddl := &model.DDLEvent{
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: c.schema,
Table: c.table,
},
},
PreTableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: c.preSchema,
Table: c.preTable,
},
},
Query: c.query,
Type: c.ddlType,
}
skip, err := f.shouldSkipDDL(ddl)
require.NoError(t, err)
require.Equal(t, c.skip, skip, "case: %+v", c)
}
Expand Down
4 changes: 4 additions & 0 deletions tests/integration_tests/event_filter/conf/cf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ ignore-event = ["truncate table"]
[[filter.event-filters]]
matcher = ["event_filter.t_alter"]
ignore-event = ["alter table"]

[[filter.event-filters]]
matcher = ["event_filter.t_name*"]
ignore-event = ["rename table"]
2 changes: 1 addition & 1 deletion tests/integration_tests/event_filter/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter"]
target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter", "event_filter.t_name*", "event_filter.t_rename*"]

[data-sources]
[data-sources.tidb0]
Expand Down
23 changes: 22 additions & 1 deletion tests/integration_tests/event_filter/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,25 @@ CREATE TABLE t_alter
) ENGINE = InnoDB
DEFAULT CHARSET = utf8
COLLATE = utf8_bin;



CREATE TABLE t_name (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name1 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name2 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name3 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
27 changes: 27 additions & 0 deletions tests/integration_tests/event_filter/data/test_rename.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
USE `event_filter`;

RENAME TABLE t_name TO t_rename;

INSERT INTO t_rename
VALUES (1, 'guagua');

INSERT INTO t_rename
VALUES (2, 'huahua');

INSERT INTO t_rename
VALUES (3, 'xigua');

INSERT INTO t_rename
VALUES (4, 'yuko');

-- rename tables
RENAME TABLE t_name1 TO t_rename1, t_name2 TO t_rename2, t_name3 TO t_rename3;

INSERT INTO t_rename1
VALUES (1, 'guagua');

INSERT INTO t_rename2
VALUES (2, 'huahua');

INSERT INTO t_rename3
VALUES (3, 'xigua');
9 changes: 9 additions & 0 deletions tests/integration_tests/event_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ function run() {
check_table_exists "event_filter.t_normal" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_truncate" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_alter" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# check those rows that are not filtered are synced to downstream
run_sql "select count(1) from event_filter.t1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -61,6 +65,11 @@ function run() {
run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE event_filter.t_alter MODIFY t_bigint BIGINT;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_alter.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name TO event_filter.t_rename;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name1 TO event_filter.t_rename1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name2 TO event_filter.t_rename2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name3 TO event_filter.t_rename3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_rename.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "create table event_filter.finish_mark(id int primary key);" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

Expand Down
Loading