Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring deprecated accumulo references #2484

Open
wants to merge 11 commits into
base: integration
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ protected void init(FilterOptions options, final long scanStart, IteratorEnviron

isIndextable = false;
if (options.getOption(AgeOffConfigParams.IS_INDEX_TABLE) == null) {
if (iterEnv != null && iterEnv.getConfig() != null) {
isIndextable = Boolean.parseBoolean(iterEnv.getConfig().get("table.custom." + AgeOffConfigParams.IS_INDEX_TABLE));
if (iterEnv != null && iterEnv.getPluginEnv().getConfiguration() != null) {
isIndextable = Boolean.parseBoolean(iterEnv.getPluginEnv().getConfiguration().get("table.custom." + AgeOffConfigParams.IS_INDEX_TABLE));
}
} else { // legacy
isIndextable = Boolean.valueOf(options.getOption(AgeOffConfigParams.IS_INDEX_TABLE));
Expand Down Expand Up @@ -298,7 +298,7 @@ protected void init(FilterOptions options, final long scanStart, IteratorEnviron
final String dataTypeHasScanTime = options.getOption(dataType + ".hasScanTime");
if (Boolean.parseBoolean(dataTypeHasScanTime)) {
if (iterEnv != null) {
final String scanTime = iterEnv.getConfig().get("table.custom.timestamp.current." + dataType);
final String scanTime = iterEnv.getPluginEnv().getConfiguration().get("table.custom.timestamp.current." + dataType);
try {
dataTypeScanTimes.put(dataType, Long.parseLong(scanTime, 10));
} catch (final NumberFormatException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -266,8 +267,8 @@ protected void init(FilterOptions options, final long startScan, IteratorEnviron

isIndextable = false;
if (options.getOption(AgeOffConfigParams.IS_INDEX_TABLE) == null) {
if (iterEnv != null && iterEnv.getConfig() != null) {
isIndextable = Boolean.parseBoolean(iterEnv.getConfig().get("table.custom." + AgeOffConfigParams.IS_INDEX_TABLE));
if (iterEnv != null && iterEnv.getPluginEnv().getConfiguration() != null) {
isIndextable = Boolean.parseBoolean(iterEnv.getPluginEnv().getConfiguration().get("table.custom." + AgeOffConfigParams.IS_INDEX_TABLE));
}
} else { // legacy
isIndextable = Boolean.parseBoolean(options.getOption(AgeOffConfigParams.IS_INDEX_TABLE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public boolean isUserCompaction() {
throw new UnsupportedOperationException();
}

@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> sortedKeyValueIterator) {
throw new UnsupportedOperationException();
}

@Override
public Authorizations getAuthorizations() {
throw new UnsupportedOperationException();
Expand All @@ -96,7 +91,11 @@ public PluginEnvironment getPluginEnv() {

@Override
public Configuration getConfiguration() {
return null;
if (conf != null) {
return new ConfigurationImpl(conf);
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,16 @@
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;

public class BulkIteratorEnvironment implements IteratorEnvironment {

private IteratorScope scope;
private AccumuloConfiguration conf;

public BulkIteratorEnvironment(IteratorScope scope) {
this.scope = scope;
this.conf = DefaultConfiguration.getInstance();
}

@Override
public AccumuloConfiguration getConfig() {
return conf;
}

@Override
Expand All @@ -45,11 +32,6 @@ public boolean isUserCompaction() {
throw new UnsupportedOperationException();
}

@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
throw new UnsupportedOperationException();
}

@Override
public Authorizations getAuthorizations() {
throw new UnsupportedOperationException();
Expand All @@ -70,17 +52,6 @@ public SamplerConfiguration getSamplerConfiguration() {
return null;
}

@Override
public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
// TODO Auto-generated method stub
return null;
}

@Override
public ServiceEnvironment getServiceEnv() {
return null;
}

@Override
public PluginEnvironment getPluginEnv() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ public boolean isUserCompaction() {
return isUser;
}

@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
throw new UnsupportedOperationException();
}

@Override
public Authorizations getAuthorizations() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -42,14 +40,11 @@ private IteratorThreadPoolManager(IteratorEnvironment env) {
}

private ThreadPoolExecutor createExecutorService(final String prop, final String name, final IteratorEnvironment env) {
final AccumuloConfiguration accumuloConfiguration;
final PluginEnvironment pluginEnv;
if (env != null) {
pluginEnv = env.getPluginEnv();
accumuloConfiguration = env.getConfig();
} else {
pluginEnv = null;
accumuloConfiguration = DefaultConfiguration.getInstance();
}
final ThreadPoolExecutor service = createExecutorService(getMaxThreads(prop, pluginEnv), name + " (" + instanceId + ')');
threadPools.put(name, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ public RFileEnvironment() {
this.conf = DefaultConfiguration.getInstance();
}

@Override
public AccumuloConfiguration getConfig() {
return conf;
}

@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
Expand Down Expand Up @@ -215,11 +210,6 @@ public SamplerConfiguration getSamplerConfiguration() {
return null;
}

@Override
public ServiceEnvironment getServiceEnv() {
return null;
}

@Override
public PluginEnvironment getPluginEnv() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op

// update ActiveQueryLog with (potentially) updated config
if (env != null) {
ActiveQueryLog.setConfig(env.getConfig());
ActiveQueryLog.setConfig(env.getPluginEnv().getConfiguration());
}

DatawaveFieldIndexListIteratorJexl.FSTManager.setHdfsFileSystem(this.getFileSystemCache());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void close(TaskAttemptContext attempt) throws IOException, InterruptedExc
if (e.getSecurityErrorCodes().size() >= 0) {
HashSet<String> tables = new HashSet<>();
for (TabletId tabletId : e.getSecurityErrorCodes().keySet()) {
tables.add(tabletId.getTableId().toString());
tables.add(tabletId.getTable().toString());
}

log.error("Not authorized to write to tables : " + tables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.data.Range;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
Expand All @@ -25,7 +25,7 @@ public class ActiveQueryLog {
private static final Logger log = Logger.getLogger(ActiveQueryLog.class);
private static Cache<String,ActiveQueryLog> logCache = null;
private static final Object logCacheLock = new Object();
private static AccumuloConfiguration conf = null;
private static PluginEnvironment.Configuration conf = null;

// Accumulo properties
public static final String MAX_IDLE = "datawave.query.active.maxIdleMs";
Expand All @@ -50,9 +50,9 @@ public class ActiveQueryLog {

private final String name;

synchronized public static void setConfig(final AccumuloConfiguration conf) {
synchronized public static void setConfig(final PluginEnvironment.Configuration conf) {
if (conf != null) {
if (ActiveQueryLog.conf == null || conf.getUpdateCount() > ActiveQueryLog.conf.getUpdateCount()) {
if (ActiveQueryLog.conf == null) {
ActiveQueryLog.conf = conf;
}
// Do not allow access to the cache while updating each log's settings.
Expand Down Expand Up @@ -126,7 +126,7 @@ private ActiveQueryLog() {
this(null, null);
}

private ActiveQueryLog(AccumuloConfiguration conf, String name) {
private ActiveQueryLog(PluginEnvironment.Configuration conf, String name) {
if (conf != null) {
checkSettings(conf, true);
} else {
Expand Down Expand Up @@ -217,7 +217,7 @@ public void setMaxIdle(long maxIdle) {
}
}

private void checkSettings(AccumuloConfiguration conf, boolean useDefaults) {
private void checkSettings(PluginEnvironment.Configuration conf, boolean useDefaults) {

String maxIdleStr = conf.get(MAX_IDLE);
if (maxIdleStr != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected void build(K key, String myTableName) throws TableNotFoundException {
public boolean equals(Object obj) {
if (obj instanceof AccumuloLoader) {
AccumuloLoader loaderObj = AccumuloLoader.class.cast(obj);
if (client.instanceOperations().getInstanceID().equals(loaderObj.client.instanceOperations().getInstanceID())) {
if (client.instanceOperations().getInstanceId().equals(loaderObj.client.instanceOperations().getInstanceId())) {
if (tableName.equals(loaderObj.tableName))
return true;
}
Expand All @@ -152,7 +152,7 @@ public boolean equals(Object obj) {

@Override
public int hashCode() {
return tableName.hashCode() + 31 + client.instanceOperations().getInstanceID().hashCode();
return tableName.hashCode() + 31 + client.instanceOperations().getInstanceId().hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ public LoaderKey(AccumuloClient client, String table, String user) {
}

public int hashCode() {
return client.instanceOperations().getInstanceID().hashCode() + table.hashCode() + user.hashCode();
return client.instanceOperations().getInstanceId().hashCode() + table.hashCode() + user.hashCode();
}

public boolean equals(Object other) {
if (other instanceof LoaderKey) {
LoaderKey otherLoader = (LoaderKey) other;
return otherLoader.client.instanceOperations().getInstanceID().equals(client.instanceOperations().getInstanceID())
return otherLoader.client.instanceOperations().getInstanceId().equals(client.instanceOperations().getInstanceId())
&& otherLoader.table.equals(table) && otherLoader.user.equals(user);
}
return false;
}

@Override
public String toString() {
return client.instanceOperations().getInstanceID() + "/" + "/" + table + "/" + user;
return client.instanceOperations().getInstanceId() + "/" + "/" + table + "/" + user;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
Expand All @@ -39,9 +37,7 @@

import datawave.accumulo.inmemory.InMemoryAccumuloClient;
import datawave.accumulo.inmemory.InMemoryInstance;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.protobuf.Uid;
import datawave.query.iterator.SourceManagerTest;

public class DiscoveryIteratorTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Set;
import java.util.SortedMap;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,26 +363,11 @@ public MockIteratorEnvironment() {
this.conf = DefaultConfiguration.getInstance();
}

@Override
public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
DefaultConfiguration.getInstance().getAllCryptoProperties());
return RFileOperations.getInstance().newReaderBuilder().forFile(mapFileName, fs, conf, cs)
.withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build();
}

@Override
public PluginEnvironment getPluginEnv() {
return new MockPluginEnvironment();
}

@Override
public AccumuloConfiguration getConfig() {
return conf;
}

@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
Expand Down Expand Up @@ -418,11 +403,6 @@ public SamplerConfiguration getSamplerConfiguration() {
return null;
}

@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
throw new UnsupportedOperationException();
}

public class MockPluginEnvironment implements PluginEnvironment {
@Override
public Configuration getConfiguration() {
Expand Down
Loading