Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Azure reload in case the file is changing #177

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,27 @@ To upload your files to AWS S3 you will need an IAM policy for writing/reading t
]
}

## Azure Specific Dev Notes
Run a local blobstore emulator:
```shell
podman run -p 10000:10000 -v .:/log mcr.microsoft.com/azure-storage/azurite \
azurite-blob --debug /log/debug.log
```
Azurite accepts the same well-known account and key used by the legacy Azure Storage Emulator.

- Account name: `devstoreaccount1`
- Account key: `Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==`

Uploading files:
```sh
export AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
az storage container create --name data
az storage blob upload --file a1.pmtiles --container-name data --name a.pmtiles
az storage blob upload --overwrite --file a2.pmtiles --container-name data --name a.pmtiles
```


Starting pmtiles:
```sh
AZURE_STORAGE_ACCOUNT=devstoreaccount1 AZURE_STORAGE_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ./go-pmtiles serve --port=8084 / --bucket="azblob://data?protocol=http&domain=127.0.0.1:10000"
```
71 changes: 65 additions & 6 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"path/filepath"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -48,8 +50,8 @@
func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) {
body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err

}

func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) {
bs, ok := m.items[key]
if !ok {
Expand Down Expand Up @@ -111,7 +113,7 @@
return hasherToEtag(hasher)
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {

Check warning on line 116 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReaderEtag should have comment or be unexported
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
Expand Down Expand Up @@ -144,7 +146,7 @@
return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil
}

func (b FileBucket) Close() error {

Check warning on line 149 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.Close should have comment or be unexported
return nil
}

Expand Down Expand Up @@ -202,16 +204,16 @@
return code == http.StatusPreconditionFailed || code == http.StatusRequestedRangeNotSatisfiable
}

type BucketAdapter struct {
type S3BucketAdapter struct {
Bucket *blob.Bucket
}

func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
func (ba S3BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
func (ba S3BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{
BeforeRead: func(asFunc func(interface{}) bool) error {
var req *s3.GetObjectInput
Expand Down Expand Up @@ -242,7 +244,59 @@
return reader, resultETag, status, nil
}

func (ba BucketAdapter) Close() error {
func (ba S3BucketAdapter) Close() error {
return ba.Bucket.Close()
}

type AzureBucketAdapter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to centralize this logic in the existing Bucket Adapter instead of creating separate implementations for each of (Azure, S3, etc)? That will be more maintainable in the long term and takes better advantage of gocloud.

Copy link
Author

@akhenakh akhenakh Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried that in this updated version, there is nothing left in a "BaseBucketAdapter" cause everything we do here is using the provider specific APIs (As()), so it's not generic anymore.

The only common left is now in handleReaderResponse()

I think I almost prefer the explicit version with both adapters and a few repetitions,

Bucket *blob.Bucket
}

func (ba AzureBucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (ba AzureBucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{
BeforeRead: func(asFunc func(interface{}) bool) error {
var req *azblobblob.DownloadStreamOptions
if len(etag) > 0 && asFunc(&req) {
etag := azcore.ETag(etag)
if req.AccessConditions == nil {
req.AccessConditions = &azblobblob.AccessConditions{}
}
if req.AccessConditions.ModifiedAccessConditions == nil {
req.AccessConditions.ModifiedAccessConditions = &azblobblob.ModifiedAccessConditions{}
}
req.AccessConditions.ModifiedAccessConditions.IfMatch = &etag
}
return nil
},
})
status := 206
if err != nil {
var resp *azcore.ResponseError
errors.As(err, &resp)
status = 404
if resp != nil {
status = resp.StatusCode
if isRefreshRequiredCode(resp.StatusCode) {
return nil, "", resp.StatusCode, &RefreshRequiredError{resp.StatusCode}
}
}
return nil, "", status, err
}
resultETag := ""
var resp azblobblob.DownloadStreamResponse
if reader.As(&resp) {
resultETag = string(*resp.ETag)
}

return reader, resultETag, status, nil
}

func (ba AzureBucketAdapter) Close() error {
return ba.Bucket.Close()
}

Expand Down Expand Up @@ -293,13 +347,18 @@
bucket := NewFileBucket(filepath.FromSlash(path))
return bucket, nil
}

bucket, err := blob.OpenBucket(ctx, bucketURL)
if err != nil {
return nil, err
}
if bucketPrefix != "" && bucketPrefix != "/" && bucketPrefix != "." {
bucket = blob.PrefixedBucket(bucket, path.Clean(bucketPrefix)+string(os.PathSeparator))
}
wrappedBucket := BucketAdapter{bucket}
if strings.HasPrefix(bucketURL, "azblob") {
bucket := AzureBucketAdapter{bucket}
return bucket, nil
}
wrappedBucket := S3BucketAdapter{bucket}
return wrappedBucket, err
}
Loading