Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more prometheus metrics #141

Merged
merged 10 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main

Check warning on line 1 in main.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

should have a package comment

import (
"fmt"
Expand Down Expand Up @@ -136,6 +136,7 @@
logger.Fatalf("Failed to create new server, %v", err)
}

pmtiles.SetBuildInfo(version, commit, date)
server.Start()

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
152 changes: 113 additions & 39 deletions pmtiles/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"regexp"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
)

type cacheKey struct {
Expand Down Expand Up @@ -53,6 +51,7 @@ type Server struct {
cacheSize int
cors string
publicURL string
metrics *metrics
}

// NewServer creates a new pmtiles HTTP server.
Expand Down Expand Up @@ -87,18 +86,12 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize
cacheSize: cacheSize,
cors: cors,
publicURL: publicURL,
metrics: createMetrics("", logger), // change scope string if there are multiple servers running in one process
}

return l, nil
}

func register[K prometheus.Collector](server *Server, metric K) K {
if err := prometheus.Register(metric); err != nil {
server.logger.Println(err)
}
return metric
}

// Start the server HTTP listener.
func (server *Server) Start() {

Expand All @@ -109,19 +102,14 @@ func (server *Server) Start() {
evictList := list.New()
totalSize := 0
ctx := context.Background()

cacheSize := register(server, prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "pmtiles",
Subsystem: "cache",
Name: "size",
Help: "Current number or directories in the cache",
}))
server.metrics.initCacheStats(server.cacheSize * 1000 * 1000)

for {
select {
case req := <-server.reqs:
if len(req.purgeEtag) > 0 {
if _, dup := inflight[req.key]; !dup {
server.metrics.reloadFile(req.key.name)
server.logger.Printf("re-fetching directories for changed file %s", req.key.name)
}
for k, v := range cache {
Expand All @@ -132,17 +120,23 @@ func (server *Server) Start() {
totalSize -= resp.size
}
}
cacheSize.Set(float64(len(cache)))
server.metrics.updateCacheStats(totalSize, len(cache))
}
key := req.key
if val, ok := cache[key]; ok {
evictList.MoveToFront(val)
req.value <- val.Value.(*response).value
server.metrics.cacheRequest(key.name, "hit")
} else if _, ok := inflight[key]; ok {
inflight[key] = append(inflight[key], req)
server.metrics.cacheRequest(key.name, "inflight")
msbarry marked this conversation as resolved.
Show resolved Hide resolved
} else {
inflight[key] = []request{req}
server.metrics.cacheRequest(key.name, "miss")
go func() {
status := "ok"
tracker := server.metrics.startBucketRequest(key.name)
defer func() { tracker.finish(status) }()
var result cachedValue
isRoot := (key.offset == 0 && key.length == 0)

Expand All @@ -160,6 +154,11 @@ func (server *Server) Start() {
if err != nil {
ok = false
result.badEtag = isRefreshRequredError(err)
if result.badEtag {
status = "refresh_required"
} else {
status = canceledOrError(ctx)
}
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand All @@ -168,6 +167,7 @@ func (server *Server) Start() {
b, err := io.ReadAll(r)
if err != nil {
ok = false
status = canceledOrError(ctx)
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand All @@ -176,6 +176,7 @@ func (server *Server) Start() {
if isRoot {
header, err := deserializeHeader(b[0:HeaderV3LenBytes])
if err != nil {
status = "error"
server.logger.Printf("parsing header failed: %v", err)
return
}
Expand Down Expand Up @@ -224,7 +225,7 @@ func (server *Server) Start() {
totalSize -= kv.size
}
}
cacheSize.Set(float64(len(cache)))
server.metrics.updateCacheStats(totalSize, len(cache))
}
}
}
Expand All @@ -249,11 +250,16 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
return false, HeaderV3{}, nil, "", nil
}

status := "ok"
tracker := server.metrics.startBucketRequest(name)
defer func() { tracker.finish(status) }()
r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag)
if isRefreshRequredError(err) {
status = "refresh_required"
return false, HeaderV3{}, nil, rootValue.etag, nil
}
if err != nil {
status = canceledOrError(ctx)
return false, HeaderV3{}, nil, "", nil
}
defer r.Close()
Expand All @@ -266,6 +272,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
} else if header.InternalCompression == NoCompression {
metadataBytes, err = io.ReadAll(r)
} else {
status = "error"
return true, HeaderV3{}, nil, "", errors.New("unknown compression")
}

Expand Down Expand Up @@ -381,19 +388,31 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
}

if entry.RunLength > 0 {
status := "ok"
tracker := server.metrics.startBucketRequest(name)
defer func() { tracker.finish(status) }()
r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag)
if isRefreshRequredError(err) {
status = "refresh_required"
return 500, httpHeaders, []byte("I/O Error"), rootValue.etag
}
// possible we have the header/directory cached but the archive has disappeared
if err != nil {
server.logger.Printf("failed to fetch tile %s %d-%d", name, entry.Offset, entry.Length)
return 404, httpHeaders, []byte("archive not found"), ""
status = canceledOrError(ctx)
if isCanceled(ctx) {
return 499, httpHeaders, []byte("Canceled"), ""
}
server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err)
return 404, httpHeaders, []byte("Tile not found"), ""
}
defer r.Close()
b, err := io.ReadAll(r)
if err != nil {
return 500, httpHeaders, []byte("I/O error"), ""
status = canceledOrError(ctx)
if isCanceled(ctx) {
return 499, httpHeaders, []byte("Canceled"), ""
}
return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), ""
}

