Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix][plugin][rdbms] 1.DataBaseType should not be call set method,so that the driver is error when rdbmsreader to rdbmswriter 2.hive jdbc don't support method(Timestamp getTimestamp(int columnIndex, Calendar cal)) 3.DataBaseType add vertica driver #1188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void init(Configuration readerSliceConfig)
}

public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize)
TaskPluginCollector taskPluginCollector, int fetchSize)
{
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);

Expand Down Expand Up @@ -223,7 +223,7 @@ public void destroy(Configuration originalConfig)
}

protected void transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData,
int columnNumber, TaskPluginCollector taskPluginCollector)
int columnNumber, TaskPluginCollector taskPluginCollector)
{
Record record = buildRecord(recordSender, rs, metaData, columnNumber, taskPluginCollector);
recordSender.sendToWriter(record);
Expand Down Expand Up @@ -272,7 +272,12 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
return new DateColumn(rs.getDate(i));

case Types.TIMESTAMP:
return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance()));
if(!"org.apache.hive.jdbc.HiveDriver".equals(this.dataBaseType.getDriverClassName())){
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the hivereader plugin, you can achieve specific handling for reading the Timestamp type by overriding the createColumn method. You can refer to the implementation in the following link for guidance:https://github.com/wgzhao/Addax/blob/master/plugin/reader/postgresqlreader/src/main/java/com/wgzhao/addax/plugin/reader/postgresqlreader/PostgresqlReader.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it needs to be encapsulated into a unified script, hivereader will not be used, but rdbmsreader will be used instead

return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance()));
}else{
//hive not support method(Timestamp getTimestamp(int columnIndex, Calendar cal))
return new TimestampColumn(rs.getTimestamp(i));
}

case Types.BINARY:
case Types.VARBINARY:
Expand Down Expand Up @@ -311,7 +316,7 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
}

protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber,
TaskPluginCollector taskPluginCollector)
TaskPluginCollector taskPluginCollector)
{
Record record = recordSender.createRecord();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
Expand Down Expand Up @@ -118,7 +119,11 @@ private static void dealJdbcAndTable(Configuration originalConfig)
String driverClass = connConf.getString(Key.JDBC_DRIVER, null);
if (driverClass != null && !driverClass.isEmpty()) {
LOG.warn("use specified driver class: {}", driverClass);
dataBaseType.setDriverClassName(driverClass);


Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d ->
dataBaseType=d);
}
connConf.getNecessaryValue(Key.JDBC_URL, REQUIRED_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ public enum DataBaseType
Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"),
Databend("databend", "com.databend.jdbc.DatabendDriver"),
Access("access","net.ucanaccess.jdbc.UcanaccessDriver"),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot list all the database JDBC driver classes in this class. For drivers not specified in this class, you can resolve this by specifying the driver configuration item in the job configuration file. For more details, please refer to https://wgzhao.github.io/Addax/latest/reader/rdbmsreader/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the setDriverClass method of the DataBaseType enum class can cause errors from rdbmsreader to rdbmswriter, I have to add drivers for as many database types as possible.

HANA("hana", "com.sap.db.jdbc.Driver");
HANA("hana", "com.sap.db.jdbc.Driver"),
VERTICA("vertica", "com.vertica.jdbc.Driver"),
DM("dm","dm.jdbc.driver.DmDriver"),
OSCAR("oscar","com.oscar.Driver"),
KINGBASE8("kingbase8","com.kingbase8.Driver"),
HIGHGO("highgo","com.highgo.jdbc.Driver"),
OCEANBASE("oceanbase","com.alipay.oceanbase.jdbc.Driver"),
GOLDENDB("goldendb","com.goldendb.jdbc.Driver"),
GBASEDBT("gbasedbt-sqli","com.gbasedbt.jdbc.Driver");

private static final Pattern jdbcUrlPattern = Pattern.compile("jdbc:\\w+:(?:thin:url=|//|thin:@|)([\\w\\d.,]+).*");

Expand Down Expand Up @@ -162,8 +170,4 @@ public String getTypeName()
return typeName;
}

public void setDriverClassName(String driverClassName)
{
this.driverClassName = driverClassName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
Expand Down Expand Up @@ -92,7 +93,9 @@ public static void simplifyConf(Configuration originalConfig)
String driverClass = connConf.getString(Key.JDBC_DRIVER, null);
if (driverClass != null && !driverClass.isEmpty()) {
LOG.warn("Use specified driver class [{}]", driverClass);
dataBaseType.setDriverClassName(driverClass);
Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d ->
dataBaseType=d);
}
String jdbcUrl = connConf.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class RdbmsReader
extends Reader
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;
private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;

public static class Job
extends Reader.Job
Expand Down Expand Up @@ -71,11 +71,13 @@ public void init()
final String jdbcType = connection.getString(Key.JDBC_URL).split(":")[1];
Arrays.stream(DataBaseType.values()).filter(
dataBaseType -> dataBaseType.getTypeName().equals(jdbcType)).findFirst().ifPresent(dataBaseType ->
DATABASE_TYPE.setDriverClassName(dataBaseType.getDriverClassName()));
DATABASE_TYPE=dataBaseType);
}
else {
// use custom jdbc driver
DATABASE_TYPE.setDriverClassName(jdbcDriver);
Arrays.stream(DataBaseType.values()).filter(
dataBaseType -> dataBaseType.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(dataBaseType ->
DATABASE_TYPE=dataBaseType);
}
this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(DATABASE_TYPE);
this.originalConfig = this.commonRdbmsReaderMaster.init(this.originalConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter;
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.base.Key.JDBC_DRIVER;
Expand All @@ -38,7 +39,7 @@
public class RdbmsWriter
extends Writer
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;
private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;

public static class Job
extends Writer.Job
Expand All @@ -63,7 +64,9 @@ public void init()
throw AddaxException.asAddaxException(REQUIRED_VALUE, "config 'driver' is required and must not be empty");
}
// use special jdbc driver class
DATABASE_TYPE.setDriverClassName(jdbcDriver);
Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(d ->
DATABASE_TYPE=d);
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
commonRdbmsWriterJob.init(originalConfig);
}
Expand Down