Skip to content

Commit

Permalink
bugfix: fix the error of active refresh failure of cross-database tab…
Browse files Browse the repository at this point in the history
…le metadata (#6759)
  • Loading branch information
GoodBoyCoder authored Sep 3, 2024
1 parent 91eb505 commit cd2e6d2
Show file tree
Hide file tree
Showing 22 changed files with 102 additions and 50 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] make exception message generic for all database drivers
- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] fix the error of active refresh failure of cross-database table metadata


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] 使异常消息对所有数据库驱动程序通用

- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] 修复跨库表主动刷新`tableMeta`的异常问题

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ public TableMeta getTableMeta(final Connection connection, final String tableNam
public void refresh(final Connection connection, String resourceId) {
ConcurrentMap<String, TableMeta> tableMetaMap = TABLE_META_CACHE.asMap();
for (Map.Entry<String, TableMeta> entry : tableMetaMap.entrySet()) {
String key = getCacheKey(connection, entry.getValue().getTableName(), resourceId);
String key = getCacheKey(connection, entry.getValue().getOriginalTableName(), resourceId);
if (entry.getKey().equals(key)) {
try {
TableMeta tableMeta = fetchSchema(connection, entry.getValue().getTableName());
String freshTableName = StringUtils.isBlank(entry.getValue().getOriginalTableName()) ?
entry.getValue().getTableName() : entry.getValue().getOriginalTableName();
TableMeta tableMeta = fetchSchema(connection, freshTableName);
if (!tableMeta.equals(entry.getValue())) {
TABLE_META_CACHE.put(entry.getKey(), tableMeta);
LOGGER.info("table meta change was found, update table meta cache automatically.");
Expand All @@ -99,6 +101,7 @@ public void refresh(final Connection connection, String resourceId) {
}
}


/**
* generate cache key
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableNam

TableNameMeta tableNameMeta = toTableNameMeta(tableName, dbmd.getConnection().getSchema());
result.setTableName(tableNameMeta.getTableName());
result.setOriginalTableName(tableName);
try (ResultSet rsColumns = dbmd.getColumns("", tableNameMeta.getSchema(), tableNameMeta.getTableName(), "%");
ResultSet rsIndex = dbmd.getIndexInfo(null, tableNameMeta.getSchema(), tableNameMeta.getTableName(), false, true);
ResultSet rsPrimary = dbmd.getPrimaryKeys(null, tableNameMeta.getSchema(), tableNameMeta.getTableName())) {
Expand All @@ -67,7 +68,6 @@ protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableNam
processIndexes(result, rsIndex);

processPrimaries(result, rsPrimary);

if (result.getAllIndexes().isEmpty()) {
throw new ShouldNeverHappenException(String.format("Could not found any index in the table: %s", tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MARIADB) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public class MysqlTableMetaCache extends AbstractTableMetaCache {
protected String getCacheKey(Connection connection, String tableName, String resourceId) {
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");
//remove single quote and separate it to catalogName and tableName
String[] tableNameWithCatalog = tableName.replace("`", "").split("\\.");
String defaultTableName = tableNameWithCatalog.length > 1 ? tableNameWithCatalog[1] : tableNameWithCatalog[0];
//original: remove single quote and separate it to catalogName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
String defaultTableName = ColumnUtils.delEscape(tableName, JdbcConstants.MYSQL);

DatabaseMetaData databaseMetaData = null;
DatabaseMetaData databaseMetaData;
try {
databaseMetaData = connection.getMetaData();
} catch (SQLException e) {
Expand Down Expand Up @@ -80,15 +80,15 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
}
}

protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd)
protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String originalTableName)
throws SQLException {
//always "" for mysql
String schemaName = rsmd.getSchemaName(1);
Expand All @@ -110,6 +110,10 @@ protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaDa
// May be not consistent with lower_case_table_names
tm.setCaseSensitive(true);

// Save the original table name information for active cache refresh
// to avoid refresh failure caused by missing catalog information
tm.setOriginalTableName(originalTableName);

/*
* here has two different type to get the data
* make sure the table name was right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema.length > 1 ? tableNameWithSchema[1] : tableNameWithSchema[0];

//original: separate it to schemaName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
//oracle does not implement supportsMixedCaseIdentifiers in DatabaseMetadata
if (defaultTableName.contains("\"")) {
cacheKey.append(defaultTableName.replace("\"", ""));
if (tableName.contains("\"")) {
cacheKey.append(tableName.replace("\"", ""));
} else {
// oracle default store in upper case
cacheKey.append(defaultTableName.toUpperCase());
cacheKey.append(tableName.toUpperCase());
}

return cacheKey.toString();
Expand All @@ -75,6 +73,9 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws

protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableName) throws SQLException {
TableMeta tm = new TableMeta();
// Save the original table name information for active cache refresh
// to avoid refresh failure caused by missing catalog information
tm.setOriginalTableName(tableName);
String[] schemaTable = tableName.split("\\.");
String schemaName = schemaTable.length > 1 ? schemaTable[0] : dbmd.getUserName();
tableName = schemaTable.length > 1 ? schemaTable[1] : tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.POLARDBX) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema.length > 1 ? tableNameWithSchema[1] : tableNameWithSchema[0];

//original: separate it to schemaName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
//postgres does not implement supportsMixedCaseIdentifiers in DatabaseMetadata
if (defaultTableName.contains("\"")) {
cacheKey.append(defaultTableName.replace("\"", ""));
if (tableName.contains("\"")) {
cacheKey.append(tableName.replace("\"", ""));
} else {
//postgres default store in lower case
cacheKey.append(defaultTableName.toLowerCase());
cacheKey.append(tableName.toLowerCase());
}

return cacheKey.toString();
Expand All @@ -73,6 +71,7 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
DatabaseMetaData dbmd = connection.getMetaData();
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
tm.setOriginalTableName(tableName);
String[] schemaTable = tableName.split("\\.");
String schemaName = schemaTable.length > 1 ? schemaTable[0] : null;
tableName = schemaTable.length > 1 ? schemaTable[1] : tableName;
Expand Down Expand Up @@ -185,8 +184,8 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
}

while (rsTable.next()) {
String rsTableName = rsTable.getString("TABLE_NAME");
String rsTableSchema = rsTable.getString("TABLE_SCHEM");
String rsTableName = rsTable.getString("TABLE_NAME");
//set origin tableName with schema if necessary
if ("public".equalsIgnoreCase(rsTableSchema)) {
//for compatibility reasons, old clients generally do not have the 'public' default schema by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,24 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema[tableNameWithSchema.length - 1];

DatabaseMetaData databaseMetaData;
try {
databaseMetaData = connection.getMetaData();
} catch (SQLException e) {
LOGGER.error("Could not get connection, use default cache key {}", e.getMessage(), e);
return cacheKey.append(defaultTableName).toString();
return cacheKey.append(tableName).toString();
}

try {
//prevent duplicated cache key
if (databaseMetaData.supportsMixedCaseIdentifiers()) {
cacheKey.append(defaultTableName);
cacheKey.append(tableName);
} else {
cacheKey.append(defaultTableName.toUpperCase());
cacheKey.append(tableName.toUpperCase());
}
} catch (SQLException e) {
LOGGER.error("Could not get supportsMixedCaseIdentifiers in connection metadata, use default cache key {}", e.getMessage(), e);
return cacheKey.append(defaultTableName).toString();
return cacheKey.append(tableName).toString();
}

return cacheKey.toString();
Expand All @@ -88,6 +84,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
private TableMeta resultSetMetaToSchema(Connection connection, String tableName) throws SQLException {
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
tm.setOriginalTableName(tableName);

tableName = ColumnUtils.delEscape(tableName, JdbcConstants.SQLSERVER);
String[] schemaTable = tableName.split("\\.");
Expand Down Expand Up @@ -189,8 +186,8 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
}

while (rsTable.next()) {
String rsTableName = rsTable.getString("TABLE_NAME");
String rsTableSchema = rsTable.getString("TABLE_SCHEM");
String rsTableName = rsTable.getString("TABLE_NAME");
//set origin tableName with schema if necessary
if ("dbo".equalsIgnoreCase(rsTableSchema)) {
//for compatibility reasons, old clients generally do not have the 'dbo' default schema by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setUrl("jdbc:mock:xxx2");
dataSource.setDriver(mockDriver);

DataSourceProxy newDataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setDriver(mockDriver);
Expand All @@ -159,6 +159,7 @@ public void init() throws SQLException {

@Test
public void testBeforeAndAfterImage() throws SQLException {
System.out.println(newStatementProxy);
String sql = "insert into table_insert_executor_test(id, user_id, name, sex) values (1, 1, 'will', 1)";
List<SQLStatement> asts = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);
MySQLInsertRecognizer recognizer = new MySQLInsertRecognizer(sql, asts.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setUrl("jdbc:mock:xxx1");
dataSource.setDriver(mockDriver);

DataSourceProxy newDataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void init() {
new Object[]{0, "updated", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setDriver(mockDriver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ public class MockDatabaseMetaData implements DatabaseMetaData {
);

private static List<String> tableMetaColumnLabels = Arrays.asList(
"TABLE_NAME",
"TABLE_SCHEM"
"TABLE_CAT",
"TABLE_SCHEM",
"TABLE_NAME"
);

private Object[][] columnsMetasReturnValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ public MockDriver() {
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{});
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{}, new Object[][]{});
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue);
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue, new Object[][]{});
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue, Object[][] mockTableMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue, mockTableMetasReturnValue);
}

public MockDriver(List<String> mockReturnValueColumnLabels, Object[][] mockReturnValue, Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue) {
this(mockReturnValueColumnLabels, mockReturnValue, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{});
this(mockReturnValueColumnLabels, mockReturnValue, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{}, new Object[][]{});
}

public MockDriver(List<String> mockReturnValueColumnLabels, Object[][] mockReturnValue, Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue) {
Expand Down
Loading

0 comments on commit cd2e6d2

Please sign in to comment.