Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix][hive-reader] HiveBaseResultSet#getTimestamp Method not supported #920

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.wgzhao.addax.plugin.reader.hivereader;

import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.DoubleColumn;
import com.wgzhao.addax.common.element.TimestampColumn;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
Expand All @@ -30,6 +33,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_FETCH_SIZE;
Expand All @@ -39,22 +47,19 @@
import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL;

public class HiveReader
extends Reader
{
extends Reader {

private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;

public static class Job
extends Reader.Job
{
extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);

private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;

@Override
public void init()
{
public void init() {
this.originalConfig = getPluginJobConf();

boolean haveKerberos = originalConfig.getBool(HAVE_KERBEROS, false);
Expand All @@ -71,37 +76,31 @@ public void init()
}

@Override
public void preCheck()
{
public void preCheck() {
this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE);
}

@Override
public List<Configuration> split(int adviceNumber)
{
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);
}

@Override
public void post()
{
public void post() {
this.commonRdbmsReaderJob.post(originalConfig);
}

@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsReaderJob.destroy(originalConfig);
}

private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf)
{
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf) {
if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
}
catch (Exception e) {
} catch (Exception e) {
String message = String.format("Auth failure with kerberos, Please check " +
"kerberosKeytabFilePath[%s] and kerberosPrincipal[%s]",
kerberosKeytabFilePath, kerberosPrincipal);
Expand All @@ -112,37 +111,45 @@ private void kerberosAuthentication(String kerberosPrincipal, String kerberosKey
}

public static class Task
extends Reader.Task
{
extends Reader.Task {

private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;

@Override
public void init()
{
public void init() {
this.readerSliceConfig = getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()) {

@Override
protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
throws SQLException, UnsupportedEncodingException {
if (metaData.getColumnType(i) == Types.TIMESTAMP ) {
// hive HiveBaseResultSet#getTimestamp(String columnName, Calendar cal) not support
return new TimestampColumn(rs.getTimestamp(i));
}
return super.createColumn(rs, metaData, i);
}

};

this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}

@Override
public void startRead(RecordSender recordSender)
{
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);

this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
}

@Override
public void post()
{
public void post() {
this.commonRdbmsReaderTask.post(readerSliceConfig);
}

@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsReaderTask.destroy(readerSliceConfig);
}
}
Expand Down
Loading