Skip to content

Commit

Permalink
Adding copy operation to Azure Blob Storage and AWS S3 bindings
Browse files Browse the repository at this point in the history
Signed-off-by: ytimocin <ytimocin@microsoft.com>
  • Loading branch information
ytimocin committed Nov 4, 2024
1 parent b969bbf commit 720e252
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 13 deletions.
2 changes: 2 additions & 0 deletions bindings/aws/s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "aws"
Expand Down
54 changes: 52 additions & 2 deletions bindings/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s *AWSS3) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
presignOperation,
}
}
Expand Down Expand Up @@ -240,7 +241,7 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
}, nil
}

func (s *AWSS3) presign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (s *AWSS3) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := s.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err)
Expand Down Expand Up @@ -389,6 +390,53 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
}, nil
}

func (s *AWSS3) copy(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
_, err := s.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err)
}

source := req.Metadata["source"]
if source == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'source' missing")
}

destinationBucket := req.Metadata["bucket"]
if destinationBucket == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'bucket' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'destinationKey' missing")
}

_, err = s.s3Client.CopyObject(&s3.CopyObjectInput{
// Bucket is the destination bucket.
Bucket: ptr.Of(destinationBucket),

// CopySource is the source bucket and key.
CopySource: ptr.Of(source),

// Key is the key of the destination object.
Key: ptr.Of(destinationKey),

// MetadataDirective is the directive to apply to the metadata of the destination object.
MetadataDirective: ptr.Of(s3.MetadataDirectiveCopy),
})
if err != nil {
return nil, fmt.Errorf("s3 binding error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"source": source,
"destinationBucket": destinationBucket,
"destinationKey": destinationKey,
},
}, nil
}

func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -399,8 +447,10 @@ func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
return s.delete(ctx, req)
case bindings.ListOperation:
return s.list(ctx, req)
case bindings.CopyOperation:
return s.copy(req)
case presignOperation:
return s.presign(ctx, req)
return s.presign(req)
default:
return nil, fmt.Errorf("s3 binding error: unsupported operation %s", req.Operation)
}
Expand Down
118 changes: 113 additions & 5 deletions bindings/azure/blobstorage/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
"io"
"reflect"
"strconv"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/google/uuid"

"github.com/dapr/components-contrib/bindings"
storagecommon "github.com/dapr/components-contrib/common/component/azure/blobstorage"
contribMetadata "github.com/dapr/components-contrib/metadata"
contribmetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
Expand All @@ -57,8 +59,9 @@ const (
// Specifies the maximum number of blobs to return, including all BlobPrefix elements. If the request does not
// specify maxresults the server will return up to 5,000 items.
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs#uri-parameters
maxResults int32 = 5000
endpointKey = "endpoint"
maxResults int32 = 5000
endpointKey = "endpoint"
presignOperation = "presign"
)

var ErrMissingBlobName = errors.New("blobName is a required attribute")
Expand All @@ -76,6 +79,10 @@ type createResponse struct {
BlobName string `json:"blobName"`
}

type presignResponse struct {
PresignURL string `json:"presignURL"`
}

type listInclude struct {
Copy bool `json:"copy"`
Metadata bool `json:"metadata"`
Expand Down Expand Up @@ -112,6 +119,7 @@ func (a *AzureBlobStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
}
}

Expand Down Expand Up @@ -344,6 +352,102 @@ func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest
}, nil
}

func (a *AzureBlobStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sourceBlobName := req.Metadata["sourceBlobName"]
if sourceBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceBlobName' missing")
}

destinationBlobName := req.Metadata["destinationBlobName"]
if destinationBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationBlobName' missing")
}

sourceContainerName := req.Metadata["sourceContainerName"]
if sourceContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceContainerName' missing")
}

destinationContainerName := req.Metadata["destinationContainerName"]
if destinationContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationContainerName' missing")
}

sourceBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", sourceContainerName, sourceBlobName))
destinationBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", destinationContainerName, destinationBlobName))

copyURL := sourceBlobClient.URL()
_, err := destinationBlobClient.StartCopyFromURL(ctx, copyURL, nil)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceBlobName": sourceBlobName,
"destinationBlobName": destinationBlobName,
},
}, nil
}

func (a *AzureBlobStorage) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
blobName := req.Metadata[metadataKeyBlobName]
if blobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata '%s' missing", metadataKeyBlobName)
}

