Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Feature: Add option for destination database name prefix.
Browse files Browse the repository at this point in the history
  • Loading branch information
uzshao committed Nov 5, 2016
1 parent ac86065 commit 7a5af64
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ private void mergeConfiguration(Configuration inputConfig, Configuration merged)
ConfigurationKeys.DEST_CLUSTER_METASTORE_URL,
ConfigurationKeys.DEST_HDFS_ROOT,
ConfigurationKeys.DEST_HDFS_TMP,
ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX,
ConfigurationKeys.BATCH_JOB_METASTORE_BLACKLIST,
ConfigurationKeys.BATCH_JOB_CLUSTER_FACTORY_CLASS,
ConfigurationKeys.BATCH_JOB_OUTPUT_DIR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ public Cluster getDestCluster() throws ConfigurationException {
ConfigurationKeys.DEST_HDFS_ROOT);
String destHdfsTmp = conf.get(
ConfigurationKeys.DEST_HDFS_TMP);
// Note that the DEST_CLUSTER_METASTORE_DB_PREFIX is a special config that only exists on the
// dest cluster (but not on the src cluster).
String destDbPrefix = conf.get(
ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX);
return new HardCodedCluster(
destClusterName,
destMetastoreUrl.getHost(),
destMetastoreUrl.getPort(),
null,
null,
new Path(destHdfsRoot),
new Path(destHdfsTmp));
new Path(destHdfsTmp),
destDbPrefix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.airbnb.reair.incremental.configuration;

import com.airbnb.reair.common.DbPrefixHiveMetastoreClient;
import com.airbnb.reair.common.HiveMetastoreClient;
import com.airbnb.reair.common.HiveMetastoreException;
import com.airbnb.reair.common.ThriftHiveMetastoreClient;

Expand All @@ -17,7 +19,8 @@ public class HardCodedCluster implements Cluster {
private String jobtrackerPort;
private Path hdfsRoot;
private Path tmpDir;
private ThreadLocal<ThriftHiveMetastoreClient> metastoreClient;
private String dbNamePrefix;
private ThreadLocal<HiveMetastoreClient> metastoreClient;

/**
* Constructor with specific values.
Expand All @@ -29,6 +32,7 @@ public class HardCodedCluster implements Cluster {
* @param jobtrackerPort port of the job tracker
* @param hdfsRoot the path for the root HDFS directory
* @param tmpDir the path for the temporary HDFS directory (should be under root)
* @param dbNamePrefix optional prefix for all databases in the Hive metastore.
*/
public HardCodedCluster(
String name,
Expand All @@ -37,15 +41,38 @@ public HardCodedCluster(
String jobtrackerHost,
String jobtrackerPort,
Path hdfsRoot,
Path tmpDir) {
Path tmpDir,
String dbNamePrefix) {
this.name = name;
this.metastoreHost = metastoreHost;
this.metastorePort = metastorePort;
this.jobtrackerHost = jobtrackerHost;
this.jobtrackerPort = jobtrackerPort;
this.hdfsRoot = hdfsRoot;
this.tmpDir = tmpDir;
this.metastoreClient = new ThreadLocal<ThriftHiveMetastoreClient>();
this.dbNamePrefix = dbNamePrefix;
this.metastoreClient = new ThreadLocal<>();
}

/**
* Constructor with specific values, with a default dbNamePrefix of null.
*/
public HardCodedCluster(
String name,
String metastoreHost,
int metastorePort,
String jobtrackerHost,
String jobtrackerPort,
Path hdfsRoot,
Path tmpDir) {
this(name,
metastoreHost,
metastorePort,
jobtrackerHost,
jobtrackerPort,
hdfsRoot,
tmpDir,
null);
}

public String getMetastoreHost() {
Expand All @@ -58,11 +85,17 @@ public int getMetastorePort() {

/**
* Get a cached ThreadLocal metastore client.
*
* <p>Also applies a DbPrefixHiveMetastoreClient wrapper if dbNamePrefix is not null or
* empty.
*/
public ThriftHiveMetastoreClient getMetastoreClient() throws HiveMetastoreException {
ThriftHiveMetastoreClient result = this.metastoreClient.get();
public HiveMetastoreClient getMetastoreClient() throws HiveMetastoreException {
HiveMetastoreClient result = this.metastoreClient.get();
if (result == null) {
result = new ThriftHiveMetastoreClient(getMetastoreHost(), getMetastorePort());
if (dbNamePrefix != null && !dbNamePrefix.isEmpty()) {
result = new DbPrefixHiveMetastoreClient(result, dbNamePrefix);
}
this.metastoreClient.set(result);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class ConfigurationKeys {
public static final String DEST_HDFS_ROOT = "airbnb.reair.clusters.dest.hdfs.root";
// The root of the temporary directory for storing temporary files on the destination cluster
public static final String DEST_HDFS_TMP = "airbnb.reair.clusters.dest.hdfs.tmp";
// Prefix to add to any databases on the destination cluster. Mostly used for production testing.
public static final String DEST_CLUSTER_METASTORE_DB_PREFIX =
"airbnb.reair.clusters.dest.metastore.db.prefix";

// Class to use for filtering out entries from the audit log
public static final String OBJECT_FILTER_CLASS = "airbnb.reair.object.filter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.airbnb.reair.common.ArgumentException;
import com.airbnb.reair.common.CliUtils;
import com.airbnb.reair.common.HiveMetastoreClient;
import com.airbnb.reair.common.HiveObjectSpec;
import com.airbnb.reair.common.ThriftHiveMetastoreClient;
import com.airbnb.reair.incremental.DirectoryCopier;
import com.airbnb.reair.incremental.ReplicationUtils;
import com.airbnb.reair.incremental.RunInfo;
Expand Down Expand Up @@ -160,7 +160,7 @@ public static int main(String[] args) throws Exception {

if ("copy-unpartitioned-table".equals(op)) {
LOG.info("Copying an unpartitioned table");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName());
CopyUnpartitionedTableTask job = new CopyUnpartitionedTableTask(conf,
destinationObjectFactory, conflictHandler, srcCluster, destCluster, spec,
Expand All @@ -172,7 +172,7 @@ public static int main(String[] args) throws Exception {
}
} else if ("copy-partitioned-table".equals(op)) {
LOG.info("Copying a partitioned table");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName());
CopyPartitionedTableTask job = new CopyPartitionedTableTask(conf, destinationObjectFactory,
conflictHandler, srcCluster, destCluster, spec, ReplicationUtils.getLocation(srcTable));
Expand All @@ -183,7 +183,7 @@ public static int main(String[] args) throws Exception {
}
} else if (op.equals("copy-partition")) {
LOG.info("Copying a partition");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Partition srcPartition =
ms.getPartition(spec.getDbName(), spec.getTableName(), spec.getPartitionName());
CopyPartitionTask job = new CopyPartitionTask(conf, destinationObjectFactory, conflictHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
</comment>
</property>

<property>
<name>airbnb.reair.clusters.dest.metastore.db.prefix</name>
<value></value>
<comment>Optional prefix to add to destination Hive database names.</comment>
</property>

<property>
<name>airbnb.reair.clusters.batch.output.dir</name>
<value>hdfs://airfs-test/user/test/test_output</value>
Expand Down
Loading

0 comments on commit 7a5af64

Please sign in to comment.