Skip to content

Commit

Permalink
fix: handle blob data from mysql to postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Jul 4, 2023
1 parent 463129f commit 92d77a3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<version.debezium>1.5.2.Final</version.debezium>
<version.testContainers>1.17.6</version.testContainers>
<version.testContainers>1.18.3</version.testContainers>
</properties>


Expand Down
25 changes: 23 additions & 2 deletions src/main/java/org/replicadb/manager/PostgresqlManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static org.replicadb.manager.SupportedManagers.MARIADB;
import static org.replicadb.manager.SupportedManagers.MYSQL;

public class PostgresqlManager extends SqlManager {

private static final Logger LOG = LogManager.getLogger(PostgresqlManager.class.getName());
Expand Down Expand Up @@ -76,6 +81,10 @@ public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLExceptio
byte[] bytes;
String colValue = null;

// determine if the source database is MySQL or MariaDB
boolean isMySQL = MYSQL.isTheManagerTypeOf(options, DataSourceType.SOURCE) || MARIADB.isTheManagerTypeOf(options, DataSourceType.SOURCE);


if (resultSet.next()) {
// Create Bandwidth Throttling
BandwidthThrottling bt = new BandwidthThrottling(options.getBandwidthThrottling(), options.getFetchSize(), resultSet);
Expand All @@ -95,11 +104,23 @@ public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLExceptio
case Types.BINARY:
colValue = bytesToPostgresHex(resultSet.getBytes(i));
break;
case Types.BLOB:
case Types.BLOB:
colValue = blobToPostgresHex(getBlob(resultSet,i));
break;
default:
colValue = resultSet.getString(i);
if (isMySQL) {
// MySQL and MariaDB have a different way to handle Binary type
List<Integer> binaryTypes = Arrays.asList(-3,-4);
if (binaryTypes.contains(rsmd.getColumnType(i))) {
colValue = blobToPostgresHex(getBlob(resultSet,i));
} else {
// Any other type is converted to String
colValue = resultSet.getString(i);
}
} else {
// Any other type is converted to String
colValue = resultSet.getString(i);
}
break;
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/replicadb/file/Csv2SqlserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void testSqlserverVersion2017 () throws SQLException {
}

@Test
void testCsv2PostgresComplete () throws ParseException, IOException, SQLException {
void testCsv2SqlserverComplete () throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
"--source-connect", "file://" + RESOURCE_DIR + CSV_SOURCE_FILE,
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/mysql/mysql-source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create table t_source (
/*Binary Strings:*/
C_BINARY BINARY(35),
C_BINARY_VAR VARBINARY(255),
C_BINARY_LOB BLOB,
C_BINARY_LOB LONGBLOB,
/*Boolean:*/
C_BOOLEAN BOOLEAN,
/*Character Strings:*/
Expand Down

0 comments on commit 92d77a3

Please sign in to comment.