From f242507c8455971c7c4f97388d321da4980f15be Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Sun, 18 Feb 2024 01:30:00 -0500 Subject: [PATCH] server: add more prometheus metrics (#141) * prometheus metrics for cache hit rate * status codes for bucket requests * differentiate between root/leaf requests --- main.go | 1 + pmtiles/bucket.go | 59 +++++----- pmtiles/bucket_test.go | 33 ++++-- pmtiles/server.go | 143 +++++++++++++++++------- pmtiles/server_metrics.go | 226 ++++++++++++++++++++++++++++++++++++++ pmtiles/server_test.go | 2 + 6 files changed, 384 insertions(+), 80 deletions(-) create mode 100644 pmtiles/server_metrics.go diff --git a/main.go b/main.go index e6e1a80..b117b46 100644 --- a/main.go +++ b/main.go @@ -136,6 +136,7 @@ func main() { logger.Fatalf("Failed to create new server, %v", err) } + pmtiles.SetBuildInfo(version, commit, date) server.Start() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { diff --git a/pmtiles/bucket.go b/pmtiles/bucket.go index e83c15d..8dbc11c 100644 --- a/pmtiles/bucket.go +++ b/pmtiles/bucket.go @@ -25,7 +25,7 @@ import ( type Bucket interface { Close() error NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) - NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) + NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) } // RefreshRequiredError is an error that indicates the etag has chanced on the remote file @@ -46,25 +46,25 @@ func (m mockBucket) Close() error { } func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) { - body, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) { +func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) { bs, ok := m.items[key] if !ok { - return nil, "", fmt.Errorf("Not found %s", key) + return nil, "", 404, fmt.Errorf("Not found %s", key) } resultEtag := generateEtag(bs) if len(etag) > 0 && resultEtag != etag { - return nil, "", &RefreshRequiredError{} + return nil, "", 412, &RefreshRequiredError{} } if offset+length > int64(len(bs)) { - return nil, "", &RefreshRequiredError{416} + return nil, "", 416, &RefreshRequiredError{416} } - return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil + return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil } // FileBucket is a bucket backed by a directory on disk @@ -73,7 +73,7 @@ type FileBucket struct { } func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } @@ -102,30 +102,30 @@ func generateEtagFromInts(ns ...int64) string { return hasherToEtag(hasher) } -func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { name := filepath.Join(b.path, key) file, err := os.Open(name) defer file.Close() if err != nil { - return nil, "", err + return nil, "", 404, err } info, err := file.Stat() if err != nil { - return nil, "", err + return nil, "", 404, err } newEtag := generateEtagFromInts(info.ModTime().UnixNano(), info.Size()) if len(etag) > 0 && etag != newEtag { - return nil, "", &RefreshRequiredError{} + return nil, "", 412, &RefreshRequiredError{} } result := make([]byte, length) read, err := file.ReadAt(result, offset) if err != nil { - return nil, "", err + return nil, "", 500, err } if read != int(length) { - return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read) + return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read) } - return io.NopCloser(bytes.NewReader(result)), newEtag, nil + return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil } func (b FileBucket) Close() error { @@ -143,16 +143,16 @@ type HTTPBucket struct { } func (b HTTPBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { reqURL := b.baseURL + "/" + key req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { - return nil, "", err + return nil, "", 500, err } req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) @@ -162,7 +162,7 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, resp, err := b.client.Do(req) if err != nil { - return nil, "", err + return nil, "", resp.StatusCode, err } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { @@ -172,10 +172,10 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, } else { err = fmt.Errorf("HTTP error: %d", resp.StatusCode) } - return nil, "", err + return nil, "", resp.StatusCode, err } - return resp.Body, resp.Header.Get("ETag"), nil + return resp.Body, resp.Header.Get("ETag"), resp.StatusCode, nil } func (b HTTPBucket) Close() error { @@ -191,11 +191,11 @@ type BucketAdapter struct { } func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{ BeforeRead: func(asFunc func(interface{}) bool) error { var req *s3.GetObjectInput @@ -205,20 +205,25 @@ func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offs return nil }, }) + status := 206 if err != nil { var resp awserr.RequestFailure errors.As(err, &resp) - if resp != nil && isRefreshRequredCode(resp.StatusCode()) { - return nil, "", &RefreshRequiredError{resp.StatusCode()} + status = 404 + if resp != nil { + status = resp.StatusCode() + if isRefreshRequredCode(resp.StatusCode()) { + return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()} + } } - return nil, "", err + return nil, "", status, err } resultETag := "" var resp s3.GetObjectOutput if reader.As(&resp) { resultETag = *resp.ETag } - return reader, resultETag, nil + return reader, resultETag, status, nil } func (ba BucketAdapter) Close() error { diff --git a/pmtiles/bucket_test.go b/pmtiles/bucket_test.go index 257b2f0..bf247c0 100644 --- a/pmtiles/bucket_test.go +++ b/pmtiles/bucket_test.go @@ -63,10 +63,11 @@ func TestHttpBucketRequestNormal(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "") + data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "") assert.Equal(t, "", mock.request.Header.Get("If-Match")) assert.Equal(t, "bytes=100-102", mock.request.Header.Get("Range")) assert.Equal(t, "http://tiles.example.com/tiles/a/b/c", mock.request.URL.String()) + assert.Equal(t, 200, status) assert.Nil(t, err) b, err := io.ReadAll(data) assert.Nil(t, err) @@ -85,8 +86,9 @@ func TestHttpBucketRequestRequestEtag(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.Equal(t, "etag1", mock.request.Header.Get("If-Match")) + assert.Equal(t, 200, status) assert.Nil(t, err) b, err := io.ReadAll(data) assert.Nil(t, err) @@ -105,17 +107,20 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - _, _, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.Equal(t, "etag1", mock.request.Header.Get("If-Match")) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) mock.response.StatusCode = 416 - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + assert.Equal(t, 416, status) assert.True(t, isRefreshRequredError(err)) mock.response.StatusCode = 404 - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.False(t, isRefreshRequredError(err)) + assert.Equal(t, 404, status) } func TestFileBucketReplace(t *testing.T) { @@ -129,7 +134,8 @@ func TestFileBucketReplace(t *testing.T) { assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) // first read from file - reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err := io.ReadAll(reader) assert.Nil(t, err) @@ -137,7 +143,8 @@ func TestFileBucketReplace(t *testing.T) { // change file, verify etag changes assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666)) - reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err = io.ReadAll(reader) assert.Nil(t, err) @@ -145,7 +152,8 @@ func TestFileBucketReplace(t *testing.T) { assert.Equal(t, []byte{5}, data) // and requesting with old etag fails with refresh required error - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) } @@ -163,7 +171,8 @@ func TestFileBucketRename(t *testing.T) { assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) // first read from file - reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err := io.ReadAll(reader) assert.Nil(t, err) @@ -172,7 +181,8 @@ func TestFileBucketRename(t *testing.T) { // change file, verify etag changes os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles")) os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles")) - reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err = io.ReadAll(reader) assert.Nil(t, err) @@ -180,6 +190,7 @@ func TestFileBucketRename(t *testing.T) { assert.Equal(t, []byte{5}, data) // and requesting with old etag fails with refresh required error - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) } diff --git a/pmtiles/server.go b/pmtiles/server.go index e9fb6cd..8111b3d 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -13,8 +13,6 @@ import ( "regexp" "strconv" "time" - - "github.com/prometheus/client_golang/prometheus" ) type cacheKey struct { @@ -53,6 +51,7 @@ type Server struct { cacheSize int cors string publicURL string + metrics *metrics } // NewServer creates a new pmtiles HTTP server. @@ -87,18 +86,12 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize cacheSize: cacheSize, cors: cors, publicURL: publicURL, + metrics: createMetrics("", logger), // change scope string if there are multiple servers running in one process } return l, nil } -func register[K prometheus.Collector](server *Server, metric K) K { - if err := prometheus.Register(metric); err != nil { - server.logger.Println(err) - } - return metric -} - // Start the server HTTP listener. func (server *Server) Start() { @@ -109,19 +102,14 @@ func (server *Server) Start() { evictList := list.New() totalSize := 0 ctx := context.Background() - - cacheSize := register(server, prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "pmtiles", - Subsystem: "cache", - Name: "size", - Help: "Current number or directories in the cache", - })) + server.metrics.initCacheStats(server.cacheSize * 1000 * 1000) for { select { case req := <-server.reqs: if len(req.purgeEtag) > 0 { if _, dup := inflight[req.key]; !dup { + server.metrics.reloadFile(req.key.name) server.logger.Printf("re-fetching directories for changed file %s", req.key.name) } for k, v := range cache { @@ -132,19 +120,26 @@ func (server *Server) Start() { totalSize -= resp.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } key := req.key + isRoot := (key.offset == 0 && key.length == 0) + kind := "leaf" + if isRoot { + kind = "root" + } if val, ok := cache[key]; ok { evictList.MoveToFront(val) req.value <- val.Value.(*response).value + server.metrics.cacheRequest(key.name, kind, "hit") } else if _, ok := inflight[key]; ok { inflight[key] = append(inflight[key], req) + server.metrics.cacheRequest(key.name, kind, "hit") // treat inflight as a hit since it doesn't make a new server request } else { inflight[key] = []request{req} + server.metrics.cacheRequest(key.name, kind, "miss") go func() { var result cachedValue - isRoot := (key.offset == 0 && key.length == 0) offset := int64(key.offset) length := int64(key.length) @@ -154,8 +149,13 @@ func (server *Server) Start() { length = 16384 } + status := "" + tracker := server.metrics.startBucketRequest(key.name, kind) + defer func() { tracker.finish(ctx, status) }() + server.logger.Printf("fetching %s %d-%d", key.name, offset, length) - r, etag, err := server.bucket.NewRangeReaderEtag(ctx, key.name+".pmtiles", offset, length, key.etag) + r, etag, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, key.name+".pmtiles", offset, length, key.etag) + status = strconv.Itoa(statusCode) if err != nil { ok = false @@ -168,6 +168,7 @@ func (server *Server) Start() { b, err := io.ReadAll(r) if err != nil { ok = false + status = "error" resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -176,6 +177,7 @@ func (server *Server) Start() { if isRoot { header, err := deserializeHeader(b[0:HeaderV3LenBytes]) if err != nil { + status = "error" server.logger.Printf("parsing header failed: %v", err) return } @@ -224,7 +226,7 @@ func (server *Server) Start() { totalSize -= kv.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } } } @@ -249,7 +251,11 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE return false, HeaderV3{}, nil, "", nil } - r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) + status := "" + tracker := server.metrics.startBucketRequest(name, "metadata") + defer func() { tracker.finish(ctx, status) }() + r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) + status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { return false, HeaderV3{}, nil, rootValue.etag, nil } @@ -266,6 +272,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE } else if header.InternalCompression == NoCompression { metadataBytes, err = io.ReadAll(r) } else { + status = "error" return true, HeaderV3{}, nil, "", errors.New("unknown compression") } @@ -381,18 +388,29 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string } if entry.RunLength > 0 { - r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) + status := "" + tracker := server.metrics.startBucketRequest(name, "tile") + defer func() { tracker.finish(ctx, status) }() + r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) + status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { - server.logger.Printf("failed to fetch tile %s %d-%d", name, entry.Offset, entry.Length) - return 404, httpHeaders, []byte("archive not found"), "" + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } + server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err) + return 404, httpHeaders, []byte("Tile not found"), "" } defer r.Close() b, err := io.ReadAll(r) if err != nil { + status = "error" + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } return 500, httpHeaders, []byte("I/O error"), "" } @@ -416,6 +434,10 @@ func isRefreshRequredError(err error) bool { return ok } +func isCanceled(ctx context.Context) bool { + return errors.Is(ctx.Err(), context.Canceled) +} + var tilePattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/(\d+)\/(\d+)\/(\d+)\.([a-z]+)$`) var metadataPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/metadata$`) var tileJSONPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\.json$`) @@ -448,48 +470,85 @@ func parseMetadataPath(path string) (bool, string) { return false, "" } -// Get a response for the given path. -// Return status code, HTTP headers, and body. -func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { - httpHeaders := make(map[string]string) +func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) { + handler = "" + archive = "" + headers = make(map[string]string) if len(server.cors) > 0 { - httpHeaders["Access-Control-Allow-Origin"] = server.cors + headers["Access-Control-Allow-Origin"] = server.cors } if ok, key, z, x, y, ext := parseTilePath(path); ok { - return server.getTile(ctx, httpHeaders, key, z, x, y, ext) - } - if ok, key := parseTilejsonPath(path); ok { - return server.getTileJSON(ctx, httpHeaders, key) - } - if ok, key := parseMetadataPath(path); ok { - return server.getMetadata(ctx, httpHeaders, key) + archive, handler = key, "tile" + status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext) + } else if ok, key := parseTilejsonPath(path); ok { + archive, handler = key, "tilejson" + status, headers, data = server.getTileJSON(ctx, headers, key) + } else if ok, key := parseMetadataPath(path); ok { + archive, handler = key, "metadata" + status, headers, data = server.getMetadata(ctx, headers, key) + } else if path == "/" { + handler, status, data = "/", 204, []byte{} + } else { + handler, status, data = "404", 404, []byte("Path not found") } - if path == "/" { - return 204, httpHeaders, []byte{} - } + return +} - return 404, httpHeaders, []byte("Path not found") +// Get a response for the given path. +// Return status code, HTTP headers, and body. +func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { + tracker := server.metrics.startRequest() + archive, handler, status, headers, data := server.get(ctx, path) + tracker.finish(ctx, archive, handler, status, len(data), true) + return status, headers, data +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (lrw *loggingResponseWriter) WriteHeader(code int) { + lrw.statusCode = code + lrw.ResponseWriter.WriteHeader(code) } // Serve an HTTP response from the archive func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { - statusCode, headers, body := server.Get(r.Context(), r.URL.Path) + tracker := server.metrics.startRequest() + if r.Method == http.MethodOptions { + if len(server.cors) > 0 { + w.Header().Set("Access-Control-Allow-Origin", server.cors) + } + w.WriteHeader(204) + tracker.finish(r.Context(), "", r.Method, 204, 0, false) + return 204 + } else if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.WriteHeader(405) + tracker.finish(r.Context(), "", r.Method, 405, 0, false) + return 405 + } + archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) for k, v := range headers { w.Header().Set(k, v) } if statusCode == 200 { + lrw := &loggingResponseWriter{w, 200} // handle if-match, if-none-match request headers based on response etag http.ServeContent( - w, r, + lrw, r, "", // name used to infer content-type, but we've already set that time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers bytes.NewReader(body), ) + statusCode = lrw.statusCode } else { w.WriteHeader(statusCode) w.Write(body) } + tracker.finish(r.Context(), archive, handler, statusCode, len(body), true) + return statusCode } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go new file mode 100644 index 0000000..c9f5385 --- /dev/null +++ b/pmtiles/server_metrics.go @@ -0,0 +1,226 @@ +package pmtiles + +import ( + "context" + "fmt" + "log" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var buildInfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildinfo", +}, []string{"version", "revision"}) + +var buildTimeMetric = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildtime", +}) + +func init() { + err := prometheus.Register(buildInfoMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } + err = prometheus.Register(buildTimeMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } +} + +// SetBuildInfo initializes static metrics with pmtiles version, git hash, and build time +func SetBuildInfo(version, commit, date string) { + buildInfoMetric.WithLabelValues(version, commit).Set(1) + time, err := time.Parse(time.RFC3339, date) + if err == nil { + buildTimeMetric.Set(float64(time.Unix())) + } else { + buildTimeMetric.Set(0) + } +} + +type metrics struct { + // overall requests: # requests, request duration, response size by archive/status code + requests *prometheus.CounterVec + responseSize *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + // dir cache: # requests, hits, cache entries, cache bytes, cache bytes limit + dirCacheEntries prometheus.Gauge + dirCacheSizeBytes prometheus.Gauge + dirCacheLimitBytes prometheus.Gauge + dirCacheRequests *prometheus.CounterVec + // requests to bucket: # total, response duration by archive/status code + bucketRequests *prometheus.CounterVec + bucketRequestDuration *prometheus.HistogramVec + // misc + reloads *prometheus.CounterVec +} + +// utility to time an overall tile request +type requestTracker struct { + finished bool + start time.Time + metrics *metrics +} + +func (m *metrics) startRequest() *requestTracker { + return &requestTracker{start: time.Now(), metrics: m} +} + +func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool) { + if !r.finished { + r.finished = true + // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives + statusString := strconv.Itoa(status) + if status == 404 { + archive = "" + } else if isCanceled(ctx) { + statusString = "canceled" + } + + labels := []string{archive, handler, statusString} + r.metrics.requests.WithLabelValues(labels...).Inc() + if logDetails { + r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) + r.metrics.requestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + } + } +} + +// utility to time an individual request to the underlying bucket +type bucketRequestTracker struct { + finished bool + start time.Time + metrics *metrics + archive string + kind string +} + +func (m *metrics) startBucketRequest(archive, kind string) *bucketRequestTracker { + return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive, kind: kind} +} + +func (r *bucketRequestTracker) finish(ctx context.Context, status string) { + if !r.finished { + r.finished = true + // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives + if status == "404" || status == "403" { + r.archive = "" + } else if isCanceled(ctx) { + status = "canceled" + } + r.metrics.bucketRequests.WithLabelValues(r.archive, r.kind, status).Inc() + r.metrics.bucketRequestDuration.WithLabelValues(r.archive, status).Observe(time.Since(r.start).Seconds()) + } +} + +// misc helpers + +func (m *metrics) reloadFile(name string) { + m.reloads.WithLabelValues(name).Inc() +} + +func (m *metrics) initCacheStats(limitBytes int) { + m.dirCacheLimitBytes.Set(float64(limitBytes)) + m.updateCacheStats(0, 0) +} + +func (m *metrics) updateCacheStats(sizeBytes, entries int) { + m.dirCacheEntries.Set(float64(entries)) + m.dirCacheSizeBytes.Set(float64(sizeBytes)) +} + +func (m *metrics) cacheRequest(archive, kind, status string) { + m.dirCacheRequests.WithLabelValues(archive, kind, status).Inc() +} + +func register[K prometheus.Collector](logger *log.Logger, metric K) K { + if err := prometheus.Register(metric); err != nil { + logger.Println(err) + } + return metric +} + +func createMetrics(scope string, logger *log.Logger) *metrics { + namespace := "pmtiles" + durationBuckets := prometheus.DefBuckets + kib := 1024.0 + mib := kib * kib + sizeBuckets := []float64{1.0 * kib, 5.0 * kib, 10.0 * kib, 25.0 * kib, 50.0 * kib, 100 * kib, 250 * kib, 500 * kib, 1.0 * mib} + + return &metrics{ + // overall requests + requests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "requests_total", + Help: "Overall number of requests to the service", + }, []string{"archive", "handler", "status"})), + responseSize: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "response_size_bytes", + Help: "Overall response size in bytes", + Buckets: sizeBuckets, + }, []string{"archive", "handler", "status"})), + requestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "request_duration_seconds", + Help: "Overall request duration in seconds", + Buckets: durationBuckets, + }, []string{"archive", "handler", "status"})), + + // dir cache + dirCacheEntries: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_entries", + Help: "Number of directories in the cache", + })), + dirCacheSizeBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_size_bytes", + Help: "Current directory cache usage in bytes", + })), + dirCacheLimitBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_limit_bytes", + Help: "Maximum directory cache size limit in bytes", + })), + dirCacheRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_requests", + Help: "Requests to the directory cache by archive and status (hit/miss)", + }, []string{"archive", "kind", "status"})), + + // requests to bucket + bucketRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_requests_total", + Help: "Requests to the underlying bucket", + }, []string{"archive", "kind", "status"})), + bucketRequestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_request_duration_seconds", + Help: "Request duration in seconds for individual requests to the underlying bucket", + Buckets: durationBuckets, + }, []string{"archive", "status"})), + + // misc + reloads: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_reloads", + Help: "Number of times an archive was reloaded due to the etag changing", + }, []string{"archive"})), + } +} diff --git a/pmtiles/server_test.go b/pmtiles/server_test.go index c12725f..76bbf46 100644 --- a/pmtiles/server_test.go +++ b/pmtiles/server_test.go @@ -9,6 +9,7 @@ import ( "sort" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -108,6 +109,7 @@ func fakeArchive(t *testing.T, header HeaderV3, metadata map[string]interface{}, } func newServer(t *testing.T) (mockBucket, *Server) { + prometheus.DefaultRegisterer = prometheus.NewRegistry() bucket := mockBucket{make(map[string][]byte)} server, err := NewServerWithBucket(bucket, "", log.Default(), 10, "", "tiles.example.com") assert.Nil(t, err)