Skip to content

Commit

Permalink
fix multithreaded hashing, add experimental sync checker
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Dec 26, 2023
1 parent 51c3cf6 commit 75a7e5f
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 3 deletions.
19 changes: 19 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ var cli struct {
HashFunction string `default:fnv1a help:"The hash function."`
} `cmd:"" help:"Create an **experimental** sync control file (.pmtiles.sync) for a local archive."`

Sync struct {
Existing string `arg:"" type:"existingfile"`
Syncfile string `arg:"" type:"existingfile"`
} `cmd:"" help:"This command is experimental."`

Serve struct {
Path string `arg:"" help:"Local path or bucket prefix"`
Interface string `default:"0.0.0.0"`
Expand All @@ -77,6 +82,15 @@ var cli struct {
PublicHostname string `help:"Public hostname of tile endpoint e.g. https://example.com"`
} `cmd:"" help:"Run an HTTP proxy server for Z/X/Y tiles."`

Download struct {
OldFile string `type:"existingfile" help:"The old archive on disk. Providing this will check the new archive for a .sync file"`
NewFile string `arg:"The remote file."`
Bucket string `required:"" help:"Bucket of file to download."`
DownloadThreads int `default:4 help:"Number of download threads."`
DryRun bool `help:"Calculate new parts to download, but don't download them."`
Overfetch float32 `default:0.05 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"`
} `cmd:"" help:"Upload a local archive to remote storage."`

Upload struct {
Input string `arg:"" type:"existingfile"`
Key string `arg:""`
Expand Down Expand Up @@ -183,6 +197,11 @@ func main() {
if err != nil {
logger.Fatalf("Failed to makesync archive, %v", err)
}
case "sync <existing> <syncfile>":
err := pmtiles.Sync(logger, cli.Sync.Existing, cli.Sync.Syncfile)
if err != nil {
logger.Fatalf("Failed to sync archive, %v", err)
}
case "version":
fmt.Printf("pmtiles %s, commit %s, built at %s\n", version, commit, date)
default:
Expand Down
147 changes: 144 additions & 3 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/dustin/go-humanize"
"github.com/schollz/progressbar/v3"
"golang.org/x/sync/errgroup"
"hash/fnv"
"io"
"log"
"os"
"runtime"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -118,10 +121,8 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {

var current Block

hasher := fnv.New64a()

GetHash := func(offset uint64, length uint64) uint64 {
hasher.Reset()
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -213,3 +214,143 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)

by_start_id := make(map[uint64]Syncline)

sync, err := os.Open(syncfile)
if err != nil {
return fmt.Errorf("Error opening syncfile: %v\n", err)
}
defer sync.Close()
scanner := bufio.NewScanner(sync)
for scanner.Scan() {
line := scanner.Text()
parts := strings.Fields(line)
if len(parts) != 4 {
continue
}

start_id, _ := strconv.ParseUint(parts[0], 10, 64)
offset, _ := strconv.ParseUint(parts[1], 10, 64)
length, _ := strconv.ParseUint(parts[2], 10, 64)
total_remote_bytes += length
hash, _ := strconv.ParseUint(parts[3], 16, 64)
by_start_id[start_id] = Syncline{offset, length, hash}
}

// open the existing archive

ctx := context.Background()

bucketURL, key, err := NormalizeBucketKey("", "", file)

if err != nil {
return err
}

bucket, err := OpenBucket(ctx, bucketURL, "")

if err != nil {
return fmt.Errorf("Failed to open bucket for %s, %w", bucketURL, err)
}
defer bucket.Close()

r, err := bucket.NewRangeReader(ctx, key, 0, 16384)

if err != nil {
return fmt.Errorf("Failed to create range reader for %s, %w", key, err)
}
b, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("Failed to read %s, %w", key, err)
}
r.Close()

header, err := deserialize_header(b[0:HEADERV3_LEN_BYTES])

if !header.Clustered {
return fmt.Errorf("Error: archive must be clustered for makesync.")
}

hasher := fnv.New64a()

GetHash := func(offset uint64, length uint64) uint64 {
hasher.Reset()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}

var CollectEntries func(uint64, uint64, func(EntryV3))

CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) {
dirbytes, err := bucket.NewRangeReader(ctx, key, int64(dir_offset), int64(dir_length))
if err != nil {
panic(fmt.Errorf("I/O error"))
}
defer dirbytes.Close()
b, err = io.ReadAll(dirbytes)
if err != nil {
panic(fmt.Errorf("I/O Error"))
}

directory := deserialize_entries(bytes.NewBuffer(b))
for _, entry := range directory {
if entry.RunLength > 0 {
f(entry)
} else {
CollectEntries(header.LeafDirectoryOffset+entry.Offset, uint64(entry.Length), f)
}
}
}

bar := progressbar.Default(
int64(header.TileEntriesCount),
"calculating diff",
)

total_chunks := len(by_start_id)
hits := 0

CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
bar.Add(1)

potential_match, ok := by_start_id[e.TileId]
if ok {
hash_result := GetHash(e.Offset, potential_match.Length)
if hash_result == potential_match.Hash {
hits += 1
delete(by_start_id, e.TileId)
}
}
})

to_transfer := uint64(0)
for _, v := range by_start_id {
to_transfer += v.Length
}

pct := 100 - (to_transfer / total_remote_bytes * 100)

fmt.Printf("%d/%d chunks matched, need to transfer %s/%s bytes (%d%% match).\n", hits, total_chunks, humanize.Bytes(to_transfer), humanize.Bytes(total_remote_bytes), pct)

fmt.Printf("Completed sync in %v.\n", time.Since(start))
return nil
}

0 comments on commit 75a7e5f

Please sign in to comment.