Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Merge pull request #40 from zshao/issue_39_cache_metastoreclient
Browse files Browse the repository at this point in the history
Make HiveMetastoreClient cached in a Thread Local way
  • Loading branch information
Paul Yang authored Nov 4, 2016
2 parents ac899fc + 93d80a3 commit ac86065
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class HardCodedCluster implements Cluster {
private String jobtrackerPort;
private Path hdfsRoot;
private Path tmpDir;
private ThreadLocal<ThriftHiveMetastoreClient> metastoreClient;

/**
* Constructor with specific values.
Expand Down Expand Up @@ -44,6 +45,7 @@ public HardCodedCluster(
this.jobtrackerPort = jobtrackerPort;
this.hdfsRoot = hdfsRoot;
this.tmpDir = tmpDir;
this.metastoreClient = new ThreadLocal<ThriftHiveMetastoreClient>();
}

public String getMetastoreHost() {
Expand All @@ -54,8 +56,16 @@ public int getMetastorePort() {
return metastorePort;
}

/**
* Get a cached ThreadLocal metastore client.
*/
public ThriftHiveMetastoreClient getMetastoreClient() throws HiveMetastoreException {
return new ThriftHiveMetastoreClient(getMetastoreHost(), getMetastorePort());
ThriftHiveMetastoreClient result = this.metastoreClient.get();
if (result == null) {
result = new ThriftHiveMetastoreClient(getMetastoreHost(), getMetastorePort());
this.metastoreClient.set(result);
}
return result;
}

public Path getFsRoot() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.airbnb.reair.common;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand All @@ -20,6 +22,8 @@
*/
public class ThriftHiveMetastoreClient implements HiveMetastoreClient {

private static final Log LOG = LogFactory.getLog(ThriftHiveMetastoreClient.class);

private static int DEFAULT_SOCKET_TIMEOUT = 600;

private String host;
Expand Down Expand Up @@ -52,13 +56,15 @@ public ThriftHiveMetastoreClient(String host, int port) throws HiveMetastoreExce
*/
private void connect() throws HiveMetastoreException {

LOG.info("Connecting to ThriftHiveMetastore " + host + ":" + port);
transport = new TSocket(host, port, 1000 * clientSocketTimeout);

this.client = new ThriftHiveMetastore.Client(new TBinaryProtocol(transport));

try {
transport.open();
} catch (TTransportException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -67,7 +73,17 @@ private void connect() throws HiveMetastoreException {
* TODO.
*/
public void close() {
transport.close();
if (transport != null) {
transport.close();
transport = null;
client = null;
}
}

private void connectIfNeeded() throws HiveMetastoreException {
if (transport == null) {
connect();
}
}

/**
Expand All @@ -80,8 +96,10 @@ public void close() {
*/
public synchronized Partition addPartition(Partition partition) throws HiveMetastoreException {
try {
connectIfNeeded();
return client.add_partition(partition);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -99,10 +117,12 @@ public synchronized Table getTable(String dbName, String tableName)
throws HiveMetastoreException {

try {
connectIfNeeded();
return client.get_table(dbName, tableName);
} catch (NoSuchObjectException e) {
return null;
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -121,6 +141,7 @@ public synchronized Partition getPartition(String dbName, String tableName, Stri
throws HiveMetastoreException {

try {
connectIfNeeded();
return client.get_partition_by_name(dbName, tableName, partitionName);
} catch (NoSuchObjectException e) {
return null;
Expand All @@ -136,6 +157,7 @@ public synchronized Partition getPartition(String dbName, String tableName, Stri
throw new HiveMetastoreException(e);
}
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -152,8 +174,10 @@ public synchronized Partition getPartition(String dbName, String tableName, Stri
public synchronized void alterPartition(String dbName, String tableName, Partition partition)
throws HiveMetastoreException {
try {
connectIfNeeded();
client.alter_partition(dbName, tableName, partition);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -170,8 +194,10 @@ public synchronized void alterPartition(String dbName, String tableName, Partiti
public synchronized void alterTable(String dbName, String tableName, Table table)
throws HiveMetastoreException {
try {
connectIfNeeded();
client.alter_table(dbName, tableName, table);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand Down Expand Up @@ -228,8 +254,10 @@ public synchronized boolean existsTable(String dbName, String tableName)
*/
public synchronized void createTable(Table table) throws HiveMetastoreException {
try {
connectIfNeeded();
client.create_table(table);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -246,8 +274,10 @@ public synchronized void createTable(Table table) throws HiveMetastoreException
public synchronized void dropTable(String dbName, String tableName, boolean deleteData)
throws HiveMetastoreException {
try {
connectIfNeeded();
client.drop_table(dbName, tableName, deleteData);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -265,8 +295,10 @@ public synchronized void dropTable(String dbName, String tableName, boolean dele
public synchronized void dropPartition(String dbName, String tableName, String partitionName,
boolean deleteData) throws HiveMetastoreException {
try {
connectIfNeeded();
client.drop_partition_by_name(dbName, tableName, partitionName, deleteData);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -280,28 +312,34 @@ public synchronized void dropPartition(String dbName, String tableName, String p
public synchronized Map<String, String> partitionNameToMap(String partitionName)
throws HiveMetastoreException {
try {
connectIfNeeded();
return client.partition_name_to_spec(partitionName);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}

@Override
public synchronized void createDatabase(Database db) throws HiveMetastoreException {
try {
connectIfNeeded();
client.create_database(db);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}

@Override
public synchronized Database getDatabase(String dbName) throws HiveMetastoreException {
try {
connectIfNeeded();
return client.get_database(dbName);
} catch (NoSuchObjectException e) {
return null;
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -315,8 +353,10 @@ public synchronized boolean existsDb(String dbName) throws HiveMetastoreExceptio
public synchronized List<String> getPartitionNames(String dbName, String tableName)
throws HiveMetastoreException {
try {
connectIfNeeded();
return client.get_partition_names(dbName, tableName, (short) -1);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -325,8 +365,10 @@ public synchronized List<String> getPartitionNames(String dbName, String tableNa
public synchronized List<String> getTables(String dbName, String tableName)
throws HiveMetastoreException {
try {
connectIfNeeded();
return client.get_tables(dbName, tableName);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -339,9 +381,11 @@ public synchronized Partition exchangePartition(
String destinationTableName)
throws HiveMetastoreException {
try {
connectIfNeeded();
return client.exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb,
destinationTableName);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -354,8 +398,10 @@ public void renamePartition(
Partition partition)
throws HiveMetastoreException {
try {
connectIfNeeded();
client.rename_partition(db, table, partitionValues, partition);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -367,8 +413,10 @@ public void renamePartition(
*/
public List<String> getAllDatabases() throws HiveMetastoreException {
try {
connectIfNeeded();
return client.get_all_databases();
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand All @@ -381,8 +429,10 @@ public List<String> getAllDatabases() throws HiveMetastoreException {
*/
public List<String> getAllTables(String dbName) throws HiveMetastoreException {
try {
connectIfNeeded();
return client.get_all_tables(dbName);
} catch (TException e) {
close();
throw new HiveMetastoreException(e);
}
}
Expand Down

0 comments on commit ac86065

Please sign in to comment.