Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][dingo-executor] fix issues mysql client hanging and chinese character encoding. #1239

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ public class MysqlCommands {
MysqlPacketFactory mysqlPacketFactory = MysqlPacketFactory.getInstance();

public static void executeShowFields(String table, AtomicLong packetId, MysqlConnection mysqlConnection) {
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ResultSet rs = mysqlConnection.getConnection().getMetaData().getColumns(null, null,
table, null);
MysqlResponseHandler.responseShowField(rs, packetId, mysqlConnection);
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

public void execute(QueryPacket queryPacket,
MysqlConnection mysqlConnection) {
String sql;
String characterSet = null;
try {
String characterSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
characterSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
characterSet = getCharacterSet(characterSet);
sql = new String(queryPacket.message, characterSet);
} catch (Exception e) {
Expand All @@ -92,7 +95,8 @@ public void execute(QueryPacket queryPacket,
AtomicLong packetId = new AtomicLong(queryPacket.packetId + 1);
LogUtils.debug(log, "dingo connection:{}, receive sql:{}", mysqlConnection.getConnection().toString(), sql);
if (mysqlConnection.passwordExpire && !doExpire(mysqlConnection, sql, packetId)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_PASSWORD_EXPIRE);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_PASSWORD_EXPIRE,
characterSet);
return;
}
executeSingleQuery(sql, packetId, mysqlConnection);
Expand All @@ -115,20 +119,27 @@ private static boolean doExpire(MysqlConnection mysqlConnection, String sql, Ato
}
}

if (sql.startsWith(setPwdSql1) || sql.startsWith(alterUserPwdSql1)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel,
ErrorCode.ER_PASSWORD_EXPIRE);
return true;
try {
if (sql.startsWith(setPwdSql1) || sql.startsWith(alterUserPwdSql1)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel,
ErrorCode.ER_PASSWORD_EXPIRE, mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET));
return true;
}
} catch(SQLException e) {
LogUtils.info(log, e.getMessage(), e);
}
return false;
}

