Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

New Feature: Metastore Database Prefix #45

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ private void mergeConfiguration(Configuration inputConfig, Configuration merged)
ConfigurationKeys.DEST_CLUSTER_METASTORE_URL,
ConfigurationKeys.DEST_HDFS_ROOT,
ConfigurationKeys.DEST_HDFS_TMP,
ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX,
ConfigurationKeys.BATCH_JOB_METASTORE_BLACKLIST,
ConfigurationKeys.BATCH_JOB_CLUSTER_FACTORY_CLASS,
ConfigurationKeys.BATCH_JOB_OUTPUT_DIR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ public Cluster getDestCluster() throws ConfigurationException {
ConfigurationKeys.DEST_HDFS_ROOT);
String destHdfsTmp = conf.get(
ConfigurationKeys.DEST_HDFS_TMP);
// Note that the DEST_CLUSTER_METASTORE_DB_PREFIX is a special config that only exists on the
// dest cluster (but not on the src cluster).
String destDbPrefix = conf.get(
ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX);
return new HardCodedCluster(
destClusterName,
destMetastoreUrl.getHost(),
destMetastoreUrl.getPort(),
null,
null,
new Path(destHdfsRoot),
new Path(destHdfsTmp));
new Path(destHdfsTmp),
destDbPrefix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.airbnb.reair.incremental.configuration;

import com.airbnb.reair.common.DbPrefixHiveMetastoreClient;
import com.airbnb.reair.common.HiveMetastoreClient;
import com.airbnb.reair.common.HiveMetastoreException;
import com.airbnb.reair.common.ThriftHiveMetastoreClient;

Expand All @@ -17,7 +19,8 @@ public class HardCodedCluster implements Cluster {
private String jobtrackerPort;
private Path hdfsRoot;
private Path tmpDir;
private ThreadLocal<ThriftHiveMetastoreClient> metastoreClient;
private String dbNamePrefix;
private ThreadLocal<HiveMetastoreClient> metastoreClient;

/**
* Constructor with specific values.
Expand All @@ -29,6 +32,7 @@ public class HardCodedCluster implements Cluster {
* @param jobtrackerPort port of the job tracker
* @param hdfsRoot the path for the root HDFS directory
* @param tmpDir the path for the temporary HDFS directory (should be under root)
* @param dbNamePrefix optional prefix for all databases in the Hive metastore.
*/
public HardCodedCluster(
String name,
Expand All @@ -37,15 +41,38 @@ public HardCodedCluster(
String jobtrackerHost,
String jobtrackerPort,
Path hdfsRoot,
Path tmpDir) {
Path tmpDir,
String dbNamePrefix) {
this.name = name;
this.metastoreHost = metastoreHost;
this.metastorePort = metastorePort;
this.jobtrackerHost = jobtrackerHost;
this.jobtrackerPort = jobtrackerPort;
this.hdfsRoot = hdfsRoot;
this.tmpDir = tmpDir;
this.metastoreClient = new ThreadLocal<ThriftHiveMetastoreClient>();
this.dbNamePrefix = dbNamePrefix;
this.metastoreClient = new ThreadLocal<>();
}

/**
* Constructor with specific values, with a default dbNamePrefix of null.
*/
public HardCodedCluster(
String name,
String metastoreHost,
int metastorePort,
String jobtrackerHost,
String jobtrackerPort,
Path hdfsRoot,
Path tmpDir) {
this(name,
metastoreHost,
metastorePort,
jobtrackerHost,
jobtrackerPort,
hdfsRoot,
tmpDir,
null);
}

public String getMetastoreHost() {
Expand All @@ -58,11 +85,17 @@ public int getMetastorePort() {

/**
* Get a cached ThreadLocal metastore client.
*
* <p>Also applies a DbPrefixHiveMetastoreClient wrapper if dbNamePrefix is not null or
* empty.
*/
public ThriftHiveMetastoreClient getMetastoreClient() throws HiveMetastoreException {
ThriftHiveMetastoreClient result = this.metastoreClient.get();
public HiveMetastoreClient getMetastoreClient() throws HiveMetastoreException {
HiveMetastoreClient result = this.metastoreClient.get();
if (result == null) {
result = new ThriftHiveMetastoreClient(getMetastoreHost(), getMetastorePort());
if (dbNamePrefix != null && !dbNamePrefix.isEmpty()) {
result = new DbPrefixHiveMetastoreClient(result, dbNamePrefix);
}
this.metastoreClient.set(result);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class ConfigurationKeys {
public static final String DEST_HDFS_ROOT = "airbnb.reair.clusters.dest.hdfs.root";
// The root of the temporary directory for storing temporary files on the destination cluster
public static final String DEST_HDFS_TMP = "airbnb.reair.clusters.dest.hdfs.tmp";
// Prefix to add to any databases on the destination cluster. Mostly used for production testing.
public static final String DEST_CLUSTER_METASTORE_DB_PREFIX =
"airbnb.reair.clusters.dest.metastore.db.prefix";

// Class to use for filtering out entries from the audit log
public static final String OBJECT_FILTER_CLASS = "airbnb.reair.object.filter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.airbnb.reair.common.ArgumentException;
import com.airbnb.reair.common.CliUtils;
import com.airbnb.reair.common.HiveMetastoreClient;
import com.airbnb.reair.common.HiveObjectSpec;
import com.airbnb.reair.common.ThriftHiveMetastoreClient;
import com.airbnb.reair.incremental.DirectoryCopier;
import com.airbnb.reair.incremental.ReplicationUtils;
import com.airbnb.reair.incremental.RunInfo;
Expand Down Expand Up @@ -160,7 +160,7 @@ public static int main(String[] args) throws Exception {

if ("copy-unpartitioned-table".equals(op)) {
LOG.info("Copying an unpartitioned table");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName());
CopyUnpartitionedTableTask job = new CopyUnpartitionedTableTask(conf,
destinationObjectFactory, conflictHandler, srcCluster, destCluster, spec,
Expand All @@ -172,7 +172,7 @@ public static int main(String[] args) throws Exception {
}
} else if ("copy-partitioned-table".equals(op)) {
LOG.info("Copying a partitioned table");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName());
CopyPartitionedTableTask job = new CopyPartitionedTableTask(conf, destinationObjectFactory,
conflictHandler, srcCluster, destCluster, spec, ReplicationUtils.getLocation(srcTable));
Expand All @@ -183,7 +183,7 @@ public static int main(String[] args) throws Exception {
}
} else if (op.equals("copy-partition")) {
LOG.info("Copying a partition");
ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient();
HiveMetastoreClient ms = srcCluster.getMetastoreClient();
Partition srcPartition =
ms.getPartition(spec.getDbName(), spec.getTableName(), spec.getPartitionName());
CopyPartitionTask job = new CopyPartitionTask(conf, destinationObjectFactory, conflictHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
</comment>
</property>

<property>
<name>airbnb.reair.clusters.dest.metastore.db.prefix</name>
<value></value>
<comment>Optional prefix to add to destination Hive database names.</comment>
</property>

<property>
<name>airbnb.reair.clusters.batch.output.dir</name>
<value>hdfs://airfs-test/user/test/test_output</value>
Expand Down
Loading