Skip to content

Commit

Permalink
wip on multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Sep 9, 2023
1 parent 51f32db commit f5e72f0
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/schollz/progressbar/v3 v3.11.0
github.com/stretchr/testify v1.8.0
gocloud.dev v0.27.0
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
zombiezen.com/go/sqlite v0.10.1
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var cli struct {
Bucket string `help:"Remote bucket of input archive."`
Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"`
Maxzoom uint8 `help:"Maximum zoom level, inclusive."`
DownloadThreads int `default:1 help:"Number of download threads."`
DryRun bool `help:"Calculate tiles to extract, but don't download them."`
Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"`
} `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region."`
Expand Down Expand Up @@ -120,7 +121,7 @@ func main() {
logger.Printf("Serving %s %s on port %d with Access-Control-Allow-Origin: %s\n", cli.Serve.Bucket, cli.Serve.Path, cli.Serve.Port, cli.Serve.Cors)
logger.Fatal(http.ListenAndServe(":"+strconv.Itoa(cli.Serve.Port), nil))
case "extract <input> <output>":
err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.Overfetch, cli.Extract.DryRun)
err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.DownloadThreads, cli.Extract.Overfetch, cli.Extract.DryRun)
if err != nil {
logger.Fatalf("Failed to extract, %v", err)
}
Expand Down
70 changes: 61 additions & 9 deletions pmtiles/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/paulmach/orb/geojson"
"github.com/schollz/progressbar/v3"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"log"
"math"
"os"
"sort"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -225,7 +227,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
result := make([]OverfetchRange, len(shortest))

sort.Slice(shortest, func(i, j int) bool {
return shortest[i].Rng.DstOffset < shortest[j].Rng.DstOffset
return shortest[i].Rng.Length > shortest[j].Rng.Length
})

for i, x := range shortest {
Expand All @@ -252,7 +254,7 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange {
// 10. write the leaf directories (if any)
// 11. Get all tiles, and write directly to the output.

func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, overfetch float32, dry_run bool) error {
func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error {
// 1. fetch the header

if bucketURL == "" {
Expand Down Expand Up @@ -388,6 +390,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
overfetch_ranges := MergeRanges(tile_parts, overfetch)
fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges))

// TODO: takes up too much RAM
// construct the directories
new_root_bytes, new_leaves_bytes, _ := optimize_directories(reencoded, 16384-HEADERV3_LEN_BYTES)

Expand Down Expand Up @@ -419,10 +422,18 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
total_bytes += x.Rng.Length
}

total_actual_bytes := uint64(0)
for _, x := range tile_parts {
total_actual_bytes += x.Length
}

if !dry_run {

outfile, err := os.Create(output)
defer outfile.Close()

outfile.Truncate(127 + int64(len(new_root_bytes)) + int64(header.MetadataLength) + int64(len(new_leaves_bytes)) + int64(total_actual_bytes))

_, err = outfile.Write(header_bytes)
if err != nil {
return err
Expand Down Expand Up @@ -458,31 +469,72 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
"fetching chunks",
)

for _, or := range overfetch_ranges {
var mu sync.Mutex

// go func() {
// for r := range progress {
// bar.Add(int(r))
// }
// }()

downloadPart := func(or OverfetchRange) error {
tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil)
if err != nil {
return err
}
offset_writer := io.NewOffsetWriter(outfile, int64(header.TileDataOffset)+int64(or.Rng.DstOffset))

for _, cd := range or.CopyDiscards {
_, err := io.CopyN(io.MultiWriter(outfile, bar), tile_r, int64(cd.Wanted))

_, err := io.CopyN(io.MultiWriter(offset_writer, bar), tile_r, int64(cd.Wanted))
if err != nil {
return err
}

_, err = io.CopyN(io.MultiWriter(io.Discard, bar), tile_r, int64(cd.Discard))
_, err = io.CopyN(bar, tile_r, int64(cd.Discard))
if err != nil {
return err
}
}
tile_r.Close()
return nil
}
}

total_actual_bytes := uint64(0)
for _, x := range tile_parts {
total_actual_bytes += x.Length
errs, _ := errgroup.WithContext(ctx)

for i := 0; i < download_threads; i++ {
work_back := (i == 0 && download_threads > 1)
errs.Go(func() error {
done := false
var or OverfetchRange
for {
mu.Lock()
if len(overfetch_ranges) == 0 {
done = true
} else {
if work_back {
or = overfetch_ranges[len(overfetch_ranges)-1]
overfetch_ranges = overfetch_ranges[:len(overfetch_ranges)-1]
} else {
or = overfetch_ranges[0]
overfetch_ranges = overfetch_ranges[1:]
}
}
mu.Unlock()
if done {
return nil
}
downloadPart(or)
}

return nil
})
}

err = errs.Wait()
if err != nil {
return err
}
}

fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start))
Expand Down

0 comments on commit f5e72f0

Please sign in to comment.