diff --git a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt index 08afa5d1988a..098ddfc8cf02 100644 --- a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt +++ b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt @@ -32,7 +32,7 @@ mysql --protocol=tcp -u root mytest -e " c_time time, c_datetime datetime, c_timestamp timestamp, - c_enum ENUM('happy','sad','ok'), + c_enum ENUM('happy','sad','ok') DEFAULT 'ok', c_json JSON, PRIMARY KEY (c_boolean,c_Bigint,c_date) ); diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index be0ecf119929..d58fcf87f957 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -121,7 +121,19 @@ impl MySqlExternalTable { DataType::Int16 => Some(ScalarImpl::Int16(val as _)), DataType::Int32 => Some(ScalarImpl::Int32(val as _)), DataType::Int64 => Some(ScalarImpl::Int64(val)), - _ => Err(anyhow!("unexpected default value type for integer column"))?, + DataType::Varchar => { + // should be the Enum type which is mapped to Varchar + Some(ScalarImpl::from(val.to_string())) + } + _ => { + tracing::error!( + column = col_name, + ?data_type, + default_val = val, + "unexpected default value type for column, set default to null" + ); + None + } }, ColumnDefault::Real(val) => match data_type { DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))), @@ -131,7 +143,15 @@ impl MySqlExternalTable { anyhow!("failed to convert default value to decimal").context(err) })?, )), - _ => Err(anyhow!("unexpected default value type for float column"))?, + _ => { + tracing::error!( + column = col_name, + ?data_type, + default_val = val, + "unexpected default value type for column, set default to null" + ); + None + } }, ColumnDefault::String(mut val) => { // mysql timestamp is mapped to timestamptz, we use UTC timezone to