Skip to content

Commit

Permalink
1. Enhance the functionality of s3filesystem to support multipart upl…
Browse files Browse the repository at this point in the history
…oads.

2. Support the use of s3 storage for BML materials and workspaces.
  • Loading branch information
sjgllgh committed Dec 4, 2024
1 parent ebc08a0 commit 4c3352a
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.apache.linkis.storage.domain.FsPathListWithError;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.storage.fs.stream.S3OutputStream;
import org.apache.linkis.storage.utils.StorageConfiguration;
import org.apache.linkis.storage.utils.StorageUtils;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +103,7 @@ public String rootUserName() {
public FsPath get(String dest) throws IOException {
FsPath ret = new FsPath(dest);
if (exists(ret)) {
ret.setIsdir(isDir(buildKey(ret.getPath())));
return ret;
} else {
logger.warn("File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)");
Expand All @@ -111,21 +116,31 @@ public FsPath get(String dest) throws IOException {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent();
return s3Client.getObject(bucket, buildKey(dest.getPath())).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
}

@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException {
try (InputStream inputStream = read(dest);
OutputStream outputStream =
new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) {
InputStream inputStream = null;
try {
if (!exists(dest)) {
create(dest.getPath());
}

OutputStream outputStream = new S3OutputStream(s3Client, bucket, buildKey(dest.getPath()));

if (!overwrite) {
inputStream = read(dest);
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
} catch (IOException e) {
throw new IOException("You have not permission to access path " + dest.getPath());
} finally {
IOUtils.closeQuietly(inputStream);
}
}

Expand All @@ -134,24 +149,39 @@ public boolean create(String dest) throws IOException {
if (exists(new FsPath(dest))) {
return false;
}
s3Client.putObject(bucket, dest, "");
s3Client.putObject(bucket, buildKey(dest), "");
return true;
}

@Override
public List<FsPath> list(FsPath path) throws IOException {
try {
if (!StringUtils.isEmpty(path.getPath())) {
ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath());
List<S3ObjectSummary> s3ObjectSummaries = listObjectsV2Result.getObjectSummaries();
return s3ObjectSummaries.stream()
.filter(summary -> !isInitFile(summary))
.map(
summary -> {
FsPath newPath = new FsPath(buildPath(summary.getKey()));
return fillStorageFile(newPath, summary);
})
.collect(Collectors.toList());
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildKey(path.getPath()) + "/")
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
List<String> commonPrefixes = dirResult.getCommonPrefixes();
List<FsPath> fsPaths =
s3ObjectSummaries.stream()
.filter(summary -> !isInitFile(summary))
.map(
summary -> {
FsPath newPath = new FsPath(buildPath(summary.getKey()));
return fillStorageFile(newPath, summary);
})
.collect(Collectors.toList());
if (commonPrefixes != null) {
for (String dir : commonPrefixes) {
FsPath newPath = new FsPath(buildPath(dir));
newPath.setIsdir(true);
fsPaths.add(newPath);
}
}
return fsPaths;
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + path.getPath());
Expand All @@ -173,7 +203,7 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildPrefix(path.getPath()))
.withPrefix(buildKey(path.getPath()) + "/")
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
Expand Down Expand Up @@ -204,25 +234,15 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
@Override
public boolean exists(FsPath dest) throws IOException {
try {
if (new File(dest.getPath()).getName().contains(".")) {
return existsFile(dest);
if (dest == null) {
return false;
}
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(buildPrefix(dest.getPath()))
.withDelimiter("/");
return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+ s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
> 0;
} catch (AmazonS3Exception e) {
return false;
}
}

public boolean existsFile(FsPath dest) {
try {
return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false));
.withPrefix(buildKey(dest.getPath()))
.withMaxKeys(1);
return !s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty();
} catch (AmazonS3Exception e) {
return false;
}
Expand All @@ -231,25 +251,41 @@ public boolean existsFile(FsPath dest) {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false));
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
String[] keyList =
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest("test").withKeys(keyList);
s3Client.deleteObjects(deleteObjectsRequest);
List<String> deleteKeys = new ArrayList<>();
delete(dest, deleteKeys);
if (!deleteKeys.isEmpty()) {
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new String[0]));
s3Client.deleteObjects(deleteObjectsRequest);
}
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
}

