Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed Feb 14, 2024
1 parent 800168b commit 6fbba22
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 39 deletions.
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func main() {
logger.Fatalf("Failed to create new server, %v", err)
}

pmtiles.SetBuildInfo(version, commit, date)
server.Start()

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
152 changes: 113 additions & 39 deletions pmtiles/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"regexp"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
)

type cacheKey struct {
Expand Down Expand Up @@ -53,6 +51,7 @@ type Server struct {
cacheSize int
cors string
publicURL string
metrics *metrics
}

// NewServer creates a new pmtiles HTTP server.
Expand Down Expand Up @@ -87,18 +86,12 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize
cacheSize: cacheSize,
cors: cors,
publicURL: publicURL,
metrics: createMetrics("", logger), // change scope string if there are multiple servers running in one process
}

return l, nil
}

func register[K prometheus.Collector](server *Server, metric K) K {
if err := prometheus.Register(metric); err != nil {
server.logger.Println(err)
}
return metric
}

// Start the server HTTP listener.
func (server *Server) Start() {

Expand All @@ -109,19 +102,14 @@ func (server *Server) Start() {
evictList := list.New()
totalSize := 0
ctx := context.Background()

cacheSize := register(server, prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "pmtiles",
Subsystem: "cache",
Name: "size",
Help: "Current number or directories in the cache",
}))
server.metrics.initCacheStats(server.cacheSize * 1000 * 1000)

