diff --git a/CHANGELOG.md b/CHANGELOG.md index e71d5ede..3ebebdfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,13 @@ ## - **FIXED** fix(s3): use HeadObject for checking if blob exists [bergemalm](https://github.com/bergemalm) - update longtaillib to 0.4.3 (#260) +- **ADDED** New option to control the number of worker thread in remote stores to avoid overflowing the network connection. + - `--remote-worker-count` Set number of workers created for the remote store, defaults to match number of logical CPUs with upper limit of 8 for networked remote stores +- **FIXED** Don't update store index if we failed to upload blocks - **UPDATED** Update to golang 1.23 - **UPDATED** Updated all golang dependencies - **UPDATED** Update longtaillib to v0.4.3 -## 0.4.3 +## v0.4.3 - **CHANGED** For multi-source downsync/get the separator for paths is changed to | to avoid problems with path that contains spaces - **CHANGED** Made multi-path options separate from single-path -- `--source-path` vs `--source-paths` diff --git a/cmd/longtail/main.go b/cmd/longtail/main.go index 21e97e69..c2d58ce2 100644 --- a/cmd/longtail/main.go +++ b/cmd/longtail/main.go @@ -96,6 +96,8 @@ func runCommand() error { context.NumWorkerCount = commands.Cli.WorkerCount } + context.NumRemoteWorkerCount = commands.Cli.RemoteWorkerCount + if commands.Cli.MemTrace || commands.Cli.MemTraceDetailed || commands.Cli.MemTraceCSV != "" { longtaillib.EnableMemtrace() defer func() { diff --git a/commands/cmd_clonestore.go b/commands/cmd_clonestore.go index d4549191..fde92123 100644 --- a/commands/cmd_clonestore.go +++ b/commands/cmd_clonestore.go @@ -516,6 +516,7 @@ func cloneOneVersion( func cloneStore( numWorkerCount int, + remoteStoreWorkerCount int, sourceStoreURI string, sourceEndpointResolverURI string, targetStoreURI string, @@ -539,6 +540,7 @@ func cloneStore( log := logrus.WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "sourceStoreURI": sourceStoreURI, "sourceEndpointResolverURI": sourceEndpointResolverURI, "targetStoreURI": targetStoreURI, @@ -578,7 +580,7 @@ func cloneStore( localFS := longtaillib.CreateFSStorageAPI() defer localFS.Dispose() - sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI)) + sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -612,7 +614,7 @@ func cloneStore( defer sourceLRUBlockStore.Dispose() defer sourceStore.Dispose() - targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI)) + targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, remoteStoreWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -736,6 +738,7 @@ type CloneStoreCmd struct { func (r *CloneStoreCmd) Run(ctx *Context) error { storeStats, timeStats, err := cloneStore( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.SourceStorageURI, r.SourceS3EndpointResolverURL, r.TargetStorageURI, diff --git a/commands/cmd_cp.go b/commands/cmd_cp.go index fe0f0bcd..a246721a 100644 --- a/commands/cmd_cp.go +++ b/commands/cmd_cp.go @@ -14,6 +14,7 @@ import ( func cpVersionIndex( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, versionIndexPath string, @@ -23,15 +24,16 @@ func cpVersionIndex( enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) { const fname = "cpVersionIndex" log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{ - "fname": fname, - "numWorkerCount": numWorkerCount, - "blobStoreURI": blobStoreURI, - "s3EndpointResolverURI": s3EndpointResolverURI, - "versionIndexPath": versionIndexPath, - "localCachePath": localCachePath, - "sourcePath": sourcePath, - "targetPath": targetPath, - "enableFileMapping": enableFileMapping, + "fname": fname, + "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, + "blobStoreURI": blobStoreURI, + "s3EndpointResolverURI": s3EndpointResolverURI, + "versionIndexPath": versionIndexPath, + "localCachePath": localCachePath, + "sourcePath": sourcePath, + "targetPath": targetPath, + "enableFileMapping": enableFileMapping, }) log.Info(fname) @@ -48,7 +50,7 @@ func cpVersionIndex( defer hashRegistry.Dispose() // MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings - remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -220,6 +222,7 @@ type CpCmd struct { func (r *CpCmd) Run(ctx *Context) error { storeStats, timeStats, err := cpVersionIndex( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.VersionIndexPath, diff --git a/commands/cmd_createversionstoreindex.go b/commands/cmd_createversionstoreindex.go index 4c0b0d15..aaabda08 100644 --- a/commands/cmd_createversionstoreindex.go +++ b/commands/cmd_createversionstoreindex.go @@ -12,6 +12,7 @@ import ( func createVersionStoreIndex( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, sourceFilePath string, @@ -20,6 +21,7 @@ func createVersionStoreIndex( log := logrus.WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "blobStoreURI": blobStoreURI, "s3EndpointResolverURI": s3EndpointResolverURI, "sourceFilePath": sourceFilePath, @@ -35,7 +37,7 @@ func createVersionStoreIndex( jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0) defer jobs.Dispose() - indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -96,6 +98,7 @@ type CreateVersionStoreIndexCmd struct { func (r *CreateVersionStoreIndexCmd) Run(ctx *Context) error { storeStats, timeStats, err := createVersionStoreIndex( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.SourcePath, diff --git a/commands/cmd_downsync.go b/commands/cmd_downsync.go index 4500f4b7..9fdad1b6 100644 --- a/commands/cmd_downsync.go +++ b/commands/cmd_downsync.go @@ -29,6 +29,7 @@ func readVersionIndex(sourceFilePath string, opts ...longtailstorelib.BlobStoreO func downsync( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, sourceFilePath string, @@ -50,6 +51,7 @@ func downsync( log := logrus.WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "blobStoreURI": blobStoreURI, "s3EndpointResolverURI": s3EndpointResolverURI, "sourceFilePath": sourceFilePath, @@ -191,7 +193,7 @@ func downsync( } // MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings - remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -486,6 +488,7 @@ type DownsyncCmd struct { func (r *DownsyncCmd) Run(ctx *Context) error { storeStats, timeStats, err := downsync( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.SourcePath, diff --git a/commands/cmd_get.go b/commands/cmd_get.go index 66ef806c..f4c1349a 100644 --- a/commands/cmd_get.go +++ b/commands/cmd_get.go @@ -13,6 +13,7 @@ import ( func get( numWorkerCount int, + numRemoteWorkerCount int, getConfigPath string, getConfigPaths []string, s3EndpointResolverURI string, @@ -31,6 +32,7 @@ func get( log := logrus.WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "numRemoteWorkerCount": numRemoteWorkerCount, "getConfigPath": getConfigPath, "getConfigPaths": getConfigPaths, "s3EndpointResolverURI": s3EndpointResolverURI, @@ -113,6 +115,7 @@ func get( downSyncStoreStats, downSyncTimeStats, err := downsync( numWorkerCount, + numRemoteWorkerCount, blobStoreURI, s3EndpointResolverURI, "", @@ -158,6 +161,7 @@ type GetCmd struct { func (r *GetCmd) Run(ctx *Context) error { storeStats, timeStats, err := get( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.GetConfigURI, r.GetConfigURIs, r.S3EndpointResolverURL, diff --git a/commands/cmd_initremotestore.go b/commands/cmd_initremotestore.go index c0bc20c6..161bbc2c 100644 --- a/commands/cmd_initremotestore.go +++ b/commands/cmd_initremotestore.go @@ -12,16 +12,18 @@ import ( func initRemoteStore( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, hashAlgorithm string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) { const fname = "initRemoteStore" log := logrus.WithFields(logrus.Fields{ - "fname": fname, - "numWorkerCount": numWorkerCount, - "blobStoreURI": blobStoreURI, - "s3EndpointResolverURI": s3EndpointResolverURI, - "hashAlgorithm": hashAlgorithm, + "fname": fname, + "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, + "blobStoreURI": blobStoreURI, + "s3EndpointResolverURI": s3EndpointResolverURI, + "hashAlgorithm": hashAlgorithm, }) log.Info(fname) @@ -33,7 +35,7 @@ func initRemoteStore( jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0) defer jobs.Dispose() - remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -78,6 +80,7 @@ type InitRemoteStoreCmd struct { func (r *InitRemoteStoreCmd) Run(ctx *Context) error { storeStats, timeStats, err := initRemoteStore( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.Hashing) diff --git a/commands/cmd_printVersionUsage.go b/commands/cmd_printVersionUsage.go index eaa16554..b155cdef 100644 --- a/commands/cmd_printVersionUsage.go +++ b/commands/cmd_printVersionUsage.go @@ -2,6 +2,7 @@ package commands import ( "fmt" + "runtime" "time" "github.com/DanEngelbrecht/golongtail/longtaillib" @@ -14,6 +15,7 @@ import ( func printVersionUsage( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, versionIndexPath string, @@ -41,7 +43,7 @@ func printVersionUsage( var indexStore longtaillib.Longtail_BlockStoreAPI - remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -103,7 +105,10 @@ func printVersionUsage( defer progress.Dispose() blockHashes := existingStoreIndex.GetBlockHashes() - maxBatchSize := int(numWorkerCount) + maxBatchSize := int(remoteStoreWorkerCount) + if maxBatchSize == 0 { + maxBatchSize = runtime.NumCPU() + } for i := 0; i < len(blockHashes); { batchSize := len(blockHashes) - i if batchSize > maxBatchSize { @@ -216,6 +221,7 @@ type PrintVersionUsageCmd struct { func (r *PrintVersionUsageCmd) Run(ctx *Context) error { storeStats, timeStats, err := printVersionUsage( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.VersionIndexPath, diff --git a/commands/cmd_prunestore.go b/commands/cmd_prunestore.go index c48cfd4a..e0fb2c5e 100644 --- a/commands/cmd_prunestore.go +++ b/commands/cmd_prunestore.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "os" + "runtime" "strings" "time" @@ -133,7 +134,7 @@ func pruneOne( } func gatherBlocksToKeep( - numWorkerCount int, + remoteStoreWorkerCount int, storageURI string, s3EndpointResolverURI string, sourceFilePaths []string, @@ -146,7 +147,7 @@ func gatherBlocksToKeep( const fname = "gatherBlocksToKeep" log := logrus.WithFields(logrus.Fields{ "fname": fname, - "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "storageURI": storageURI, "s3EndpointResolverURI": s3EndpointResolverURI, "writeVersionLocalStoreIndex": writeVersionLocalStoreIndex, @@ -155,7 +156,7 @@ func gatherBlocksToKeep( "dryRun": dryRun, }) log.Debug(fname) - remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return nil, errors.Wrap(err, fname) } @@ -163,7 +164,12 @@ func gatherBlocksToKeep( usedBlocks := make(map[uint64]uint32) - resultChannel := make(chan pruneOneResult, numWorkerCount) + workerCount := remoteStoreWorkerCount + if workerCount == 0 { + workerCount = runtime.NumCPU() + } + + resultChannel := make(chan pruneOneResult, workerCount) activeWorkerCount := 0 progress := longtailutils.CreateProgress("Processing versions ", 0) @@ -178,7 +184,7 @@ func gatherBlocksToKeep( versionLocalStoreIndexFilePath = versionLocalStoreIndexFilePaths[i] } - if activeWorkerCount == numWorkerCount { + if activeWorkerCount == workerCount { result := <-resultChannel completed++ if result.err == nil { @@ -261,6 +267,7 @@ func gatherBlocksToKeep( func pruneStore( numWorkerCount int, + remoteStoreWorkerCount int, storageURI string, s3EndpointResolverURI string, sourcePaths string, @@ -273,6 +280,7 @@ func pruneStore( log := logrus.WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "storageURI": storageURI, "s3EndpointResolverURI": s3EndpointResolverURI, "sourcePaths": sourcePaths, @@ -337,7 +345,7 @@ func pruneStore( gatherBlocksToKeepStartTime := time.Now() blocksToKeep, err := gatherBlocksToKeep( - numWorkerCount, + remoteStoreWorkerCount, storageURI, s3EndpointResolverURI, sourceFilePaths, @@ -359,7 +367,7 @@ func pruneStore( fmt.Printf("Prune would keep %d blocks", len(blocksToKeep)) return storeStats, timeStats, nil } - remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadWrite, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadWrite, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -405,6 +413,7 @@ type PruneStoreCmd struct { func (r *PruneStoreCmd) Run(ctx *Context) error { storeStats, timeStats, err := pruneStore( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.SourcePaths, diff --git a/commands/cmd_put.go b/commands/cmd_put.go index 3f591b10..17eac096 100644 --- a/commands/cmd_put.go +++ b/commands/cmd_put.go @@ -16,6 +16,7 @@ import ( func put( numWorkerCount int, + numRemoteWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, sourceFolderPath string, @@ -37,6 +38,7 @@ func put( log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "numRemoteWorkerCount": numRemoteWorkerCount, "blobStoreURI": blobStoreURI, "s3EndpointResolverURI": s3EndpointResolverURI, "sourceFolderPath": sourceFolderPath, @@ -90,6 +92,7 @@ func put( downSyncStoreStats, downSyncTimeStats, err := upsync( numWorkerCount, + numRemoteWorkerCount, blobStoreURI, s3EndpointResolverURI, sourceFolderPath, @@ -176,6 +179,7 @@ type PutCmd struct { func (r *PutCmd) Run(ctx *Context) error { storeStats, timeStats, err := put( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.OptionalStorageURI, r.S3EndpointResolverURL, r.SourcePath, diff --git a/commands/cmd_upsync.go b/commands/cmd_upsync.go index fd4fbf26..72aeb15b 100644 --- a/commands/cmd_upsync.go +++ b/commands/cmd_upsync.go @@ -14,6 +14,7 @@ import ( func upsync( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, sourceFolderPath string, @@ -33,6 +34,7 @@ func upsync( log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{ "fname": fname, "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, "blobStoreURI": blobStoreURI, "s3EndpointResolverURI": s3EndpointResolverURI, "sourceFolderPath": sourceFolderPath, @@ -98,7 +100,7 @@ func upsync( enableFileMapping, &sourceFolderScanner) - remoteStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + remoteStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrapf(err, fname) } @@ -244,6 +246,7 @@ type UpsyncCmd struct { func (r *UpsyncCmd) Run(ctx *Context) error { storeStats, timeStats, err := upsync( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.SourcePath, diff --git a/commands/cmd_validateversion.go b/commands/cmd_validateversion.go index 87d2b415..f59dd8ee 100644 --- a/commands/cmd_validateversion.go +++ b/commands/cmd_validateversion.go @@ -12,15 +12,17 @@ import ( func validateVersion( numWorkerCount int, + remoteStoreWorkerCount int, blobStoreURI string, s3EndpointResolverURI string, versionIndexPath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) { const fname = "validateVersion" log := logrus.WithFields(logrus.Fields{ - "numWorkerCount": numWorkerCount, - "blobStoreURI": blobStoreURI, - "s3EndpointResolverURI": s3EndpointResolverURI, - "versionIndexPath": versionIndexPath, + "numWorkerCount": numWorkerCount, + "remoteStoreWorkerCount": remoteStoreWorkerCount, + "blobStoreURI": blobStoreURI, + "s3EndpointResolverURI": s3EndpointResolverURI, + "versionIndexPath": versionIndexPath, }) log.Info(fname) @@ -33,7 +35,7 @@ func validateVersion( defer jobs.Dispose() // MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings - indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) + indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI)) if err != nil { return storeStats, timeStats, errors.Wrap(err, fname) } @@ -85,6 +87,7 @@ type ValidateVersionCmd struct { func (r *ValidateVersionCmd) Run(ctx *Context) error { storeStats, timeStats, err := validateVersion( ctx.NumWorkerCount, + ctx.NumRemoteWorkerCount, r.StorageURI, r.S3EndpointResolverURL, r.VersionIndexPath) diff --git a/commands/commands.go b/commands/commands.go index 8926803c..946d0328 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -8,7 +8,8 @@ var Cli struct { MemTrace bool `name:"mem-trace" help:"Output summary memory statistics from longtail"` MemTraceDetailed bool `name:"mem-trace-detailed" help:"Output detailed memory statistics from longtail"` MemTraceCSV string `name:"mem-trace-csv" help:"Output path for detailed memory statistics from longtail in csv format"` - WorkerCount int `name:"worker-count" help:"Limit number of workers created, defaults to match number of logical CPUs (zero for default count)" default:"0"` + WorkerCount int `name:"worker-count" help:"Set number of workers created, defaults to match number of logical CPUs (zero for default count)" default:"0"` + RemoteWorkerCount int `name:"remote-worker-count" help:"Set number of workers created for the remote store, defaults to match number of logical CPUs with upper limit of 8 for networked remote stores (zero for default count)" default:"0"` LogToConsole bool `name:"log-to-console" help:"Enable logging to console" default:"true" negatable:""` LogFilePath string `name:"log-file-path" help:"Path to log file for json formatted logging"` LogColoring bool `name:"log-coloring" help:"Use colored logging for stdout"` diff --git a/commands/options.go b/commands/options.go index 285c1559..5dc34ba9 100644 --- a/commands/options.go +++ b/commands/options.go @@ -3,9 +3,10 @@ package commands import "github.com/DanEngelbrecht/golongtail/longtailutils" type Context struct { - NumWorkerCount int - StoreStats []longtailutils.StoreStat - TimeStats []longtailutils.TimeStat + NumWorkerCount int + NumRemoteWorkerCount int + StoreStats []longtailutils.StoreStat + TimeStats []longtailutils.TimeStat } type CompressionOption struct { diff --git a/go.work.sum b/go.work.sum index 1ef21bd4..fc526d75 100644 --- a/go.work.sum +++ b/go.work.sum @@ -185,6 +185,7 @@ cloud.google.com/go/memcache v1.10.6/go.mod h1:4elGf6MwGszZCM0Yopp15qmBoo+Y8M7wg cloud.google.com/go/memcache v1.11.2/go.mod h1:jIzHn79b0m5wbkax2SdlW5vNSbpaEk0yWHbeLpMIYZE= cloud.google.com/go/metastore v1.13.5/go.mod h1:dmsJzIdQcJrpmRGhEaii3EhVq1JuhI0bxSBoy7A8hcQ= cloud.google.com/go/metastore v1.14.2/go.mod h1:dk4zOBhZIy3TFOQlI8sbOa+ef0FjAcCHEnd8dO2J+LE= +cloud.google.com/go/monitoring v1.18.1 h1:0yvFXK+xQd95VKo6thndjwnJMno7c7Xw1CwMByg0B+8= cloud.google.com/go/monitoring v1.18.1/go.mod h1:52hTzJ5XOUMRm7jYi7928aEdVxBEmGwA0EjNJXIBvt8= cloud.google.com/go/monitoring v1.21.1/go.mod h1:Rj++LKrlht9uBi8+Eb530dIrzG/cU/lB8mt+lbeFK1c= cloud.google.com/go/networkconnectivity v1.14.5/go.mod h1:Wy28mxRApI1uVwA9iHaYYxGNe74cVnSP311bCUJEpBc= @@ -313,7 +314,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -323,6 +327,7 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -330,8 +335,10 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= diff --git a/remotestore/remotestore.go b/remotestore/remotestore.go index cab36d7b..53a5ecdd 100644 --- a/remotestore/remotestore.go +++ b/remotestore/remotestore.go @@ -7,6 +7,7 @@ import ( "net/url" "os" "path/filepath" + "runtime" "strings" "sync" "sync/atomic" @@ -842,10 +843,16 @@ func contentIndexWorker( select { case <-flushMessages: + if err != nil { + flushReplyMessages <- err + err = nil + continue + } if len(addedBlockIndexes) > 0 && accessType != ReadOnly { newStoreIndex, err := addBlocksToRemoteStoreIndex(ctx, s, client, addedBlockIndexes) if err != nil { flushReplyMessages <- err + err = nil continue } addedBlockIndexes = nil @@ -914,6 +921,11 @@ func contentIndexWorker( } } + if err != nil && !longtaillib.IsNotExist(err) { + storeIndex.Dispose() + return err + } + if accessType == ReadOnly { storeIndex.Dispose() return nil @@ -1960,6 +1972,11 @@ func CreateBlockStoreForURI( if err != nil { return longtaillib.Longtail_BlockStoreAPI{}, errors.Wrap(err, fname) } + + if numWorkerCount == 0 { + numWorkerCount = runtime.NumCPU() + } + fsBlockStore, err := NewRemoteBlockStore( jobAPI, fsBlobStore, @@ -1981,6 +1998,14 @@ func CreateBlockStoreForURI( if err != nil { return longtaillib.Longtail_BlockStoreAPI{}, errors.Wrap(err, fname) } + + if numWorkerCount == 0 { + numWorkerCount = runtime.NumCPU() + if numWorkerCount > 8 { + numWorkerCount = 8 + } + } + gcsBlockStore, err := NewRemoteBlockStore( jobAPI, gcsBlobStore, @@ -1997,6 +2022,14 @@ func CreateBlockStoreForURI( if err != nil { return longtaillib.Longtail_BlockStoreAPI{}, errors.Wrap(err, fname) } + + if numWorkerCount == 0 { + numWorkerCount = runtime.NumCPU() + if numWorkerCount > 8 { + numWorkerCount = 8 + } + } + s3BlockStore, err := NewRemoteBlockStore( jobAPI, s3BlobStore,