diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 861968ffb31392..614cfca37455a4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2161,6 +2161,8 @@ public class Config extends ConfigBase { @ConfField(masterOnly = true) public static boolean enable_hms_events_incremental_sync = false; + public static String external_catalog_config_dir = EnvUtils.getDorisHome() + "/conf/catalogs/"; + /** * If set to true, doris will try to parse the ddl of a hive view and try to execute the query * otherwise it will throw an AnalysisException. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java index a54a28d59dd68a..a8838f65c6b646 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java @@ -22,12 +22,15 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.HdfsUtil; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import lombok.Data; +import org.apache.hadoop.conf.Configuration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +53,8 @@ public class CatalogProperty implements Writable { @SerializedName(value = "properties") private Map properties; + private Map fileCatalogConfigProperties; + private volatile Resource catalogResource = null; public CatalogProperty(String resource, Map properties) { @@ -57,7 +62,19 @@ public CatalogProperty(String resource, Map properties) { this.properties = properties; if (this.properties == null) { this.properties = Maps.newConcurrentMap(); + this.fileCatalogConfigProperties = new HashMap<>(); + } + checkNeedReloadFileProperties(this.properties); + } + + private void checkNeedReloadFileProperties(Map properties) { + if (!properties.isEmpty() && properties.containsKey(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) { + Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir( + this.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES)); + this.fileCatalogConfigProperties = configuration.getValByRegex(".*"); + return; } + this.fileCatalogConfigProperties = new HashMap<>(); } private Resource catalogResource() { @@ -98,25 +115,36 @@ public Map getProperties() { public void modifyCatalogProps(Map props) { properties.putAll(PropertyConverter.convertToMetaProperties(props)); + checkNeedReloadFileProperties(properties); } public void rollBackCatalogProps(Map props) { properties.clear(); properties = new HashMap<>(props); + checkNeedReloadFileProperties(properties); } public Map getHadoopProperties() { - Map hadoopProperties = getProperties(); + Map hadoopProperties = fileCatalogConfigProperties; + hadoopProperties.putAll(getProperties()); hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties())); return hadoopProperties; } public void addProperty(String key, String val) { this.properties.put(key, val); + if (key.equals(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) { + Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir( + this.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES)); + this.fileCatalogConfigProperties = configuration.getValByRegex(".*"); + } } public void deleteProperty(String key) { this.properties.remove(key); + if (key.equals(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) { + this.fileCatalogConfigProperties = new HashMap<>(); + } } @Override @@ -126,6 +154,14 @@ public void write(DataOutput out) throws IOException { public static CatalogProperty read(DataInput in) throws IOException { String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, CatalogProperty.class); + CatalogProperty catalogProperty = GsonUtils.GSON.fromJson(json, CatalogProperty.class); + catalogProperty.fileCatalogConfigProperties = new HashMap<>(); + if (catalogProperty.properties != null && !catalogProperty.properties.isEmpty() + && catalogProperty.properties.containsKey(DFSFileSystem.HADOOP_CONFIG_RESOURCES)) { + Configuration configuration = HdfsUtil.loadConfigurationFromHadoopConfDir( + catalogProperty.properties.get(DFSFileSystem.HADOOP_CONFIG_RESOURCES)); + catalogProperty.fileCatalogConfigProperties = configuration.getValByRegex(".*"); + } + return catalogProperty; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 20b9482041df02..540a7f067e389d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -53,11 +53,13 @@ import com.google.common.collect.Maps; import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.hive.HiveCatalog; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; import java.util.List; import java.util.Map; import java.util.Objects; @@ -185,6 +187,7 @@ protected void initLocalObjectsImpl() { jdbcClientConfig.setDriverClass(catalogProperty.getOrDefault("driver_class", "")); } else { hiveConf = new HiveConf(); + initHiveConfIfNeeded(hiveConf); for (Map.Entry kv : catalogProperty.getHadoopProperties().entrySet()) { hiveConf.set(kv.getKey(), kv.getValue()); } @@ -203,6 +206,26 @@ protected void initLocalObjectsImpl() { icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); } + private void initHiveConfIfNeeded(HiveConf hiveConf) { + String resourcesPath = catalogProperty.getOrDefault(HMSProperties.HIVE_CONFIG_RESOURCES, null); + if (Strings.isNullOrEmpty(resourcesPath)) { + return; + } + for (String resource : resourcesPath.split(",")) { + // Construct the full path to the resource + String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim(); + File file = new File(resourcePath); // Create a File object for the resource path + + // Check if the file exists and is a regular file + if (file.exists() && file.isFile()) { + hiveConf.addResource(new Path(file.toURI())); // Add the resource to the configuration + } else { + // Handle the case where the file does not exist + LOG.warn("Hive configuration resource file {} does not exist", resourcePath); + } + } + } + @Override public void onRefresh(boolean invalidCache) { super.onRefresh(invalidCache); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java index 81baf042faed37..5e748a769d4cef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java @@ -28,6 +28,9 @@ public class HMSProperties { // required public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; public static final List REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS); - public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync"; - public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc"; + public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync"; + public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc"; + + public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources"; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/HdfsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/fs/HdfsUtil.java new file mode 100644 index 00000000000000..b1ebb28987e77c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/HdfsUtil.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.common.Config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.File; + +public class HdfsUtil { + + /** + * Loads the Hadoop configuration files from the specified directory. + *

+ * This method reads a comma-separated list of resource files from the given + * `resourcesPath`, constructs their absolute paths based on the `Config.external_catalog_config_dir`, + * and then loads these files into a Hadoop `Configuration` object. + * + * @param resourcesPath The comma-separated list of Hadoop configuration resource files to load. + * This must not be null or empty. + * @return The Hadoop `Configuration` object with the loaded configuration files. + * @throws IllegalArgumentException If the provided `resourcesPath` is blank, or if any of the specified + * configuration files do not exist or are not regular files. + */ + public static Configuration loadConfigurationFromHadoopConfDir(String resourcesPath) { + // Check if the provided resourcesPath is blank and throw an exception if so. + if (StringUtils.isBlank(resourcesPath)) { + throw new IllegalArgumentException("Hadoop config resource path is empty"); + } + + // Create a new Hadoop Configuration object without loading default resources. + Configuration conf = new Configuration(false); + + // Iterate over the comma-separated list of resource files. + for (String resource : resourcesPath.split(",")) { + // Construct the full path to the resource file. + String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim(); + File file = new File(resourcePath); + + // Check if the file exists and is a regular file; if not, throw an exception. + if (file.exists() && file.isFile()) { + // Add the resource file to the Hadoop Configuration object. + conf.addResource(new Path(file.toURI())); + } else { + // Throw an exception if the file does not exist or is not a regular file. + throw new IllegalArgumentException("Hadoop config resource file does not exist: " + resourcePath); + } + } + // Return the populated Hadoop Configuration object. + return conf; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7034641a9fc128..6c4dc5944c3053 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -19,10 +19,12 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.URI; +import org.apache.doris.fs.HdfsUtil; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; import org.apache.doris.fs.operations.OpParams; @@ -58,7 +60,7 @@ import java.util.Map; public class DFSFileSystem extends RemoteFileSystem { - + public static final String HADOOP_CONFIG_RESOURCES = "hadoop.config.resources"; public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); private HDFSFileOperations operations = null; @@ -85,7 +87,12 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { throw new UserException("FileSystem is closed."); } if (dfsFileSystem == null) { - Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); + Configuration conf; + if (properties.containsKey(HADOOP_CONFIG_RESOURCES)) { + conf = HdfsUtil.loadConfigurationFromHadoopConfDir(properties.get(HADOOP_CONFIG_RESOURCES)); + } else { + conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); + } for (Map.Entry propEntry : properties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); } @@ -111,7 +118,8 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { } protected RemoteIterator getLocatedFiles(boolean recursive, - FileSystem fileSystem, Path locatedPath) throws IOException { + FileSystem fileSystem, Path locatedPath) + throws IOException { return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive)); } @@ -128,6 +136,26 @@ public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { return hdfsConf; } + private void loadConfigFromResources(Configuration conf) { + // Get the Hadoop config resources from properties + String hadoopConfigResources = properties.get(HADOOP_CONFIG_RESOURCES); + if (hadoopConfigResources != null) { + for (String resource : hadoopConfigResources.split(",")) { + // Construct the full path to the resource + String resourcePath = Config.external_catalog_config_dir + File.separator + resource.trim(); + File file = new File(resourcePath); // Create a File object for the resource path + + // Check if the file exists and is a regular file + if (file.exists() && file.isFile()) { + conf.addResource(new Path(file.toURI())); // Add the resource to the configuration + } else { + // Handle the case where the file does not exist + throw new IllegalArgumentException("Hadoop config resource file does not exist: " + resourcePath); + } + } + } + } + @Override public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { if (LOG.isDebugEnabled()) { @@ -221,7 +249,7 @@ public Status downloadWithFileSize(String remoteFilePath, String localFilePath, * @throws IOException when read data error. */ private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length) - throws IOException { + throws IOException { synchronized (fsDataInputStream) { long currentStreamOffset; try {