for {
select {
case req := <-server.reqs:
if len(req.purgeEtag) > 0 {
if _, dup := inflight[req.key]; !dup {
server.metrics.reloadFile(req.key.name)
server.logger.Printf("re-fetching directories for changed file %s", req.key.name)
}
for k, v := range cache {
Expand All @@ -132,17 +120,23 @@ func (server *Server) Start() {
totalSize -= resp.size
}
}
cacheSize.Set(float64(len(cache)))
server.metrics.updateCacheStats(totalSize, len(cache))
}
key := req.key
if val, ok := cache[key]; ok {
evictList.MoveToFront(val)
req.value <- val.Value.(*response).value
server.metrics.cacheRequest(key.name, "hit")
} else if _, ok := inflight[key]; ok {
inflight[key] = append(inflight[key], req)
server.metrics.cacheRequest(key.name, "inflight")
} else {
inflight[key] = []request{req}
server.metrics.cacheRequest(key.name, "miss")
go func() {
status := "ok"
tracker := server.metrics.startBucketRequest(key.name)
defer func() { tracker.finish(status) }()
var result cachedValue
isRoot := (key.offset == 0 && key.length == 0)

Expand All @@ -160,6 +154,11 @@ func (server *Server) Start() {
if err != nil {
ok = false
result.badEtag = isRefreshRequredError(err)
if result.badEtag {
status = "refresh_required"
} else {
status = canceledOrError(ctx)
}
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand All @@ -168,6 +167,7 @@ func (server *Server) Start() {
b, err := io.ReadAll(r)
if err != nil {
ok = false
status = canceledOrError(ctx)
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand All @@ -176,6 +176,7 @@ func (server *Server) Start() {
if isRoot {
header, err := deserializeHeader(b[0:HeaderV3LenBytes])
if err != nil {
status = "error"
server.logger.Printf("parsing header failed: %v", err)
return
}
Expand Down Expand Up @@ -224,7 +225,7 @@ func (server *Server) Start() {
totalSize -= kv.size
}
}
cacheSize.Set(float64(len(cache)))
server.metrics.updateCacheStats(totalSize, len(cache))
}
}
}
Expand All @@ -249,11 +250,16 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
return false, HeaderV3{}, nil, "", nil
}

status := "ok"
tracker := server.metrics.startBucketRequest(name)
defer func() { tracker.finish(status) }()
r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag)
if isRefreshRequredError(err) {
status = "refresh_required"
return false, HeaderV3{}, nil, rootValue.etag, nil
}
if err != nil {
status = canceledOrError(ctx)
return false, HeaderV3{}, nil, "", nil
}
defer r.Close()
Expand All @@ -266,6 +272,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
} else if header.InternalCompression == NoCompression {
metadataBytes, err = io.ReadAll(r)
} else {
status = "error"
return true, HeaderV3{}, nil, "", errors.New("unknown compression")
}

Expand Down Expand Up @@ -381,19 +388,31 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
}

if entry.RunLength > 0 {
status := "ok"
tracker := server.metrics.startBucketRequest(name)
defer func() { tracker.finish(status) }()
r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag)
if isRefreshRequredError(err) {
status = "refresh_required"
return 500, httpHeaders, []byte("I/O Error"), rootValue.etag
}
// possible we have the header/directory cached but the archive has disappeared
if err != nil {
server.logger.Printf("failed to fetch tile %s %d-%d", name, entry.Offset, entry.Length)
return 404, httpHeaders, []byte("archive not found"), ""
status = canceledOrError(ctx)
if isCanceled(ctx) {
return 499, httpHeaders, []byte("Canceled"), ""
}
server.logger.Printf("failed to fetch tile %s %d-%d %v", name, entry.Offset, entry.Length, err)
return 404, httpHeaders, []byte("Tile not found"), ""
}
defer r.Close()
b, err := io.ReadAll(r)
if err != nil {
return 500, httpHeaders, []byte("I/O error"), ""
status = canceledOrError(ctx)
if isCanceled(ctx) {
return 499, httpHeaders, []byte("Canceled"), ""
}
return canceledOrStatusCode(ctx, 500), httpHeaders, []byte("I/O error"), ""
}

httpHeaders["Etag"] = generateEtag(b)
Expand All @@ -416,6 +435,24 @@ func isRefreshRequredError(err error) bool {
return ok
}

func canceledOrError(ctx context.Context) string {
if isCanceled(ctx) {
return "canceled"
}
return "error"
}

func canceledOrStatusCode(ctx context.Context, code int) int {
if isCanceled(ctx) {
return 499
}
return code
}

func isCanceled(ctx context.Context) bool {
return errors.Is(ctx.Err(), context.Canceled)
}

var tilePattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/(\d+)\/(\d+)\/(\d+)\.([a-z]+)$`)
var metadataPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\/metadata$`)
var tileJSONPattern = regexp.MustCompile(`^\/([-A-Za-z0-9_\/!-_\.\*'\(\)']+)\.json$`)
Expand Down Expand Up @@ -448,48 +485,85 @@ func parseMetadataPath(path string) (bool, string) {
return false, ""
}

// Get a response for the given path.
// Return status code, HTTP headers, and body.
func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) {
httpHeaders := make(map[string]string)
func (server *Server) get(ctx context.Context, path string) (archive, handler string, status int, headers map[string]string, data []byte) {
handler = ""
archive = ""
headers = make(map[string]string)
if len(server.cors) > 0 {
httpHeaders["Access-Control-Allow-Origin"] = server.cors
headers["Access-Control-Allow-Origin"] = server.cors
}

if ok, key, z, x, y, ext := parseTilePath(path); ok {
return server.getTile(ctx, httpHeaders, key, z, x, y, ext)
}
if ok, key := parseTilejsonPath(path); ok {
return server.getTileJSON(ctx, httpHeaders, key)
}
if ok, key := parseMetadataPath(path); ok {
return server.getMetadata(ctx, httpHeaders, key)
archive, handler = key, "tile"
status, headers, data = server.getTile(ctx, headers, key, z, x, y, ext)
} else if ok, key := parseTilejsonPath(path); ok {
archive, handler = key, "tilejson"
status, headers, data = server.getTileJSON(ctx, headers, key)
} else if ok, key := parseMetadataPath(path); ok {
archive, handler = key, "metadata"
status, headers, data = server.getMetadata(ctx, headers, key)
} else if path == "/" {
handler, status, data = "/", 204, []byte{}
} else {
handler, status, data = "404", 404, []byte("Path not found")
}

if path == "/" {
return 204, httpHeaders, []byte{}
}
return
}

return 404, httpHeaders, []byte("Path not found")
// Get a response for the given path.
// Return status code, HTTP headers, and body.
func (server *Server) Get(ctx context.Context, path string) (int, map[string]string, []byte) {
tracker := server.metrics.startRequest()
archive, handler, status, headers, data := server.get(ctx, path)
tracker.finish(archive, handler, status, len(data), true)
return status, headers, data
}

type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

// Serve an HTTP response from the archive
func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) int {
statusCode, headers, body := server.Get(r.Context(), r.URL.Path)
tracker := server.metrics.startRequest()
if r.Method == http.MethodOptions {
if len(server.cors) > 0 {
w.Header().Set("Access-Control-Allow-Origin", server.cors)
}
w.WriteHeader(204)
tracker.finish("", r.Method, 204, 0, false)
return 204
} else if r.Method != http.MethodGet && r.Method != http.MethodHead {
w.WriteHeader(405)
tracker.finish("", r.Method, 405, 0, false)
return 405
}
archive, handler, statusCode, headers, body := server.get(r.Context(), r.URL.Path)
for k, v := range headers {
w.Header().Set(k, v)
}
if statusCode == 200 {
lrw := &loggingResponseWriter{w, 200}
// handle if-match, if-none-match request headers based on response etag
http.ServeContent(
w, r,
lrw, r,
"", // name used to infer content-type, but we've already set that
time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers
bytes.NewReader(body),
)
statusCode = lrw.statusCode
} else {
w.WriteHeader(statusCode)
w.Write(body)
}
tracker.finish(archive, handler, statusCode, len(body), true)

return statusCode
}
Loading

0 comments on commit 6fbba22

Please sign in to comment.