Skip to content

Commit

Permalink
Updated storage impl
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jan 4, 2024
1 parent 34fb77b commit 5afca51
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/Foundatio.Minio/Storage/MinioFileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -27,7 +27,7 @@ public class MinioFileStorage : IFileStorage {
public MinioFileStorage(MinioFileStorageOptions options) {
if (options == null)
throw new ArgumentNullException(nameof(options));

_serializer = options.Serializer ?? DefaultSerializer.Instance;
_logger = options.LoggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;

Expand All @@ -36,7 +36,7 @@ public MinioFileStorage(MinioFileStorageOptions options) {
_bucket = bucket;
_shouldAutoCreateBucket = options.AutoCreateBucket;
}

public MinioFileStorage(Builder<MinioFileStorageOptionsBuilder, MinioFileStorageOptions> builder)
: this(builder(new MinioFileStorageOptionsBuilder()).Build()) { }

Expand All @@ -58,15 +58,23 @@ private async Task EnsureBucketExists() {
_bucketExistsChecked = true;
}

public async Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
[Obsolete($"Use {nameof(GetFileStreamAsync)} with {nameof(FileAccess)} instead to define read or write behaviour of stream")]
public Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default)
=> GetFileStreamAsync(path, StreamMode.Read, cancellationToken);

public async Task<Stream> GetFileStreamAsync(string path, StreamMode streamMode, CancellationToken cancellationToken = default)

Check failure on line 65 in src/Foundatio.Minio/Storage/MinioFileStorage.cs

View workflow job for this annotation

GitHub Actions / build / build

The type or namespace name 'StreamMode' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 65 in src/Foundatio.Minio/Storage/MinioFileStorage.cs

View workflow job for this annotation

GitHub Actions / build / build

The type or namespace name 'StreamMode' could not be found (are you missing a using directive or an assembly reference?)
{
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

if (streamMode is StreamMode.Write)
throw new NotSupportedException($"Stream mode {streamMode} is not supported.");

await EnsureBucketExists().AnyContext();

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file stream for {Path}", normalizedPath);

try {
Stream result = new MemoryStream();
await _client.GetObjectAsync(new GetObjectArgs().WithBucket(_bucket).WithObject(normalizedPath).WithCallbackStream(async (stream, _) => await stream.CopyToAsync(result).AnyContext()), cancellationToken).AnyContext();
Expand Down Expand Up @@ -106,7 +114,7 @@ public async Task<bool> ExistsAsync(string path) {
throw new ArgumentNullException(nameof(path));

await EnsureBucketExists().AnyContext();

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Checking if {Path} exists", normalizedPath);

Expand All @@ -128,7 +136,7 @@ public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationTo

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Saving {Path}", normalizedPath);

var seekableStream = stream.CanSeek ? stream : new MemoryStream();
if (!stream.CanSeek) {
await stream.CopyToAsync(seekableStream).AnyContext();
Expand Down Expand Up @@ -158,7 +166,7 @@ public async Task<bool> RenameFileAsync(string path, string newPath, Cancellatio
string normalizedPath = NormalizePath(path);
string normalizedNewPath = NormalizePath(newPath);
_logger.LogInformation("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath);

return await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext() &&
await DeleteFileAsync(normalizedPath, cancellationToken).AnyContext();
}
Expand Down Expand Up @@ -194,7 +202,7 @@ public async Task<bool> DeleteFileAsync(string path, CancellationToken cancellat
throw new ArgumentNullException(nameof(path));

await EnsureBucketExists().AnyContext();

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Deleting {Path}", normalizedPath);

Expand Down Expand Up @@ -270,23 +278,23 @@ private Task<List<FileSpec>> GetFileListAsync(string searchPattern = null, int?
var criteria = GetRequestCriteria(searchPattern);

_logger.LogTrace(
s => s.Property("SearchPattern", searchPattern).Property("Limit", limit).Property("Skip", skip),
s => s.Property("SearchPattern", searchPattern).Property("Limit", limit).Property("Skip", skip),
"Getting file list recursively matching {Prefix} and {Pattern}...", criteria.Prefix, criteria.Pattern
);

ExceptionDispatchInfo exception = null;
var resetEvent = new AutoResetEvent(false);
var observable = _client.ListObjectsAsync(new ListObjectsArgs().WithBucket(_bucket).WithPrefix(criteria.Prefix).WithRecursive(true), cancellationToken);
observable.Subscribe(item => {
if (item.IsDir)
return;
if (criteria.Pattern != null && !criteria.Pattern.IsMatch(item.Key)) {

if (criteria.Pattern != null && !criteria.Pattern.IsMatch(item.Key)) {
_logger.LogTrace("Skipping {Path}: Doesn't match pattern", item.Key);
return;
}
list.Add(item);

list.Add(item);
}, error => {
if (error.GetType().ToString() != "Minio.EmptyBucketOperation") {
_logger.LogError(error, "Error getting file list: {Message}", error.Message);
Expand All @@ -301,7 +309,7 @@ private Task<List<FileSpec>> GetFileListAsync(string searchPattern = null, int?

if (skip.HasValue)
list = list.Skip(skip.Value).ToList();

if (limit.HasValue)
list = list.Take(limit.Value).ToList();

Expand All @@ -325,14 +333,14 @@ private class SearchCriteria {
private SearchCriteria GetRequestCriteria(string searchPattern) {
if (String.IsNullOrEmpty(searchPattern))
return new SearchCriteria { Prefix = String.Empty };

string normalizedSearchPattern = NormalizePath(searchPattern);
int wildcardPos = normalizedSearchPattern.IndexOf('*');
bool hasWildcard = wildcardPos >= 0;

string prefix = normalizedSearchPattern;
Regex patternRegex = null;

if (hasWildcard) {
patternRegex = new Regex($"^{Regex.Escape(normalizedSearchPattern).Replace("\\*", ".*?")}$");
int slashPos = normalizedSearchPattern.LastIndexOf('/');
Expand Down Expand Up @@ -372,10 +380,10 @@ private SearchCriteria GetRequestCriteria(string searchPattern) {

if (secure)
client.WithSSL();

return (client, connectionString.Bucket);
}

public void Dispose() { }
}
}

0 comments on commit 5afca51

Please sign in to comment.