diff --git a/README.md b/README.md index 019bfa1..3bab568 100644 --- a/README.md +++ b/README.md @@ -84,3 +84,27 @@ To upload your files to AWS S3 you will need an IAM policy for writing/reading t ] } +## Azure Specific Dev Notes +Run a local blobstore emulator: +```shell +podman run -p 10000:10000 -v .:/log mcr.microsoft.com/azure-storage/azurite \ + azurite-blob --debug /log/debug.log +``` +Azurite accepts the same well-known account and key used by the legacy Azure Storage Emulator. + +- Account name: `devstoreaccount1` +- Account key: `Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==` + +Uploading files: +```sh +export AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;" +az storage container create --name data +az storage blob upload --file a1.pmtiles --container-name data --name a.pmtiles +az storage blob upload --overwrite --file a2.pmtiles --container-name data --name a.pmtiles +``` + + +Starting pmtiles: +```sh +AZURE_STORAGE_ACCOUNT=devstoreaccount1 AZURE_STORAGE_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ./go-pmtiles serve --port=8084 / --bucket="azblob://data?protocol=http&domain=127.0.0.1:10000" +``` diff --git a/pmtiles/bucket.go b/pmtiles/bucket.go index 70850d5..26d4550 100644 --- a/pmtiles/bucket.go +++ b/pmtiles/bucket.go @@ -15,6 +15,8 @@ import ( "path/filepath" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/cespare/xxhash/v2" @@ -28,7 +30,7 @@ type Bucket interface { NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) } -// RefreshRequiredError is an error that indicates the etag has chanced on the remote file +// RefreshRequiredError is an error that indicates the etag has changed on the remote file type RefreshRequiredError struct { StatusCode int } @@ -48,8 +50,8 @@ func (m mockBucket) Close() error { func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) { body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err - } + func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) { bs, ok := m.items[key] if !ok { @@ -202,16 +204,41 @@ func isRefreshRequiredCode(code int) bool { return code == http.StatusPreconditionFailed || code == http.StatusRequestedRangeNotSatisfiable } -type BucketAdapter struct { - Bucket *blob.Bucket +// StatusCoder is used to get the actual status code out from a cloud vendor specific error +type StatusCoder interface { + StatusCodeFromError(error) int } -func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") - return body, err +// handleReaderResponse is the common main logic for all cloud +func handleReaderResponse(reader *blob.Reader, b StatusCoder, err error, getETag func(interface{}) string) (io.ReadCloser, string, int, error) { + status := 206 + if err != nil { + status = 404 + + statusCode := b.StatusCodeFromError(err) + + if isRefreshRequiredCode(statusCode) { + return nil, "", statusCode, &RefreshRequiredError{statusCode} + } + + return nil, "", status, err + } + + resultETag := "" + reader.As(func(resp interface{}) bool { + resultETag = getETag(resp) + return true + }) + + return reader, resultETag, status, nil +} + +// S3BucketAdapter implements the Bucket interface for S3 +type S3BucketAdapter struct { + *blob.Bucket } -func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { +func (ba S3BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{ BeforeRead: func(asFunc func(interface{}) bool) error { var req *s3.GetObjectInput @@ -221,31 +248,83 @@ func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offs return nil }, }) - status := 206 - if err != nil { - var resp awserr.RequestFailure - errors.As(err, &resp) - status = 404 - if resp != nil { - status = resp.StatusCode() - if isRefreshRequiredCode(resp.StatusCode()) { - return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()} - } + return handleReaderResponse(reader, ba, err, func(resp interface{}) string { + if s3Resp, ok := resp.(*s3.GetObjectOutput); ok { + return *s3Resp.ETag } - return nil, "", status, err - } - resultETag := "" - var resp s3.GetObjectOutput - if reader.As(&resp) { - resultETag = *resp.ETag + return "" + }) +} + +func (ba S3BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { + body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") + return body, err +} + +func (ba S3BucketAdapter) Close() error { + return ba.Bucket.Close() +} + +func (ba S3BucketAdapter) StatusCodeFromError(err error) int { + var resp awserr.RequestFailure + errors.As(err, &resp) + status := 404 + if resp != nil { + status = resp.StatusCode() } - return reader, resultETag, status, nil + + return status +} + +type AzureBucketAdapter struct { + Bucket *blob.Bucket +} + +func (ba AzureBucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { + body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") + return body, err +} + +func (ba AzureBucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { + reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{ + BeforeRead: func(asFunc func(interface{}) bool) error { + var req *azblobblob.DownloadStreamOptions + if len(etag) > 0 && asFunc(&req) { + azureEtag := azcore.ETag(etag) + if req.AccessConditions == nil { + req.AccessConditions = &azblobblob.AccessConditions{} + } + if req.AccessConditions.ModifiedAccessConditions == nil { + req.AccessConditions.ModifiedAccessConditions = &azblobblob.ModifiedAccessConditions{} + } + req.AccessConditions.ModifiedAccessConditions.IfMatch = &azureEtag + } + return nil + }, + }) + return handleReaderResponse(reader, ba, err, func(resp interface{}) string { + if azureResp, ok := resp.(*azblobblob.DownloadStreamResponse); ok { + return string(*azureResp.ETag) + } + return "" + }) } -func (ba BucketAdapter) Close() error { +func (ba AzureBucketAdapter) Close() error { return ba.Bucket.Close() } +func (ba AzureBucketAdapter) StatusCodeFromError(err error) int { + var resp *azcore.ResponseError + errors.As(err, &resp) + status := 404 + if resp != nil { + return resp.StatusCode + } + + return status +} + func NormalizeBucketKey(bucket string, prefix string, key string) (string, string, error) { if bucket == "" { if strings.HasPrefix(key, "http") { @@ -293,6 +372,7 @@ func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Buc bucket := NewFileBucket(filepath.FromSlash(path)) return bucket, nil } + bucket, err := blob.OpenBucket(ctx, bucketURL) if err != nil { return nil, err @@ -300,6 +380,10 @@ func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Buc if bucketPrefix != "" && bucketPrefix != "/" && bucketPrefix != "." { bucket = blob.PrefixedBucket(bucket, path.Clean(bucketPrefix)+string(os.PathSeparator)) } - wrappedBucket := BucketAdapter{bucket} + if strings.HasPrefix(bucketURL, "azblob") { + bucket := AzureBucketAdapter{bucket} + return bucket, nil + } + wrappedBucket := S3BucketAdapter{bucket} return wrappedBucket, err }