Skip to content

Commit

Permalink
[apache#5973] feat(hadoop-catalog): Support credential when using fil…
Browse files Browse the repository at this point in the history
…eset catalog with cloud storage (apache#5974)

### What changes were proposed in this pull request?

Support dynamic credential in obtaining cloud storage fileset.

### Why are the changes needed?

Static key are not very safe, we need to optimize it. 

Fix: apache#5973 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

ITs
  • Loading branch information
yuqi1129 authored and Abyss-lord committed Jan 10, 2025
1 parent 3b24ee2 commit 2f81bdc
Show file tree
Hide file tree
Showing 26 changed files with 1,765 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.hadoop.conf.Configuration;

public class OSSCredentialsProvider implements CredentialsProvider {

private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;
private Credentials basicCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.5D;

public OSSCredentialsProvider(URI uri, Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicCredentials;
}

private void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = OSSUtils.getSuitableCredential(gravitinoCredentials);
if (credential == null) {
throw new RuntimeException("No suitable credential for OSS found...");
}

if (credential instanceof OSSSecretKeyCredential) {
OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) credential;
basicCredentials =
new DefaultCredentials(
ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey());
} else if (credential instanceof OSSTokenCredential) {
OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
basicCredentials =
new BasicCredentials(
ossTokenCredential.accessKeyId(),
ossTokenCredential.secretAccessKey(),
ossTokenCredential.securityToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.Constants;

public class OSSFileSystemProvider implements FileSystemProvider {
public class OSSFileSystemProvider implements FileSystemProvider, SupportsCredentialVending {

private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";

Expand Down Expand Up @@ -61,9 +66,22 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public Map<String, String> getFileSystemCredentialConf(Credential[] credentials) {
Credential credential = OSSUtils.getSuitableCredential(credentials);
Map<String, String> result = Maps.newHashMap();
if (credential instanceof OSSSecretKeyCredential || credential instanceof OSSTokenCredential) {
result.put(
Constants.CREDENTIALS_PROVIDER_KEY, OSSCredentialsProvider.class.getCanonicalName());
}

return result;
}

@Override
public String scheme() {
return "oss";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;

public class OSSUtils {

/**
* Get the credential from the credential array. Using dynamic credential first, if not found,
* uses static credential.
*
* @param credentials The credential array.
* @return A credential. Null if not found.
*/
static Credential getSuitableCredential(Credential[] credentials) {
// Use dynamic credential if found.
for (Credential credential : credentials) {
if (credential instanceof OSSTokenCredential) {
return credential;
}
}

// If dynamic credential not found, use the static one
for (Credential credential : credentials) {
if (credential instanceof OSSSecretKeyCredential) {
return credential;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.hadoop.conf.Configuration;

public class S3CredentialsProvider implements AWSCredentialsProvider {
private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;

private AWSCredentials basicSessionCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.5D;

public S3CredentialsProvider(final URI uri, final Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public AWSCredentials getCredentials() {
// Refresh credentials if they are null or about to expire.
if (basicSessionCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicSessionCredentials;
}

@Override
public void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = S3Utils.getSuitableCredential(gravitinoCredentials);

if (credential == null) {
throw new RuntimeException("No suitable credential for S3 found...");
}

if (credential instanceof S3SecretKeyCredential) {
S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential) credential;
basicSessionCredentials =
new BasicAWSCredentials(
s3SecretKeyCredential.accessKeyId(), s3SecretKeyCredential.secretAccessKey());
} else if (credential instanceof S3TokenCredential) {
S3TokenCredential s3TokenCredential = (S3TokenCredential) credential;
basicSessionCredentials =
new BasicSessionCredentials(
s3TokenCredential.accessKeyId(),
s3TokenCredential.secretAccessKey(),
s3TokenCredential.sessionToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,9 +44,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3FileSystemProvider implements FileSystemProvider {
public class S3FileSystemProvider implements FileSystemProvider, SupportsCredentialVending {

private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemProvider.class);

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
Expand All @@ -61,18 +66,29 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

hadoopConfMap.forEach(configuration::set);
if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
}

hadoopConfMap.forEach(configuration::set);

// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(configuration);

return S3AFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public Map<String, String> getFileSystemCredentialConf(Credential[] credentials) {
Credential credential = S3Utils.getSuitableCredential(credentials);
Map<String, String> result = Maps.newHashMap();
if (credential instanceof S3SecretKeyCredential || credential instanceof S3TokenCredential) {
result.put(
Constants.AWS_CREDENTIALS_PROVIDER, S3CredentialsProvider.class.getCanonicalName());
}

return result;
}

private void checkAndSetCredentialProvider(Configuration configuration) {
String provides = configuration.get(S3_CREDENTIAL_KEY);
if (provides == null) {
Expand All @@ -91,12 +107,12 @@ private void checkAndSetCredentialProvider(Configuration configuration) {
if (AWSCredentialsProvider.class.isAssignableFrom(c)) {
validProviders.add(provider);
} else {
LOGGER.warn(
LOG.warn(
"Credential provider {} is not a subclass of AWSCredentialsProvider, skipping",
provider);
}
} catch (Exception e) {
LOGGER.warn(
LOG.warn(
"Credential provider {} not found in the Hadoop runtime, falling back to default",
provider);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
Expand Down
Loading

0 comments on commit 2f81bdc

Please sign in to comment.