From f5e72f034af46609f22592e66ebcefc902848229 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Sat, 9 Sep 2023 10:06:44 +0800 Subject: [PATCH] wip on multithreading --- go.mod | 1 + go.sum | 1 + main.go | 3 +- pmtiles/extract.go | 70 ++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 3886aff..a5d3c94 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/schollz/progressbar/v3 v3.11.0 github.com/stretchr/testify v1.8.0 gocloud.dev v0.27.0 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 zombiezen.com/go/sqlite v0.10.1 ) diff --git a/go.sum b/go.sum index f75bf3b..dd3daba 100644 --- a/go.sum +++ b/go.sum @@ -1658,6 +1658,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/main.go b/main.go index c6f4a22..db6d464 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ var cli struct { Bucket string `help:"Remote bucket of input archive."` Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"` Maxzoom uint8 `help:"Maximum zoom level, inclusive."` + DownloadThreads int `default:1 help:"Number of download threads."` DryRun bool `help:"Calculate tiles to extract, but don't download them."` Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"` } `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region."` @@ -120,7 +121,7 @@ func main() { logger.Printf("Serving %s %s on port %d with Access-Control-Allow-Origin: %s\n", cli.Serve.Bucket, cli.Serve.Path, cli.Serve.Port, cli.Serve.Cors) logger.Fatal(http.ListenAndServe(":"+strconv.Itoa(cli.Serve.Port), nil)) case "extract ": - err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.Overfetch, cli.Extract.DryRun) + err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.DownloadThreads, cli.Extract.Overfetch, cli.Extract.DryRun) if err != nil { logger.Fatalf("Failed to extract, %v", err) } diff --git a/pmtiles/extract.go b/pmtiles/extract.go index 745c66e..a85904c 100644 --- a/pmtiles/extract.go +++ b/pmtiles/extract.go @@ -10,6 +10,7 @@ import ( "github.com/paulmach/orb/geojson" "github.com/schollz/progressbar/v3" "gocloud.dev/blob" + "golang.org/x/sync/errgroup" "io" "io/ioutil" "log" @@ -17,6 +18,7 @@ import ( "os" "sort" "strings" + "sync" "time" ) @@ -225,7 +227,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { result := make([]OverfetchRange, len(shortest)) sort.Slice(shortest, func(i, j int) bool { - return shortest[i].Rng.DstOffset < shortest[j].Rng.DstOffset + return shortest[i].Rng.Length > shortest[j].Rng.Length }) for i, x := range shortest { @@ -252,7 +254,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { // 10. write the leaf directories (if any) // 11. Get all tiles, and write directly to the output. -func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, overfetch float32, dry_run bool) error { +func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error { // 1. fetch the header if bucketURL == "" { @@ -388,6 +390,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r overfetch_ranges := MergeRanges(tile_parts, overfetch) fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges)) + // TODO: takes up too much RAM // construct the directories new_root_bytes, new_leaves_bytes, _ := optimize_directories(reencoded, 16384-HEADERV3_LEN_BYTES) @@ -419,10 +422,18 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r total_bytes += x.Rng.Length } + total_actual_bytes := uint64(0) + for _, x := range tile_parts { + total_actual_bytes += x.Length + } + if !dry_run { outfile, err := os.Create(output) defer outfile.Close() + + outfile.Truncate(127 + int64(len(new_root_bytes)) + int64(header.MetadataLength) + int64(len(new_leaves_bytes)) + int64(total_actual_bytes)) + _, err = outfile.Write(header_bytes) if err != nil { return err @@ -458,31 +469,72 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r "fetching chunks", ) - for _, or := range overfetch_ranges { + var mu sync.Mutex + + // go func() { + // for r := range progress { + // bar.Add(int(r)) + // } + // }() + downloadPart := func(or OverfetchRange) error { tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil) if err != nil { return err } + offset_writer := io.NewOffsetWriter(outfile, int64(header.TileDataOffset)+int64(or.Rng.DstOffset)) for _, cd := range or.CopyDiscards { - _, err := io.CopyN(io.MultiWriter(outfile, bar), tile_r, int64(cd.Wanted)) + + _, err := io.CopyN(io.MultiWriter(offset_writer, bar), tile_r, int64(cd.Wanted)) if err != nil { return err } - _, err = io.CopyN(io.MultiWriter(io.Discard, bar), tile_r, int64(cd.Discard)) + _, err = io.CopyN(bar, tile_r, int64(cd.Discard)) if err != nil { return err } } tile_r.Close() + return nil } - } - total_actual_bytes := uint64(0) - for _, x := range tile_parts { - total_actual_bytes += x.Length + errs, _ := errgroup.WithContext(ctx) + + for i := 0; i < download_threads; i++ { + work_back := (i == 0 && download_threads > 1) + errs.Go(func() error { + done := false + var or OverfetchRange + for { + mu.Lock() + if len(overfetch_ranges) == 0 { + done = true + } else { + if work_back { + or = overfetch_ranges[len(overfetch_ranges)-1] + overfetch_ranges = overfetch_ranges[:len(overfetch_ranges)-1] + } else { + or = overfetch_ranges[0] + overfetch_ranges = overfetch_ranges[1:] + } + } + mu.Unlock() + if done { + return nil + } + downloadPart(or) + } + + return nil + }) + } + + err = errs.Wait() + if err != nil { + return err + } } fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start))