From 6fbba224dd25227b7f2fd855040bbc3e2a909e94 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Wed, 14 Feb 2024 06:05:27 -0500 Subject: [PATCH] progress --- main.go | 1 + pmtiles/server.go | 152 ++++++++++++++++++++-------- pmtiles/server_metrics.go | 203 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+), 39 deletions(-) create mode 100644 pmtiles/server_metrics.go diff --git a/main.go b/main.go index e6e1a80..b117b46 100644 --- a/main.go +++ b/main.go @@ -136,6 +136,7 @@ func main() { logger.Fatalf("Failed to create new server, %v", err) } + pmtiles.SetBuildInfo(version, commit, date) server.Start() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { diff --git a/pmtiles/server.go b/pmtiles/server.go index 309df28..f9c6125 100644 --- a/pmtiles/server.go +++ b/pmtiles/server.go @@ -13,8 +13,6 @@ import ( "regexp" "strconv" "time" - - "github.com/prometheus/client_golang/prometheus" ) type cacheKey struct { @@ -53,6 +51,7 @@ type Server struct { cacheSize int cors string publicURL string + metrics *metrics } // NewServer creates a new pmtiles HTTP server. @@ -87,18 +86,12 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize cacheSize: cacheSize, cors: cors, publicURL: publicURL, + metrics: createMetrics("", logger), // change scope string if there are multiple servers running in one process } return l, nil } -func register[K prometheus.Collector](server *Server, metric K) K { - if err := prometheus.Register(metric); err != nil { - server.logger.Println(err) - } - return metric -} - // Start the server HTTP listener. func (server *Server) Start() { @@ -109,19 +102,14 @@ func (server *Server) Start() { evictList := list.New() totalSize := 0 ctx := context.Background() - - cacheSize := register(server, prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "pmtiles", - Subsystem: "cache", - Name: "size", - Help: "Current number or directories in the cache", - })) + server.metrics.initCacheStats(server.cacheSize * 1000 * 1000) for { select { case req := <-server.reqs: if len(req.purgeEtag) > 0 { if _, dup := inflight[req.key]; !dup { + server.metrics.reloadFile(req.key.name) server.logger.Printf("re-fetching directories for changed file %s", req.key.name) } for k, v := range cache { @@ -132,17 +120,23 @@ func (server *Server) Start() { totalSize -= resp.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } key := req.key if val, ok := cache[key]; ok { evictList.MoveToFront(val) req.value <- val.Value.(*response).value + server.metrics.cacheRequest(key.name, "hit") } else if _, ok := inflight[key]; ok { inflight[key] = append(inflight[key], req) + server.metrics.cacheRequest(key.name, "inflight") } else { inflight[key] = []request{req} + server.metrics.cacheRequest(key.name, "miss") go func() { + status := "ok" + tracker := server.metrics.startBucketRequest(key.name) + defer func() { tracker.finish(status) }() var result cachedValue isRoot := (key.offset == 0 && key.length == 0) @@ -160,6 +154,11 @@ func (server *Server) Start() { if err != nil { ok = false result.badEtag = isRefreshRequredError(err) + if result.badEtag { + status = "refresh_required" + } else { + status = canceledOrError(ctx) + } resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -168,6 +167,7 @@ func (server *Server) Start() { b, err := io.ReadAll(r) if err != nil { ok = false + status = canceledOrError(ctx) resps <- response{key: key, value: result} server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err) return @@ -176,6 +176,7 @@ func (server *Server) Start() { if isRoot { header, err := deserializeHeader(b[0:HeaderV3LenBytes]) if err != nil { + status = "error" server.logger.Printf("parsing header failed: %v", err) return } @@ -224,7 +225,7 @@ func (server *Server) Start() { totalSize -= kv.size } } - cacheSize.Set(float64(len(cache))) + server.metrics.updateCacheStats(totalSize, len(cache)) } } } @@ -249,11 +250,16 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE return false, HeaderV3{}, nil, "", nil } + status := "ok" + tracker := server.metrics.startBucketRequest(name) + defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag) if isRefreshRequredError(err) { + status = "refresh_required" return false, HeaderV3{}, nil, rootValue.etag, nil } if err != nil { + status = canceledOrError(ctx) return false, HeaderV3{}, nil, "", nil } defer r.Close() @@ -266,6 +272,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE } else if header.InternalCompression == NoCompression { metadataBytes, err = io.ReadAll(r) } else { + status = "error" return true, HeaderV3{}, nil, "", errors.New("unknown compression") } @@ -381,19 +388,31 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string } if entry.RunLength > 0 { + status := "ok" + tracker := server.metrics.startBucketRequest(name) + defer func() { tracker.finish(status) }() r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag) if isRefreshRequredError(err) { + status = "refresh_required" return 500, httpHeaders, []byte("I/O Error"), rootValue.etag } // possible we have the header/directory cached but the archive has disappeared if err != nil { - server.logger.Printf("failed to fetch tile %s %d-%d", name, entry.Offset, entry.Length) - return 404, httpHeaders, []byte("archive not found"), "" + status = canceledOrError(ctx) + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } + server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err) + return 404, httpHeaders, []byte("Tile not found"), "" } defer r.Close() b, err := io.ReadAll(r) if err != nil { - return 500, httpHeaders, []byte("I/O error"), "" + status = canceledOrError(ctx) + if isCanceled(ctx) { + return 499, httpHeaders, []byte("Canceled"), "" + } + return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), "" } httpHeaders["Etag"] = generateEtag(b) @@ -416,6 +435,24 @@ func isRefreshRequredError(err error) bool { return ok } +func canceledOrError(ctx context.Context) string { + if isCanceled(ctx) { + return "canceled" + } + return "error" +} + +func canceledOrStatusCode(ctx context.Context, code int) int { + if isCanceled(ctx) { + return 499 + } + return code +} + +func isCanceled(ctx context.Context) bool { + return errors.Is(ctx.Err(), context.Canceled) +} + var tilePattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/(\d+)\/(\d+)\/(\d+)\.([a-z]+)$`) var metadataPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/metadata$`) var tileJSONPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\.json$`) @@ -448,48 +485,85 @@ func parseMetadataPath(path string) (bool, string) { return false, "" } -// Get a response for the given path. -// Return status code, HTTP headers, and body. -func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { - httpHeaders := make(map[string]string) +func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) { + handler = "" + archive = "" + headers = make(map[string]string) if len(server.cors) > 0 { - httpHeaders["Access-Control-Allow-Origin"] = server.cors + headers["Access-Control-Allow-Origin"] = server.cors } if ok, key, z, x, y, ext := parseTilePath(path); ok { - return server.getTile(ctx, httpHeaders, key, z, x, y, ext) - } - if ok, key := parseTilejsonPath(path); ok { - return server.getTileJSON(ctx, httpHeaders, key) - } - if ok, key := parseMetadataPath(path); ok { - return server.getMetadata(ctx, httpHeaders, key) + archive, handler = key, "tile" + status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext) + } else if ok, key := parseTilejsonPath(path); ok { + archive, handler = key, "tilejson" + status, headers, data = server.getTileJSON(ctx, headers, key) + } else if ok, key := parseMetadataPath(path); ok { + archive, handler = key, "metadata" + status, headers, data = server.getMetadata(ctx, headers, key) + } else if path == "/" { + handler, status, data = "/", 204, []byte{} + } else { + handler, status, data = "404", 404, []byte("Path not found") } - if path == "/" { - return 204, httpHeaders, []byte{} - } + return +} - return 404, httpHeaders, []byte("Path not found") +// Get a response for the given path. +// Return status code, HTTP headers, and body. +func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) { + tracker := server.metrics.startRequest() + archive, handler, status, headers, data := server.get(ctx, path) + tracker.finish(archive, handler, status, len(data), true) + return status, headers, data +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (lrw *loggingResponseWriter) WriteHeader(code int) { + lrw.statusCode = code + lrw.ResponseWriter.WriteHeader(code) } // Serve an HTTP response from the archive func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int { - statusCode, headers, body := server.Get(r.Context(), r.URL.Path) + tracker := server.metrics.startRequest() + if r.Method == http.MethodOptions { + if len(server.cors) > 0 { + w.Header().Set("Access-Control-Allow-Origin", server.cors) + } + w.WriteHeader(204) + tracker.finish("", r.Method, 204, 0, false) + return 204 + } else if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.WriteHeader(405) + tracker.finish("", r.Method, 405, 0, false) + return 405 + } + archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path) for k, v := range headers { w.Header().Set(k, v) } if statusCode == 200 { + lrw := &loggingResponseWriter{w, 200} // handle if-match, if-none-match request headers based on response etag http.ServeContent( - w, r, + lrw, r, "", // name used to infer content-type, but we've already set that time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers bytes.NewReader(body), ) + statusCode = lrw.statusCode } else { w.WriteHeader(statusCode) w.Write(body) } + tracker.finish(archive, handler, statusCode, len(body), true) + return statusCode } diff --git a/pmtiles/server_metrics.go b/pmtiles/server_metrics.go new file mode 100644 index 0000000..690a7ed --- /dev/null +++ b/pmtiles/server_metrics.go @@ -0,0 +1,203 @@ +package pmtiles + +import ( + "fmt" + "log" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var buildInfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildinfo", +}, []string{"version", "revision"}) + +var buildTimeMetric = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pmtiles", + Name: "buildtime", +}) + +func init() { + err := prometheus.Register(buildInfoMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } + err = prometheus.Register(buildTimeMetric) + if err != nil { + fmt.Println("Error registering metric", err) + } +} + +// SetBuildInfo initializes static metrics with pmtiles version, git hash, and build time +func SetBuildInfo(version, commit, date string) { + buildInfoMetric.WithLabelValues(version, commit).Set(1) + time, err := time.Parse(time.RFC3339, date) + if err == nil { + buildTimeMetric.Set(float64(time.Unix())) + } else { + buildTimeMetric.Set(0) + } +} + +type metrics struct { + // overall requests: # requests, request duration, response size by archive/status code + requests *prometheus.CounterVec + responseSize *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + // dir cache: # requests, hits, cache entries, cache bytes, cache bytes limit + dirCacheEntries prometheus.Gauge + dirCacheSizeBytes prometheus.Gauge + dirCacheLimitBytes prometheus.Gauge + dirCacheRequests *prometheus.CounterVec + // requests to bucket: # total, response duration by archive/status code + bucketRequests *prometheus.CounterVec + bucketRequestDuration *prometheus.HistogramVec + // misc + reloads *prometheus.CounterVec +} + +// utility to time an overall tile request +type requestTracker struct { + start time.Time + metrics *metrics +} + +func (m *metrics) startRequest() *requestTracker { + return &requestTracker{start: time.Now(), metrics: m} +} + +func (r *requestTracker) finish(archive, handler string, status, responseSize int, logDetails bool) { + labels := []string{archive, handler, strconv.Itoa(status)} + r.metrics.requests.WithLabelValues(labels...).Inc() + if logDetails { + r.metrics.responseSize.WithLabelValues(labels...).Observe(float64(responseSize)) + r.metrics.requestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) + } +} + +// utility to time an individual request to the underlying bucket +type bucketRequestTracker struct { + start time.Time + metrics *metrics + archive string +} + +func (m *metrics) startBucketRequest(archive string) *bucketRequestTracker { + return &bucketRequestTracker{start: time.Now(), metrics: m, archive: archive} +} + +func (r *bucketRequestTracker) finish(status string) { + labels := []string{r.archive, status} + r.metrics.bucketRequests.WithLabelValues(labels...).Inc() + r.metrics.bucketRequestDuration.WithLabelValues(labels...).Observe(time.Since(r.start).Seconds()) +} + +// misc helpers + +func (m *metrics) reloadFile(name string) { + m.reloads.WithLabelValues(name).Inc() +} + +func (m *metrics) initCacheStats(limitBytes int) { + m.dirCacheLimitBytes.Set(float64(limitBytes)) + m.updateCacheStats(0, 0) +} + +func (m *metrics) updateCacheStats(sizeBytes, entries int) { + m.dirCacheEntries.Set(float64(entries)) + m.dirCacheSizeBytes.Set(float64(sizeBytes)) +} + +func (m *metrics) cacheRequest(archive, status string) { + m.dirCacheRequests.WithLabelValues(archive, status).Inc() +} + +func register[K prometheus.Collector](logger *log.Logger, metric K) K { + if err := prometheus.Register(metric); err != nil { + logger.Println(err) + } + return metric +} + +func createMetrics(scope string, logger *log.Logger) *metrics { + namespace := "pmtiles" + durationBuckets := prometheus.DefBuckets + kib := 1024.0 + mib := kib * kib + sizeBuckets := []float64{1.0 * kib, 5.0 * kib, 10.0 * kib, 50.0 * kib, 100 * kib, 200 * kib, 500 * kib, 1.0 * mib, 2.5 * mib, 5.0 * mib} + + return &metrics{ + // overall requests + requests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "requests_total", + Help: "Overall number of requests to the service", + }, []string{"archive", "handler", "status"})), + responseSize: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "response_size_bytes", + Help: "Overall response size in bytes", + Buckets: sizeBuckets, + }, []string{"archive", "handler", "status"})), + requestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "request_duration_seconds", + Help: "Overall request duration in seconds", + Buckets: durationBuckets, + }, []string{"archive", "handler", "status"})), + + // dir cache + dirCacheEntries: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_entries", + Help: "Number of directories in the cache", + })), + dirCacheSizeBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_size_bytes", + Help: "Current directory cache usage in bytes", + })), + dirCacheLimitBytes: register(logger, prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_limit_bytes", + Help: "Maximum directory cache size limit in bytes", + })), + dirCacheRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "dir_cache_requests", + Help: "Requests to the directory cache by archive and status (hit/inflight/miss)", + }, []string{"archive", "status"})), + + // requests to bucket + bucketRequests: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_requests_total", + Help: "Requests to the underlying bucket", + }, []string{"archive", "status"})), + bucketRequestDuration: register(logger, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_request_duration_seconds", + Help: "Request duration in seconds for individual requests to the underlying bucket", + Buckets: durationBuckets, + }, []string{"archive", "status"})), + + // misc + reloads: register(logger, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: scope, + Name: "bucket_reloads", + Help: "Number of times an archive was reloaded due to the etag changing", + }, []string{"archive"})), + } +}