Skip to content

Commit

Permalink
Finish initial extract [#31]
Browse files Browse the repository at this point in the history
* include the DstOffset so we can multithread downloads later
* set header statistics
* implement --dry-run
* add logging messages for user feedback
  • Loading branch information
bdon committed Sep 4, 2023
1 parent 27a426d commit 51f32db
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 103 deletions.
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var cli struct {
Bucket string `help:"Remote bucket of input archive."`
Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"`
Maxzoom uint8 `help:"Maximum zoom level, inclusive."`
DryRun bool `help:"Calculate tiles to extract based on header and directories, but don't download them."`
DryRun bool `help:"Calculate tiles to extract, but don't download them."`
Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"`
} `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region."`

Expand Down Expand Up @@ -120,7 +120,7 @@ func main() {
logger.Printf("Serving %s %s on port %d with Access-Control-Allow-Origin: %s\n", cli.Serve.Bucket, cli.Serve.Path, cli.Serve.Port, cli.Serve.Cors)
logger.Fatal(http.ListenAndServe(":"+strconv.Itoa(cli.Serve.Port), nil))
case "extract <input> <output>":
err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.Overfetch)
err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.Overfetch, cli.Extract.DryRun)
if err != nil {
logger.Fatalf("Failed to extract, %v", err)
}
Expand Down
208 changes: 130 additions & 78 deletions pmtiles/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/dustin/go-humanize"
"github.com/paulmach/orb"
"github.com/paulmach/orb/geojson"
"github.com/schollz/progressbar/v3"
Expand All @@ -16,8 +17,15 @@ import (
"os"
"sort"
"strings"
"time"
)

type SrcDstRange struct {
SrcOffset uint64
DstOffset uint64
Length uint64
}

// given a bitmap and a set of existing entries,
// create only relevant entries
// return sorted slice of entries, and slice of all leaf entries
Expand Down Expand Up @@ -71,32 +79,48 @@ func RelevantEntries(bitmap *roaring64.Bitmap, maxzoom uint8, dir []EntryV3) ([]
return tiles, leaves
}

