From 815e55700c5fb0dc0b1574deb4a796865e0169e6 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Tue, 16 Jan 2024 19:57:22 +0800 Subject: [PATCH] optimizations to sync/makesync --- main.go | 5 +- pmtiles/makesync.go | 273 +++++++++++++++++++++++++++++--------------- 2 files changed, 181 insertions(+), 97 deletions(-) diff --git a/main.go b/main.go index 1305257..19e6f3a 100644 --- a/main.go +++ b/main.go @@ -69,7 +69,7 @@ var cli struct { Makesync struct { Input string `arg:"" type:"existingfile"` - BlockSizeKb int `default:"1000" help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` + BlockSizeKb int `default:"20" help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` HashFunction string `default:"fnv1a" help:"The hash function."` Checksum string `help:"Store a checksum in the syncfile."` } `cmd:"" hidden:""` @@ -77,6 +77,7 @@ var cli struct { Sync struct { Existing string `arg:"" type:"existingfile"` Syncfile string `arg:"" type:"existingfile"` + Overfetch float32 `default:0.05 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"` } `cmd:"" hidden:""` Serve struct { @@ -211,7 +212,7 @@ func main() { logger.Fatalf("Failed to makesync archive, %v", err) } case "sync ": - err := pmtiles.Sync(logger, cli.Sync.Existing, cli.Sync.Syncfile) + err := pmtiles.Sync(logger, cli.Sync.Existing, cli.Sync.Syncfile, cli.Sync.Overfetch) if err != nil { logger.Fatalf("Failed to sync archive, %v", err) } diff --git a/pmtiles/makesync.go b/pmtiles/makesync.go index d8f44c4..2092fa2 100644 --- a/pmtiles/makesync.go +++ b/pmtiles/makesync.go @@ -5,6 +5,8 @@ import ( "bytes" "context" "crypto/md5" + "encoding/binary" + "encoding/json" "fmt" "github.com/dustin/go-humanize" "github.com/schollz/progressbar/v3" @@ -15,32 +17,72 @@ import ( "os" "runtime" "sort" - "strconv" - "strings" "sync" "time" ) -type block struct { - Index uint64 // starts at 0 - Start uint64 // the start tileID - Offset uint64 // the offset in the file, in bytes - Length uint64 // the length, in bytes +type syncBlock struct { + Start uint64 // the start tileID of the block + Offset uint64 // the offset in the source archive + Length uint64 // the length of the block + Hash uint64 // the hash of the block } -type result struct { - Block block - Hash uint64 +type syncMetadata struct { + Version string + BlockSize uint64 + HashType string + HashSize uint8 + ChecksumType string + Checksum string + NumBlocks int } -type syncline struct { - Offset uint64 - Length uint64 - Hash uint64 +type syncTask struct { + NewBlock syncBlock + OldOffset uint64 } -// Makesync generates a syncfile for an archive on disk. (experimental) -func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, checksum string) error { +func serializeSyncBlocks(output io.Writer, blocks []syncBlock) { + tmp := make([]byte, binary.MaxVarintLen64) + var n int + + lastStartID := uint64(0) + for _, block := range blocks { + n = binary.PutUvarint(tmp, uint64(block.Start-lastStartID)) + output.Write(tmp[:n]) + n = binary.PutUvarint(tmp, uint64(block.Length)) + output.Write(tmp[:n]) + binary.LittleEndian.PutUint64(tmp, block.Hash) + output.Write(tmp[0:8]) + + lastStartID = block.Start + } +} + +func deserializesyncBlocks(numBlocks int, reader *bufio.Reader) []syncBlock { + blocks := make([]syncBlock, 0) + + lastStartID := uint64(0) + offset := uint64(0) + buf := make([]byte, 8) + + for i := 0; i < numBlocks; i++ { + start, _ := binary.ReadUvarint(reader) + length, _ := binary.ReadUvarint(reader) + _, _ = io.ReadFull(reader, buf) + blocks = append(blocks, syncBlock{Start: lastStartID + start, Offset: offset, Length: length, Hash: binary.LittleEndian.Uint64(buf)}) + + lastStartID = lastStartID + start + offset = offset + length + } + + return blocks +} + +// measure the number of "missing blocks" +// what to do about extra blocks at the end +func Makesync(logger *log.Logger, cliVersion string, file string, blockSizeKb int, checksum string) error { ctx := context.Background() start := time.Now() @@ -103,7 +145,6 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch panic(err) } defer output.Close() - output.Write([]byte(fmt.Sprintf("version=%s\n", cliVersion))) if checksum == "md5" { localfile, err := os.Open(file) @@ -118,25 +159,22 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } md5checksum := md5hasher.Sum(nil) fmt.Printf("Completed md5 in %v.\n", time.Since(start)) - output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + fmt.Printf("md5=%x\n", md5checksum) } - output.Write([]byte("hash=fnv1a\n")) - output.Write([]byte(fmt.Sprintf("blocksize=%d\n", blockSizeBytes))) - bar := progressbar.Default( int64(header.TileEntriesCount), "writing syncfile", ) - var current block + var current syncBlock - tasks := make(chan block, 1000) + tasks := make(chan syncBlock, 1000) var wg sync.WaitGroup var mu sync.Mutex - synclines := make(map[uint64]syncline) + blocks := make([]syncBlock, 0) errs, _ := errgroup.WithContext(ctx) // workers @@ -155,9 +193,10 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } r.Close() - sum64 := hasher.Sum64() + block.Hash = hasher.Sum64() + mu.Lock() - synclines[block.Start] = syncline{block.Offset, block.Length, sum64} + blocks = append(blocks, block) mu.Unlock() hasher.Reset() @@ -167,13 +206,9 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch }) } - currentIndex := uint64(0) - - blocks := 0 CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { bar.Add(1) if current.Length == 0 { - current.Index = currentIndex current.Start = e.TileID current.Offset = e.Offset current.Length = uint64(e.Length) @@ -182,12 +217,10 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } else if e.Offset > current.Offset+uint64(current.Length) { panic("Invalid clustering of archive detected - check with verify") } else { + // check this logic if current.Length+uint64(e.Length) > blockSizeBytes { - tasks <- block{current.Index, current.Start, current.Offset, current.Length} - blocks++ + tasks <- syncBlock{current.Start, current.Offset, current.Length, 0} - currentIndex++ - current.Index = currentIndex current.Start = e.TileID current.Offset = e.Offset current.Length = uint64(e.Length) @@ -197,57 +230,48 @@ func Makesync(_ *log.Logger, cliVersion string, file string, blockSizeKb int, ch } }) - tasks <- block{current.Index, current.Start, current.Offset, current.Length} - blocks++ + tasks <- syncBlock{current.Start, current.Offset, current.Length, 0} close(tasks) wg.Wait() - var keys []uint64 - for k := range synclines { - keys = append(keys, k) - } - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + sort.Slice(blocks, func(i, j int) bool { return blocks[i].Start < blocks[j].Start }) - for _, k := range keys { - syncline := synclines[k] - output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash))) - } + metadataBytes, err := json.Marshal(syncMetadata{ + Version: cliVersion, + HashSize: 8, + BlockSize: blockSizeBytes, + HashType: "fnv1a", + NumBlocks: len(blocks), + }) + + output.Write(metadataBytes) + output.Write([]byte{'\n'}) + + serializeSyncBlocks(output, blocks) - fmt.Printf("Created syncfile with %d blocks.\n", blocks) + fmt.Printf("Created syncfile with %d blocks.\n", len(blocks)) fmt.Printf("Completed makesync in %v.\n", time.Since(start)) return nil } -// Sync calculates the diff between an archive on disk and a syncfile. (experimental) -func Sync(_ *log.Logger, file string, syncfile string) error { +func Sync(logger *log.Logger, file string, syncfilename string, overfetch float32) error { start := time.Now() - totalRemoteBytes := uint64(0) - - byStartID := make(map[uint64]syncline) - sync, err := os.Open(syncfile) + syncfile, err := os.Open(syncfilename) if err != nil { return fmt.Errorf("error opening syncfile: %v", err) } - defer sync.Close() - scanner := bufio.NewScanner(sync) - for scanner.Scan() { - line := scanner.Text() - parts := strings.Fields(line) - if len(parts) != 4 { - continue - } + defer syncfile.Close() - startID, _ := strconv.ParseUint(parts[0], 10, 64) - offset, _ := strconv.ParseUint(parts[1], 10, 64) - length, _ := strconv.ParseUint(parts[2], 10, 64) - totalRemoteBytes += length - hash, _ := strconv.ParseUint(parts[3], 16, 64) - byStartID[startID] = syncline{offset, length, hash} - } + bufferedReader := bufio.NewReader(syncfile) + + var metadata syncMetadata + jsonBytes, _ := bufferedReader.ReadSlice('\n') - // open the existing archive + json.Unmarshal(jsonBytes, &metadata) + + blocks := deserializesyncBlocks(metadata.NumBlocks, bufferedReader) ctx := context.Background() @@ -281,20 +305,6 @@ func Sync(_ *log.Logger, file string, syncfile string) error { return fmt.Errorf("archive must be clustered for makesync") } - GetHash := func(offset uint64, length uint64) uint64 { - hasher := fnv.New64a() - r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) - if err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(hasher, r); err != nil { - log.Fatal(err) - } - r.Close() - return hasher.Sum64() - } - var CollectEntries func(uint64, uint64, func(EntryV3)) CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) { @@ -319,35 +329,108 @@ func Sync(_ *log.Logger, file string, syncfile string) error { } bar := progressbar.Default( - int64(header.TileEntriesCount), + int64(len(blocks)), "calculating diff", ) - totalBlocks := len(byStartID) - hits := 0 + wanted := make([]syncBlock, 0) + have := make([]syncBlock, 0) + + idx := 0 + + tasks := make(chan syncTask, 1000) + var wg sync.WaitGroup + var mu sync.Mutex + + errs, _ := errgroup.WithContext(ctx) + // workers + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + errs.Go(func() error { + wg.Add(1) + hasher := fnv.New64a() + for task := range tasks { + r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+task.OldOffset), int64(task.NewBlock.Length)) + if err != nil { + log.Fatal(err) + } + + if _, err := io.Copy(hasher, r); err != nil { + log.Fatal(err) + } + r.Close() + + mu.Lock() + if task.NewBlock.Hash == hasher.Sum64() { + have = append(have, task.NewBlock) + } else { + wanted = append(wanted, task.NewBlock) + } + mu.Unlock() + + hasher.Reset() + } + wg.Done() + return nil + }) + } CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { - bar.Add(1) + if idx < len(blocks) { + for e.TileID > blocks[idx].Start { + wanted = append(wanted, blocks[idx]) + bar.Add(1) + idx = idx + 1 + } - potentialMatch, ok := byStartID[e.TileID] - if ok { - hashResult := GetHash(e.Offset, potentialMatch.Length) - if hashResult == potentialMatch.Hash { - hits++ - delete(byStartID, e.TileID) + if e.TileID == blocks[idx].Start { + tasks <- syncTask{NewBlock: blocks[idx], OldOffset: e.Offset} + bar.Add(1) + idx = idx + 1 } } }) + // we may not have consumed until the end + for idx < len(blocks) { + wanted = append(wanted, blocks[idx]) + bar.Add(1) + idx = idx + 1 + } + + close(tasks) + wg.Wait() + + sort.Slice(wanted, func(i, j int) bool { return wanted[i].Start < wanted[j].Start }) + toTransfer := uint64(0) - for _, v := range byStartID { + totalRemoteBytes := uint64(0) + for _, v := range wanted { toTransfer += v.Length + totalRemoteBytes += v.Length + } + + for _, v := range have { + totalRemoteBytes += v.Length } - blocksMatched := float64(hits) / float64(totalBlocks) * 100 + blocksMatched := float64(len(have)) / float64(len(blocks)) * 100 pct := float64(toTransfer) / float64(totalRemoteBytes) * 100 - fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, totalBlocks, blocksMatched, humanize.Bytes(toTransfer), humanize.Bytes(totalRemoteBytes), pct) + fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", len(have), len(blocks), blocksMatched, humanize.Bytes(toTransfer), humanize.Bytes(totalRemoteBytes), pct) + + ranges := make([]srcDstRange, 0) + for _, v := range wanted { + l := len(ranges) + if l > 0 && (ranges[l-1].SrcOffset+ranges[l-1].Length) == v.Offset { + ranges[l-1].Length = ranges[l-1].Length + v.Length + } else { + ranges = append(ranges, srcDstRange{SrcOffset: v.Offset, DstOffset: v.Offset, Length: v.Length}) + } + } + fmt.Printf("need %d chunks.\n", len(ranges)) + + requests, _ := MergeRanges(ranges, overfetch) + fmt.Printf("need %d requests with overfetch=%f.\n", requests.Len(), overfetch) fmt.Printf("Completed sync in %v.\n", time.Since(start)) return nil