public void prepare(MysqlConnection mysqlConnection, String sql) {
DingoConnection connection = (DingoConnection) mysqlConnection.getConnection();
AtomicLong packetId = new AtomicLong(2);
String connCharSet = null;

try {
DingoPreparedStatement preparedStatement = (DingoPreparedStatement) connection
.prepareStatement(sql);
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
Meta.StatementHandle statementHandle = preparedStatement.handle;
String placeholder = "?";
int i = 0;
Expand Down Expand Up @@ -173,16 +184,19 @@ public void prepare(MysqlConnection mysqlConnection, String sql) {
MysqlResponseHandler.responsePrepare(preparePacket, mysqlConnection.channel);
} catch (SQLException e) {
LogUtils.info(log, e.getMessage(), e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

public void executeSingleQuery(String sql, AtomicLong packetId,
MysqlConnection mysqlConnection) {
Statement statement = null;
boolean hasResults;
String connCharSet = null;

try {
statement = mysqlConnection.getConnection().createStatement();
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
hasResults = statement.execute(sql);
if (hasResults) {
// select
Expand Down Expand Up @@ -212,10 +226,9 @@ public void executeSingleQuery(String sql, AtomicLong packetId,
} catch (SQLException sqlException) {
LogUtils.error(log, "sql exception sqlstate:" + sqlException.getSQLState() + ", code:" + sqlException.getErrorCode()
+ ", message:" + sqlException.getMessage());
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, sqlException);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, sqlException, connCharSet);
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
//MysqlResponseHandler.responseError(bakPacketId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, "");
throw e;
} finally {
try {
Expand All @@ -242,9 +255,13 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
AtomicLong packetId,
MysqlConnection mysqlConnection
) {
String connectionCharSet = null;
try {
connectionCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
statementPacket.paramValMap.forEach((k, v) -> {
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
switch (v.getType()) {
case MysqlType.FIELD_TYPE_TINY:
byte byteVal = v.getValue()[0];
Expand Down Expand Up @@ -323,15 +340,17 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
preparedStatement.setObject(k, charVal);
}
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
});
if (statementType == Meta.StatementType.SELECT) {
String connCharSet = null;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
MysqlResponseHandler.responsePrepareExecute(resultSet, packetId, mysqlConnection);
} catch (SQLException e) {
LogUtils.error(log, e.getMessage(), e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
} else if (statementType == Meta.StatementType.OTHER_DDL) {
OKPacket okPacket = mysqlPacketFactory.getOkPacket(0, packetId, null);
Expand All @@ -353,9 +372,9 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
MysqlResponseHandler.responseOk(okPacket, mysqlConnection.channel);
}
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connectionCharSet);
} catch (Exception e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, "");
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, connectionCharSet);
LogUtils.error(log, e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public static void responseShowField(ResultSet resultSet,
MysqlConnection mysqlConnection) {
// 1. column packet
// 2. ok packet
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
List<ColumnPacket> columnPackets = factory.getColumnPackets(packetId, resultSet, true);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
for (ColumnPacket columnPacket : columnPackets) {
Expand All @@ -74,7 +76,7 @@ public static void responseShowField(ResultSet resultSet,
okPacket.write(buffer);
mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

Expand All @@ -94,7 +96,9 @@ public static void responseResultSet(ResultSet resultSet,
// 5. eof packet
boolean deprecateEof = (mysqlConnection.authPacket.extendClientFlags
& ExtendedClientCapabilities.CLIENT_DEPRECATE_EOF) != 0;
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ResultSetMetaData metaData = resultSet.getMetaData();
ColumnsNumberPacket columnsNumberPacket = new ColumnsNumberPacket();
Expand Down Expand Up @@ -127,7 +131,7 @@ public static void responseResultSet(ResultSet resultSet,

mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

Expand Down Expand Up @@ -204,33 +208,40 @@ private static void handlerPrepareRowPacket(ResultSet resultSet,

public static void responseError(AtomicLong packetId,
SocketChannel channel,
io.dingodb.common.mysql.constant.ErrorCode errorCode) {
responseError(packetId, channel, errorCode, errorCode.message);
io.dingodb.common.mysql.constant.ErrorCode errorCode,
String characterSet) {
responseError(packetId, channel, errorCode, errorCode.message, characterSet);
}

public static void responseError(AtomicLong packetId,
SocketChannel channel,
io.dingodb.common.mysql.constant.ErrorCode errorCode, String message) {
io.dingodb.common.mysql.constant.ErrorCode errorCode,
String message,
String characterSet) {
ERRPacket ep = new ERRPacket();
ep.packetId = (byte) packetId.getAndIncrement();
ep.capabilities = MysqlServer.getServerCapabilities();
ep.errorCode = errorCode.code;
ep.sqlState = errorCode.sqlState;
ep.errorMessage = message;
ep.characterSet = characterSet;
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ep.write(buffer);
channel.writeAndFlush(buffer);
}

public static void responseError(AtomicLong packetId,
SocketChannel channel,
SQLException exception) {
SQLException exception,
String characterSet) {
exception = errorDingo2Mysql(exception);
ERRPacket ep = new ERRPacket();
ep.packetId = (byte) packetId.getAndIncrement();
ep.capabilities = MysqlServer.getServerCapabilities();
ep.errorCode = exception.getErrorCode();
ep.sqlState = exception.getSQLState();
ep.characterSet = characterSet;

if (exception.getMessage() == null) {
ep.errorMessage = "";
} else if (exception.getMessage().startsWith("Encountered")) {
Expand All @@ -243,23 +254,6 @@ public static void responseError(AtomicLong packetId,
channel.writeAndFlush(buffer);
}

public static String encodeResponseError(String src) {
if(src == null) {
return null;
}

StringBuilder builder = new StringBuilder();
for(char c : src.toCharArray()) {
if(c > 0x7f) {
//deal with chinese character.
builder.append("\\u").append(String.format("%04x", (int)c));
} else {
builder.append(c);
}
}
return builder.toString();
}

public static SQLException errorDingo2Mysql(SQLException e) {
if (e.getMessage() != null && e.getMessage().contains("Duplicate data")) {
return new SQLException("Duplicate data for key 'PRIMARY'", "23000", 1062);
Expand All @@ -268,11 +262,11 @@ public static SQLException errorDingo2Mysql(SQLException e) {
return new SQLException("Query execution was interrupted", "70100", 1317);
} else if (e.getMessage() != null && e.getMessage().contains("Statement canceled")) {
LogUtils.info(log, e.getMessage(), e);
return new SQLException(encodeResponseError(e.getMessage()), "HY000", 1105);
return new SQLException(e.getMessage(), "HY000", 1105);
} else if (e.getErrorCode() == 9001 && e.getSQLState().equals("45000")) {
int code = 1105;
String state = "HY000";
String reason = encodeResponseError(e.getMessage());
String reason = e.getMessage();
if (reason.contains("Duplicate entry")) {
code = 1062;
state = "23000";
Expand All @@ -282,15 +276,15 @@ public static SQLException errorDingo2Mysql(SQLException e) {
}
return new SQLException(reason, state, code);
} else if (e.getErrorCode() == 1054 && e.getSQLState().equals("42S22")) {
return new SQLException(encodeResponseError(e.getMessage()), "HY000", 1105);
return new SQLException(e.getMessage(), "HY000", 1105);
} else if (e.getErrorCode() == 5001 && e.getSQLState().equals("45000")) {
if (e.getMessage().contains("Syntax Error")) {
return new SQLException(
encodeResponseError(e.getMessage()),
e.getMessage(),
e.getSQLState(),
e.getErrorCode());
} else if (e.getMessage().contains("io.dingodb.store.api.transaction.exception.DuplicateEntryException:")) {
return new SQLException(encodeResponseError(e.getMessage())
return new SQLException(e.getMessage()
.replace("io.dingodb.store.api.transaction.exception.DuplicateEntryException:", ""),
"23000", 1062);
}
Expand Down Expand Up @@ -328,7 +322,9 @@ public static void responsePrepareExecute(ResultSet resultSet,
// 5. eof packet
boolean deprecateEof = (mysqlConnection.authPacket.extendClientFlags
& ExtendedClientCapabilities.CLIENT_DEPRECATE_EOF) != 0;
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ResultSetMetaData metaData = resultSet.getMetaData();
ColumnsNumberPacket columnsNumberPacket = new ColumnsNumberPacket();
Expand Down Expand Up @@ -361,7 +357,7 @@ public static void responsePrepareExecute(ResultSet resultSet,

mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
String error =
String.format(ErrorCode.ER_ACCESS_DENIED_ERROR.message, user, ip, "YES");
MysqlResponseHandler.responseError(packetId,
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error);
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error, null);
if (mysqlConnection.channel.isActive()) {
mysqlConnection.channel.close();
}
Expand Down Expand Up @@ -217,7 +217,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
String error =
String.format(ErrorCode.ER_ACCESS_DENIED_ERROR.message, user, ip, "YES");
MysqlResponseHandler.responseError(packetId,
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error);
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error, null);
if (mysqlConnection.channel.isActive()) {
mysqlConnection.channel.close();
}
Expand Down
Loading
Loading