Skip to content

Commit

Permalink
multithread sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Jan 19, 2024
1 parent 6767889 commit 0a3f4bd
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,16 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
return nil
}

func Sync(logger *log.Logger, file string, syncfile string) error {
func Sync(logger *log.Logger, file string, syncfilename string) error {
start := time.Now()

sync, err := os.Open(syncfile)
syncfile, err := os.Open(syncfilename)
if err != nil {
return fmt.Errorf("Error opening syncfile: %v\n", err)
}
defer sync.Close()
defer syncfile.Close()

buffered_reader := bufio.NewReader(sync)
buffered_reader := bufio.NewReader(syncfile)

var metadata SyncMetadata
json_bytes, _ := buffered_reader.ReadSlice('\n')
Expand Down Expand Up @@ -304,20 +304,6 @@ func Sync(logger *log.Logger, file string, syncfile string) error {
return fmt.Errorf("Error: archive must be clustered for makesync.")
}

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}

var CollectEntries func(uint64, uint64, func(EntryV3))

CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) {
Expand Down Expand Up @@ -351,6 +337,42 @@ func Sync(logger *log.Logger, file string, syncfile string) error {

idx := 0

tasks := make(chan SyncBlock, 1000)
var wg sync.WaitGroup
var mu sync.Mutex

errs, _ := errgroup.WithContext(ctx)
// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errs.Go(func() error {
wg.Add(1)
hasher := fnv.New64a()
for block := range tasks {
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()

mu.Lock()
if block.Hash == hasher.Sum64() {
have = append(have, block)
} else {
wanted = append(wanted, block)
}
mu.Unlock()

hasher.Reset()
}
wg.Done()
return nil
})
}

CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
bar.Add(1)

Expand All @@ -361,17 +383,15 @@ func Sync(logger *log.Logger, file string, syncfile string) error {
}

if e.TileId == blocks[idx].Start {
hash_result := GetHash(e.Offset, blocks[idx].Length)
if hash_result == blocks[idx].Hash {
have = append(have, blocks[idx])
} else {
wanted = append(wanted, blocks[idx])
}
tasks <- blocks[idx]
idx = idx + 1
}
}
})

close(tasks)
wg.Wait()

to_transfer := uint64(0)
total_remote_bytes := uint64(0)
for _, v := range wanted {
Expand Down

0 comments on commit 0a3f4bd

Please sign in to comment.