func ReencodeEntries(dir []EntryV3) ([]EntryV3, []Range) {
// Given a tile entries for a Source archive, sorted in TileID order,
// return:
// * Re-encoded tile-entries, with their offsets changed to contiguous (clustered) order in a new archive.
// * SrcDstRange: slice of offsets in the source archive, offset in the new archive, and length.
// - Each range is one or more tiles
// - the output must not have contiguous entries
// - It is sorted by new offsets, but not necessarily by source offsets
//
// * The total size of the tile section in the new archive
// * The # of addressed tiles (sum over RunLength)
// * # the number of unique offsets ("tile contents")
// - this might not be the last SrcDstRange new_offset + length, it's the highest offset (can be in the middle)
func ReencodeEntries(dir []EntryV3) ([]EntryV3, []SrcDstRange, uint64, uint64, uint64) {
reencoded := make([]EntryV3, 0, len(dir))
seen_offsets := make(map[uint64]uint64)
ranges := make([]Range, 0)
offset := uint64(0)
ranges := make([]SrcDstRange, 0)
addressed_tiles := uint64(0)

dst_offset := uint64(0)
for _, entry := range dir {
if val, ok := seen_offsets[entry.Offset]; ok {
reencoded = append(reencoded, EntryV3{entry.TileId, val, entry.Length, entry.RunLength})
} else {
if len(ranges) > 0 {
last_range := ranges[len(ranges)-1]
if last_range.Offset+last_range.Length == entry.Offset {
if last_range.SrcOffset+last_range.Length == entry.Offset {
ranges[len(ranges)-1].Length += uint64(entry.Length)
} else {
ranges = append(ranges, Range{entry.Offset, uint64(entry.Length)})
ranges = append(ranges, SrcDstRange{entry.Offset, dst_offset, uint64(entry.Length)})
}
} else {
ranges = append(ranges, Range{entry.Offset, uint64(entry.Length)})
ranges = append(ranges, SrcDstRange{entry.Offset, dst_offset, uint64(entry.Length)})
}

reencoded = append(reencoded, EntryV3{entry.TileId, offset, entry.Length, entry.RunLength})
seen_offsets[entry.Offset] = offset
offset += uint64(entry.Length)
reencoded = append(reencoded, EntryV3{entry.TileId, dst_offset, entry.Length, entry.RunLength})
seen_offsets[entry.Offset] = dst_offset
dst_offset += uint64(entry.Length)
}

addressed_tiles += uint64(entry.RunLength)
}
return reencoded, ranges
return reencoded, ranges, dst_offset, addressed_tiles, uint64(len(seen_offsets))
}

// "want the next N bytes, then discard N bytes"
Expand All @@ -106,26 +130,34 @@ type CopyDiscard struct {
}

type OverfetchRange struct {
Rng Range
Rng SrcDstRange
CopyDiscards []CopyDiscard
}

// A single request, where only some of the bytes
// in the requested range we want
type OverfetchListItem struct {
Rng Range
Rng SrcDstRange
CopyDiscards []CopyDiscard
BytesToNext uint64 // the "priority"
prev *OverfetchListItem
next *OverfetchListItem
index int
}

// overfetch = 0.2 means we can request an extra 20%
// overfetch = 1.00 means we can double our request size
// to minimize the # of HTTP requests
// the input ranges must be non-contiguous but might not be sorted
func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {
// MergeRanges takes a slice of SrcDstRanges, that:
// * is non-contiguous, and is sorted by NewOffset
// * an Overfetch parameter
// - overfetch = 0.2 means we can request an extra 20%
// - overfetch = 1.00 means we can double our total transfer size
//
// Return a slice of OverfetchRanges
//
// Each OverfetchRange is one or more input ranges
// input ranges are merged in order of smallest byte distance to next range
// until the overfetch budget is consumed.
// The slice is sorted by Length
func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
total_size := 0

list := make([]*OverfetchListItem, len(ranges))
Expand All @@ -136,7 +168,7 @@ func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {
if i == len(ranges)-1 {
bytes_to_next = math.MaxUint64
} else {
bytes_to_next = ranges[i+1].Offset - (rng.Offset + rng.Length)
bytes_to_next = ranges[i+1].SrcOffset - (rng.SrcOffset + rng.Length)
if bytes_to_next < 0 {
bytes_to_next = math.MaxUint64
}
Expand All @@ -146,7 +178,6 @@ func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {
Rng: rng,
BytesToNext: bytes_to_next,
CopyDiscards: []CopyDiscard{{uint64(rng.Length), 0}},
index: i,
}
total_size += int(rng.Length)
}
Expand Down Expand Up @@ -177,7 +208,7 @@ func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {

// merge this item into item.next
new_length := item.Rng.Length + item.BytesToNext + item.next.Rng.Length
item.next.Rng = Range{item.Rng.Offset, new_length}
item.next.Rng = SrcDstRange{item.Rng.SrcOffset, item.Rng.DstOffset, new_length}
item.next.prev = item.prev
if item.prev != nil {
item.prev.next = item.next
Expand All @@ -194,7 +225,7 @@ func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {
result := make([]OverfetchRange, len(shortest))

sort.Slice(shortest, func(i, j int) bool {
return shortest[i].index < shortest[j].index
return shortest[i].Rng.DstOffset < shortest[j].Rng.DstOffset
})

for i, x := range shortest {
Expand All @@ -220,9 +251,8 @@ func MergeRanges(ranges []Range, overfetch float32) []OverfetchRange {
// 9. get and write the metadata.
// 10. write the leaf directories (if any)
// 11. Get all tiles, and write directly to the output.
// (make this download multithreaded)

func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, overfetch float32) error {
func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, overfetch float32, dry_run bool) error {
// 1. fetch the header

if bucketURL == "" {
Expand All @@ -233,6 +263,9 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}
}

fmt.Println("WARNING: extract is an experimental feature and results may not be suitable for production use.")
start := time.Now()

ctx := context.Background()
bucket, err := blob.OpenBucket(ctx, bucketURL)
if err != nil {
Expand All @@ -254,9 +287,12 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
header, err := deserialize_header(b[0:HEADERV3_LEN_BYTES])

if !header.Clustered {
return fmt.Errorf("Error: archive must be clustered for extracts.")
return fmt.Errorf("Error: source archive must be clustered for extracts.")
}

source_metadata_offset := header.MetadataOffset
source_tile_data_offset := header.TileDataOffset

if header.MaxZoom < maxzoom || maxzoom == 0 {
maxzoom = header.MaxZoom
}
Expand Down Expand Up @@ -301,17 +337,17 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

// 4. get all relevant leaf entries

leaf_ranges := make([]Range, 0)
leaf_ranges := make([]SrcDstRange, 0)
for _, leaf := range leaves {
leaf_ranges = append(leaf_ranges, Range{header.LeafDirectoryOffset + leaf.Offset, uint64(leaf.Length)})
leaf_ranges = append(leaf_ranges, SrcDstRange{header.LeafDirectoryOffset + leaf.Offset, 0, uint64(leaf.Length)})
}

overfetch_leaves := MergeRanges(leaf_ranges, overfetch)
fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), len(overfetch_leaves))

for _, or := range overfetch_leaves {

slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.Offset), int64(or.Rng.Length), nil)
slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -347,9 +383,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

// 6. create the new header and chunk list
// we now need to re-encode this entry list using cumulative offsets
reencoded, tile_parts := ReencodeEntries(tile_entries)

fmt.Println("Reencoding done, parts: ", len(tile_parts))
reencoded, tile_parts, tiledata_length, addressed_tiles, tile_contents := ReencodeEntries(tile_entries)

overfetch_ranges := MergeRanges(tile_parts, overfetch)
fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges))
Expand All @@ -360,16 +394,15 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
// 7. write the modified header
header.RootOffset = HEADERV3_LEN_BYTES
header.RootLength = uint64(len(new_root_bytes))
old_metadata_offset := header.MetadataOffset
header.MetadataOffset = header.RootOffset + header.RootLength
header.LeafDirectoryOffset = header.MetadataOffset + header.MetadataLength
header.LeafDirectoryLength = uint64(len(new_leaves_bytes))
old_tile_data_offset := header.TileDataOffset
header.TileDataOffset = header.LeafDirectoryOffset + header.LeafDirectoryLength
last_part := tile_parts[len(tile_parts)-1]
header.TileDataLength = last_part.Offset + uint64(last_part.Length)

//TODO set statistics
header.TileDataLength = tiledata_length
header.AddressedTilesCount = addressed_tiles
header.TileEntriesCount = uint64(len(tile_entries))
header.TileContentsCount = tile_contents

header.MinLonE7 = int32(bound.Left() * 10000000)
header.MinLatE7 = int32(bound.Bottom() * 10000000)
Expand All @@ -381,67 +414,86 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

header_bytes := serialize_header(header)

outfile, err := os.Create(output)
defer outfile.Close()
_, err = outfile.Write(header_bytes)
if err != nil {
return err
}

// 8. write the root directory
_, err = outfile.Write(new_root_bytes)
if err != nil {
return err
total_bytes := uint64(0)
for _, x := range overfetch_ranges {
total_bytes += x.Rng.Length
}

// 9. get and write the metadata
metadata_reader, err := bucket.NewRangeReader(ctx, file, int64(old_metadata_offset), int64(header.MetadataLength), nil)
if err != nil {
return err
}
metadata_bytes, err := io.ReadAll(metadata_reader)
defer metadata_reader.Close()
if err != nil {
return err
}
if !dry_run {

outfile.Write(metadata_bytes)
outfile, err := os.Create(output)
defer outfile.Close()
_, err = outfile.Write(header_bytes)
if err != nil {
return err
}

// 10. write the leaf directories
_, err = outfile.Write(new_leaves_bytes)
if err != nil {
return err
}
// 8. write the root directory
_, err = outfile.Write(new_root_bytes)
if err != nil {
return err
}

total_bytes := uint64(0)
for _, x := range tile_parts {
total_bytes += x.Length
}
// 9. get and write the metadata
metadata_reader, err := bucket.NewRangeReader(ctx, file, int64(source_metadata_offset), int64(header.MetadataLength), nil)
if err != nil {
return err
}
metadata_bytes, err := io.ReadAll(metadata_reader)
defer metadata_reader.Close()
if err != nil {
return err
}

bar := progressbar.DefaultBytes(
int64(total_bytes),
"downloading " + output,
)
for _, or := range overfetch_ranges {
outfile.Write(metadata_bytes)

tile_r, err := bucket.NewRangeReader(ctx, file, int64(old_tile_data_offset+or.Rng.Offset), int64(or.Rng.Length), nil)
// 10. write the leaf directories
_, err = outfile.Write(new_leaves_bytes)
if err != nil {
return err
}

for _, cd := range or.CopyDiscards {
_, err := io.CopyN(io.MultiWriter(outfile, bar), tile_r, int64(cd.Wanted))
bar := progressbar.DefaultBytes(
int64(total_bytes),
"fetching chunks",
)

for _, or := range overfetch_ranges {

tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil)
if err != nil {
return err
}

_, err = io.CopyN(io.MultiWriter(io.Discard,bar), tile_r, int64(cd.Discard))
if err != nil {
return err
for _, cd := range or.CopyDiscards {
_, err := io.CopyN(io.MultiWriter(outfile, bar), tile_r, int64(cd.Wanted))
if err != nil {
return err
}

_, err = io.CopyN(io.MultiWriter(io.Discard, bar), tile_r, int64(cd.Discard))
if err != nil {
return err
}
}
tile_r.Close()
}
tile_r.Close()
}

total_actual_bytes := uint64(0)
for _, x := range tile_parts {
total_actual_bytes += x.Length
}

fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start))
total_requests := 2 // header + root
total_requests += len(overfetch_leaves) // leaves
total_requests += 1 // metadata
total_requests += len(overfetch_ranges)
fmt.Printf("Extract required %d total requests.\n", total_requests)
fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes))
fmt.Println("Verify your extract is usable at https://protomaps.github.io/PMTiles/")
fmt.Println("Feedback wanted! report your success or failure to https://github.com/protomaps/go-pmtiles/issues")

return nil
}
Loading

0 comments on commit 51f32db

Please sign in to comment.