From f4c3187ff2229ac11f34e74746f39ee109888829 Mon Sep 17 00:00:00 2001 From: Zheng Shao Date: Thu, 3 Nov 2016 22:49:31 -0700 Subject: [PATCH] Feature: Add option for destination database name prefix. --- .../batch/hive/MetastoreReplicationJob.java | 1 + .../ConfiguredClusterFactory.java | 5 +- .../configuration/HardCodedCluster.java | 43 ++- .../incremental/deploy/ConfigurationKeys.java | 3 + .../reair/incremental/deploy/HiveCopy.java | 8 +- ...tch_replication_configuration_template.xml | 6 + .../common/DbPrefixHiveMetastoreClient.java | 251 ++++++++++++++++++ 7 files changed, 307 insertions(+), 10 deletions(-) create mode 100644 utils/src/main/java/com/airbnb/reair/common/DbPrefixHiveMetastoreClient.java diff --git a/main/src/main/java/com/airbnb/reair/batch/hive/MetastoreReplicationJob.java b/main/src/main/java/com/airbnb/reair/batch/hive/MetastoreReplicationJob.java index 07bbee43..74911f4e 100644 --- a/main/src/main/java/com/airbnb/reair/batch/hive/MetastoreReplicationJob.java +++ b/main/src/main/java/com/airbnb/reair/batch/hive/MetastoreReplicationJob.java @@ -317,6 +317,7 @@ private void mergeConfiguration(Configuration inputConfig, Configuration merged) ConfigurationKeys.DEST_CLUSTER_METASTORE_URL, ConfigurationKeys.DEST_HDFS_ROOT, ConfigurationKeys.DEST_HDFS_TMP, + ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX, ConfigurationKeys.BATCH_JOB_METASTORE_BLACKLIST, ConfigurationKeys.BATCH_JOB_CLUSTER_FACTORY_CLASS, ConfigurationKeys.BATCH_JOB_OUTPUT_DIR, diff --git a/main/src/main/java/com/airbnb/reair/incremental/configuration/ConfiguredClusterFactory.java b/main/src/main/java/com/airbnb/reair/incremental/configuration/ConfiguredClusterFactory.java index 1a53e972..41205be8 100644 --- a/main/src/main/java/com/airbnb/reair/incremental/configuration/ConfiguredClusterFactory.java +++ b/main/src/main/java/com/airbnb/reair/incremental/configuration/ConfiguredClusterFactory.java @@ -55,6 +55,8 @@ public Cluster getDestCluster() throws ConfigurationException { ConfigurationKeys.DEST_HDFS_ROOT); String destHdfsTmp = conf.get( ConfigurationKeys.DEST_HDFS_TMP); + String destDbPrefix = conf.get( + ConfigurationKeys.DEST_CLUSTER_METASTORE_DB_PREFIX); return new HardCodedCluster( destClusterName, destMetastoreUrl.getHost(), @@ -62,7 +64,8 @@ public Cluster getDestCluster() throws ConfigurationException { null, null, new Path(destHdfsRoot), - new Path(destHdfsTmp)); + new Path(destHdfsTmp), + destDbPrefix); } @Override diff --git a/main/src/main/java/com/airbnb/reair/incremental/configuration/HardCodedCluster.java b/main/src/main/java/com/airbnb/reair/incremental/configuration/HardCodedCluster.java index cd08bf8f..61a953bb 100644 --- a/main/src/main/java/com/airbnb/reair/incremental/configuration/HardCodedCluster.java +++ b/main/src/main/java/com/airbnb/reair/incremental/configuration/HardCodedCluster.java @@ -1,5 +1,7 @@ package com.airbnb.reair.incremental.configuration; +import com.airbnb.reair.common.DbPrefixHiveMetastoreClient; +import com.airbnb.reair.common.HiveMetastoreClient; import com.airbnb.reair.common.HiveMetastoreException; import com.airbnb.reair.common.ThriftHiveMetastoreClient; @@ -17,7 +19,8 @@ public class HardCodedCluster implements Cluster { private String jobtrackerPort; private Path hdfsRoot; private Path tmpDir; - private ThreadLocal metastoreClient; + private String dbNamePrefix; + private ThreadLocal metastoreClient; /** * Constructor with specific values. @@ -29,6 +32,7 @@ public class HardCodedCluster implements Cluster { * @param jobtrackerPort port of the job tracker * @param hdfsRoot the path for the root HDFS directory * @param tmpDir the path for the temporary HDFS directory (should be under root) + * @param dbNamePrefix optional prefix for all databases in the Hive metastore. */ public HardCodedCluster( String name, @@ -37,7 +41,8 @@ public HardCodedCluster( String jobtrackerHost, String jobtrackerPort, Path hdfsRoot, - Path tmpDir) { + Path tmpDir, + String dbNamePrefix) { this.name = name; this.metastoreHost = metastoreHost; this.metastorePort = metastorePort; @@ -45,7 +50,29 @@ public HardCodedCluster( this.jobtrackerPort = jobtrackerPort; this.hdfsRoot = hdfsRoot; this.tmpDir = tmpDir; - this.metastoreClient = new ThreadLocal(); + this.dbNamePrefix = dbNamePrefix; + this.metastoreClient = new ThreadLocal<>(); + } + + /** + * Constructor with specific values, with a default dbNamePrefix of null. + */ + public HardCodedCluster( + String name, + String metastoreHost, + int metastorePort, + String jobtrackerHost, + String jobtrackerPort, + Path hdfsRoot, + Path tmpDir) { + this(name, + metastoreHost, + metastorePort, + jobtrackerHost, + jobtrackerPort, + hdfsRoot, + tmpDir, + null); } public String getMetastoreHost() { @@ -58,11 +85,17 @@ public int getMetastorePort() { /** * Get a cached ThreadLocal metastore client. + * + *

Also applies a DbPrefixHiveMetastoreClient wrapper if dbNamePrefix is not null or + * empty. */ - public ThriftHiveMetastoreClient getMetastoreClient() throws HiveMetastoreException { - ThriftHiveMetastoreClient result = this.metastoreClient.get(); + public HiveMetastoreClient getMetastoreClient() throws HiveMetastoreException { + HiveMetastoreClient result = this.metastoreClient.get(); if (result == null) { result = new ThriftHiveMetastoreClient(getMetastoreHost(), getMetastorePort()); + if (dbNamePrefix != null && !dbNamePrefix.isEmpty()) { + result = new DbPrefixHiveMetastoreClient(result, dbNamePrefix); + } this.metastoreClient.set(result); } return result; diff --git a/main/src/main/java/com/airbnb/reair/incremental/deploy/ConfigurationKeys.java b/main/src/main/java/com/airbnb/reair/incremental/deploy/ConfigurationKeys.java index a45f779c..9cd5f133 100644 --- a/main/src/main/java/com/airbnb/reair/incremental/deploy/ConfigurationKeys.java +++ b/main/src/main/java/com/airbnb/reair/incremental/deploy/ConfigurationKeys.java @@ -52,6 +52,9 @@ public class ConfigurationKeys { public static final String DEST_HDFS_ROOT = "airbnb.reair.clusters.dest.hdfs.root"; // The root of the temporary directory for storing temporary files on the destination cluster public static final String DEST_HDFS_TMP = "airbnb.reair.clusters.dest.hdfs.tmp"; + // Prefix to add to any databases on the destination cluster. Mostly used for production testing. + public static final String DEST_CLUSTER_METASTORE_DB_PREFIX = + "airbnb.reair.clusters.dest.metastore.db.prefix"; // Class to use for filtering out entries from the audit log public static final String OBJECT_FILTER_CLASS = "airbnb.reair.object.filter"; diff --git a/main/src/main/java/com/airbnb/reair/incremental/deploy/HiveCopy.java b/main/src/main/java/com/airbnb/reair/incremental/deploy/HiveCopy.java index a6c49fd6..7020f522 100644 --- a/main/src/main/java/com/airbnb/reair/incremental/deploy/HiveCopy.java +++ b/main/src/main/java/com/airbnb/reair/incremental/deploy/HiveCopy.java @@ -2,8 +2,8 @@ import com.airbnb.reair.common.ArgumentException; import com.airbnb.reair.common.CliUtils; +import com.airbnb.reair.common.HiveMetastoreClient; import com.airbnb.reair.common.HiveObjectSpec; -import com.airbnb.reair.common.ThriftHiveMetastoreClient; import com.airbnb.reair.incremental.DirectoryCopier; import com.airbnb.reair.incremental.ReplicationUtils; import com.airbnb.reair.incremental.RunInfo; @@ -160,7 +160,7 @@ public static int main(String[] args) throws Exception { if ("copy-unpartitioned-table".equals(op)) { LOG.info("Copying an unpartitioned table"); - ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient(); + HiveMetastoreClient ms = srcCluster.getMetastoreClient(); Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName()); CopyUnpartitionedTableTask job = new CopyUnpartitionedTableTask(conf, destinationObjectFactory, conflictHandler, srcCluster, destCluster, spec, @@ -172,7 +172,7 @@ public static int main(String[] args) throws Exception { } } else if ("copy-partitioned-table".equals(op)) { LOG.info("Copying a partitioned table"); - ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient(); + HiveMetastoreClient ms = srcCluster.getMetastoreClient(); Table srcTable = ms.getTable(spec.getDbName(), spec.getTableName()); CopyPartitionedTableTask job = new CopyPartitionedTableTask(conf, destinationObjectFactory, conflictHandler, srcCluster, destCluster, spec, ReplicationUtils.getLocation(srcTable)); @@ -183,7 +183,7 @@ public static int main(String[] args) throws Exception { } } else if (op.equals("copy-partition")) { LOG.info("Copying a partition"); - ThriftHiveMetastoreClient ms = srcCluster.getMetastoreClient(); + HiveMetastoreClient ms = srcCluster.getMetastoreClient(); Partition srcPartition = ms.getPartition(spec.getDbName(), spec.getTableName(), spec.getPartitionName()); CopyPartitionTask job = new CopyPartitionTask(conf, destinationObjectFactory, conflictHandler, diff --git a/main/src/main/resources/batch_replication_configuration_template.xml b/main/src/main/resources/batch_replication_configuration_template.xml index 6eadd9f7..b22142d0 100644 --- a/main/src/main/resources/batch_replication_configuration_template.xml +++ b/main/src/main/resources/batch_replication_configuration_template.xml @@ -62,6 +62,12 @@ + + airbnb.reair.clusters.dest.metastore.db.prefix + + Optional prefix to add to destination Hive database names. + + airbnb.reair.clusters.batch.output.dir hdfs://airfs-test/user/test/test_output diff --git a/utils/src/main/java/com/airbnb/reair/common/DbPrefixHiveMetastoreClient.java b/utils/src/main/java/com/airbnb/reair/common/DbPrefixHiveMetastoreClient.java new file mode 100644 index 00000000..0e071e9e --- /dev/null +++ b/utils/src/main/java/com/airbnb/reair/common/DbPrefixHiveMetastoreClient.java @@ -0,0 +1,251 @@ +package com.airbnb.reair.common; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A HiveMetastoreClient wrapper that automatically adds a prefix to DbName when talking with the + * underlying HiveMetastoreClient. This is mainly for test purposes. + */ +public class DbPrefixHiveMetastoreClient implements HiveMetastoreClient { + + private static final Log LOG = LogFactory.getLog(DbPrefixHiveMetastoreClient.class); + + private HiveMetastoreClient client; + private String prefix; + + public DbPrefixHiveMetastoreClient(HiveMetastoreClient client, String prefix) { + this.client = client; + this.prefix = prefix; + } + + private String addPrefix(String dbName) { + return prefix + dbName; + } + + private Database addPrefix(Database db) { + if (db == null) { + return null; + } + Database result = new Database(db); + result.setName(addPrefix(result.getName())); + return result; + } + + private Table addPrefix(Table table) { + if (table == null) { + return null; + } + Table result = new Table(table); + result.setDbName(addPrefix(result.getDbName())); + return result; + } + + private Partition addPrefix(Partition partition) { + if (partition == null) { + return null; + } + Partition result = new Partition(partition); + result.setDbName(addPrefix(result.getDbName())); + return result; + } + + private String removePrefix(String dbName) { + if (!dbName.startsWith(prefix)) { + LOG.warn("Cannot remove prefix " + prefix + " from String " + dbName); + } else { + dbName = dbName.substring(prefix.length()); + } + return dbName; + } + + private Database removePrefix(Database db) { + if (db == null) { + return null; + } + Database result = new Database(db); + String dbName = result.getName(); + if (!dbName.startsWith(prefix)) { + LOG.warn("Cannot remove prefix " + prefix + " from Database " + dbName); + } else { + dbName = dbName.substring(prefix.length()); + } + result.setName(dbName); + return result; + } + + private Table removePrefix(Table table) { + if (table == null) { + return null; + } + Table result = new Table(table); + String dbName = result.getDbName(); + if (!dbName.startsWith(prefix)) { + LOG.warn("Cannot remove prefix " + prefix + " from Table " + dbName + ":" + result + .getTableName()); + } else { + dbName = dbName.substring(prefix.length()); + } + result.setDbName(dbName); + return result; + } + + private Partition removePrefix(Partition partition) { + if (partition == null) { + return null; + } + Partition result = new Partition(partition); + String dbName = result.getDbName(); + if (!dbName.startsWith(prefix)) { + LOG.warn("Cannot remove prefix " + prefix + " from Partition " + dbName + ":" + result + .getTableName() + ":" + result.getParameters()); + } else { + dbName = dbName.substring(prefix.length()); + } + result.setDbName(dbName); + return result; + } + + @Override + public Partition addPartition(Partition partition) throws HiveMetastoreException { + return removePrefix(client.addPartition(addPrefix(partition))); + } + + @Override + public Table getTable(String dbName, String tableName) throws HiveMetastoreException { + return removePrefix(client.getTable(addPrefix(dbName), tableName)); + } + + @Override + public Partition getPartition(String dbName, String tableName, String partitionName) throws + HiveMetastoreException { + return removePrefix(client.getPartition(addPrefix(dbName), tableName, partitionName)); + } + + @Override + public List getPartitionNames(String dbName, String tableName) throws + HiveMetastoreException { + return client.getPartitionNames(addPrefix(dbName), tableName); + } + + @Override + public void alterPartition(String dbName, String tableName, Partition partition) throws + HiveMetastoreException { + client.alterPartition(addPrefix(dbName), tableName, addPrefix(partition)); + } + + @Override + public void alterTable(String dbName, String tableName, Table table) throws + HiveMetastoreException { + client.alterTable(addPrefix(dbName), tableName, addPrefix(table)); + } + + @Override + public boolean isPartitioned(String dbName, String tableName) throws HiveMetastoreException { + return client.isPartitioned(addPrefix(dbName), tableName); + } + + @Override + public boolean existsPartition(String dbName, String tableName, String partitionName) throws + HiveMetastoreException { + return client.existsPartition(addPrefix(dbName), tableName, partitionName); + } + + @Override + public boolean existsTable(String dbName, String tableName) throws HiveMetastoreException { + return client.existsTable(addPrefix(dbName), tableName); + } + + @Override + public void createTable(Table table) throws HiveMetastoreException { + client.createTable(addPrefix(table)); + } + + @Override + public void dropTable(String dbName, String tableName, boolean deleteData) throws + HiveMetastoreException { + client.dropTable(addPrefix(dbName), tableName, deleteData); + } + + @Override + public void dropPartition(String dbName, String tableName, String partitionName, boolean + deleteData) throws HiveMetastoreException { + client.dropPartition(addPrefix(dbName), tableName, partitionName, deleteData); + } + + @Override + public Map partitionNameToMap(String partitionName) throws + HiveMetastoreException { + return client.partitionNameToMap(partitionName); + } + + @Override + public void createDatabase(Database db) throws HiveMetastoreException { + client.createDatabase(addPrefix(db)); + } + + @Override + public Database getDatabase(String dbName) throws HiveMetastoreException { + return removePrefix(client.getDatabase(addPrefix(dbName))); + } + + @Override + public boolean existsDb(String dbName) throws HiveMetastoreException { + return client.existsDb(addPrefix(dbName)); + } + + @Override + public List getTables(String dbName, String tableName) throws HiveMetastoreException { + return client.getTables(addPrefix(dbName), tableName); + } + + @Override + public Partition exchangePartition(Map partitionSpecs, String sourceDb, String + sourceTable, String destDb, String destinationTableName) throws HiveMetastoreException { + return removePrefix(client.exchangePartition(partitionSpecs, addPrefix(sourceDb), + sourceTable, addPrefix(destDb), destinationTableName)); + } + + @Override + public void renamePartition(String db, String table, List partitionValues, Partition + partition) throws HiveMetastoreException { + client.renamePartition(addPrefix(db), table, partitionValues, addPrefix(partition)); + } + + /** + * Return all databases that start with the predefined prefix. + */ + @Override + public List getAllDatabases() throws HiveMetastoreException { + List result = client.getAllDatabases(); + if (result == null) { + return null; + } + List real_result = new ArrayList(); + for (int i = 0; i < result.size(); i++) { + String db = result.get(i); + // Ignore databases that doesn't start with the prefix. + if (db.startsWith(prefix)) { + real_result.add(removePrefix(db)); + } + } + return real_result; + } + + @Override + public List getAllTables(String dbName) throws HiveMetastoreException { + return client.getAllTables(addPrefix(dbName)); + } + + @Override + public void close() { + client.close(); + } +}