Skip to content

Commit

Permalink
server: add more prometheus metrics (#141)
Browse files Browse the repository at this point in the history
* prometheus metrics for cache hit rate
* status codes for bucket requests
* differentiate between root/leaf requests
  • Loading branch information
msbarry authored Feb 18, 2024
1 parent 82257ce commit f242507
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 80 deletions.
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func main() {
logger.Fatalf("Failed to create new server, %v", err)
}

pmtiles.SetBuildInfo(version, commit, date)
server.Start()

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
59 changes: 32 additions & 27 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type Bucket interface {
Close() error
NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error)
NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error)
NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error)
}

// RefreshRequiredError is an error that indicates the etag has chanced on the remote file
Expand All @@ -46,25 +46,25 @@ func (m mockBucket) Close() error {
}

func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) {
body, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err

}
func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) {
func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) {
bs, ok := m.items[key]
if !ok {
return nil, "", fmt.Errorf("Not found %s", key)
return nil, "", 404, fmt.Errorf("Not found %s", key)
}

resultEtag := generateEtag(bs)
if len(etag) > 0 && resultEtag != etag {
return nil, "", &RefreshRequiredError{}
return nil, "", 412, &RefreshRequiredError{}
}
if offset+length > int64(len(bs)) {
return nil, "", &RefreshRequiredError{416}
return nil, "", 416, &RefreshRequiredError{416}
}

return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil
return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil
}

// FileBucket is a bucket backed by a directory on disk
Expand All @@ -73,7 +73,7 @@ type FileBucket struct {
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

Expand Down Expand Up @@ -102,30 +102,30 @@ func generateEtagFromInts(ns ...int64) string {
return hasherToEtag(hasher)
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
if err != nil {
return nil, "", err
return nil, "", 404, err
}
info, err := file.Stat()
if err != nil {
return nil, "", err
return nil, "", 404, err
}
newEtag := generateEtagFromInts(info.ModTime().UnixNano(), info.Size())
if len(etag) > 0 && etag != newEtag {
return nil, "", &RefreshRequiredError{}
return nil, "", 412, &RefreshRequiredError{}
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)
if err != nil {
return nil, "", err
return nil, "", 500, err
}
if read != int(length) {
return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}
return io.NopCloser(bytes.NewReader(result)), newEtag, nil
return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil
}

func (b FileBucket) Close() error {
Expand All @@ -143,16 +143,16 @@ type HTTPBucket struct {
}

func (b HTTPBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reqURL := b.baseURL + "/" + key

req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, "", err
return nil, "", 500, err
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
Expand All @@ -162,7 +162,7 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset,

resp, err := b.client.Do(req)
if err != nil {
return nil, "", err
return nil, "", resp.StatusCode, err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
Expand All @@ -172,10 +172,10 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset,
} else {
err = fmt.Errorf("HTTP error: %d", resp.StatusCode)
}
return nil, "", err
return nil, "", resp.StatusCode, err
}

return resp.Body, resp.Header.Get("ETag"), nil
return resp.Body, resp.Header.Get("ETag"), resp.StatusCode, nil
}

func (b HTTPBucket) Close() error {
Expand All @@ -191,11 +191,11 @@ type BucketAdapter struct {
}

func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{
BeforeRead: func(asFunc func(interface{}) bool) error {
var req *s3.GetObjectInput
Expand All @@ -205,20 +205,25 @@ func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offs
return nil
},
})
status := 206
if err != nil {
var resp awserr.RequestFailure
errors.As(err, &resp)
if resp != nil && isRefreshRequredCode(resp.StatusCode()) {
return nil, "", &RefreshRequiredError{resp.StatusCode()}
status = 404
if resp != nil {
status = resp.StatusCode()
if isRefreshRequredCode(resp.StatusCode()) {
return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()}
}
}
return nil, "", err
return nil, "", status, err
}
resultETag := ""
var resp s3.GetObjectOutput
if reader.As(&resp) {
resultETag = *resp.ETag
}
return reader, resultETag, nil
return reader, resultETag, status, nil
}

func (ba BucketAdapter) Close() error {
Expand Down
33 changes: 22 additions & 11 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func TestHttpBucketRequestNormal(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "")
data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "")
assert.Equal(t, "", mock.request.Header.Get("If-Match"))
assert.Equal(t, "bytes=100-102", mock.request.Header.Get("Range"))
assert.Equal(t, "http://tiles.example.com/tiles/a/b/c", mock.request.URL.String())
assert.Equal(t, 200, status)
assert.Nil(t, err)
b, err := io.ReadAll(data)
assert.Nil(t, err)
Expand All @@ -85,8 +86,9 @@ func TestHttpBucketRequestRequestEtag(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 200, status)
assert.Nil(t, err)
b, err := io.ReadAll(data)
assert.Nil(t, err)
Expand All @@ -105,17 +107,20 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
_, _, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))

mock.response.StatusCode = 416
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, 416, status)
assert.True(t, isRefreshRequredError(err))

mock.response.StatusCode = 404
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
assert.Equal(t, 404, status)
}

func TestFileBucketReplace(t *testing.T) {
Expand All @@ -129,23 +134,26 @@ func TestFileBucketReplace(t *testing.T) {
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
}

Expand All @@ -163,7 +171,8 @@ func TestFileBucketRename(t *testing.T) {
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
Expand All @@ -172,14 +181,16 @@ func TestFileBucketRename(t *testing.T) {
// change file, verify etag changes
os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles"))
os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles"))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
}
Loading

0 comments on commit f242507

Please sign in to comment.