public void delete(FsPath dest, List<String> keys) throws IOException {
if (isDir(buildKey(dest.getPath()))) {
FsPathListWithError fsPathListWithError = listPathWithError(dest, false);
List<FsPath> fsPaths = fsPathListWithError.getFsPaths();
fsPaths.forEach(
fsPath -> {
try {
delete(fsPath, keys);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} else {
keys.add(buildKey(dest.getPath()));
}
}

@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
String newOriginPath = buildPrefix(oldDest.getPath(), false);
String newDestPath = buildPrefix(newDest.getPath(), false);
String newOriginPath = buildKey(oldDest.getPath());
String newDestPath = buildKey(newDest.getPath());
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
Expand Down Expand Up @@ -281,8 +317,8 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
String newOrigin = buildPrefix(origin, false);
String newDest = buildPrefix(dest, false);
String newOrigin = buildKey(origin);
String newDest = buildKey(dest);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
Expand All @@ -305,8 +341,16 @@ public boolean copy(String origin, String dest) throws IOException {
}
}

private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) {
return s3ObjectSummary.getKey().substring(prefix.length()).contains("/");
private boolean isDir(String key) {
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(key + "/")
.withDelimiter("/")
.withMaxKeys(1);

return !(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty()
&& s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty());
}

private boolean isInitFile(S3ObjectSummary s3ObjectSummary) {
Expand All @@ -318,6 +362,13 @@ public String listRoot() {
return "/";
}

/**
* s3没有目录概念,无法直接创建目录
* S3 lacks the concept of directories and cannot create directories directly.
* @param dest
* @return
* @throws IOException
*/
@Override
public boolean mkdir(FsPath dest) throws IOException {
String path = new File(dest.getPath(), INIT_FILE_NAME).getPath();
Expand All @@ -339,7 +390,7 @@ private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) {
fsPath.setOwner(owner.getDisplayName());
}
try {
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
fsPath.setIsdir(isDir(s3ObjectSummary.getKey()));
} catch (Throwable e) {
logger.warn("Failed to fill storage file:" + fsPath.getPath(), e);
}
Expand All @@ -359,7 +410,7 @@ public boolean canRead(FsPath dest) {

@Override
public boolean canRead(FsPath dest, String user) throws IOException {
return false;
return true;
}

@Override
Expand All @@ -384,7 +435,10 @@ public long getUsableSpace(FsPath dest) {

@Override
public long getLength(FsPath dest) throws IOException {
return 0;
return s3Client
.getObject(bucket, buildKey(dest.getPath()))
.getObjectMetadata()
.getContentLength();
}

@Override
Expand Down Expand Up @@ -418,7 +472,9 @@ public boolean setPermission(FsPath dest, String permission) {
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
s3Client.shutdown();
}

public String getLabel() {
return label;
Expand All @@ -429,46 +485,22 @@ public void setLabel(String label) {
}

public String buildPath(String path) {
if (path == null || "".equals(path)) return "";
if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
return StorageUtils.S3_SCHEMA() + path;
}
return StorageUtils.S3_SCHEMA() + "/" + path;
}

public String buildPrefix(String path, boolean addTail) {
public String buildKey(String path) {
String res = path;
if (path == null || "".equals(path)) return "";
if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
res = path.replaceFirst("/", "");
}
if (!path.endsWith("/") && addTail) {
res = res + "/";
if (path.endsWith("/") && !res.isEmpty()) {
res = res.substring(0, res.length() - 1);
}
return res;
}

public String buildPrefix(String path) {
return buildPrefix(path, true);
}
}

class S3OutputStream extends ByteArrayOutputStream {
private AmazonS3 s3Client;
private String bucket;
private String path;

public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
this.s3Client = s3Client;
this.bucket = bucket;
this.path = path;
}

@Override
public void close() throws IOException {
byte[] buffer = this.toByteArray();
try (InputStream in = new ByteArrayInputStream(buffer)) {
s3Client.putObject(bucket, path, in, new ObjectMetadata());
}
}
}
Loading

0 comments on commit 4c3352a

Please sign in to comment.