Skip to content

Commit

Permalink
[fix][dingo-executor] fix issues mysql client hanging and chinese cha…
Browse files Browse the repository at this point in the history
…racter encoding.
  • Loading branch information
nokiaMS committed Sep 30, 2024
1 parent fb8501c commit be7db3a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ public class MysqlCommands {
MysqlPacketFactory mysqlPacketFactory = MysqlPacketFactory.getInstance();

public static void executeShowFields(String table, AtomicLong packetId, MysqlConnection mysqlConnection) {
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ResultSet rs = mysqlConnection.getConnection().getMetaData().getColumns(null, null,
table, null);
MysqlResponseHandler.responseShowField(rs, packetId, mysqlConnection);
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

public void execute(QueryPacket queryPacket,
MysqlConnection mysqlConnection) {
String sql;
String characterSet = null;
try {
String characterSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
characterSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
characterSet = getCharacterSet(characterSet);
sql = new String(queryPacket.message, characterSet);
} catch (Exception e) {
Expand All @@ -92,7 +95,8 @@ public void execute(QueryPacket queryPacket,
AtomicLong packetId = new AtomicLong(queryPacket.packetId + 1);
LogUtils.debug(log, "dingo connection:{}, receive sql:{}", mysqlConnection.getConnection().toString(), sql);
if (mysqlConnection.passwordExpire && !doExpire(mysqlConnection, sql, packetId)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_PASSWORD_EXPIRE);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_PASSWORD_EXPIRE,
characterSet);
return;
}
executeSingleQuery(sql, packetId, mysqlConnection);
Expand All @@ -115,20 +119,27 @@ private static boolean doExpire(MysqlConnection mysqlConnection, String sql, Ato
}
}

if (sql.startsWith(setPwdSql1) || sql.startsWith(alterUserPwdSql1)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel,
ErrorCode.ER_PASSWORD_EXPIRE);
return true;
try {
if (sql.startsWith(setPwdSql1) || sql.startsWith(alterUserPwdSql1)) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel,
ErrorCode.ER_PASSWORD_EXPIRE, mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET));
return true;
}
} catch(SQLException e) {
LogUtils.info(log, e.getMessage(), e);
}
return false;
}

