diff --git a/main.go b/main.go index 1305257..2505ce7 100644 --- a/main.go +++ b/main.go @@ -69,14 +69,15 @@ var cli struct { Makesync struct { Input string `arg:"" type:"existingfile"` - BlockSizeKb int `default:"1000" help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` - HashFunction string `default:"fnv1a" help:"The hash function."` + BlockSizeKb int `default:"20" help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` + HashFunction string `default:"xxh64" help:"The hash function."` Checksum string `help:"Store a checksum in the syncfile."` } `cmd:"" hidden:""` Sync struct { Existing string `arg:"" type:"existingfile"` - Syncfile string `arg:"" type:"existingfile"` + New string `arg:"" help:"Local or remote archive, with .sync sidecar file."` + DryRun bool `help:"Calculate new parts to download, but don't download them."` } `cmd:"" hidden:""` Serve struct { @@ -210,8 +211,8 @@ func main() { if err != nil { logger.Fatalf("Failed to makesync archive, %v", err) } - case "sync ": - err := pmtiles.Sync(logger, cli.Sync.Existing, cli.Sync.Syncfile) + case "sync ": + err := pmtiles.Sync(logger, cli.Sync.Existing, cli.Sync.New, cli.Sync.DryRun) if err != nil { logger.Fatalf("Failed to sync archive, %v", err) } diff --git a/pmtiles/makesync.go b/pmtiles/makesync.go index d8f44c4..f257ccf 100644 --- a/pmtiles/makesync.go +++ b/pmtiles/makesync.go @@ -5,42 +5,86 @@ import ( "bytes" "context" "crypto/md5" + "encoding/binary" + "encoding/json" "fmt" + "github.com/cespare/xxhash/v2" "github.com/dustin/go-humanize" "github.com/schollz/progressbar/v3" "golang.org/x/sync/errgroup" - "hash/fnv" "io" "log" + "mime" + "mime/multipart" + "net/http" "os" "runtime" "sort" - "strconv" "strings" "sync" "time" ) -type block struct { - Index uint64 // starts at 0 - Start uint64 // the start tileID - Offset uint64 // the offset in the file, in bytes - Length uint64 // the length, in bytes +type syncBlock struct { + Start uint64 // the start tileID of the block + Offset uint64 // the offset in the source archive + Length uint64 // the length of the block + Hash uint64 // the hash of the block } -type result struct { - Block block - Hash uint64 +type syncMetadata struct { + Version string + BlockSize uint64 + HashType string + HashSize uint8 + ChecksumType string + Checksum string + NumBlocks int } -type syncline struct { - Offset uint64 - Length uint64 - Hash uint64 +type syncTask struct { + NewBlock syncBlock + OldOffset uint64 } -// Makesync generates a syncfile for an archive on disk. (experimental) -func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, checksum string) error { +func serializeSyncBlocks(output io.Writer, blocks []syncBlock) { + tmp := make([]byte, binary.MaxVarintLen64) + var n int + + lastStartID := uint64(0) + for _, block := range blocks { + n = binary.PutUvarint(tmp, uint64(block.Start-lastStartID)) + output.Write(tmp[:n]) + n = binary.PutUvarint(tmp, uint64(block.Length)) + output.Write(tmp[:n]) + binary.LittleEndian.PutUint64(tmp, block.Hash) + output.Write(tmp[0:8]) + + lastStartID = block.Start + } +} + +func deserializeSyncBlocks(numBlocks int, reader *bufio.Reader) []syncBlock { + blocks := make([]syncBlock, 0) + + lastStartID := uint64(0) + offset := uint64(0) + buf := make([]byte, 8) + + for i := 0; i < numBlocks; i++ { + start, _ := binary.ReadUvarint(reader) + length, _ := binary.ReadUvarint(reader) + _, _ = io.ReadFull(reader, buf) + blocks = append(blocks, syncBlock{Start: lastStartID + start, Offset: offset, Length: length, Hash: binary.LittleEndian.Uint64(buf)}) + + lastStartID = lastStartID + start + offset = offset + length + } + + return blocks +} + +func Makesync(logger *log.Logger, cliVersion string, file string, blockSizeKb int, checksum string) error { ctx := context.Background() start := time.Now() @@ -103,7 +147,6 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch panic(err) } defer output.Close() - output.Write([]byte(fmt.Sprintf("version=%s\n", cliVersion))) if checksum == "md5" { localfile, err := os.Open(file) @@ -118,32 +161,29 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } md5checksum := md5hasher.Sum(nil) fmt.Printf("Completed md5 in %v.\n", time.Since(start)) - output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + fmt.Printf("md5=%x\n", md5checksum) } - output.Write([]byte("hash=fnv1a\n")) - output.Write([]byte(fmt.Sprintf("blocksize=%d\n", blockSizeBytes))) - bar := progressbar.Default( int64(header.TileEntriesCount), "writing syncfile", ) - var current block + var current syncBlock - tasks := make(chan block, 1000) + tasks := make(chan syncBlock, 1000) var wg sync.WaitGroup var mu sync.Mutex - synclines := make(map[uint64]syncline) + blocks := make([]syncBlock, 0) errs, _ := errgroup.WithContext(ctx) - // workers + for i := 0; i < runtime.GOMAXPROCS(0); i++ { errs.Go(func() error { wg.Add(1) - hasher := fnv.New64a() + hasher := xxhash.New() for block := range tasks { r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length)) if err != nil { @@ -155,9 +195,10 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } r.Close() - sum64 := hasher.Sum64() + block.Hash = hasher.Sum64() + mu.Lock() - synclines[block.Start] = syncline{block.Offset, block.Length, sum64} + blocks = append(blocks, block) mu.Unlock() hasher.Reset() @@ -167,13 +208,9 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch }) } - currentIndex := uint64(0) - - blocks := 0 CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { bar.Add(1) if current.Length == 0 { - current.Index = currentIndex current.Start = e.TileID current.Offset = e.Offset current.Length = uint64(e.Length) @@ -182,12 +219,10 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } else if e.Offset > current.Offset+uint64(current.Length) { panic("Invalid clustering of archive detected - check with verify") } else { + // check this logic if current.Length+uint64(e.Length) > blockSizeBytes { - tasks <- block{current.Index, current.Start, current.Offset, current.Length} - blocks++ + tasks <- syncBlock{current.Start, current.Offset, current.Length, 0} - currentIndex++ - current.Index = currentIndex current.Start = e.TileID current.Offset = e.Offset current.Length = uint64(e.Length) @@ -197,61 +232,73 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } }) - tasks <- block{current.Index, current.Start, current.Offset, current.Length} - blocks++ + tasks <- syncBlock{current.Start, current.Offset, current.Length, 0} close(tasks) wg.Wait() - var keys []uint64 - for k := range synclines { - keys = append(keys, k) - } - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + sort.Slice(blocks, func(i, j int) bool { return blocks[i].Start < blocks[j].Start }) - for _, k := range keys { - syncline := synclines[k] - output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash))) - } + metadataBytes, err := json.Marshal(syncMetadata{ + Version: cliVersion, + HashSize: 8, + BlockSize: blockSizeBytes, + HashType: "xxh64", + NumBlocks: len(blocks), + }) + + output.Write(metadataBytes) + output.Write([]byte{'\n'}) + + serializeSyncBlocks(output, blocks) - fmt.Printf("Created syncfile with %d blocks.\n", blocks) + fmt.Printf("Created syncfile with %d blocks.\n", len(blocks)) fmt.Printf("Completed makesync in %v.\n", time.Since(start)) return nil } -// Sync calculates the diff between an archive on disk and a syncfile. (experimental) -func Sync(_ *log.Logger, file string, syncfile string) error { +func Sync(logger *log.Logger, oldVersion string, newVersion string, dryRun bool) error { start := time.Now() - totalRemoteBytes := uint64(0) - byStartID := make(map[uint64]syncline) + client := &http.Client{} - sync, err := os.Open(syncfile) - if err != nil { - return fmt.Errorf("error opening syncfile: %v", err) - } - defer sync.Close() - scanner := bufio.NewScanner(sync) - for scanner.Scan() { - line := scanner.Text() - parts := strings.Fields(line) - if len(parts) != 4 { - continue + var bufferedReader *bufio.Reader + if strings.HasPrefix(newVersion, "http") { + req, err := http.NewRequest("GET", newVersion+".sync", nil) + if err != nil { + return err } - - startID, _ := strconv.ParseUint(parts[0], 10, 64) - offset, _ := strconv.ParseUint(parts[1], 10, 64) - length, _ := strconv.ParseUint(parts[2], 10, 64) - totalRemoteBytes += length - hash, _ := strconv.ParseUint(parts[3], 16, 64) - byStartID[startID] = syncline{offset, length, hash} + resp, err := client.Do(req) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf(".sync file not found") + } + if err != nil { + return err + } + bar := progressbar.DefaultBytes( + resp.ContentLength, + "downloading syncfile", + ) + bufferedReader = bufio.NewReader(io.TeeReader(resp.Body, bar)) + } else { + newFile, err := os.Open(newVersion + ".sync") + if err != nil { + return fmt.Errorf("error opening syncfile: %v", err) + } + defer newFile.Close() + bufferedReader = bufio.NewReader(newFile) } - // open the existing archive + var metadata syncMetadata + jsonBytes, _ := bufferedReader.ReadSlice('\n') + + json.Unmarshal(jsonBytes, &metadata) + + blocks := deserializeSyncBlocks(metadata.NumBlocks, bufferedReader) ctx := context.Background() - bucketURL, key, err := NormalizeBucketKey("", "", file) + bucketURL, key, err := NormalizeBucketKey("", "", oldVersion) if err != nil { return err @@ -281,20 +328,6 @@ func Sync(_ *log.Logger, file string, syncfile string) error { return fmt.Errorf("archive must be clustered for makesync") } - GetHash := func(offset uint64, length uint64) uint64 { - hasher := fnv.New64a() - r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) - if err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(hasher, r); err != nil { - log.Fatal(err) - } - r.Close() - return hasher.Sum64() - } - var CollectEntries func(uint64, uint64, func(EntryV3)) CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) { @@ -319,35 +352,150 @@ func Sync(_ *log.Logger, file string, syncfile string) error { } bar := progressbar.Default( - int64(header.TileEntriesCount), + int64(len(blocks)), "calculating diff", ) - totalBlocks := len(byStartID) - hits := 0 + wanted := make([]syncBlock, 0) + have := make([]syncBlock, 0) + + idx := 0 + + tasks := make(chan syncTask, 1000) + var wg sync.WaitGroup + var mu sync.Mutex + + errs, _ := errgroup.WithContext(ctx) + + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + errs.Go(func() error { + wg.Add(1) + for task := range tasks { + hasher := xxhash.New() + r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+task.OldOffset), int64(task.NewBlock.Length)) + if err != nil { + log.Fatal(err) + } + + if _, err := io.Copy(hasher, r); err != nil { + log.Fatal(err) + } + r.Close() + + mu.Lock() + if task.NewBlock.Hash == hasher.Sum64() { + have = append(have, task.NewBlock) + } else { + wanted = append(wanted, task.NewBlock) + } + mu.Unlock() + } + wg.Done() + return nil + }) + } CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { - bar.Add(1) + if idx < len(blocks) { + for e.TileID > blocks[idx].Start { + mu.Lock() + wanted = append(wanted, blocks[idx]) + mu.Unlock() + bar.Add(1) + idx = idx + 1 + } - potentialMatch, ok := byStartID[e.TileID] - if ok { - hashResult := GetHash(e.Offset, potentialMatch.Length) - if hashResult == potentialMatch.Hash { - hits++ - delete(byStartID, e.TileID) + if e.TileID == blocks[idx].Start { + tasks <- syncTask{NewBlock: blocks[idx], OldOffset: e.Offset} + bar.Add(1) + idx = idx + 1 } } }) + // we may not have consumed until the end + for idx < len(blocks) { + mu.Lock() + wanted = append(wanted, blocks[idx]) + mu.Unlock() + bar.Add(1) + idx = idx + 1 + } + + close(tasks) + wg.Wait() + + sort.Slice(wanted, func(i, j int) bool { return wanted[i].Start < wanted[j].Start }) + toTransfer := uint64(0) - for _, v := range byStartID { + totalRemoteBytes := uint64(0) + for _, v := range wanted { toTransfer += v.Length + totalRemoteBytes += v.Length + } + + for _, v := range have { + totalRemoteBytes += v.Length } - blocksMatched := float64(hits) / float64(totalBlocks) * 100 + blocksMatched := float64(len(have)) / float64(len(blocks)) * 100 pct := float64(toTransfer) / float64(totalRemoteBytes) * 100 - fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, totalBlocks, blocksMatched, humanize.Bytes(toTransfer), humanize.Bytes(totalRemoteBytes), pct) + fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", len(have), len(blocks), blocksMatched, humanize.Bytes(toTransfer), humanize.Bytes(totalRemoteBytes), pct) + + ranges := make([]srcDstRange, 0) + for _, v := range wanted { + l := len(ranges) + if l > 0 && (ranges[l-1].SrcOffset+ranges[l-1].Length) == v.Offset { + ranges[l-1].Length = ranges[l-1].Length + v.Length + } else { + ranges = append(ranges, srcDstRange{SrcOffset: v.Offset, DstOffset: v.Offset, Length: v.Length}) + } + } + fmt.Printf("need %d chunks.\n", len(ranges)) + + if !dryRun { + req, err := http.NewRequest("GET", newVersion, nil) + + var rangeParts []string + + for _, r := range ranges { + rangeParts = append(rangeParts, fmt.Sprintf("%d-%d", r.SrcOffset, r.SrcOffset+r.Length+1)) + } + + headerVal := strings.Join(rangeParts, ",") + req.Header.Set("Range", fmt.Sprintf("bytes=%s", headerVal)) + resp, err := client.Do(req) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("non-OK multirange request") + } + + _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return err + } + + mr := multipart.NewReader(resp.Body, params["boundary"]) + + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + return err + } + + _ = part.Header.Get("Content-Range") + + partBytes, err := io.ReadAll(part) + if err != nil { + return err + } + + _ = partBytes + } + } fmt.Printf("Completed sync in %v.\n", time.Since(start)) return nil