Skip to content

Commit

Permalink
Support loading HDFS and Hive XML config files during catalog creation
Browse files Browse the repository at this point in the history
Add support for `hive.config.resources` and `hadoop.config.resources` parameters to allow specifying the paths of Hive and Hadoop XML configuration files when creating a catalog. These parameters ensure that the corresponding config files are properly loaded.
  • Loading branch information
CalvinKirs committed Oct 14, 2024
1 parent d3f63d5 commit 1abc5a5
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,8 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;

public static String external_catalog_config_dir = EnvUtils.getDorisHome() + "/conf/catalogs/";

/**
* If set to true, doris will try to parse the ddl of a hive view and try to execute the query
* otherwise it will throw an AnalysisException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.HdfsUtil;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -50,14 +53,28 @@ public class CatalogProperty implements Writable {
@SerializedName(value = "properties")
private Map<String, String> properties;

private Map<String, String> fileCatalogConfigProperties;

private volatile Resource catalogResource = null;

public CatalogProperty(String resource, Map<String, String> properties) {
this.resource = Strings.nullToEmpty(resource);
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newConcurrentMap();
this.fileCatalogConfigProperties = new HashMap<>();
}
checkNeedReloadFileProperties(this.properties);
}

private void checkNeedReloadFileProperties(Map<String, String> properties) {
if (!properties.isEmpty() && properties.containsKey(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) {
Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir(
this.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES));
this.fileCatalogConfigProperties = configuration.getValByRegex(".*");
return;
}
this.fileCatalogConfigProperties = new HashMap<>();
}

private Resource catalogResource() {
Expand Down Expand Up @@ -98,25 +115,36 @@ public Map<String, String> getProperties() {

public void modifyCatalogProps(Map<String, String> props) {
properties.putAll(PropertyConverter.convertToMetaProperties(props));
checkNeedReloadFileProperties(properties);
}

public void rollBackCatalogProps(Map<String, String> props) {
properties.clear();
properties = new HashMap<>(props);
checkNeedReloadFileProperties(properties);
}

public Map<String, String> getHadoopProperties() {
Map<String, String> hadoopProperties = getProperties();
Map<String, String> hadoopProperties = fileCatalogConfigProperties;
hadoopProperties.putAll(getProperties());
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
return hadoopProperties;
}

public void addProperty(String key, String val) {
this.properties.put(key, val);
if (key.equals(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) {
Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir(
this.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES));
this.fileCatalogConfigProperties = configuration.getValByRegex(".*");
}
}

public void deleteProperty(String key) {
this.properties.remove(key);
if (key.equals(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) {
this.fileCatalogConfigProperties = new HashMap<>();
}
}

@Override
Expand All @@ -126,6 +154,14 @@ public void write(DataOutput out) throws IOException {

public static CatalogProperty read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, CatalogProperty.class);
CatalogProperty catalogProperty = GsonUtils.GSON.fromJson(json, CatalogProperty.class);
catalogProperty.fileCatalogConfigProperties = new HashMap<>();
if (catalogProperty.properties != null && !catalogProperty.properties.isEmpty()
&& catalogProperty.properties.containsKey(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) {
Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir(
catalogProperty.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES));
catalogProperty.fileCatalogConfigProperties = configuration.getValByRegex(".*");
}
return catalogProperty;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -185,6 +187,7 @@ protected void initLocalObjectsImpl() {
jdbcClientConfig.setDriverClass(catalogProperty.getOrDefault("driver_class", ""));
} else {
hiveConf = new HiveConf();
initHiveConfIfNeeded(hiveConf);
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
hiveConf.set(kv.getKey(), kv.getValue());
}
Expand All @@ -203,6 +206,26 @@ protected void initLocalObjectsImpl() {
icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
}

private void initHiveConfIfNeeded(HiveConf hiveConf) {
String resourcesPath = catalogProperty.getOrDefault(HMSProperties.HIVE_CONFIG_RESOURCES, null);
if (Strings.isNullOrEmpty(resourcesPath)) {
return;
}
for (String resource : resourcesPath.split(",")) {
// Construct the full path to the resource
String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim();
File file = new File(resourcePath); // Create a File object for the resource path

// Check if the file exists and is a regular file
if (file.exists() && file.isFile()) {
hiveConf.addResource(new Path(file.toURI())); // Add the resource to the configuration
} else {
// Handle the case where the file does not exist
LOG.warn("Hive configuration resource file {} does not exist", resourcePath);
}
}
}

@Override
public void onRefresh(boolean invalidCache) {
super.onRefresh(invalidCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class HMSProperties {
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);
public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";
public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";

public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources";

}
71 changes: 71 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fs/HdfsUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.fs;

import org.apache.doris.common.Config;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.File;

public class HdfsUtil {

/**
* Loads the Hadoop configuration files from the specified directory.
* <p>
* This method reads a comma-separated list of resource files from the given
* `resourcesPath`, constructs their absolute paths based on the `Config.external_catalog_config_dir`,
* and then loads these files into a Hadoop `Configuration` object.
*
* @param resourcesPath The comma-separated list of Hadoop configuration resource files to load.
* This must not be null or empty.
* @return The Hadoop `Configuration` object with the loaded configuration files.
* @throws IllegalArgumentException If the provided `resourcesPath` is blank, or if any of the specified
* configuration files do not exist or are not regular files.
*/
public static Configuration loadConfigurationFromHadoopConfDir(String resourcesPath) {
// Check if the provided resourcesPath is blank and throw an exception if so.
if (StringUtils.isBlank(resourcesPath)) {
throw new IllegalArgumentException("Hadoop config resource path is empty");
}

// Create a new Hadoop Configuration object without loading default resources.
Configuration conf = new Configuration(false);

// Iterate over the comma-separated list of resource files.
for (String resource : resourcesPath.split(",")) {
// Construct the full path to the resource file.
String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim();
File file = new File(resourcePath);

// Check if the file exists and is a regular file; if not, throw an exception.
if (file.exists() && file.isFile()) {
// Add the resource file to the Hadoop Configuration object.
conf.addResource(new Path(file.toURI()));
} else {
// Throw an exception if the file does not exist or is not a regular file.
throw new IllegalArgumentException("Hadoop config resource file does not exist: " + resourcePath);
}
}
// Return the populated Hadoop Configuration object.
return conf;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.URI;
import org.apache.doris.fs.HdfsUtil;
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
Expand Down Expand Up @@ -58,7 +60,7 @@
import java.util.Map;

public class DFSFileSystem extends RemoteFileSystem {

public static final String HADOOP_CONFIG_RESOURCES = "hadoop.config.resources";
public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";
private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class);
private HDFSFileOperations operations = null;
Expand All @@ -85,7 +87,12 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth());
Configuration conf;
if (properties.containsKey(HADOOP_CONFIG_RESOURCES)) {
conf = HdfsUtil.loadConfigurationFromHadoopConfDir(properties.get(HADOOP_CONFIG_RESOURCES));
} else {
conf = getHdfsConf(ifNotSetFallbackToSimpleAuth());
}
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
}
Expand All @@ -111,7 +118,8 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
}

protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive,
FileSystem fileSystem, Path locatedPath) throws IOException {
FileSystem fileSystem, Path locatedPath)
throws IOException {
return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive));
}

Expand All @@ -128,6 +136,26 @@ public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
return hdfsConf;
}

private void loadConfigFromResources(Configuration conf) {
// Get the Hadoop config resources from properties
String hadoopConfigResources = properties.get(HADOOP_CONFIG_RESOURCES);
if (hadoopConfigResources != null) {
for (String resource : hadoopConfigResources.split(",")) {
// Construct the full path to the resource
String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim();
File file = new File(resourcePath); // Create a File object for the resource path

// Check if the file exists and is a regular file
if (file.exists() && file.isFile()) {
conf.addResource(new Path(file.toURI())); // Add the resource to the configuration
} else {
// Handle the case where the file does not exist
throw new IllegalArgumentException("Hadoop config resource file does not exist: " + resourcePath);
}
}
}
}

@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -221,7 +249,7 @@ public Status downloadWithFileSize(String remoteFilePath, String localFilePath,
* @throws IOException when read data error.
*/
private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length)
throws IOException {
throws IOException {
synchronized (fsDataInputStream) {
long currentStreamOffset;
try {
Expand Down

0 comments on commit 1abc5a5

Please sign in to comment.