diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 00000000000..7d38d829940 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.stream.Stream; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + ancientDatabase, + true, + Arrays.asList( + "[1, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "[2, null, null, null, null, null, null, null, null]", + "[3, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "[4, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "[5, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "[9, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "[10, null, null, null, null, null, null, null, null]", + "[11, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "[12, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "[13, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "[14, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[15, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[16, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + ancientDatabase, + false, + Arrays.asList( + "[1, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "[2, null, null, null, null, null, null, null, null]", + "[3, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "[4, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "[5, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "[9, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "[10, null, null, null, null, null, null, null, null]", + "[11, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "[12, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "[13, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "[14, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[15, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[16, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + private void runGenericAncientDateAndTimeTest( + UniqueDatabase database, + boolean enableTimeAdjuster, + List expectedSnapshotResults, + List expectedStreamingResults) + throws Exception { + Schema ancientSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("date_col", DataTypes.DATE(), null, "0017-08-12") + .physicalColumn( + "datetime_0_col", + DataTypes.TIMESTAMP(0), + null, + "0016-07-13 17:17:17") + .physicalColumn( + "datetime_1_col", + DataTypes.TIMESTAMP(1), + null, + "0015-06-14 17:17:17.1") + .physicalColumn( + "datetime_2_col", + DataTypes.TIMESTAMP(2), + null, + "0014-05-15 17:17:17.12") + .physicalColumn( + "datetime_3_col", + DataTypes.TIMESTAMP(3), + null, + "0013-04-16 17:17:17.123") + .physicalColumn( + "datetime_4_col", + DataTypes.TIMESTAMP(4), + null, + "0012-03-17 17:17:17.1234") + .physicalColumn( + "datetime_5_col", + DataTypes.TIMESTAMP(5), + null, + "0011-02-18 17:17:17.12345") + .physicalColumn( + "datetime_6_col", + DataTypes.TIMESTAMP(6), + null, + "0010-01-19 17:17:17.123456") + .primaryKey("id") + .build(); + List ancientSchemaFieldGetters = + SchemaUtils.createFieldGetters(ancientSchema); + + try (CloseableIterator iterator = + env.fromSource( + getFlinkSourceProvider( + new String[] {"ancient_times"}, + database, + enableTimeAdjuster) + .getSource(), + WatermarkStrategy.noWatermarks(), + "Event-Source") + .executeAndCollect()) { + + { + Tuple2, List> snapshotResults = + fetchResultsAndCreateTableEvent(iterator, expectedSnapshotResults.size()); + Assertions.assertThat(snapshotResults.f1) + .containsOnly( + new CreateTableEvent( + TableId.tableId( + ancientDatabase.getDatabaseName(), "ancient_times"), + ancientSchema)); + Assertions.assertThat(snapshotResults.f0) + .map(evt -> (DataChangeEvent) evt) + .map( + evt -> + SchemaUtils.restoreOriginalData( + evt.after(), ancientSchemaFieldGetters) + .toString()) + .containsExactlyInAnyOrderElementsOf(expectedSnapshotResults); + } + + createBinlogEvents(ancientDatabase); + + { + Tuple2, List> streamingResults = + fetchResultsAndCreateTableEvent(iterator, expectedSnapshotResults.size()); + Assertions.assertThat(streamingResults.f1) + .containsOnly( + new CreateTableEvent( + TableId.tableId( + ancientDatabase.getDatabaseName(), "ancient_times"), + ancientSchema)); + Assertions.assertThat(streamingResults.f0) + .map(evt -> (DataChangeEvent) evt) + .map( + evt -> + SchemaUtils.restoreOriginalData( + evt.after(), ancientSchemaFieldGetters) + .toString()) + .containsExactlyInAnyOrderElementsOf(expectedStreamingResults); + } + } + } + + private FlinkSourceProvider getFlinkSourceProvider( + String[] captureTables, UniqueDatabase database, boolean enableTimeAdjuster) { + String[] captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> database.getDatabaseName() + "." + tableName) + .toArray(String[]::new); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList(database.getDatabaseName()) + .tableList(captureTableIds) + .includeSchemaChanges(false) + .hostname(database.getHost()) + .port(database.getDatabasePort()) + .splitSize(10) + .fetchSize(2) + .username(database.getUsername()) + .password(database.getPassword()) + .serverTimeZone(ZoneId.of("UTC").toString()) + .serverId(MySqSourceTestUtils.getServerId(env.getParallelism())) + .debeziumProperties(dbzProperties); + return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + } + + private static void createBinlogEvents(UniqueDatabase database) throws SQLException { + // Test reading identical data in binlog stage again + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0000-00-00',\n" + + " '0000-00-00 00:00:00',\n" + + " '0000-00-00 00:00:00.0',\n" + + " '0000-00-00 00:00:00.00',\n" + + " '0000-00-00 00:00:00.000',\n" + + " '0000-00-00 00:00:00.0000',\n" + + " '0000-00-00 00:00:00.00000',\n" + + " '0000-00-00 00:00:00.000000'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0001-01-01',\n" + + " '0001-01-01 16:16:16',\n" + + " '0001-01-01 16:16:16.1',\n" + + " '0001-01-01 16:16:16.12',\n" + + " '0001-01-01 16:16:16.123',\n" + + " '0001-01-01 16:16:16.1234',\n" + + " '0001-01-01 16:16:16.12345',\n" + + " '0001-01-01 16:16:16.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0002-02-02',\n" + + " '0002-02-02 15:15:15',\n" + + " '0002-02-02 15:15:15.1',\n" + + " '0002-02-02 15:15:15.12',\n" + + " '0002-02-02 15:15:15.123',\n" + + " '0002-02-02 15:15:15.1234',\n" + + " '0002-02-02 15:15:15.12345',\n" + + " '0002-02-02 15:15:15.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0033-03-03',\n" + + " '0033-03-03 14:14:14',\n" + + " '0033-03-03 14:14:14.1',\n" + + " '0033-03-03 14:14:14.12',\n" + + " '0033-03-03 14:14:14.123',\n" + + " '0033-03-03 14:14:14.1234',\n" + + " '0033-03-03 14:14:14.12345',\n" + + " '0033-03-03 14:14:14.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0444-04-04',\n" + + " '0444-04-04 13:13:13',\n" + + " '0444-04-04 13:13:13.1',\n" + + " '0444-04-04 13:13:13.12',\n" + + " '0444-04-04 13:13:13.123',\n" + + " '0444-04-04 13:13:13.1234',\n" + + " '0444-04-04 13:13:13.12345',\n" + + " '0444-04-04 13:13:13.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '1969-12-31',\n" + + " '1969-12-31 12:12:12',\n" + + " '1969-12-31 12:12:12.1',\n" + + " '1969-12-31 12:12:12.12',\n" + + " '1969-12-31 12:12:12.123',\n" + + " '1969-12-31 12:12:12.1234',\n" + + " '1969-12-31 12:12:12.12345',\n" + + " '1969-12-31 12:12:12.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '2019-12-31',\n" + + " '2019-12-31 23:11:11',\n" + + " '2019-12-31 23:11:11.1',\n" + + " '2019-12-31 23:11:11.12',\n" + + " '2019-12-31 23:11:11.123',\n" + + " '2019-12-31 23:11:11.1234',\n" + + " '2019-12-31 23:11:11.12345',\n" + + " '2019-12-31 23:11:11.123456'\n" + + ");"); + } + } + + public static Tuple2, List> fetchResultsAndCreateTableEvent( + Iterator iter, int size) { + List result = new ArrayList<>(size); + List createTableEvents = new ArrayList<>(); + while (size > 0 && iter.hasNext()) { + T event = iter.next(); + if (event instanceof CreateTableEvent) { + createTableEvents.add((CreateTableEvent) event); + } else { + result.add(event); + size--; + } + } + return Tuple2.of(result, createTableEvents); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql new file mode 100644 index 00000000000..93832b9b7b1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql @@ -0,0 +1,124 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE ancient_times +( + id INT NOT NULL AUTO_INCREMENT, + date_col DATE DEFAULT '0017-08-12', + datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17', + datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1', + datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12', + datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123', + datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234', + datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345', + datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456', + PRIMARY KEY (id) +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0000-00-00', + '0000-00-00 00:00:00', + '0000-00-00 00:00:00.0', + '0000-00-00 00:00:00.00', + '0000-00-00 00:00:00.000', + '0000-00-00 00:00:00.0000', + '0000-00-00 00:00:00.00000', + '0000-00-00 00:00:00.000000' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0001-01-01', + '0001-01-01 16:16:16', + '0001-01-01 16:16:16.1', + '0001-01-01 16:16:16.12', + '0001-01-01 16:16:16.123', + '0001-01-01 16:16:16.1234', + '0001-01-01 16:16:16.12345', + '0001-01-01 16:16:16.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0002-02-02', + '0002-02-02 15:15:15', + '0002-02-02 15:15:15.1', + '0002-02-02 15:15:15.12', + '0002-02-02 15:15:15.123', + '0002-02-02 15:15:15.1234', + '0002-02-02 15:15:15.12345', + '0002-02-02 15:15:15.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0033-03-03', + '0033-03-03 14:14:14', + '0033-03-03 14:14:14.1', + '0033-03-03 14:14:14.12', + '0033-03-03 14:14:14.123', + '0033-03-03 14:14:14.1234', + '0033-03-03 14:14:14.12345', + '0033-03-03 14:14:14.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0444-04-04', + '0444-04-04 13:13:13', + '0444-04-04 13:13:13.1', + '0444-04-04 13:13:13.12', + '0444-04-04 13:13:13.123', + '0444-04-04 13:13:13.1234', + '0444-04-04 13:13:13.12345', + '0444-04-04 13:13:13.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '1969-12-31', + '1969-12-31 12:12:12', + '1969-12-31 12:12:12.1', + '1969-12-31 12:12:12.12', + '1969-12-31 12:12:12.123', + '1969-12-31 12:12:12.1234', + '1969-12-31 12:12:12.12345', + '1969-12-31 12:12:12.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '2019-12-31', + '2019-12-31 23:11:11', + '2019-12-31 23:11:11.1', + '2019-12-31 23:11:11.12', + '2019-12-31 23:11:11.123', + '2019-12-31 23:11:11.1234', + '2019-12-31 23:11:11.12345', + '2019-12-31 23:11:11.123456' +); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf new file mode 100644 index 00000000000..ca0483780fb --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row +sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index f77d1aa184c..ad7ec458e53 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -309,10 +309,12 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) { return TimestampData.fromMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; - return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000)); + return TimestampData.fromMillis( + Math.floorDiv(micro, 1000), (int) (Math.floorMod(micro, 1000) * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; - return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000)); + return TimestampData.fromMillis( + Math.floorDiv(nano, 1000_000), (int) (Math.floorMod(nano, 1000_000))); } } throw new IllegalArgumentException( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 6a96420753b..b68dcd171a8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -481,12 +481,17 @@ public Object convert(Object dbzObj, Schema schema) { return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; + // Use Math#floorDiv and Math#floorMod instead of `/` and `%`, because + // timestamp number could be negative if we're handling timestamps prior + // to 1970. return TimestampData.fromEpochMillis( - micro / 1000, (int) (micro % 1000 * 1000)); + Math.floorDiv(micro, 1000), + (int) (Math.floorMod(micro, 1000) * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis( - nano / 1000_000, (int) (nano % 1000_000)); + Math.floorDiv(nano, 1000_000), + (int) (Math.floorMod(nano, 1000_000))); } } LocalDateTime localDateTime = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 00000000000..13e3dd29a2b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowUtils; +import org.apache.flink.util.CloseableIterator; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + true, + Arrays.asList( + "+I[1, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[4, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[5, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + false, + Arrays.asList( + "+I[1, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[4, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[5, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + private void runGenericAncientDateAndTimeTest( + MySqlContainer container, + UniqueDatabase database, + boolean enableTimeAdjuster, + List expectedResults) + throws Exception { + // Build deserializer + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("date_col", DataTypes.DATE()), + DataTypes.FIELD("datetime_0_col", DataTypes.TIMESTAMP(0)), + DataTypes.FIELD("datetime_1_col", DataTypes.TIMESTAMP(1)), + DataTypes.FIELD("datetime_2_col", DataTypes.TIMESTAMP(2)), + DataTypes.FIELD("datetime_3_col", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("datetime_4_col", DataTypes.TIMESTAMP(4)), + DataTypes.FIELD("datetime_5_col", DataTypes.TIMESTAMP(5)), + DataTypes.FIELD("datetime_6_col", DataTypes.TIMESTAMP(6))); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + // Build source + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .databaseList(database.getDatabaseName()) + .serverTimeZone("UTC") + .tableList(database.getDatabaseName() + ".ancient_times") + .username(database.getUsername()) + .password(database.getPassword()) + .serverId(getServerId()) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .build(); + + try (CloseableIterator iterator = + env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "Backfill Skipped Source") + .executeAndCollect()) { + Assertions.assertThat(fetchRows(iterator, expectedResults.size())) + .containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + RowData row = iter.next(); + rows.add(row); + size--; + } + return convertRowDataToRowString(rows); + } + + private static List convertRowDataToRowString(List rows) { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("id_col", 0); + map.put("date_col", 1); + map.put("datetime_0_col", 2); + map.put("datetime_1_col", 3); + map.put("datetime_2_col", 4); + map.put("datetime_3_col", 5); + map.put("datetime_4_col", 6); + map.put("datetime_5_col", 7); + map.put("datetime_6_col", 8); + return rows.stream() + .map( + row -> + RowUtils.createRowWithNamedPositions( + row.getRowKind(), + new Object[] { + wrap(row, 0, RowData::getInt), + wrap(row, 1, RowData::getInt), + wrap(row, 2, (r, i) -> r.getTimestamp(i, 0)), + wrap(row, 3, (r, i) -> r.getTimestamp(i, 1)), + wrap(row, 4, (r, i) -> r.getTimestamp(i, 2)), + wrap(row, 5, (r, i) -> r.getTimestamp(i, 3)), + wrap(row, 6, (r, i) -> r.getTimestamp(i, 4)), + wrap(row, 7, (r, i) -> r.getTimestamp(i, 5)), + wrap(row, 8, (r, i) -> r.getTimestamp(i, 6)) + }, + map) + .toString()) + .collect(Collectors.toList()); + } + + private static Object wrap(RowData row, int index, BiFunction getter) { + if (row.isNullAt(index)) { + return null; + } + return getter.apply(row, index); + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + env.getParallelism()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 00000000000..b8e2510a495 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +@RunWith(Parameterized.class) +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private final boolean incrementalSnapshot; + + @Parameterized.Parameters(name = "incrementalSnapshot: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + + public MySqlAncientDateAndTimeITCase(boolean incrementalSnapshot) { + this.incrementalSnapshot = incrementalSnapshot; + } + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + if (incrementalSnapshot) { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + } else { + env.setParallelism(1); + } + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + incrementalSnapshot, + true, + Arrays.asList( + "+I[1, 2017-08-12, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 2001-01-01, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[4, 2002-02-02, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[5, 2033-03-03, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[6, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "+I[9, 2017-08-12, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[10, null, null, null, null, null, null, null, null]", + "+I[11, 2001-01-01, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[12, 2002-02-02, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[13, 2033-03-03, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[14, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[15, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[16, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + incrementalSnapshot, + false, + Arrays.asList( + "+I[1, 0017-08-12, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 0001-01-01, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[4, 0002-02-02, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[5, 0033-03-03, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[6, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "+I[9, 0017-08-12, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[10, null, null, null, null, null, null, null, null]", + "+I[11, 0001-01-01, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[12, 0002-02-02, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[13, 0033-03-03, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[14, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[15, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[16, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + private void runGenericAncientDateAndTimeTest( + MySqlContainer container, + UniqueDatabase database, + boolean incrementalSnapshot, + boolean enableTimeAdjuster, + List expectedSnapshotResults, + List expectedStreamingResults) + throws Exception { + String sourceDDL = + String.format( + "CREATE TABLE ancient_db (" + + " `id` INT NOT NULL," + + " date_col DATE," + + " datetime_0_col TIMESTAMP(0)," + + " datetime_1_col TIMESTAMP(1)," + + " datetime_2_col TIMESTAMP(2)," + + " datetime_3_col TIMESTAMP(3)," + + " datetime_4_col TIMESTAMP(4)," + + " datetime_5_col TIMESTAMP(5)," + + " datetime_6_col TIMESTAMP(6)," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'," + + " 'debezium.enable.time.adjuster' = '%s'" + + ")", + container.getHost(), + container.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + database.getDatabaseName(), + "ancient_times", + incrementalSnapshot, + getServerId(), + enableTimeAdjuster); + + tEnv.executeSql(sourceDDL); + + TableResult result = tEnv.executeSql("SELECT * FROM ancient_db"); + do { + Thread.sleep(5000L); + } while (result.getJobClient().get().getJobStatus().get() != RUNNING); + + CloseableIterator iterator = result.collect(); + + List expectedRows = new ArrayList<>(expectedSnapshotResults); + + Assertions.assertThat(fetchRows(iterator, expectedRows.size())) + .containsExactlyInAnyOrderElementsOf(expectedRows); + + createBinlogEvents(database); + + Assertions.assertThat(fetchRows(iterator, expectedStreamingResults.size())) + .containsExactlyInAnyOrderElementsOf(expectedStreamingResults); + result.getJobClient().get().cancel().get(); + } + + private static void createBinlogEvents(UniqueDatabase database) throws SQLException { + // Test reading identical data in binlog stage again + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0000-00-00',\n" + + " '0000-00-00 00:00:00',\n" + + " '0000-00-00 00:00:00.0',\n" + + " '0000-00-00 00:00:00.00',\n" + + " '0000-00-00 00:00:00.000',\n" + + " '0000-00-00 00:00:00.0000',\n" + + " '0000-00-00 00:00:00.00000',\n" + + " '0000-00-00 00:00:00.000000'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0001-01-01',\n" + + " '0001-01-01 16:16:16',\n" + + " '0001-01-01 16:16:16.1',\n" + + " '0001-01-01 16:16:16.12',\n" + + " '0001-01-01 16:16:16.123',\n" + + " '0001-01-01 16:16:16.1234',\n" + + " '0001-01-01 16:16:16.12345',\n" + + " '0001-01-01 16:16:16.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0002-02-02',\n" + + " '0002-02-02 15:15:15',\n" + + " '0002-02-02 15:15:15.1',\n" + + " '0002-02-02 15:15:15.12',\n" + + " '0002-02-02 15:15:15.123',\n" + + " '0002-02-02 15:15:15.1234',\n" + + " '0002-02-02 15:15:15.12345',\n" + + " '0002-02-02 15:15:15.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0033-03-03',\n" + + " '0033-03-03 14:14:14',\n" + + " '0033-03-03 14:14:14.1',\n" + + " '0033-03-03 14:14:14.12',\n" + + " '0033-03-03 14:14:14.123',\n" + + " '0033-03-03 14:14:14.1234',\n" + + " '0033-03-03 14:14:14.12345',\n" + + " '0033-03-03 14:14:14.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0444-04-04',\n" + + " '0444-04-04 13:13:13',\n" + + " '0444-04-04 13:13:13.1',\n" + + " '0444-04-04 13:13:13.12',\n" + + " '0444-04-04 13:13:13.123',\n" + + " '0444-04-04 13:13:13.1234',\n" + + " '0444-04-04 13:13:13.12345',\n" + + " '0444-04-04 13:13:13.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '1969-12-31',\n" + + " '1969-12-31 12:12:12',\n" + + " '1969-12-31 12:12:12.1',\n" + + " '1969-12-31 12:12:12.12',\n" + + " '1969-12-31 12:12:12.123',\n" + + " '1969-12-31 12:12:12.1234',\n" + + " '1969-12-31 12:12:12.12345',\n" + + " '1969-12-31 12:12:12.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '2019-12-31',\n" + + " '2019-12-31 23:11:11',\n" + + " '2019-12-31 23:11:11.1',\n" + + " '2019-12-31 23:11:11.12',\n" + + " '2019-12-31 23:11:11.123',\n" + + " '2019-12-31 23:11:11.1234',\n" + + " '2019-12-31 23:11:11.12345',\n" + + " '2019-12-31 23:11:11.123456'\n" + + ");"); + } + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + if (incrementalSnapshot) { + return serverId + "-" + (serverId + env.getParallelism()); + } + return String.valueOf(serverId); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql new file mode 100644 index 00000000000..878c687c350 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql @@ -0,0 +1,124 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE ancient_times +( + id SERIAL, + date_col DATE DEFAULT '0017-08-12', + datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17', + datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1', + datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12', + datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123', + datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234', + datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345', + datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456', + PRIMARY KEY (id) +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0000-00-00', + '0000-00-00 00:00:00', + '0000-00-00 00:00:00.0', + '0000-00-00 00:00:00.00', + '0000-00-00 00:00:00.000', + '0000-00-00 00:00:00.0000', + '0000-00-00 00:00:00.00000', + '0000-00-00 00:00:00.000000' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0001-01-01', + '0001-01-01 16:16:16', + '0001-01-01 16:16:16.1', + '0001-01-01 16:16:16.12', + '0001-01-01 16:16:16.123', + '0001-01-01 16:16:16.1234', + '0001-01-01 16:16:16.12345', + '0001-01-01 16:16:16.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0002-02-02', + '0002-02-02 15:15:15', + '0002-02-02 15:15:15.1', + '0002-02-02 15:15:15.12', + '0002-02-02 15:15:15.123', + '0002-02-02 15:15:15.1234', + '0002-02-02 15:15:15.12345', + '0002-02-02 15:15:15.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0033-03-03', + '0033-03-03 14:14:14', + '0033-03-03 14:14:14.1', + '0033-03-03 14:14:14.12', + '0033-03-03 14:14:14.123', + '0033-03-03 14:14:14.1234', + '0033-03-03 14:14:14.12345', + '0033-03-03 14:14:14.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0444-04-04', + '0444-04-04 13:13:13', + '0444-04-04 13:13:13.1', + '0444-04-04 13:13:13.12', + '0444-04-04 13:13:13.123', + '0444-04-04 13:13:13.1234', + '0444-04-04 13:13:13.12345', + '0444-04-04 13:13:13.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '1969-12-31', + '1969-12-31 12:12:12', + '1969-12-31 12:12:12.1', + '1969-12-31 12:12:12.12', + '1969-12-31 12:12:12.123', + '1969-12-31 12:12:12.1234', + '1969-12-31 12:12:12.12345', + '1969-12-31 12:12:12.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '2019-12-31', + '2019-12-31 23:11:11', + '2019-12-31 23:11:11.1', + '2019-12-31 23:11:11.12', + '2019-12-31 23:11:11.123', + '2019-12-31 23:11:11.1234', + '2019-12-31 23:11:11.12345', + '2019-12-31 23:11:11.123456' +); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf new file mode 100644 index 00000000000..ca0483780fb --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row +sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"