Skip to content

Commit

Permalink
change to linked list
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Sep 9, 2023
1 parent f5e72f0 commit 629d627
Showing 1 changed file with 34 additions and 42 deletions.
76 changes: 34 additions & 42 deletions pmtiles/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pmtiles
import (
"bytes"
"context"
"container/list"
"fmt"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -159,10 +160,10 @@ type OverfetchListItem struct {
// input ranges are merged in order of smallest byte distance to next range
// until the overfetch budget is consumed.
// The slice is sorted by Length
func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
func MergeRanges(ranges []SrcDstRange, overfetch float32) (*list.List, uint64) {
total_size := 0

list := make([]*OverfetchListItem, len(ranges))
shortest := make([]*OverfetchListItem, len(ranges))

// create the heap items
for i, rng := range ranges {
Expand All @@ -176,7 +177,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
}
}

list[i] = &OverfetchListItem{
shortest[i] = &OverfetchListItem{
Rng: rng,
BytesToNext: bytes_to_next,
CopyDiscards: []CopyDiscard{{uint64(rng.Length), 0}},
Expand All @@ -185,21 +186,18 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
}

// make the list doubly-linked
for i, item := range list {
for i, item := range shortest {
if i > 0 {
item.prev = list[i-1]
item.prev = shortest[i-1]
}
if i < len(list)-1 {
item.next = list[i+1]
if i < len(shortest)-1 {
item.next = shortest[i+1]
}
}

overfetch_budget := int(float32(total_size) * overfetch)

// create a 2nd slice, sorted by ascending distance to next range
shortest := make([]*OverfetchListItem, len(list))
copy(shortest, list)

// sort by ascending distance to next range
sort.Slice(shortest, func(i, j int) bool {
return shortest[i].BytesToNext < shortest[j].BytesToNext
})
Expand All @@ -223,21 +221,21 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
overfetch_budget -= int(item.BytesToNext)
}

// copy out the result structs
result := make([]OverfetchRange, len(shortest))

sort.Slice(shortest, func(i, j int) bool {
return shortest[i].Rng.Length > shortest[j].Rng.Length
})

for i, x := range shortest {
result[i] = OverfetchRange{
total_bytes := uint64(0)
result := list.New()
for _, x := range shortest {
result.PushBack(OverfetchRange{
Rng: x.Rng,
CopyDiscards: x.CopyDiscards,
}
})
total_bytes += x.Rng.Length
}

return result
return result, total_bytes
}

// 1. Get the root directory (check that it is clustered)
Expand Down Expand Up @@ -344,10 +342,15 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
leaf_ranges = append(leaf_ranges, SrcDstRange{header.LeafDirectoryOffset + leaf.Offset, 0, uint64(leaf.Length)})
}

overfetch_leaves := MergeRanges(leaf_ranges, overfetch)
fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), len(overfetch_leaves))
overfetch_leaves, _ := MergeRanges(leaf_ranges, overfetch)
num_overfetch_leaves := overfetch_leaves.Len()
fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), overfetch_leaves.Len())

for _, or := range overfetch_leaves {
for {
if overfetch_leaves.Len() == 0 {
break
}
or := overfetch_leaves.Remove(overfetch_leaves.Front()).(OverfetchRange)

slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil)
if err != nil {
Expand Down Expand Up @@ -387,8 +390,10 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
// we now need to re-encode this entry list using cumulative offsets
reencoded, tile_parts, tiledata_length, addressed_tiles, tile_contents := ReencodeEntries(tile_entries)

overfetch_ranges := MergeRanges(tile_parts, overfetch)
fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges))
overfetch_ranges, total_bytes := MergeRanges(tile_parts, overfetch)

num_overfetch_ranges := overfetch_ranges.Len()
fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), overfetch_ranges.Len())

// TODO: takes up too much RAM
// construct the directories
Expand Down Expand Up @@ -417,11 +422,6 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

header_bytes := serialize_header(header)

total_bytes := uint64(0)
for _, x := range overfetch_ranges {
total_bytes += x.Rng.Length
}

total_actual_bytes := uint64(0)
for _, x := range tile_parts {
total_actual_bytes += x.Length
Expand Down Expand Up @@ -471,12 +471,6 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

var mu sync.Mutex

// go func() {
// for r := range progress {
// bar.Add(int(r))
// }
// }()

downloadPart := func(or OverfetchRange) error {
tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil)
if err != nil {
Expand Down Expand Up @@ -509,15 +503,13 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
var or OverfetchRange
for {
mu.Lock()
if len(overfetch_ranges) == 0 {
if overfetch_ranges.Len() == 0 {
done = true
} else {
if work_back {
or = overfetch_ranges[len(overfetch_ranges)-1]
overfetch_ranges = overfetch_ranges[:len(overfetch_ranges)-1]
or = overfetch_ranges.Remove(overfetch_ranges.Back()).(OverfetchRange)
} else {
or = overfetch_ranges[0]
overfetch_ranges = overfetch_ranges[1:]
or = overfetch_ranges.Remove(overfetch_ranges.Front()).(OverfetchRange)
}
}
mu.Unlock()
Expand All @@ -537,11 +529,11 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}
}

fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start))
fmt.Printf("Completed in %v with %v download threads.\n", time.Since(start), download_threads)
total_requests := 2 // header + root
total_requests += len(overfetch_leaves) // leaves
total_requests += num_overfetch_leaves // leaves
total_requests += 1 // metadata
total_requests += len(overfetch_ranges)
total_requests += num_overfetch_ranges
fmt.Printf("Extract required %d total requests.\n", total_requests)
fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes))
fmt.Println("Verify your extract is usable at https://protomaps.github.io/PMTiles/")
Expand Down

0 comments on commit 629d627

Please sign in to comment.