Skip to content

Commit

Permalink
[FLINK-37000] Fix MySQL CDC could not handle datetime prior to 1970 p…
Browse files Browse the repository at this point in the history
…roperly

Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
  • Loading branch information
yuxiqian committed Jan 4, 2025
1 parent ddb5f00 commit bbd21bb
Show file tree
Hide file tree
Showing 9 changed files with 1,417 additions and 4 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

CREATE TABLE ancient_times
(
id INT NOT NULL AUTO_INCREMENT,
date_col DATE DEFAULT '0017-08-12',
datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17',
datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1',
datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12',
datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123',
datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234',
datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345',
datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456',
PRIMARY KEY (id)
);

INSERT INTO ancient_times VALUES (
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT,
DEFAULT
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'0000-00-00',
'0000-00-00 00:00:00',
'0000-00-00 00:00:00.0',
'0000-00-00 00:00:00.00',
'0000-00-00 00:00:00.000',
'0000-00-00 00:00:00.0000',
'0000-00-00 00:00:00.00000',
'0000-00-00 00:00:00.000000'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'0001-01-01',
'0001-01-01 16:16:16',
'0001-01-01 16:16:16.1',
'0001-01-01 16:16:16.12',
'0001-01-01 16:16:16.123',
'0001-01-01 16:16:16.1234',
'0001-01-01 16:16:16.12345',
'0001-01-01 16:16:16.123456'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'0002-02-02',
'0002-02-02 15:15:15',
'0002-02-02 15:15:15.1',
'0002-02-02 15:15:15.12',
'0002-02-02 15:15:15.123',
'0002-02-02 15:15:15.1234',
'0002-02-02 15:15:15.12345',
'0002-02-02 15:15:15.123456'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'0033-03-03',
'0033-03-03 14:14:14',
'0033-03-03 14:14:14.1',
'0033-03-03 14:14:14.12',
'0033-03-03 14:14:14.123',
'0033-03-03 14:14:14.1234',
'0033-03-03 14:14:14.12345',
'0033-03-03 14:14:14.123456'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'0444-04-04',
'0444-04-04 13:13:13',
'0444-04-04 13:13:13.1',
'0444-04-04 13:13:13.12',
'0444-04-04 13:13:13.123',
'0444-04-04 13:13:13.1234',
'0444-04-04 13:13:13.12345',
'0444-04-04 13:13:13.123456'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'1969-12-31',
'1969-12-31 12:12:12',
'1969-12-31 12:12:12.1',
'1969-12-31 12:12:12.12',
'1969-12-31 12:12:12.123',
'1969-12-31 12:12:12.1234',
'1969-12-31 12:12:12.12345',
'1969-12-31 12:12:12.123456'
);

INSERT INTO ancient_times VALUES (
DEFAULT,
'2019-12-31',
'2019-12-31 23:11:11',
'2019-12-31 23:11:11.1',
'2019-12-31 23:11:11.12',
'2019-12-31 23:11:11.123',
'2019-12-31 23:11:11.1234',
'2019-12-31 23:11:11.12345',
'2019-12-31 23:11:11.123456'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
secure-file-priv=/var/lib/mysql
user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid

# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,12 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
return TimestampData.fromMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000));
return TimestampData.fromMillis(
Math.floorDiv(micro, 1000), (int) (Math.floorMod(micro, 1000) * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
return TimestampData.fromMillis(
Math.floorDiv(nano, 1000_000), (int) (Math.floorMod(nano, 1000_000)));
}
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,17 @@ public Object convert(Object dbzObj, Schema schema) {
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
// Use Math#floorDiv and Math#floorMod instead of `/` and `%`, because
// timestamp number could be negative if we're handling timestamps prior
// to 1970.
return TimestampData.fromEpochMillis(
micro / 1000, (int) (micro % 1000 * 1000));
Math.floorDiv(micro, 1000),
(int) (Math.floorMod(micro, 1000) * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(
nano / 1000_000, (int) (nano % 1000_000));
Math.floorDiv(nano, 1000_000),
(int) (Math.floorMod(nano, 1000_000)));
}
}
LocalDateTime localDateTime =
Expand Down
Loading

0 comments on commit bbd21bb

Please sign in to comment.