httpHeaders["Etag"] = generateEtag(b)
Expand All @@ -416,6 +435,24 @@ func isRefreshRequredError(err error) bool {
return ok
}

func canceledOrError(ctx context.Context) string {
if isCanceled(ctx) {
return "canceled"
}
return "error"
}

func canceledOrStatusCode(ctx context.Context, code int) int {
if isCanceled(ctx) {
return 499
}
return code
}

func isCanceled(ctx context.Context) bool {
return errors.Is(ctx.Err(), context.Canceled)
}

var tilePattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/(\d+)\/(\d+)\/(\d+)\.([a-z]+)$`)
var metadataPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/metadata$`)
var tileJSONPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\.json$`)
Expand Down Expand Up @@ -448,48 +485,85 @@ func parseMetadataPath(path string) (bool, string) {
return false, ""
}

// Get a response for the given path.
// Return status code, HTTP headers, and body.
func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) {
httpHeaders := make(map[string]string)
func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) {
handler = ""
archive = ""
headers = make(map[string]string)
if len(server.cors) > 0 {
httpHeaders["Access-Control-Allow-Origin"] = server.cors
headers["Access-Control-Allow-Origin"] = server.cors
}

if ok, key, z, x, y, ext := parseTilePath(path); ok {
return server.getTile(ctx, httpHeaders, key, z, x, y, ext)
}
if ok, key := parseTilejsonPath(path); ok {
return server.getTileJSON(ctx, httpHeaders, key)
}
if ok, key := parseMetadataPath(path); ok {
return server.getMetadata(ctx, httpHeaders, key)
archive, handler = key, "tile"
status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern here is that someone could make a bunch of random requests to nonexistent archives and all of the 404's for unique resources would blow up the metric response size. To avoid that, we could either:

  1. try to map all "archive does not exist" responses to handler="404" archive="" (but we don't want to miss logging legit failures about a bad connection to an archive you care about)
  2. or we could just recommend that applications put this behind a proxy that allows only requests to archives you know exist

Thoughts @bdon ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer 1). for 2) we would have to recommend they put the whole go-pmtiles application behind a allowlist-proxy, right? that is too strong of a limitation imho.

for 1) I think that discovering failures about missing archives seems outside of the scope of performance metrics, I don't have a lot of experience in how people use prometheus though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case I want to catch is if people are reading from an archive, and it suddenly goes missing or server starts returning 500 errors we want to able to see which archive is no longer working. We could limit the per-archive metrics to only archives that we've gotten an OK response for the root directory, but that would miss if the server restarts and fails to read at startup... it would also mean 2 metrics you need to alert on to know if ties are down

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to limit the universe of stored tileset names to ~100?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://prometheus.io/docs/practices/naming/#labels seems to recommend against unbounded cardinality... I feel like detecting a spike in 404s/500s and inspecting logs is good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, changed bucket and overall request metrics to set archive to "" on not found errors.

} else if ok, key := parseTilejsonPath(path); ok {
archive, handler = key, "tilejson"
status, headers, data = server.getTileJSON(ctx, headers, key)
} else if ok, key := parseMetadataPath(path); ok {
archive, handler = key, "metadata"
status, headers, data = server.getMetadata(ctx, headers, key)
} else if path == "/" {
handler, status, data = "/", 204, []byte{}
} else {
handler, status, data = "404", 404, []byte("Path not found")
}

if path == "/" {
return 204, httpHeaders, []byte{}
}
return
}

return 404, httpHeaders, []byte("Path not found")
// Get a response for the given path.
// Return status code, HTTP headers, and body.
func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) {
tracker := server.metrics.startRequest()
archive, handler, status, headers, data := server.get(ctx, path)
tracker.finish(archive, handler, status, len(data), true)
return status, headers, data
}

type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

// Serve an HTTP response from the archive
func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int {
statusCode, headers, body := server.Get(r.Context(), r.URL.Path)
tracker := server.metrics.startRequest()
if r.Method == http.MethodOptions {
if len(server.cors) > 0 {
w.Header().Set("Access-Control-Allow-Origin", server.cors)
}
w.WriteHeader(204)
tracker.finish("", r.Method, 204, 0, false)
msbarry marked this conversation as resolved.
Show resolved Hide resolved
return 204
} else if r.Method != http.MethodGet && r.Method != http.MethodHead {
msbarry marked this conversation as resolved.
Show resolved Hide resolved
w.WriteHeader(405)
tracker.finish("", r.Method, 405, 0, false)
return 405
}
archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path)
for k, v := range headers {
w.Header().Set(k, v)
}
if statusCode == 200 {
lrw := &loggingResponseWriter{w, 200}
// handle if-match, if-none-match request headers based on response etag
http.ServeContent(
w, r,
lrw, r,
"", // name used to infer content-type, but we've already set that
time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers
bytes.NewReader(body),
)
statusCode = lrw.statusCode
} else {
w.WriteHeader(statusCode)
w.Write(body)
}
tracker.finish(archive, handler, statusCode, len(body), true)

return statusCode
}
Loading
Loading