Skip to content

Commit

Permalink
add --remote-worker-count (#258)
Browse files Browse the repository at this point in the history
- **ADDED** New option to control the number of worker thread in remote stores to avoid overflowing the network connection.
  - `--remote-worker-count` Set number of workers created for the remote store, defaults to match number of logical CPUs with upper limit of 8 for networked remote stores
- **FIXED** Don't update store index if we failed to upload blocks
  • Loading branch information
DanEngelbrecht authored Nov 23, 2024
1 parent af01f04 commit 8fa740d
Show file tree
Hide file tree
Showing 17 changed files with 131 additions and 41 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
##
- **FIXED** fix(s3): use HeadObject for checking if blob exists [bergemalm](https://github.com/bergemalm)
update longtaillib to 0.4.3 (#260)
- **ADDED** New option to control the number of worker thread in remote stores to avoid overflowing the network connection.
- `--remote-worker-count` Set number of workers created for the remote store, defaults to match number of logical CPUs with upper limit of 8 for networked remote stores
- **FIXED** Don't update store index if we failed to upload blocks
- **UPDATED** Update to golang 1.23
- **UPDATED** Updated all golang dependencies
- **UPDATED** Update longtaillib to v0.4.3

## 0.4.3
## v0.4.3
- **CHANGED** For multi-source downsync/get the separator for paths is changed to | to avoid problems with path that contains spaces
- **CHANGED** Made multi-path options separate from single-path
-- `--source-path` vs `--source-paths`
Expand Down
2 changes: 2 additions & 0 deletions cmd/longtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func runCommand() error {
context.NumWorkerCount = commands.Cli.WorkerCount
}

context.NumRemoteWorkerCount = commands.Cli.RemoteWorkerCount

if commands.Cli.MemTrace || commands.Cli.MemTraceDetailed || commands.Cli.MemTraceCSV != "" {
longtaillib.EnableMemtrace()
defer func() {
Expand Down
7 changes: 5 additions & 2 deletions commands/cmd_clonestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ func cloneOneVersion(

func cloneStore(
numWorkerCount int,
remoteStoreWorkerCount int,
sourceStoreURI string,
sourceEndpointResolverURI string,
targetStoreURI string,
Expand All @@ -539,6 +540,7 @@ func cloneStore(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"sourceStoreURI": sourceStoreURI,
"sourceEndpointResolverURI": sourceEndpointResolverURI,
"targetStoreURI": targetStoreURI,
Expand Down Expand Up @@ -578,7 +580,7 @@ func cloneStore(
localFS := longtaillib.CreateFSStorageAPI()
defer localFS.Dispose()

sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -612,7 +614,7 @@ func cloneStore(
defer sourceLRUBlockStore.Dispose()
defer sourceStore.Dispose()

targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, remoteStoreWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -736,6 +738,7 @@ type CloneStoreCmd struct {
func (r *CloneStoreCmd) Run(ctx *Context) error {
storeStats, timeStats, err := cloneStore(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.SourceStorageURI,
r.SourceS3EndpointResolverURL,
r.TargetStorageURI,
Expand Down
23 changes: 13 additions & 10 deletions commands/cmd_cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

func cpVersionIndex(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string,
Expand All @@ -23,15 +24,16 @@ func cpVersionIndex(
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cpVersionIndex"
log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
})
log.Info(fname)

Expand All @@ -48,7 +50,7 @@ func cpVersionIndex(
defer hashRegistry.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -220,6 +222,7 @@ type CpCmd struct {
func (r *CpCmd) Run(ctx *Context) error {
storeStats, timeStats, err := cpVersionIndex(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath,
Expand Down
5 changes: 4 additions & 1 deletion commands/cmd_createversionstoreindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

func createVersionStoreIndex(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
sourceFilePath string,
Expand All @@ -20,6 +21,7 @@ func createVersionStoreIndex(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"sourceFilePath": sourceFilePath,
Expand All @@ -35,7 +37,7 @@ func createVersionStoreIndex(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -96,6 +98,7 @@ type CreateVersionStoreIndexCmd struct {
func (r *CreateVersionStoreIndexCmd) Run(ctx *Context) error {
storeStats, timeStats, err := createVersionStoreIndex(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.SourcePath,
Expand Down
5 changes: 4 additions & 1 deletion commands/cmd_downsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func readVersionIndex(sourceFilePath string, opts ...longtailstorelib.BlobStoreO

func downsync(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
sourceFilePath string,
Expand All @@ -50,6 +51,7 @@ func downsync(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"sourceFilePath": sourceFilePath,
Expand Down Expand Up @@ -191,7 +193,7 @@ func downsync(
}

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -486,6 +488,7 @@ type DownsyncCmd struct {
func (r *DownsyncCmd) Run(ctx *Context) error {
storeStats, timeStats, err := downsync(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.SourcePath,
Expand Down
4 changes: 4 additions & 0 deletions commands/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func get(
numWorkerCount int,
numRemoteWorkerCount int,
getConfigPath string,
getConfigPaths []string,
s3EndpointResolverURI string,
Expand All @@ -31,6 +32,7 @@ func get(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"numRemoteWorkerCount": numRemoteWorkerCount,
"getConfigPath": getConfigPath,
"getConfigPaths": getConfigPaths,
"s3EndpointResolverURI": s3EndpointResolverURI,
Expand Down Expand Up @@ -113,6 +115,7 @@ func get(

downSyncStoreStats, downSyncTimeStats, err := downsync(
numWorkerCount,
numRemoteWorkerCount,
blobStoreURI,
s3EndpointResolverURI,
"",
Expand Down Expand Up @@ -158,6 +161,7 @@ type GetCmd struct {
func (r *GetCmd) Run(ctx *Context) error {
storeStats, timeStats, err := get(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.GetConfigURI,
r.GetConfigURIs,
r.S3EndpointResolverURL,
Expand Down
15 changes: 9 additions & 6 deletions commands/cmd_initremotestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (

func initRemoteStore(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
hashAlgorithm string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "initRemoteStore"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"hashAlgorithm": hashAlgorithm,
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"hashAlgorithm": hashAlgorithm,
})
log.Info(fname)

Expand All @@ -33,7 +35,7 @@ func initRemoteStore(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -78,6 +80,7 @@ type InitRemoteStoreCmd struct {
func (r *InitRemoteStoreCmd) Run(ctx *Context) error {
storeStats, timeStats, err := initRemoteStore(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.Hashing)
Expand Down
10 changes: 8 additions & 2 deletions commands/cmd_printVersionUsage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"runtime"
"time"

"github.com/DanEngelbrecht/golongtail/longtaillib"
Expand All @@ -14,6 +15,7 @@ import (

func printVersionUsage(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string,
Expand Down Expand Up @@ -41,7 +43,7 @@ func printVersionUsage(

var indexStore longtaillib.Longtail_BlockStoreAPI

remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -103,7 +105,10 @@ func printVersionUsage(
defer progress.Dispose()

blockHashes := existingStoreIndex.GetBlockHashes()
maxBatchSize := int(numWorkerCount)
maxBatchSize := int(remoteStoreWorkerCount)
if maxBatchSize == 0 {
maxBatchSize = runtime.NumCPU()
}
for i := 0; i < len(blockHashes); {
batchSize := len(blockHashes) - i
if batchSize > maxBatchSize {
Expand Down Expand Up @@ -216,6 +221,7 @@ type PrintVersionUsageCmd struct {
func (r *PrintVersionUsageCmd) Run(ctx *Context) error {
storeStats, timeStats, err := printVersionUsage(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath,
Expand Down
Loading

0 comments on commit 8fa740d

Please sign in to comment.