From 629d627383d03551b895e97c3cd9749984792b21 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Sat, 9 Sep 2023 11:16:50 +0800 Subject: [PATCH] change to linked list --- pmtiles/extract.go | 76 +++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/pmtiles/extract.go b/pmtiles/extract.go index a85904c..1e13db7 100644 --- a/pmtiles/extract.go +++ b/pmtiles/extract.go @@ -3,6 +3,7 @@ package pmtiles import ( "bytes" "context" + "container/list" "fmt" "github.com/RoaringBitmap/roaring/roaring64" "github.com/dustin/go-humanize" @@ -159,10 +160,10 @@ type OverfetchListItem struct { // input ranges are merged in order of smallest byte distance to next range // until the overfetch budget is consumed. // The slice is sorted by Length -func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { +func MergeRanges(ranges []SrcDstRange, overfetch float32) (*list.List, uint64) { total_size := 0 - list := make([]*OverfetchListItem, len(ranges)) + shortest := make([]*OverfetchListItem, len(ranges)) // create the heap items for i, rng := range ranges { @@ -176,7 +177,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { } } - list[i] = &OverfetchListItem{ + shortest[i] = &OverfetchListItem{ Rng: rng, BytesToNext: bytes_to_next, CopyDiscards: []CopyDiscard{{uint64(rng.Length), 0}}, @@ -185,21 +186,18 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { } // make the list doubly-linked - for i, item := range list { + for i, item := range shortest { if i > 0 { - item.prev = list[i-1] + item.prev = shortest[i-1] } - if i < len(list)-1 { - item.next = list[i+1] + if i < len(shortest)-1 { + item.next = shortest[i+1] } } overfetch_budget := int(float32(total_size) * overfetch) - // create a 2nd slice, sorted by ascending distance to next range - shortest := make([]*OverfetchListItem, len(list)) - copy(shortest, list) - + // sort by ascending distance to next range sort.Slice(shortest, func(i, j int) bool { return shortest[i].BytesToNext < shortest[j].BytesToNext }) @@ -223,21 +221,21 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { overfetch_budget -= int(item.BytesToNext) } - // copy out the result structs - result := make([]OverfetchRange, len(shortest)) - sort.Slice(shortest, func(i, j int) bool { return shortest[i].Rng.Length > shortest[j].Rng.Length }) - for i, x := range shortest { - result[i] = OverfetchRange{ + total_bytes := uint64(0) + result := list.New() + for _, x := range shortest { + result.PushBack(OverfetchRange{ Rng: x.Rng, CopyDiscards: x.CopyDiscards, - } + }) + total_bytes += x.Rng.Length } - return result + return result, total_bytes } // 1. Get the root directory (check that it is clustered) @@ -344,10 +342,15 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r leaf_ranges = append(leaf_ranges, SrcDstRange{header.LeafDirectoryOffset + leaf.Offset, 0, uint64(leaf.Length)}) } - overfetch_leaves := MergeRanges(leaf_ranges, overfetch) - fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), len(overfetch_leaves)) + overfetch_leaves, _ := MergeRanges(leaf_ranges, overfetch) + num_overfetch_leaves := overfetch_leaves.Len() + fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), overfetch_leaves.Len()) - for _, or := range overfetch_leaves { + for { + if overfetch_leaves.Len() == 0 { + break + } + or := overfetch_leaves.Remove(overfetch_leaves.Front()).(OverfetchRange) slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil) if err != nil { @@ -387,8 +390,10 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r // we now need to re-encode this entry list using cumulative offsets reencoded, tile_parts, tiledata_length, addressed_tiles, tile_contents := ReencodeEntries(tile_entries) - overfetch_ranges := MergeRanges(tile_parts, overfetch) - fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges)) + overfetch_ranges, total_bytes := MergeRanges(tile_parts, overfetch) + + num_overfetch_ranges := overfetch_ranges.Len() + fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), overfetch_ranges.Len()) // TODO: takes up too much RAM // construct the directories @@ -417,11 +422,6 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r header_bytes := serialize_header(header) - total_bytes := uint64(0) - for _, x := range overfetch_ranges { - total_bytes += x.Rng.Length - } - total_actual_bytes := uint64(0) for _, x := range tile_parts { total_actual_bytes += x.Length @@ -471,12 +471,6 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r var mu sync.Mutex - // go func() { - // for r := range progress { - // bar.Add(int(r)) - // } - // }() - downloadPart := func(or OverfetchRange) error { tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil) if err != nil { @@ -509,15 +503,13 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r var or OverfetchRange for { mu.Lock() - if len(overfetch_ranges) == 0 { + if overfetch_ranges.Len() == 0 { done = true } else { if work_back { - or = overfetch_ranges[len(overfetch_ranges)-1] - overfetch_ranges = overfetch_ranges[:len(overfetch_ranges)-1] + or = overfetch_ranges.Remove(overfetch_ranges.Back()).(OverfetchRange) } else { - or = overfetch_ranges[0] - overfetch_ranges = overfetch_ranges[1:] + or = overfetch_ranges.Remove(overfetch_ranges.Front()).(OverfetchRange) } } mu.Unlock() @@ -537,11 +529,11 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r } } - fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start)) + fmt.Printf("Completed in %v with %v download threads.\n", time.Since(start), download_threads) total_requests := 2 // header + root - total_requests += len(overfetch_leaves) // leaves + total_requests += num_overfetch_leaves // leaves total_requests += 1 // metadata - total_requests += len(overfetch_ranges) + total_requests += num_overfetch_ranges fmt.Printf("Extract required %d total requests.\n", total_requests) fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes)) fmt.Println("Verify your extract is usable at https://protomaps.github.io/PMTiles/")