public void prepare(MysqlConnection mysqlConnection, String sql) {
DingoConnection connection = (DingoConnection) mysqlConnection.getConnection();
AtomicLong packetId = new AtomicLong(2);
String connCharSet = null;

try {
DingoPreparedStatement preparedStatement = (DingoPreparedStatement) connection
.prepareStatement(sql);
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
Meta.StatementHandle statementHandle = preparedStatement.handle;
String placeholder = "?";
int i = 0;
Expand Down Expand Up @@ -173,16 +184,19 @@ public void prepare(MysqlConnection mysqlConnection, String sql) {
MysqlResponseHandler.responsePrepare(preparePacket, mysqlConnection.channel);
} catch (SQLException e) {
LogUtils.info(log, e.getMessage(), e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

public void executeSingleQuery(String sql, AtomicLong packetId,
MysqlConnection mysqlConnection) {
Statement statement = null;
boolean hasResults;
String connCharSet = null;

try {
statement = mysqlConnection.getConnection().createStatement();
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
hasResults = statement.execute(sql);
if (hasResults) {
// select
Expand Down Expand Up @@ -212,10 +226,9 @@ public void executeSingleQuery(String sql, AtomicLong packetId,
} catch (SQLException sqlException) {
LogUtils.error(log, "sql exception sqlstate:" + sqlException.getSQLState() + ", code:" + sqlException.getErrorCode()
+ ", message:" + sqlException.getMessage());
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, sqlException);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, sqlException, connCharSet);
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
//MysqlResponseHandler.responseError(bakPacketId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, "");
throw e;
} finally {
try {
Expand All @@ -242,9 +255,13 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
AtomicLong packetId,
MysqlConnection mysqlConnection
) {
String connectionCharSet = null;
try {
connectionCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
statementPacket.paramValMap.forEach((k, v) -> {
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
switch (v.getType()) {
case MysqlType.FIELD_TYPE_TINY:
byte byteVal = v.getValue()[0];
Expand Down Expand Up @@ -323,15 +340,17 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
preparedStatement.setObject(k, charVal);
}
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
});
if (statementType == Meta.StatementType.SELECT) {
String connCharSet = null;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
MysqlResponseHandler.responsePrepareExecute(resultSet, packetId, mysqlConnection);
} catch (SQLException e) {
LogUtils.error(log, e.getMessage(), e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
} else if (statementType == Meta.StatementType.OTHER_DDL) {
OKPacket okPacket = mysqlPacketFactory.getOkPacket(0, packetId, null);
Expand All @@ -353,9 +372,9 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
MysqlResponseHandler.responseOk(okPacket, mysqlConnection.channel);
}
} catch (SQLException e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e);
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, e, connectionCharSet);
} catch (Exception e) {
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, "");
MysqlResponseHandler.responseError(packetId, mysqlConnection.channel, ErrorCode.ER_UNKNOWN_ERROR, connectionCharSet);
LogUtils.error(log, e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public static void responseShowField(ResultSet resultSet,
MysqlConnection mysqlConnection) {
// 1. column packet
// 2. ok packet
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
List<ColumnPacket> columnPackets = factory.getColumnPackets(packetId, resultSet, true);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
for (ColumnPacket columnPacket : columnPackets) {
Expand All @@ -74,7 +76,7 @@ public static void responseShowField(ResultSet resultSet,
okPacket.write(buffer);
mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

Expand All @@ -94,7 +96,9 @@ public static void responseResultSet(ResultSet resultSet,
// 5. eof packet
boolean deprecateEof = (mysqlConnection.authPacket.extendClientFlags
& ExtendedClientCapabilities.CLIENT_DEPRECATE_EOF) != 0;
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ResultSetMetaData metaData = resultSet.getMetaData();
ColumnsNumberPacket columnsNumberPacket = new ColumnsNumberPacket();
Expand Down Expand Up @@ -127,7 +131,7 @@ public static void responseResultSet(ResultSet resultSet,

mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}

Expand Down Expand Up @@ -204,33 +208,40 @@ private static void handlerPrepareRowPacket(ResultSet resultSet,

public static void responseError(AtomicLong packetId,
SocketChannel channel,
io.dingodb.common.mysql.constant.ErrorCode errorCode) {
responseError(packetId, channel, errorCode, errorCode.message);
io.dingodb.common.mysql.constant.ErrorCode errorCode,
String characterSet) {
responseError(packetId, channel, errorCode, errorCode.message, characterSet);
}

public static void responseError(AtomicLong packetId,
SocketChannel channel,
io.dingodb.common.mysql.constant.ErrorCode errorCode, String message) {
io.dingodb.common.mysql.constant.ErrorCode errorCode,
String message,
String characterSet) {
ERRPacket ep = new ERRPacket();
ep.packetId = (byte) packetId.getAndIncrement();
ep.capabilities = MysqlServer.getServerCapabilities();
ep.errorCode = errorCode.code;
ep.sqlState = errorCode.sqlState;
ep.errorMessage = message;
ep.characterSet = characterSet;
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ep.write(buffer);
channel.writeAndFlush(buffer);
}

public static void responseError(AtomicLong packetId,
SocketChannel channel,
SQLException exception) {
SQLException exception,
String characterSet) {
exception = errorDingo2Mysql(exception);
ERRPacket ep = new ERRPacket();
ep.packetId = (byte) packetId.getAndIncrement();
ep.capabilities = MysqlServer.getServerCapabilities();
ep.errorCode = exception.getErrorCode();
ep.sqlState = exception.getSQLState();
ep.characterSet = characterSet;

if (exception.getMessage() == null) {
ep.errorMessage = "";
} else if (exception.getMessage().startsWith("Encountered")) {
Expand All @@ -243,23 +254,6 @@ public static void responseError(AtomicLong packetId,
channel.writeAndFlush(buffer);
}

public static String encodeResponseError(String src) {
if(src == null) {
return null;
}

StringBuilder builder = new StringBuilder();
for(char c : src.toCharArray()) {
if(c > 0x7f) {
//deal with chinese character.
builder.append("\\u").append(String.format("%04x", (int)c));
} else {
builder.append(c);
}
}
return builder.toString();
}

public static SQLException errorDingo2Mysql(SQLException e) {
if (e.getMessage() != null && e.getMessage().contains("Duplicate data")) {
return new SQLException("Duplicate data for key 'PRIMARY'", "23000", 1062);
Expand All @@ -268,11 +262,11 @@ public static SQLException errorDingo2Mysql(SQLException e) {
return new SQLException("Query execution was interrupted", "70100", 1317);
} else if (e.getMessage() != null && e.getMessage().contains("Statement canceled")) {
LogUtils.info(log, e.getMessage(), e);
return new SQLException(encodeResponseError(e.getMessage()), "HY000", 1105);
return new SQLException(e.getMessage(), "HY000", 1105);
} else if (e.getErrorCode() == 9001 && e.getSQLState().equals("45000")) {
int code = 1105;
String state = "HY000";
String reason = encodeResponseError(e.getMessage());
String reason = e.getMessage();
if (reason.contains("Duplicate entry")) {
code = 1062;
state = "23000";
Expand All @@ -282,15 +276,15 @@ public static SQLException errorDingo2Mysql(SQLException e) {
}
return new SQLException(reason, state, code);
} else if (e.getErrorCode() == 1054 && e.getSQLState().equals("42S22")) {
return new SQLException(encodeResponseError(e.getMessage()), "HY000", 1105);
return new SQLException(e.getMessage(), "HY000", 1105);
} else if (e.getErrorCode() == 5001 && e.getSQLState().equals("45000")) {
if (e.getMessage().contains("Syntax Error")) {
return new SQLException(
encodeResponseError(e.getMessage()),
e.getMessage(),
e.getSQLState(),
e.getErrorCode());
} else if (e.getMessage().contains("io.dingodb.store.api.transaction.exception.DuplicateEntryException:")) {
return new SQLException(encodeResponseError(e.getMessage())
return new SQLException(e.getMessage()
.replace("io.dingodb.store.api.transaction.exception.DuplicateEntryException:", ""),
"23000", 1062);
}
Expand Down Expand Up @@ -328,7 +322,9 @@ public static void responsePrepareExecute(ResultSet resultSet,
// 5. eof packet
boolean deprecateEof = (mysqlConnection.authPacket.extendClientFlags
& ExtendedClientCapabilities.CLIENT_DEPRECATE_EOF) != 0;
String connCharSet = null;
try {
connCharSet = mysqlConnection.getConnection().getClientInfo(CONNECTION_CHARSET);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ResultSetMetaData metaData = resultSet.getMetaData();
ColumnsNumberPacket columnsNumberPacket = new ColumnsNumberPacket();
Expand Down Expand Up @@ -361,7 +357,7 @@ public static void responsePrepareExecute(ResultSet resultSet,

mysqlConnection.channel.writeAndFlush(buffer);
} catch (SQLException e) {
responseError(packetId, mysqlConnection.channel, e);
responseError(packetId, mysqlConnection.channel, e, connCharSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
String error =
String.format(ErrorCode.ER_ACCESS_DENIED_ERROR.message, user, ip, "YES");
MysqlResponseHandler.responseError(packetId,
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error);
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error, null);
if (mysqlConnection.channel.isActive()) {
mysqlConnection.channel.close();
}
Expand Down Expand Up @@ -217,7 +217,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
String error =
String.format(ErrorCode.ER_ACCESS_DENIED_ERROR.message, user, ip, "YES");
MysqlResponseHandler.responseError(packetId,
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error);
mysqlConnection.channel, ErrorCode.ER_ACCESS_DENIED_ERROR, error, null);
if (mysqlConnection.channel.isActive()) {
mysqlConnection.channel.close();
}
Expand Down
Loading

0 comments on commit be7db3a

Please sign in to comment.