presignTTL := req.Metadata["presignTTL"]
if presignTTL == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'presignTTL' missing")
}

ttl, err := time.ParseDuration(presignTTL)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: cannot parse duration %s: %w", presignTTL, err)
}

blobClient := a.containerClient.NewBlockBlobClient(blobName)
sasURL, err := a.generateSASURL(blobClient, ttl)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: %w", err)
}

jsonResponse, err := json.Marshal(presignResponse{
PresignURL: sasURL,
})
if err != nil {
return nil, fmt.Errorf("s3 binding error: error marshalling presign response: %w", err)
}

return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

func (a *AzureBlobStorage) generateSASURL(blobClient *blockblob.Client, ttl time.Duration) (string, error) {
permissions := sas.AccountPermissions{
Read: true,
}

sasValues := sas.AccountSignatureValues{
Protocol: sas.ProtocolHTTPS,
ExpiryTime: time.Now().UTC().Add(ttl),
Permissions: permissions.String(),
}

credential, err := azblob.NewSharedKeyCredential(a.metadata.AccountName, a.metadata.AccountKey)
if err != nil {
return "", fmt.Errorf("error creating shared key credential: %w", err)
}

sasQueryParams, err := sasValues.SignWithSharedKey(credential)
if err != nil {
return "", fmt.Errorf("error generating SAS query parameters: %w", err)
}

return fmt.Sprintf("%s?%s", blobClient.URL(), sasQueryParams.Encode()), nil
}

func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -354,6 +458,10 @@ func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeReque
return a.delete(ctx, req)
case bindings.ListOperation:
return a.list(ctx, req)
case bindings.CopyOperation:
return a.copy(ctx, req)
case presignOperation:
return a.presign(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand All @@ -371,9 +479,9 @@ func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.De
}

// GetComponentMetadata returns the metadata of the component.
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribmetadata.MetadataMap) {
metadataStruct := storagecommon.BlobStorageMetadata{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType)
contribmetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribmetadata.BindingType)
return
}

Expand Down
2 changes: 2 additions & 0 deletions bindings/azure/blobstorage/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "azuread"
Expand Down
34 changes: 32 additions & 2 deletions bindings/gcp/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
signOperation,
}
}
Expand All @@ -153,8 +154,10 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
return g.delete(ctx, req)
case bindings.ListOperation:
return g.list(ctx, req)
case bindings.CopyOperation:
return g.copy(ctx, req)
case signOperation:
return g.sign(ctx, req)
return g.sign(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand Down Expand Up @@ -307,6 +310,33 @@ func (g *GCPStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bi
}, nil
}

func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sourceKey := req.Metadata["sourceKey"]
if sourceKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'sourceKey' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'destinationKey' missing")
}

src := g.client.Bucket(g.metadata.Bucket).Object(sourceKey)
dst := g.client.Bucket(g.metadata.Bucket).Object(destinationKey)

_, err := dst.CopierFrom(src).Run(ctx)
if err != nil {
return nil, fmt.Errorf("gcp bucket binding error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceKey": sourceKey,
"destinationKey": destinationKey,
},
}, nil
}

func (g *GCPStorage) Close() error {
return g.client.Close()
}
Expand Down Expand Up @@ -345,7 +375,7 @@ func (g *GCPStorage) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
return
}

func (g *GCPStorage) sign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (g *GCPStorage) sign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := g.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("gcp binding error. error merge metadata : %w", err)
Expand Down
8 changes: 4 additions & 4 deletions bindings/gcp/bucket/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func TestParseMetadata(t *testing.T) {
t.Run("check backward compatibility", func(t *testing.T) {
gs := GCPStorage{logger: logger.NewLogger("test")}

request := bindings.InvokeRequest{}
request.Operation = bindings.CreateOperation
request.Metadata = map[string]string{
"name": "my_file.txt",
request := bindings.InvokeRequest{
Metadata: map[string]string{
"name": "my_file.txt",
},
}
result := gs.handleBackwardCompatibilityForMetadata(request.Metadata)
assert.NotEmpty(t, result["key"])
Expand Down
1 change: 1 addition & 0 deletions bindings/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
CreateOperation OperationKind = "create"
DeleteOperation OperationKind = "delete"
ListOperation OperationKind = "list"
CopyOperation OperationKind = "copy"
)

// GetMetadataAsBool parses metadata as bool.
Expand Down

0 comments on commit 720e252

Please sign in to comment.