Skip to content

Commit

Permalink
[update][plugin][kudureader] Remove duplicate key definition
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Oct 17, 2024
1 parent 160d86e commit f825a49
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

package com.wgzhao.addax.plugin.reader.kudureader;

import com.google.common.collect.ImmutableMap;
import com.wgzhao.addax.common.base.Key;
import org.apache.kudu.client.KuduPredicate;

import java.util.Map;

/**
* Created by roy on 2019/12/12 1543.
*/
public final class KuduKey extends Key
public final class KuduKey
extends Key
{

public static final String KUDU_MASTER_ADDRESSES = "masterAddress";

public static final String KUDU_TABlE_NAME = "table";

public static final String LOWER_BOUND = "lowerBound";

public static final String UPPER_BOUND = "upperBound";
Expand All @@ -39,10 +42,15 @@ public final class KuduKey extends Key

public static final String SPLIT_UPPER_BOUND = "splitUpperBound";

public static final String SPLIT_KEY = "splitPk";

public static final String SOCKET_READ_TIMEOUT = "readTimeout";

public static final String SCAN_REQUEST_TIMEOUT = "scanTimeout";

public static final Map<String, KuduPredicate.ComparisonOp> KUDU_OPERATORS = ImmutableMap.of(
"=", KuduPredicate.ComparisonOp.EQUAL,
">", KuduPredicate.ComparisonOp.GREATER,
">=", KuduPredicate.ComparisonOp.GREATER_EQUAL,
"<", KuduPredicate.ComparisonOp.LESS,
"<=", KuduPredicate.ComparisonOp.LESS_EQUAL
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package com.wgzhao.addax.plugin.reader.kudureader;

import com.google.common.collect.ImmutableMap;
import com.wgzhao.addax.common.element.BoolColumn;
import com.wgzhao.addax.common.element.BytesColumn;
import com.wgzhao.addax.common.element.DateColumn;
Expand Down Expand Up @@ -48,7 +47,6 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -57,6 +55,7 @@
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.NOT_SUPPORT_TYPE;
import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;
import static com.wgzhao.addax.plugin.reader.kudureader.KuduKey.KUDU_OPERATORS;

/**
* Kudu reader plugin
Expand All @@ -78,13 +77,15 @@ public static class Job
// match where clause such as age > 18
private static final String PATTERN_FOR_WHERE = "^(\\w+)\\s+(=|>|>=|<|<=)\\s+(.*)$";
private static final Pattern pattern = Pattern.compile(PATTERN_FOR_WHERE);
private static final Map<String, KuduPredicate.ComparisonOp> KUDU_OPERATORS = ImmutableMap.of(
"=", KuduPredicate.ComparisonOp.EQUAL,
">", KuduPredicate.ComparisonOp.GREATER,
">=", KuduPredicate.ComparisonOp.GREATER_EQUAL,
"<", KuduPredicate.ComparisonOp.LESS,
"<=", KuduPredicate.ComparisonOp.LESS_EQUAL
);

@Override
public void init()
{
originalConfig = super.getPluginJobConf();
splitKey = originalConfig.getString(KuduKey.SPLIT_PK);
lowerBound = originalConfig.getString(KuduKey.LOWER_BOUND, "min");
upperBound = originalConfig.getString(KuduKey.UPPER_BOUND, "max");
}

@Override
public List<Configuration> split(int adviceNumber)
Expand Down Expand Up @@ -160,15 +161,6 @@ public void prepare()
}
}

@Override
public void init()
{
originalConfig = super.getPluginJobConf();
splitKey = originalConfig.getString(KuduKey.SPLIT_KEY);
lowerBound = originalConfig.getString(KuduKey.LOWER_BOUND);
upperBound = originalConfig.getString(KuduKey.UPPER_BOUND);
}

@Override
public void destroy()
{
Expand All @@ -194,13 +186,7 @@ public static class Task
private List<String> columns;
private boolean specifyColumn = false;
List<KuduPredicate> customPredicate;
private static final Map<String, KuduPredicate.ComparisonOp> KUDU_OPERATORS = ImmutableMap.of(
"=", KuduPredicate.ComparisonOp.EQUAL,
">", KuduPredicate.ComparisonOp.GREATER,
">=", KuduPredicate.ComparisonOp.GREATER_EQUAL,
"<", KuduPredicate.ComparisonOp.LESS,
"<=", KuduPredicate.ComparisonOp.LESS_EQUAL
);


List<Configuration> where;

Expand Down Expand Up @@ -359,7 +345,7 @@ public void init()
{
Configuration readerSliceConfig = super.getPluginJobConf();
String masterAddresses = readerSliceConfig.getString(KuduKey.KUDU_MASTER_ADDRESSES);
tableName = readerSliceConfig.getString(KuduKey.KUDU_TABlE_NAME);
tableName = readerSliceConfig.getString(KuduKey.TABLE);
long socketReadTimeoutMs = readerSliceConfig.getLong(KuduKey.SOCKET_READ_TIMEOUT, 10) * 1000L;
scanRequestTimeout = readerSliceConfig.getLong(KuduKey.SCAN_REQUEST_TIMEOUT, 20L) * 1000L;
KuduClient.KuduClientBuilder kuduClientBuilder = (new KuduClient.KuduClientBuilder(masterAddresses));
Expand All @@ -368,7 +354,7 @@ public void init()
kuduClient = kuduClientBuilder.build();
lowerBound = readerSliceConfig.getString(KuduKey.SPLIT_LOWER_BOUND);
upperBound = readerSliceConfig.getString(KuduKey.SPLIT_UPPER_BOUND);
splitKey = readerSliceConfig.getString(KuduKey.SPLIT_KEY);
splitKey = readerSliceConfig.getString(KuduKey.SPLIT_PK);
columns = readerSliceConfig.getList(COLUMN, String.class);
if (!columns.isEmpty()) {
specifyColumn = columns.size() != 1 || (!"*".equals(columns.get(0)) && !"\"*\"".equals(columns.get(0)));
Expand Down

0 comments on commit f825a49

Please sign in to comment.