Skip to content

Commit

Permalink
[feat](catalog)Support load config file from properties
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Oct 12, 2024
1 parent de32b56 commit 816ce10
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,8 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;

public static String external_catalog_config_dir = EnvUtils.getDorisHome() + "/conf/catalogs/";

/**
* If set to true, doris will try to parse the ddl of a hive view and try to execute the query
* otherwise it will throw an AnalysisException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -181,6 +183,7 @@ protected void initLocalObjectsImpl() {
jdbcClientConfig.setDriverClass(catalogProperty.getOrDefault("driver_class", ""));
} else {
hiveConf = new HiveConf();
initHiveConfIfNeeded(hiveConf);
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
hiveConf.set(kv.getKey(), kv.getValue());
}
Expand All @@ -197,6 +200,26 @@ protected void initLocalObjectsImpl() {
metadataOps = hiveOps;
}

private void initHiveConfIfNeeded(HiveConf hiveConf) {
String resourcesPath = catalogProperty.getOrDefault(HMSProperties.HIVE_CONFIG_RESOURCES, null);
if (Strings.isNullOrEmpty(resourcesPath)) {
return;
}
for (String resource : resourcesPath.split(",")) {
// Construct the full path to the resource
String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim();
File file = new File(resourcePath); // Create a File object for the resource path

// Check if the file exists and is a regular file
if (file.exists() && file.isFile()) {
hiveConf.addResource(new Path(file.toURI())); // Add the resource to the configuration
} else {
// Handle the case where the file does not exist
throw new AnalysisException("Hive config resource file does not exist: " + resourcePath);
}
}
}

@Override
public void onRefresh(boolean invalidCache) {
super.onRefresh(invalidCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class HMSProperties {
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);
public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";
public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";

public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
Expand Down Expand Up @@ -58,7 +59,7 @@
import java.util.Map;

public class DFSFileSystem extends RemoteFileSystem {

public static final String HADOOP_CONFIG_RESOURCES = "hadoop.config.resources";
public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";
private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class);
private HDFSFileOperations operations = null;
Expand Down Expand Up @@ -86,6 +87,7 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
}
if (dfsFileSystem == null) {
Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth());
loadConfigFromResources(conf);
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
}
Expand All @@ -111,7 +113,8 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
}

protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive,
FileSystem fileSystem, Path locatedPath) throws IOException {
FileSystem fileSystem, Path locatedPath)
throws IOException {
return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive));
}

Expand All @@ -128,6 +131,26 @@ public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
return hdfsConf;
}

private void loadConfigFromResources(Configuration conf) {
// Get the Hadoop config resources from properties
String hadoopConfigResources = properties.get(HADOOP_CONFIG_RESOURCES);
if (hadoopConfigResources != null) {
for (String resource : hadoopConfigResources.split(",")) {
// Construct the full path to the resource
String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim();
File file = new File(resourcePath); // Create a File object for the resource path

// Check if the file exists and is a regular file
if (file.exists() && file.isFile()) {
conf.addResource(new Path(file.toURI())); // Add the resource to the configuration
} else {
// Handle the case where the file does not exist
throw new IllegalArgumentException("Hadoop config resource file does not exist: " + resourcePath);
}
}
}
}

@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -221,7 +244,7 @@ public Status downloadWithFileSize(String remoteFilePath, String localFilePath,
* @throws IOException when read data error.
*/
private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length)
throws IOException {
throws IOException {
synchronized (fsDataInputStream) {
long currentStreamOffset;
try {
Expand Down

0 comments on commit 816ce10

Please sign in to comment.