From 6fbba224dd25227b7f2fd855040bbc3e2a909e94 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Wed, 14 Feb 2024 06:05:27 -0500 Subject: [PATCH 01/10] progress --- main.go | 1 + pmtiles/server.go | 152 ++++++++++++++++++++-------- pmtiles/server_metrics.go | 203 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+), 39 deletions(-) create mode 100644 pmtiles/server_metrics.go diff --git a/main.go b/main.go index e6e1a80..b117b46 100644 --- a/main.go +++ b/main.go @@ -136,6 +136,7 @@ func main() { logger.Fatalf("Failed to create new server, %v", err) } + pmtiles.SetBuildInfo(version, commit, date) server.Start() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { diff --git a/pmtiles/server.go b/pmtiles/server.go index 309df28..f9c6125 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -13,8 +13,6 @@ import ( "regexp" "strconv" "time" - - "github.com/prometheus/client_golang/prometheus" ) type cacheKey struct { @@ -53,6 +51,7 @@ type Server struct { cacheSize int cors string publicURL string + metrics *metrics } // NewServer creates a new pmtiles HTTP server. @@ -87,18 +86,12 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize cacheSize: cacheSize, cors: cors, publicURL: publicURL, + metrics: createMetrics("", logger), // change scope string if there are multiple servers running in one process } return l, nil } -func register[K prometheus.Collector](server *Server, metric K) K { - if err := prometheus.Register(metric); err != nil { - server.logger.Println(err) - } - return metric -} - // Start the server HTTP listener. func (server *Server) Start() { @@ -109,19 +102,14 @@ func (server *Server) Start() { evictList := list.New() totalSize := 0 ctx := context.Background() - - cacheSize := register(server, prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "pmtiles", - Subsystem: "cache", - Name: "size", - Help: "Current number or directories in the cache", - })) + server.metrics.initCacheStats(server.cacheSize * 1000 * 1000) for { select { case req := <-server.reqs: if len(req.purgeEtag) > 0 { if _, dup := inflight[req.key]; !dup { + server.metrics.reloadFile(req.key.name) server.logger.Printf("re-fetching directories for changed file %s", req.key.name) } for k, v := range cache { @@ -132,17 +120,23 @@ func (server *Server) Start() { totalSize -= resp.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } key := req.key if val, ok := cache[key]; ok { evictList.MoveToFront(val) req.value <- val.Value.(*response).value + server.metrics.cacheRequest(key.name, "hit") } else if _, ok := inflight[key]; ok { inflight[key] = append(inflight[key], req) + server.metrics.cacheRequest(key.name, "inflight") } else { inflight[key] = []request{req} + server.metrics.cacheRequest(key.name, "miss") go func() { + status := "ok" + tracker := server.metrics.startBucketRequest(key.name) + defer func() { tracker.finish(status) }() var result cachedValue isRoot := (key.offset == 0 && key.length == 0) @@ -160,6 +154,11 @@ func (server *Server) Start() { if err != nil { ok = false result.badEtag = isRefreshRequredError(err) + if result.badEtag { + status = "refresh_required" + } else { + status = canceledOrError(ctx) + } resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -168,6 +167,7 @@ func (server *Server) Start() { b, err := io.ReadAll(r) if err != nil { ok = false + status = canceledOrError(ctx) resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -176,6 +176,7 @@ func (server *Server) Start() { if isRoot { header, err := deserializeHeader(b[0:HeaderV3LenBytes]) if err != nil { + status = "error" server.logger.Printf("parsing header failed: %v", err) return } @@ -224,7 +225,7 @@ func (server *Server) Start() { totalSize -= kv.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } } } @@ -249,11 +250,16 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE return false, HeaderV3{}, nil, "", nil } + status := "ok" + tracker := server.metrics.startBucketRequest(name) + defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) if isRefreshRequredError(err) { + status = "refresh_required" return false, HeaderV3{}, nil, rootValue.etag, nil } if err != nil { + status = canceledOrError(ctx) return false, HeaderV3{}, nil, "", nil } defer r.Close() @@ -266,6 +272,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE } else if header.InternalCompression == NoCompression { metadataBytes, err = io.ReadAll(r) } else { + status = "error" return true, HeaderV3{}, nil, "", errors.New("unknown compression") } @@ -381,19 +388,31 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string } if entry.RunLength > 0 { + status := "ok" + tracker := server.metrics.startBucketRequest(name) + defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) if isRefreshRequredError(err) { + status = "refresh_required" return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { - server.logger.Printf("failed to fetch tile %s %d-%d", name, entry.Offset, entry.Length) - return 404, httpHeaders, []byte("archive not found"), "" + status = canceledOrError(ctx) + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } + server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err) + return 404, httpHeaders, []byte("Tile not found"), "" } defer r.Close() b, err := io.ReadAll(r) if err != nil { - return 500, httpHeaders, []byte("I/O error"), "" + status = canceledOrError(ctx) + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } + return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), "" } httpHeaders["Etag"] = generateEtag(b) @@ -416,6 +435,24 @@ func isRefreshRequredError(err error) bool { return ok } +func canceledOrError(ctx context.Context) string { + if isCanceled(ctx) { + return "canceled" + } + return "error" +} + +func canceledOrStatusCode(ctx context.Context, code int) int { + if isCanceled(ctx) { + return 499 + } + return code +} + +func isCanceled(ctx context.Context) bool { + return errors.Is(ctx.Err(), context.Canceled) +} + var tilePattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/(\d+)\/(\d+)\/(\d+)\.([a-z]+)$`) var metadataPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/metadata$`) var tileJSONPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\.json$`) @@ -448,48 +485,85 @@ func parseMetadataPath(path string) (bool, string) { return false, "" } -// Get a response for the given path. -// Return status code, HTTP headers, and body. -func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { - httpHeaders := make(map[string]string) +func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) { + handler = "" + archive = "" + headers = make(map[string]string) if len(server.cors) > 0 { - httpHeaders["Access-Control-Allow-Origin"] = server.cors + headers["Access-Control-Allow-Origin"] = server.cors } if ok, key, z, x, y, ext := parseTilePath(path); ok { - return server.getTile(ctx, httpHeaders, key, z, x, y, ext) - } - if ok, key := parseTilejsonPath(path); ok { - return server.getTileJSON(ctx, httpHeaders, key) - } - if ok, key := parseMetadataPath(path); ok { - return server.getMetadata(ctx, httpHeaders, key) + archive, handler = key, "tile" + status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext) + } else if ok, key := parseTilejsonPath(path); ok { + archive, handler = key, "tilejson" + status, headers, data = server.getTileJSON(ctx, headers, key) + } else if ok, key := parseMetadataPath(path); ok { + archive, handler = key, "metadata" + status, headers, data = server.getMetadata(ctx, headers, key) + } else if path == "/" { + handler, status, data = "/", 204, []byte{} + } else { + handler, status, data = "404", 404, []byte("Path not found") } - if path == "/" { - return 204, httpHeaders, []byte{} - } + return +} - return 404, httpHeaders, []byte("Path not found") +// Get a response for the given path. +// Return status code, HTTP headers, and body. +func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { + tracker := server.metrics.startRequest() + archive, handler, status, headers, data := server.get(ctx, path) + tracker.finish(archive, handler, status, len(data), true) + return status, headers, data +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (lrw *loggingResponseWriter) WriteHeader(code int) { + lrw.statusCode = code + lrw.ResponseWriter.WriteHeader(code) } // Serve an HTTP response from the archive func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { - statusCode, headers, body := server.Get(r.Context(), r.URL.Path) + tracker := server.metrics.startRequest() + if r.Method == http.MethodOptions { + if len(server.cors) > 0 { + w.Header().Set("Access-Control-Allow-Origin", server.cors) + } + w.WriteHeader(204) + tracker.finish("", r.Method, 204, 0, false) + return 204 + } else if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.WriteHeader(405) + tracker.finish("", r.Method, 405, 0, false) + return 405 + } + archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) for k, v := range headers { w.Header().Set(k, v) } if statusCode == 200 { + lrw := &loggingResponseWriter{w, 200} // handle if-match, if-none-match request headers based on response etag http.ServeContent( - w, r, + lrw, r, "", // name used to infer content-type, but we've already set that time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers bytes.NewReader(body), ) + statusCode = lrw.statusCode } else { w.WriteHeader(statusCode) w.Write(body) } + tracker.finish(archive, handler, statusCode, len(body), true) + return statusCode } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go new file mode 100644 index 0000000..690a7ed --- /dev/null +++ b/pmtiles/server_metrics.go @@ -0,0 +1,203 @@ +package pmtiles + +import ( + "fmt" + "log" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var buildInfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildinfo", +}, []string{"version", "revision"}) + +var buildTimeMetric = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildtime", +}) + +func init() { + err := prometheus.Register(buildInfoMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } + err = prometheus.Register(buildTimeMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } +} + +// SetBuildInfo initializes static metrics with pmtiles version, git hash, and build time +func SetBuildInfo(version, commit, date string) { + buildInfoMetric.WithLabelValues(version, commit).Set(1) + time, err := time.Parse(time.RFC3339, date) + if err == nil { + buildTimeMetric.Set(float64(time.Unix())) + } else { + buildTimeMetric.Set(0) + } +} + +type metrics struct { + // overall requests: # requests, request duration, response size by archive/status code + requests *prometheus.CounterVec + responseSize *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + // dir cache: # requests, hits, cache entries, cache bytes, cache bytes limit + dirCacheEntries prometheus.Gauge + dirCacheSizeBytes prometheus.Gauge + dirCacheLimitBytes prometheus.Gauge + dirCacheRequests *prometheus.CounterVec + // requests to bucket: # total, response duration by archive/status code + bucketRequests *prometheus.CounterVec + bucketRequestDuration *prometheus.HistogramVec + // misc + reloads *prometheus.CounterVec +} + +// utility to time an overall tile request +type requestTracker struct { + start time.Time + metrics *metrics +} + +func (m *metrics) startRequest() *requestTracker { + return &requestTracker{start: time.Now(), metrics: m} +} + +func (r *requestTracker) finish(archive, handler string, status, responseSize int, logDetails bool) { + labels := []string{archive, handler, strconv.Itoa(status)} + r.metrics.requests.WithLabelValues(labels...).Inc() + if logDetails { + r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) + r.metrics.requestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + } +} + +// utility to time an individual request to the underlying bucket +type bucketRequestTracker struct { + start time.Time + metrics *metrics + archive string +} + +func (m *metrics) startBucketRequest(archive string) *bucketRequestTracker { + return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive} +} + +func (r *bucketRequestTracker) finish(status string) { + labels := []string{r.archive, status} + r.metrics.bucketRequests.WithLabelValues(labels...).Inc() + r.metrics.bucketRequestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) +} + +// misc helpers + +func (m *metrics) reloadFile(name string) { + m.reloads.WithLabelValues(name).Inc() +} + +func (m *metrics) initCacheStats(limitBytes int) { + m.dirCacheLimitBytes.Set(float64(limitBytes)) + m.updateCacheStats(0, 0) +} + +func (m *metrics) updateCacheStats(sizeBytes, entries int) { + m.dirCacheEntries.Set(float64(entries)) + m.dirCacheSizeBytes.Set(float64(sizeBytes)) +} + +func (m *metrics) cacheRequest(archive, status string) { + m.dirCacheRequests.WithLabelValues(archive, status).Inc() +} + +func register[K prometheus.Collector](logger *log.Logger, metric K) K { + if err := prometheus.Register(metric); err != nil { + logger.Println(err) + } + return metric +} + +func createMetrics(scope string, logger *log.Logger) *metrics { + namespace := "pmtiles" + durationBuckets := prometheus.DefBuckets + kib := 1024.0 + mib := kib * kib + sizeBuckets := []float64{1.0 * kib, 5.0 * kib, 10.0 * kib, 50.0 * kib, 100 * kib, 200 * kib, 500 * kib, 1.0 * mib, 2.5 * mib, 5.0 * mib} + + return &metrics{ + // overall requests + requests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "requests_total", + Help: "Overall number of requests to the service", + }, []string{"archive", "handler", "status"})), + responseSize: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "response_size_bytes", + Help: "Overall response size in bytes", + Buckets: sizeBuckets, + }, []string{"archive", "handler", "status"})), + requestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "request_duration_seconds", + Help: "Overall request duration in seconds", + Buckets: durationBuckets, + }, []string{"archive", "handler", "status"})), + + // dir cache + dirCacheEntries: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_entries", + Help: "Number of directories in the cache", + })), + dirCacheSizeBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_size_bytes", + Help: "Current directory cache usage in bytes", + })), + dirCacheLimitBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_limit_bytes", + Help: "Maximum directory cache size limit in bytes", + })), + dirCacheRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_requests", + Help: "Requests to the directory cache by archive and status (hit/inflight/miss)", + }, []string{"archive", "status"})), + + // requests to bucket + bucketRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_requests_total", + Help: "Requests to the underlying bucket", + }, []string{"archive", "status"})), + bucketRequestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_request_duration_seconds", + Help: "Request duration in seconds for individual requests to the underlying bucket", + Buckets: durationBuckets, + }, []string{"archive", "status"})), + + // misc + reloads: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_reloads", + Help: "Number of times an archive was reloaded due to the etag changing", + }, []string{"archive"})), + } +} From cc756c64911e27549c1373b50fd2a3db330d80b5 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Thu, 15 Feb 2024 07:06:56 -0500 Subject: [PATCH 02/10] inflight->hit --- pmtiles/server.go | 2 +- pmtiles/server_metrics.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index f9c6125..5390fca 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -129,7 +129,7 @@ func (server *Server) Start() { server.metrics.cacheRequest(key.name, "hit") } else if _, ok := inflight[key]; ok { inflight[key] = append(inflight[key], req) - server.metrics.cacheRequest(key.name, "inflight") + server.metrics.cacheRequest(key.name, "hit") // treat inflight as a hit since it doesn't make a new server request } else { inflight[key] = []request{req} server.metrics.cacheRequest(key.name, "miss") diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 690a7ed..8d41bab 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -126,7 +126,7 @@ func createMetrics(scope string, logger *log.Logger) *metrics { durationBuckets := prometheus.DefBuckets kib := 1024.0 mib := kib * kib - sizeBuckets := []float64{1.0 * kib, 5.0 * kib, 10.0 * kib, 50.0 * kib, 100 * kib, 200 * kib, 500 * kib, 1.0 * mib, 2.5 * mib, 5.0 * mib} + sizeBuckets := []float64{1.0 * kib, 5.0 * kib, 10.0 * kib, 25.0 * kib, 50.0 * kib, 100 * kib, 250 * kib, 500 * kib, 1.0 * mib} return &metrics{ // overall requests @@ -174,7 +174,7 @@ func createMetrics(scope string, logger *log.Logger) *metrics { Namespace: namespace, Subsystem: scope, Name: "dir_cache_requests", - Help: "Requests to the directory cache by archive and status (hit/inflight/miss)", + Help: "Requests to the directory cache by archive and status (hit/miss)", }, []string{"archive", "status"})), // requests to bucket From e63f7569f9ff8c326ca81db98eb86feaa2d7d973 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Thu, 15 Feb 2024 07:19:38 -0500 Subject: [PATCH 03/10] exclude directory deserialization from bucket timing --- pmtiles/server.go | 1 + pmtiles/server_metrics.go | 34 +++++++++++++++++++++------------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index 5390fca..7fad816 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -180,6 +180,7 @@ func (server *Server) Start() { server.logger.Printf("parsing header failed: %v", err) return } + tracker.finish(status) // track time to deserialize a header separately // populate the root first before header rootEntries := deserializeEntries(bytes.NewBuffer(b[header.RootOffset : header.RootOffset+header.RootLength])) diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 8d41bab..2417293 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -60,8 +60,9 @@ type metrics struct { // utility to time an overall tile request type requestTracker struct { - start time.Time - metrics *metrics + finished bool + start time.Time + metrics *metrics } func (m *metrics) startRequest() *requestTracker { @@ -69,19 +70,23 @@ func (m *metrics) startRequest() *requestTracker { } func (r *requestTracker) finish(archive, handler string, status, responseSize int, logDetails bool) { - labels := []string{archive, handler, strconv.Itoa(status)} - r.metrics.requests.WithLabelValues(labels...).Inc() - if logDetails { - r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) - r.metrics.requestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + if !r.finished { + r.finished = true + labels := []string{archive, handler, strconv.Itoa(status)} + r.metrics.requests.WithLabelValues(labels...).Inc() + if logDetails { + r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) + r.metrics.requestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + } } } // utility to time an individual request to the underlying bucket type bucketRequestTracker struct { - start time.Time - metrics *metrics - archive string + finished bool + start time.Time + metrics *metrics + archive string } func (m *metrics) startBucketRequest(archive string) *bucketRequestTracker { @@ -89,9 +94,12 @@ func (m *metrics) startBucketRequest(archive string) *bucketRequestTracker { } func (r *bucketRequestTracker) finish(status string) { - labels := []string{r.archive, status} - r.metrics.bucketRequests.WithLabelValues(labels...).Inc() - r.metrics.bucketRequestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + if !r.finished { + r.finished = true + labels := []string{r.archive, status} + r.metrics.bucketRequests.WithLabelValues(labels...).Inc() + r.metrics.bucketRequestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + } } // misc helpers From c6b4d4d7bd29b4f587fc7d90a2a1f2ba83f4f900 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Thu, 15 Feb 2024 07:26:21 -0500 Subject: [PATCH 04/10] include bucket request type in counter stat --- pmtiles/server.go | 13 ++++++++----- pmtiles/server_metrics.go | 12 ++++++------ 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index 7fad816..61f8923 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -134,20 +134,23 @@ func (server *Server) Start() { inflight[key] = []request{req} server.metrics.cacheRequest(key.name, "miss") go func() { - status := "ok" - tracker := server.metrics.startBucketRequest(key.name) - defer func() { tracker.finish(status) }() var result cachedValue isRoot := (key.offset == 0 && key.length == 0) offset := int64(key.offset) length := int64(key.length) + kind := "leaf" if isRoot { offset = 0 length = 16384 + kind = "root" } + status := "ok" + tracker := server.metrics.startBucketRequest(key.name, kind) + defer func() { tracker.finish(status) }() + server.logger.Printf("fetching %s %d-%d", key.name, offset, length) r, etag, err := server.bucket.NewRangeReaderEtag(ctx, key.name+".pmtiles", offset, length, key.etag) @@ -252,7 +255,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE } status := "ok" - tracker := server.metrics.startBucketRequest(name) + tracker := server.metrics.startBucketRequest(name, "metadata") defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) if isRefreshRequredError(err) { @@ -390,7 +393,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string if entry.RunLength > 0 { status := "ok" - tracker := server.metrics.startBucketRequest(name) + tracker := server.metrics.startBucketRequest(name, "tile") defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) if isRefreshRequredError(err) { diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 2417293..63bbf25 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -87,18 +87,18 @@ type bucketRequestTracker struct { start time.Time metrics *metrics archive string + kind string } -func (m *metrics) startBucketRequest(archive string) *bucketRequestTracker { - return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive} +func (m *metrics) startBucketRequest(archive, kind string) *bucketRequestTracker { + return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive, kind: kind} } func (r *bucketRequestTracker) finish(status string) { if !r.finished { r.finished = true - labels := []string{r.archive, status} - r.metrics.bucketRequests.WithLabelValues(labels...).Inc() - r.metrics.bucketRequestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + r.metrics.bucketRequests.WithLabelValues(r.archive, r.kind, status).Inc() + r.metrics.bucketRequestDuration.WithLabelValues(r.archive, status).Observe(time.Since(r.start).Seconds()) } } @@ -191,7 +191,7 @@ func createMetrics(scope string, logger *log.Logger) *metrics { Subsystem: scope, Name: "bucket_requests_total", Help: "Requests to the underlying bucket", - }, []string{"archive", "status"})), + }, []string{"archive", "kind", "status"})), bucketRequestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: scope, From a9ea3748d7bb84a8fcdfccbd2f285c074d326159 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Fri, 16 Feb 2024 07:00:09 -0500 Subject: [PATCH 05/10] limit not found cardinality, include bucket request status code --- pmtiles/bucket.go | 59 +++++++++++++++++++++------------------ pmtiles/bucket_test.go | 33 ++++++++++++++-------- pmtiles/server.go | 35 ++++++++++------------- pmtiles/server_metrics.go | 22 +++++++++++++-- 4 files changed, 87 insertions(+), 62 deletions(-) diff --git a/pmtiles/bucket.go b/pmtiles/bucket.go index e83c15d..8dbc11c 100644 --- a/pmtiles/bucket.go +++ b/pmtiles/bucket.go @@ -25,7 +25,7 @@ import ( type Bucket interface { Close() error NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) - NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) + NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) } // RefreshRequiredError is an error that indicates the etag has chanced on the remote file @@ -46,25 +46,25 @@ func (m mockBucket) Close() error { } func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) { - body, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) { +func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) { bs, ok := m.items[key] if !ok { - return nil, "", fmt.Errorf("Not found %s", key) + return nil, "", 404, fmt.Errorf("Not found %s", key) } resultEtag := generateEtag(bs) if len(etag) > 0 && resultEtag != etag { - return nil, "", &RefreshRequiredError{} + return nil, "", 412, &RefreshRequiredError{} } if offset+length > int64(len(bs)) { - return nil, "", &RefreshRequiredError{416} + return nil, "", 416, &RefreshRequiredError{416} } - return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil + return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil } // FileBucket is a bucket backed by a directory on disk @@ -73,7 +73,7 @@ type FileBucket struct { } func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } @@ -102,30 +102,30 @@ func generateEtagFromInts(ns ...int64) string { return hasherToEtag(hasher) } -func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { name := filepath.Join(b.path, key) file, err := os.Open(name) defer file.Close() if err != nil { - return nil, "", err + return nil, "", 404, err } info, err := file.Stat() if err != nil { - return nil, "", err + return nil, "", 404, err } newEtag := generateEtagFromInts(info.ModTime().UnixNano(), info.Size()) if len(etag) > 0 && etag != newEtag { - return nil, "", &RefreshRequiredError{} + return nil, "", 412, &RefreshRequiredError{} } result := make([]byte, length) read, err := file.ReadAt(result, offset) if err != nil { - return nil, "", err + return nil, "", 500, err } if read != int(length) { - return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read) + return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read) } - return io.NopCloser(bytes.NewReader(result)), newEtag, nil + return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil } func (b FileBucket) Close() error { @@ -143,16 +143,16 @@ type HTTPBucket struct { } func (b HTTPBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { reqURL := b.baseURL + "/" + key req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { - return nil, "", err + return nil, "", 500, err } req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) @@ -162,7 +162,7 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, resp, err := b.client.Do(req) if err != nil { - return nil, "", err + return nil, "", resp.StatusCode, err } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { @@ -172,10 +172,10 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, } else { err = fmt.Errorf("HTTP error: %d", resp.StatusCode) } - return nil, "", err + return nil, "", resp.StatusCode, err } - return resp.Body, resp.Header.Get("ETag"), nil + return resp.Body, resp.Header.Get("ETag"), resp.StatusCode, nil } func (b HTTPBucket) Close() error { @@ -191,11 +191,11 @@ type BucketAdapter struct { } func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { - body, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") + body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "") return body, err } -func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { +func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) { reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{ BeforeRead: func(asFunc func(interface{}) bool) error { var req *s3.GetObjectInput @@ -205,20 +205,25 @@ func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offs return nil }, }) + status := 206 if err != nil { var resp awserr.RequestFailure errors.As(err, &resp) - if resp != nil && isRefreshRequredCode(resp.StatusCode()) { - return nil, "", &RefreshRequiredError{resp.StatusCode()} + status = 404 + if resp != nil { + status = resp.StatusCode() + if isRefreshRequredCode(resp.StatusCode()) { + return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()} + } } - return nil, "", err + return nil, "", status, err } resultETag := "" var resp s3.GetObjectOutput if reader.As(&resp) { resultETag = *resp.ETag } - return reader, resultETag, nil + return reader, resultETag, status, nil } func (ba BucketAdapter) Close() error { diff --git a/pmtiles/bucket_test.go b/pmtiles/bucket_test.go index 257b2f0..bf247c0 100644 --- a/pmtiles/bucket_test.go +++ b/pmtiles/bucket_test.go @@ -63,10 +63,11 @@ func TestHttpBucketRequestNormal(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "") + data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "") assert.Equal(t, "", mock.request.Header.Get("If-Match")) assert.Equal(t, "bytes=100-102", mock.request.Header.Get("Range")) assert.Equal(t, "http://tiles.example.com/tiles/a/b/c", mock.request.URL.String()) + assert.Equal(t, 200, status) assert.Nil(t, err) b, err := io.ReadAll(data) assert.Nil(t, err) @@ -85,8 +86,9 @@ func TestHttpBucketRequestRequestEtag(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.Equal(t, "etag1", mock.request.Header.Get("If-Match")) + assert.Equal(t, 200, status) assert.Nil(t, err) b, err := io.ReadAll(data) assert.Nil(t, err) @@ -105,17 +107,20 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) { Body: io.NopCloser(strings.NewReader("abc")), Header: header, } - _, _, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.Equal(t, "etag1", mock.request.Header.Get("If-Match")) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) mock.response.StatusCode = 416 - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + assert.Equal(t, 416, status) assert.True(t, isRefreshRequredError(err)) mock.response.StatusCode = 404 - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.False(t, isRefreshRequredError(err)) + assert.Equal(t, 404, status) } func TestFileBucketReplace(t *testing.T) { @@ -129,7 +134,8 @@ func TestFileBucketReplace(t *testing.T) { assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) // first read from file - reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err := io.ReadAll(reader) assert.Nil(t, err) @@ -137,7 +143,8 @@ func TestFileBucketReplace(t *testing.T) { // change file, verify etag changes assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666)) - reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err = io.ReadAll(reader) assert.Nil(t, err) @@ -145,7 +152,8 @@ func TestFileBucketReplace(t *testing.T) { assert.Equal(t, []byte{5}, data) // and requesting with old etag fails with refresh required error - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) } @@ -163,7 +171,8 @@ func TestFileBucketRename(t *testing.T) { assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) // first read from file - reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err := io.ReadAll(reader) assert.Nil(t, err) @@ -172,7 +181,8 @@ func TestFileBucketRename(t *testing.T) { // change file, verify etag changes os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles")) os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles")) - reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Equal(t, 206, status) assert.Nil(t, err) data, err = io.ReadAll(reader) assert.Nil(t, err) @@ -180,6 +190,7 @@ func TestFileBucketRename(t *testing.T) { assert.Equal(t, []byte{5}, data) // and requesting with old etag fails with refresh required error - _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + _, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.Equal(t, 412, status) assert.True(t, isRefreshRequredError(err)) } diff --git a/pmtiles/server.go b/pmtiles/server.go index 61f8923..bd39708 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -147,21 +147,17 @@ func (server *Server) Start() { kind = "root" } - status := "ok" + status := "" tracker := server.metrics.startBucketRequest(key.name, kind) - defer func() { tracker.finish(status) }() + defer func() { tracker.finish(ctx, status) }() server.logger.Printf("fetching %s %d-%d", key.name, offset, length) - r, etag, err := server.bucket.NewRangeReaderEtag(ctx, key.name+".pmtiles", offset, length, key.etag) + r, etag, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, key.name+".pmtiles", offset, length, key.etag) + status = strconv.Itoa(statusCode) if err != nil { ok = false result.badEtag = isRefreshRequredError(err) - if result.badEtag { - status = "refresh_required" - } else { - status = canceledOrError(ctx) - } resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -170,7 +166,7 @@ func (server *Server) Start() { b, err := io.ReadAll(r) if err != nil { ok = false - status = canceledOrError(ctx) + status = "error" resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -183,7 +179,6 @@ func (server *Server) Start() { server.logger.Printf("parsing header failed: %v", err) return } - tracker.finish(status) // track time to deserialize a header separately // populate the root first before header rootEntries := deserializeEntries(bytes.NewBuffer(b[header.RootOffset : header.RootOffset+header.RootLength])) @@ -254,16 +249,15 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE return false, HeaderV3{}, nil, "", nil } - status := "ok" + status := "" tracker := server.metrics.startBucketRequest(name, "metadata") - defer func() { tracker.finish(status) }() - r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) + defer func() { tracker.finish(ctx, status) }() + r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) + status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { - status = "refresh_required" return false, HeaderV3{}, nil, rootValue.etag, nil } if err != nil { - status = canceledOrError(ctx) return false, HeaderV3{}, nil, "", nil } defer r.Close() @@ -392,17 +386,16 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string } if entry.RunLength > 0 { - status := "ok" + status := "" tracker := server.metrics.startBucketRequest(name, "tile") - defer func() { tracker.finish(status) }() - r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) + defer func() { tracker.finish(ctx, status) }() + r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) + status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { - status = "refresh_required" return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { - status = canceledOrError(ctx) if isCanceled(ctx) { return 499, httpHeaders, []byte("Canceled"), "" } @@ -412,7 +405,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string defer r.Close() b, err := io.ReadAll(r) if err != nil { - status = canceledOrError(ctx) + status = "error" if isCanceled(ctx) { return 499, httpHeaders, []byte("Canceled"), "" } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 63bbf25..12af15e 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -1,6 +1,7 @@ package pmtiles import ( + "context" "fmt" "log" "strconv" @@ -69,10 +70,18 @@ func (m *metrics) startRequest() *requestTracker { return &requestTracker{start: time.Now(), metrics: m} } -func (r *requestTracker) finish(archive, handler string, status, responseSize int, logDetails bool) { +func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool) { if !r.finished { r.finished = true - labels := []string{archive, handler, strconv.Itoa(status)} + // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives + if status == 404 { + archive = "" + } + statusString := strconv.Itoa(status) + if isCanceled(ctx) { + statusString = "canceled" + } + labels := []string{archive, handler, statusString} r.metrics.requests.WithLabelValues(labels...).Inc() if logDetails { r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) @@ -94,9 +103,16 @@ func (m *metrics) startBucketRequest(archive, kind string) *bucketRequestTracker return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive, kind: kind} } -func (r *bucketRequestTracker) finish(status string) { +func (r *bucketRequestTracker) finish(ctx context.Context, status string) { if !r.finished { r.finished = true + // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives + if status == "404" || status == "403" { + r.archive = "" + } + if isCanceled(ctx) { + status = "canceled" + } r.metrics.bucketRequests.WithLabelValues(r.archive, r.kind, status).Inc() r.metrics.bucketRequestDuration.WithLabelValues(r.archive, status).Observe(time.Since(r.start).Seconds()) } From d16c6b7b238c96f79f3db479430ff6f81cb27a12 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Fri, 16 Feb 2024 07:03:00 -0500 Subject: [PATCH 06/10] fix --- pmtiles/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index bd39708..bd87090 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -513,7 +513,7 @@ func (server *Server) get(ctx context.Context, path string) (archive, handler st func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { tracker := server.metrics.startRequest() archive, handler, status, headers, data := server.get(ctx, path) - tracker.finish(archive, handler, status, len(data), true) + tracker.finish(ctx, archive, handler, status, len(data), true) return status, headers, data } @@ -535,11 +535,11 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.Header().Set("Access-Control-Allow-Origin", server.cors) } w.WriteHeader(204) - tracker.finish("", r.Method, 204, 0, false) + tracker.finish(r.Context(), "", r.Method, 204, 0, false) return 204 } else if r.Method != http.MethodGet && r.Method != http.MethodHead { w.WriteHeader(405) - tracker.finish("", r.Method, 405, 0, false) + tracker.finish(r.Context(), "", r.Method, 405, 0, false) return 405 } archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) @@ -560,7 +560,7 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.WriteHeader(statusCode) w.Write(body) } - tracker.finish(archive, handler, statusCode, len(body), true) + tracker.finish(r.Context(), archive, handler, statusCode, len(body), true) return statusCode } From a91a3d04c4459574bcb0c42fc7a92513544d78f7 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Fri, 16 Feb 2024 09:01:56 -0500 Subject: [PATCH 07/10] track number of dir misses on tile requests --- pmtiles/server.go | 68 +++++++++++++++++++++------------------ pmtiles/server_metrics.go | 10 +++--- pmtiles/server_test.go | 2 ++ 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index bd87090..874512c 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -25,6 +25,7 @@ type cacheKey struct { type request struct { key cacheKey value chan cachedValue + miss chan int purgeEtag string } @@ -125,12 +126,15 @@ func (server *Server) Start() { key := req.key if val, ok := cache[key]; ok { evictList.MoveToFront(val) + req.miss <- 0 req.value <- val.Value.(*response).value server.metrics.cacheRequest(key.name, "hit") } else if _, ok := inflight[key]; ok { + req.miss <- 0 inflight[key] = append(inflight[key], req) server.metrics.cacheRequest(key.name, "hit") // treat inflight as a hit since it doesn't make a new server request } else { + req.miss <- 1 inflight[key] = []request{req} server.metrics.cacheRequest(key.name, "miss") go func() { @@ -240,7 +244,7 @@ func (server *Server) getHeaderMetadata(ctx context.Context, name string) (bool, } func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeEtag string) (bool, HeaderV3, []byte, string, error) { - rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag} + rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), miss: make(chan int, 1), purgeEtag: purgeEtag} server.reqs <- rootReq rootValue := <-rootReq.value header := rootValue.header @@ -321,51 +325,52 @@ func (server *Server) getMetadata(ctx context.Context, httpHeaders map[string]st httpHeaders["Etag"] = generateEtag(metadataBytes) return 200, httpHeaders, metadataBytes } -func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string) (int, map[string]string, []byte) { - status, headers, data, purgeEtag := server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, "") +func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string) (int, map[string]string, []byte, int) { + status, headers, data, dirMisses, purgeEtag := server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, "") if len(purgeEtag) > 0 { // file has new etag, retry once force-purging the etag that is no longer value - status, headers, data, _ = server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, purgeEtag) + status, headers, data, dirMisses, _ = server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, purgeEtag) } - return status, headers, data + return status, headers, data, dirMisses } -func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string, purgeEtag string) (int, map[string]string, []byte, string) { - rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag} +func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string, purgeEtag string) (int, map[string]string, []byte, int, string) { + rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), miss: make(chan int, 1), purgeEtag: purgeEtag} server.reqs <- rootReq // https://golang.org/doc/faq#atomic_maps rootValue := <-rootReq.value + dirMisses := <-rootReq.miss header := rootValue.header if !rootValue.ok { - return 404, httpHeaders, []byte("Archive not found"), "" + return 404, httpHeaders, []byte("Archive not found"), dirMisses, "" } if z < header.MinZoom || z > header.MaxZoom { - return 404, httpHeaders, []byte("Tile not found"), "" + return 404, httpHeaders, []byte("Tile not found"), dirMisses, "" } switch header.TileType { case Mvt: if ext != "mvt" { - return 400, httpHeaders, []byte("path mismatch: archive is type MVT (.mvt)"), "" + return 400, httpHeaders, []byte("path mismatch: archive is type MVT (.mvt)"), dirMisses, "" } case Png: if ext != "png" { - return 400, httpHeaders, []byte("path mismatch: archive is type PNG (.png)"), "" + return 400, httpHeaders, []byte("path mismatch: archive is type PNG (.png)"), dirMisses, "" } case Jpeg: if ext != "jpg" { - return 400, httpHeaders, []byte("path mismatch: archive is type JPEG (.jpg)"), "" + return 400, httpHeaders, []byte("path mismatch: archive is type JPEG (.jpg)"), dirMisses, "" } case Webp: if ext != "webp" { - return 400, httpHeaders, []byte("path mismatch: archive is type WebP (.webp)"), "" + return 400, httpHeaders, []byte("path mismatch: archive is type WebP (.webp)"), dirMisses, "" } case Avif: if ext != "avif" { - return 400, httpHeaders, []byte("path mismatch: archive is type AVIF (.avif)"), "" + return 400, httpHeaders, []byte("path mismatch: archive is type AVIF (.avif)"), dirMisses, "" } } @@ -373,11 +378,12 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string dirOffset, dirLen := header.RootOffset, header.RootLength for depth := 0; depth <= 3; depth++ { - dirReq := request{key: cacheKey{name: name, offset: dirOffset, length: dirLen, etag: rootValue.etag}, value: make(chan cachedValue, 1)} + dirReq := request{key: cacheKey{name: name, offset: dirOffset, length: dirLen, etag: rootValue.etag}, value: make(chan cachedValue, 1), miss: make(chan int, 1)} server.reqs <- dirReq dirValue := <-dirReq.value + dirMisses = dirMisses + <-dirReq.miss if dirValue.badEtag { - return 500, httpHeaders, []byte("I/O Error"), rootValue.etag + return 500, httpHeaders, []byte("I/O Error"), dirMisses, rootValue.etag } directory := dirValue.directory entry, ok := findTile(directory, tileID) @@ -392,24 +398,24 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { - return 500, httpHeaders, []byte("I/O Error"), rootValue.etag + return 500, httpHeaders, []byte("I/O Error"), dirMisses, rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { if isCanceled(ctx) { - return 499, httpHeaders, []byte("Canceled"), "" + return 499, httpHeaders, []byte("Canceled"), dirMisses, "" } server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err) - return 404, httpHeaders, []byte("Tile not found"), "" + return 404, httpHeaders, []byte("Tile not found"), dirMisses, "" } defer r.Close() b, err := io.ReadAll(r) if err != nil { status = "error" if isCanceled(ctx) { - return 499, httpHeaders, []byte("Canceled"), "" + return 499, httpHeaders, []byte("Canceled"), dirMisses, "" } - return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), "" + return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), dirMisses, "" } httpHeaders["Etag"] = generateEtag(b) @@ -419,12 +425,12 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string if headerVal, ok := headerContentEncoding(header.TileCompression); ok { httpHeaders["Content-Encoding"] = headerVal } - return 200, httpHeaders, b, "" + return 200, httpHeaders, b, dirMisses, "" } dirOffset = header.LeafDirectoryOffset + entry.Offset dirLen = uint64(entry.Length) } - return 204, httpHeaders, nil, "" + return 204, httpHeaders, nil, dirMisses, "" } func isRefreshRequredError(err error) bool { @@ -482,7 +488,7 @@ func parseMetadataPath(path string) (bool, string) { return false, "" } -func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) { +func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte, dirMisses int) { handler = "" archive = "" headers = make(map[string]string) @@ -492,7 +498,7 @@ func (server *Server) get(ctx context.Context, path string) (archive, handler st if ok, key, z, x, y, ext := parseTilePath(path); ok { archive, handler = key, "tile" - status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext) + status, headers, data, dirMisses = server.getTile(ctx, headers, key, z, x, y, ext) } else if ok, key := parseTilejsonPath(path); ok { archive, handler = key, "tilejson" status, headers, data = server.getTileJSON(ctx, headers, key) @@ -512,8 +518,8 @@ func (server *Server) get(ctx context.Context, path string) (archive, handler st // Return status code, HTTP headers, and body. func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { tracker := server.metrics.startRequest() - archive, handler, status, headers, data := server.get(ctx, path) - tracker.finish(ctx, archive, handler, status, len(data), true) + archive, handler, status, headers, data, dirMisses := server.get(ctx, path) + tracker.finish(ctx, archive, handler, status, len(data), true, dirMisses) return status, headers, data } @@ -535,14 +541,14 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.Header().Set("Access-Control-Allow-Origin", server.cors) } w.WriteHeader(204) - tracker.finish(r.Context(), "", r.Method, 204, 0, false) + tracker.finish(r.Context(), "", r.Method, 204, 0, false, 0) return 204 } else if r.Method != http.MethodGet && r.Method != http.MethodHead { w.WriteHeader(405) - tracker.finish(r.Context(), "", r.Method, 405, 0, false) + tracker.finish(r.Context(), "", r.Method, 405, 0, false, 0) return 405 } - archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) + archive, handler, statusCode, headers, body, dirMisses := server.get(r.Context(), r.URL.Path) for k, v := range headers { w.Header().Set(k, v) } @@ -560,7 +566,7 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.WriteHeader(statusCode) w.Write(body) } - tracker.finish(r.Context(), archive, handler, statusCode, len(body), true) + tracker.finish(r.Context(), archive, handler, statusCode, len(body), true, dirMisses) return statusCode } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 12af15e..34af9de 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -70,7 +70,7 @@ func (m *metrics) startRequest() *requestTracker { return &requestTracker{start: time.Now(), metrics: m} } -func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool) { +func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool, dirMisses int) { if !r.finished { r.finished = true // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives @@ -81,7 +81,7 @@ func (r *requestTracker) finish(ctx context.Context, archive, handler string, st if isCanceled(ctx) { statusString = "canceled" } - labels := []string{archive, handler, statusString} + labels := []string{archive, handler, statusString, strconv.Itoa(dirMisses)} r.metrics.requests.WithLabelValues(labels...).Inc() if logDetails { r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) @@ -159,21 +159,21 @@ func createMetrics(scope string, logger *log.Logger) *metrics { Subsystem: scope, Name: "requests_total", Help: "Overall number of requests to the service", - }, []string{"archive", "handler", "status"})), + }, []string{"archive", "handler", "status", "dir_misses"})), responseSize: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: scope, Name: "response_size_bytes", Help: "Overall response size in bytes", Buckets: sizeBuckets, - }, []string{"archive", "handler", "status"})), + }, []string{"archive", "handler", "status", "dir_misses"})), requestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: scope, Name: "request_duration_seconds", Help: "Overall request duration in seconds", Buckets: durationBuckets, - }, []string{"archive", "handler", "status"})), + }, []string{"archive", "handler", "status", "dir_misses"})), // dir cache dirCacheEntries: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ diff --git a/pmtiles/server_test.go b/pmtiles/server_test.go index 191d362..27fa035 100644 --- a/pmtiles/server_test.go +++ b/pmtiles/server_test.go @@ -9,6 +9,7 @@ import ( "sort" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -108,6 +109,7 @@ func fakeArchive(t *testing.T, header HeaderV3, metadata map[string]interface{}, } func newServer(t *testing.T) (mockBucket, *Server) { + prometheus.DefaultRegisterer = prometheus.NewRegistry() bucket := mockBucket{make(map[string][]byte)} server, err := NewServerWithBucket(bucket, "", log.Default(), 10, "", "tiles.example.com") assert.Nil(t, err) From 6ee9eb0c84c8f0cca41104302dcf0d6383ecd568 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Fri, 16 Feb 2024 09:14:19 -0500 Subject: [PATCH 08/10] undo misses --- pmtiles/server.go | 68 ++++++++++++++++++--------------------- pmtiles/server_metrics.go | 10 +++--- 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index 874512c..bd87090 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -25,7 +25,6 @@ type cacheKey struct { type request struct { key cacheKey value chan cachedValue - miss chan int purgeEtag string } @@ -126,15 +125,12 @@ func (server *Server) Start() { key := req.key if val, ok := cache[key]; ok { evictList.MoveToFront(val) - req.miss <- 0 req.value <- val.Value.(*response).value server.metrics.cacheRequest(key.name, "hit") } else if _, ok := inflight[key]; ok { - req.miss <- 0 inflight[key] = append(inflight[key], req) server.metrics.cacheRequest(key.name, "hit") // treat inflight as a hit since it doesn't make a new server request } else { - req.miss <- 1 inflight[key] = []request{req} server.metrics.cacheRequest(key.name, "miss") go func() { @@ -244,7 +240,7 @@ func (server *Server) getHeaderMetadata(ctx context.Context, name string) (bool, } func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeEtag string) (bool, HeaderV3, []byte, string, error) { - rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), miss: make(chan int, 1), purgeEtag: purgeEtag} + rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag} server.reqs <- rootReq rootValue := <-rootReq.value header := rootValue.header @@ -325,52 +321,51 @@ func (server *Server) getMetadata(ctx context.Context, httpHeaders map[string]st httpHeaders["Etag"] = generateEtag(metadataBytes) return 200, httpHeaders, metadataBytes } -func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string) (int, map[string]string, []byte, int) { - status, headers, data, dirMisses, purgeEtag := server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, "") +func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string) (int, map[string]string, []byte) { + status, headers, data, purgeEtag := server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, "") if len(purgeEtag) > 0 { // file has new etag, retry once force-purging the etag that is no longer value - status, headers, data, dirMisses, _ = server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, purgeEtag) + status, headers, data, _ = server.getTileAttempt(ctx, httpHeaders, name, z, x, y, ext, purgeEtag) } - return status, headers, data, dirMisses + return status, headers, data } -func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string, purgeEtag string) (int, map[string]string, []byte, int, string) { - rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), miss: make(chan int, 1), purgeEtag: purgeEtag} +func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string, purgeEtag string) (int, map[string]string, []byte, string) { + rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag} server.reqs <- rootReq // https://golang.org/doc/faq#atomic_maps rootValue := <-rootReq.value - dirMisses := <-rootReq.miss header := rootValue.header if !rootValue.ok { - return 404, httpHeaders, []byte("Archive not found"), dirMisses, "" + return 404, httpHeaders, []byte("Archive not found"), "" } if z < header.MinZoom || z > header.MaxZoom { - return 404, httpHeaders, []byte("Tile not found"), dirMisses, "" + return 404, httpHeaders, []byte("Tile not found"), "" } switch header.TileType { case Mvt: if ext != "mvt" { - return 400, httpHeaders, []byte("path mismatch: archive is type MVT (.mvt)"), dirMisses, "" + return 400, httpHeaders, []byte("path mismatch: archive is type MVT (.mvt)"), "" } case Png: if ext != "png" { - return 400, httpHeaders, []byte("path mismatch: archive is type PNG (.png)"), dirMisses, "" + return 400, httpHeaders, []byte("path mismatch: archive is type PNG (.png)"), "" } case Jpeg: if ext != "jpg" { - return 400, httpHeaders, []byte("path mismatch: archive is type JPEG (.jpg)"), dirMisses, "" + return 400, httpHeaders, []byte("path mismatch: archive is type JPEG (.jpg)"), "" } case Webp: if ext != "webp" { - return 400, httpHeaders, []byte("path mismatch: archive is type WebP (.webp)"), dirMisses, "" + return 400, httpHeaders, []byte("path mismatch: archive is type WebP (.webp)"), "" } case Avif: if ext != "avif" { - return 400, httpHeaders, []byte("path mismatch: archive is type AVIF (.avif)"), dirMisses, "" + return 400, httpHeaders, []byte("path mismatch: archive is type AVIF (.avif)"), "" } } @@ -378,12 +373,11 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string dirOffset, dirLen := header.RootOffset, header.RootLength for depth := 0; depth <= 3; depth++ { - dirReq := request{key: cacheKey{name: name, offset: dirOffset, length: dirLen, etag: rootValue.etag}, value: make(chan cachedValue, 1), miss: make(chan int, 1)} + dirReq := request{key: cacheKey{name: name, offset: dirOffset, length: dirLen, etag: rootValue.etag}, value: make(chan cachedValue, 1)} server.reqs <- dirReq dirValue := <-dirReq.value - dirMisses = dirMisses + <-dirReq.miss if dirValue.badEtag { - return 500, httpHeaders, []byte("I/O Error"), dirMisses, rootValue.etag + return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } directory := dirValue.directory entry, ok := findTile(directory, tileID) @@ -398,24 +392,24 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) status = strconv.Itoa(statusCode) if isRefreshRequredError(err) { - return 500, httpHeaders, []byte("I/O Error"), dirMisses, rootValue.etag + return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { if isCanceled(ctx) { - return 499, httpHeaders, []byte("Canceled"), dirMisses, "" + return 499, httpHeaders, []byte("Canceled"), "" } server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err) - return 404, httpHeaders, []byte("Tile not found"), dirMisses, "" + return 404, httpHeaders, []byte("Tile not found"), "" } defer r.Close() b, err := io.ReadAll(r) if err != nil { status = "error" if isCanceled(ctx) { - return 499, httpHeaders, []byte("Canceled"), dirMisses, "" + return 499, httpHeaders, []byte("Canceled"), "" } - return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), dirMisses, "" + return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), "" } httpHeaders["Etag"] = generateEtag(b) @@ -425,12 +419,12 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string if headerVal, ok := headerContentEncoding(header.TileCompression); ok { httpHeaders["Content-Encoding"] = headerVal } - return 200, httpHeaders, b, dirMisses, "" + return 200, httpHeaders, b, "" } dirOffset = header.LeafDirectoryOffset + entry.Offset dirLen = uint64(entry.Length) } - return 204, httpHeaders, nil, dirMisses, "" + return 204, httpHeaders, nil, "" } func isRefreshRequredError(err error) bool { @@ -488,7 +482,7 @@ func parseMetadataPath(path string) (bool, string) { return false, "" } -func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte, dirMisses int) { +func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) { handler = "" archive = "" headers = make(map[string]string) @@ -498,7 +492,7 @@ func (server *Server) get(ctx context.Context, path string) (archive, handler st if ok, key, z, x, y, ext := parseTilePath(path); ok { archive, handler = key, "tile" - status, headers, data, dirMisses = server.getTile(ctx, headers, key, z, x, y, ext) + status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext) } else if ok, key := parseTilejsonPath(path); ok { archive, handler = key, "tilejson" status, headers, data = server.getTileJSON(ctx, headers, key) @@ -518,8 +512,8 @@ func (server *Server) get(ctx context.Context, path string) (archive, handler st // Return status code, HTTP headers, and body. func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { tracker := server.metrics.startRequest() - archive, handler, status, headers, data, dirMisses := server.get(ctx, path) - tracker.finish(ctx, archive, handler, status, len(data), true, dirMisses) + archive, handler, status, headers, data := server.get(ctx, path) + tracker.finish(ctx, archive, handler, status, len(data), true) return status, headers, data } @@ -541,14 +535,14 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.Header().Set("Access-Control-Allow-Origin", server.cors) } w.WriteHeader(204) - tracker.finish(r.Context(), "", r.Method, 204, 0, false, 0) + tracker.finish(r.Context(), "", r.Method, 204, 0, false) return 204 } else if r.Method != http.MethodGet && r.Method != http.MethodHead { w.WriteHeader(405) - tracker.finish(r.Context(), "", r.Method, 405, 0, false, 0) + tracker.finish(r.Context(), "", r.Method, 405, 0, false) return 405 } - archive, handler, statusCode, headers, body, dirMisses := server.get(r.Context(), r.URL.Path) + archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) for k, v := range headers { w.Header().Set(k, v) } @@ -566,7 +560,7 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { w.WriteHeader(statusCode) w.Write(body) } - tracker.finish(r.Context(), archive, handler, statusCode, len(body), true, dirMisses) + tracker.finish(r.Context(), archive, handler, statusCode, len(body), true) return statusCode } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 34af9de..12af15e 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -70,7 +70,7 @@ func (m *metrics) startRequest() *requestTracker { return &requestTracker{start: time.Now(), metrics: m} } -func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool, dirMisses int) { +func (r *requestTracker) finish(ctx context.Context, archive, handler string, status, responseSize int, logDetails bool) { if !r.finished { r.finished = true // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives @@ -81,7 +81,7 @@ func (r *requestTracker) finish(ctx context.Context, archive, handler string, st if isCanceled(ctx) { statusString = "canceled" } - labels := []string{archive, handler, statusString, strconv.Itoa(dirMisses)} + labels := []string{archive, handler, statusString} r.metrics.requests.WithLabelValues(labels...).Inc() if logDetails { r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) @@ -159,21 +159,21 @@ func createMetrics(scope string, logger *log.Logger) *metrics { Subsystem: scope, Name: "requests_total", Help: "Overall number of requests to the service", - }, []string{"archive", "handler", "status", "dir_misses"})), + }, []string{"archive", "handler", "status"})), responseSize: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: scope, Name: "response_size_bytes", Help: "Overall response size in bytes", Buckets: sizeBuckets, - }, []string{"archive", "handler", "status", "dir_misses"})), + }, []string{"archive", "handler", "status"})), requestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: scope, Name: "request_duration_seconds", Help: "Overall request duration in seconds", Buckets: durationBuckets, - }, []string{"archive", "handler", "status", "dir_misses"})), + }, []string{"archive", "handler", "status"})), // dir cache dirCacheEntries: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ From 05ede4aecc10f8c56c509f52cedce513a1719578 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Fri, 16 Feb 2024 09:20:04 -0500 Subject: [PATCH 09/10] cleanup --- pmtiles/server.go | 16 +--------------- pmtiles/server_metrics.go | 9 ++++----- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index bd87090..f10f723 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -409,7 +409,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string if isCanceled(ctx) { return 499, httpHeaders, []byte("Canceled"), "" } - return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), "" + return 500, httpHeaders, []byte("I/O error"), "" } httpHeaders["Etag"] = generateEtag(b) @@ -432,20 +432,6 @@ func isRefreshRequredError(err error) bool { return ok } -func canceledOrError(ctx context.Context) string { - if isCanceled(ctx) { - return "canceled" - } - return "error" -} - -func canceledOrStatusCode(ctx context.Context, code int) int { - if isCanceled(ctx) { - return 499 - } - return code -} - func isCanceled(ctx context.Context) bool { return errors.Is(ctx.Err(), context.Canceled) } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 12af15e..9e898da 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -74,13 +74,13 @@ func (r *requestTracker) finish(ctx context.Context, archive, handler string, st if !r.finished { r.finished = true // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives + statusString := strconv.Itoa(status) if status == 404 { archive = "" - } - statusString := strconv.Itoa(status) - if isCanceled(ctx) { + } else if isCanceled(ctx) { statusString = "canceled" } + labels := []string{archive, handler, statusString} r.metrics.requests.WithLabelValues(labels...).Inc() if logDetails { @@ -109,8 +109,7 @@ func (r *bucketRequestTracker) finish(ctx context.Context, status string) { // exclude archive path from "not found" metrics to limit cardinality on requests for nonexistant archives if status == "404" || status == "403" { r.archive = "" - } - if isCanceled(ctx) { + } else if isCanceled(ctx) { status = "canceled" } r.metrics.bucketRequests.WithLabelValues(r.archive, r.kind, status).Inc() From 77ba3a443f8c78734f03fa735a27b29346a8234b Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Sat, 17 Feb 2024 14:12:26 -0500 Subject: [PATCH 10/10] include kind on cache requests --- pmtiles/server.go | 14 ++++++++------ pmtiles/server_metrics.go | 6 +++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pmtiles/server.go b/pmtiles/server.go index f10f723..3738355 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -123,28 +123,30 @@ func (server *Server) Start() { server.metrics.updateCacheStats(totalSize, len(cache)) } key := req.key + isRoot := (key.offset == 0 && key.length == 0) + kind := "leaf" + if isRoot { + kind = "root" + } if val, ok := cache[key]; ok { evictList.MoveToFront(val) req.value <- val.Value.(*response).value - server.metrics.cacheRequest(key.name, "hit") + server.metrics.cacheRequest(key.name, kind, "hit") } else if _, ok := inflight[key]; ok { inflight[key] = append(inflight[key], req) - server.metrics.cacheRequest(key.name, "hit") // treat inflight as a hit since it doesn't make a new server request + server.metrics.cacheRequest(key.name, kind, "hit") // treat inflight as a hit since it doesn't make a new server request } else { inflight[key] = []request{req} - server.metrics.cacheRequest(key.name, "miss") + server.metrics.cacheRequest(key.name, kind, "miss") go func() { var result cachedValue - isRoot := (key.offset == 0 && key.length == 0) offset := int64(key.offset) length := int64(key.length) - kind := "leaf" if isRoot { offset = 0 length = 16384 - kind = "root" } status := "" diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go index 9e898da..c9f5385 100644 --- a/pmtiles/server_metrics.go +++ b/pmtiles/server_metrics.go @@ -133,8 +133,8 @@ func (m *metrics) updateCacheStats(sizeBytes, entries int) { m.dirCacheSizeBytes.Set(float64(sizeBytes)) } -func (m *metrics) cacheRequest(archive, status string) { - m.dirCacheRequests.WithLabelValues(archive, status).Inc() +func (m *metrics) cacheRequest(archive, kind, status string) { + m.dirCacheRequests.WithLabelValues(archive, kind, status).Inc() } func register[K prometheus.Collector](logger *log.Logger, metric K) K { @@ -198,7 +198,7 @@ func createMetrics(scope string, logger *log.Logger) *metrics { Subsystem: scope, Name: "dir_cache_requests", Help: "Requests to the directory cache by archive and status (hit/miss)", - }, []string{"archive", "status"})), + }, []string{"archive", "kind", "status"})), // requests to bucket bucketRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{