Skip to content

Commit

Permalink
makesync hashing is multithreaded
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Dec 25, 2023
1 parent 4be7245 commit 067a2f0
Showing 1 changed file with 61 additions and 4 deletions.
65 changes: 61 additions & 4 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@ import (
"context"
"fmt"
"github.com/schollz/progressbar/v3"
"golang.org/x/sync/errgroup"
"hash/fnv"
"io"
"log"
"os"
"runtime"
"time"
)

type Block struct {
Index uint64 // starts at 0
Start uint64 // the start tileID
Offset uint64 // the offset in the file, in bytes
Length uint64 // the length, in bytes
}

type Result struct {
Block Block
Hash uint64
}

func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
start := time.Now()
ctx := context.Background()
Expand Down Expand Up @@ -108,9 +116,56 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
return hasher.Sum64()
}

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)

errs, _ := errgroup.WithContext(ctx)
// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errs.Go(func() error {
for block := range tasks {
intermediate <- Result{block, GetHash(block.Offset, block.Length)}
}
return nil
})
}

done := make(chan struct{})

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)

for i := range intermediate {
buffer[i.Block.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {

output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))

delete(buffer, nextIndex)
nextIndex++

if next.Block.Offset+next.Block.Length == header.TileDataLength {
close(intermediate)
}

} else {
break
}
}
}

done <- struct{}{}
}()

current_index := uint64(0)

CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
bar.Add(1)
if current.Length == 0 {
current.Index = current_index
current.Start = e.TileId
current.Offset = e.Offset
current.Length = uint64(e.Length)
Expand All @@ -120,9 +175,10 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
panic("Invalid clustering of archive detected - check with verify")
} else {
if current.Length+uint64(e.Length) > max_block_bytes {
hsh := GetHash(current.Offset, current.Length)
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", current.Start, current.Offset, current.Length, hsh)))
tasks <- Block{current.Index, current.Start, current.Offset, current.Length}

current_index += 1
current.Index = current_index
current.Start = e.TileId
current.Offset = e.Offset
current.Length = uint64(e.Length)
Expand All @@ -132,9 +188,10 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
}
})

hsh := GetHash(current.Offset, current.Length)
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x", current.Start, current.Offset, current.Length, hsh)))
tasks <- Block{current.Index, current.Start, current.Offset, current.Length}
close(tasks)

<-done
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

0 comments on commit 067a2f0

Please sign in to comment.