Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add --remote-worker-count #258

Merged
merged 4 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
##
- **FIXED** fix(s3): use HeadObject for checking if blob exists [bergemalm](https://github.com/bergemalm)
update longtaillib to 0.4.3 (#260)
- **ADDED** New option to control the number of worker thread in remote stores to avoid overflowing the network connection.
- `--remote-worker-count` Set number of workers created for the remote store, defaults to match number of logical CPUs with upper limit of 8 for networked remote stores
- **FIXED** Don't update store index if we failed to upload blocks
- **UPDATED** Update to golang 1.23
- **UPDATED** Updated all golang dependencies
- **UPDATED** Update longtaillib to v0.4.3

## 0.4.3
## v0.4.3
- **CHANGED** For multi-source downsync/get the separator for paths is changed to | to avoid problems with path that contains spaces
- **CHANGED** Made multi-path options separate from single-path
-- `--source-path` vs `--source-paths`
Expand Down
2 changes: 2 additions & 0 deletions cmd/longtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func runCommand() error {
context.NumWorkerCount = commands.Cli.WorkerCount
}

context.NumRemoteWorkerCount = commands.Cli.RemoteWorkerCount

if commands.Cli.MemTrace || commands.Cli.MemTraceDetailed || commands.Cli.MemTraceCSV != "" {
longtaillib.EnableMemtrace()
defer func() {
Expand Down
7 changes: 5 additions & 2 deletions commands/cmd_clonestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ func cloneOneVersion(

func cloneStore(
numWorkerCount int,
remoteStoreWorkerCount int,
sourceStoreURI string,
sourceEndpointResolverURI string,
targetStoreURI string,
Expand All @@ -539,6 +540,7 @@ func cloneStore(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"sourceStoreURI": sourceStoreURI,
"sourceEndpointResolverURI": sourceEndpointResolverURI,
"targetStoreURI": targetStoreURI,
Expand Down Expand Up @@ -578,7 +580,7 @@ func cloneStore(
localFS := longtaillib.CreateFSStorageAPI()
defer localFS.Dispose()

sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -612,7 +614,7 @@ func cloneStore(
defer sourceLRUBlockStore.Dispose()
defer sourceStore.Dispose()

targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, nil, jobs, remoteStoreWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -736,6 +738,7 @@ type CloneStoreCmd struct {
func (r *CloneStoreCmd) Run(ctx *Context) error {
storeStats, timeStats, err := cloneStore(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.SourceStorageURI,
r.SourceS3EndpointResolverURL,
r.TargetStorageURI,
Expand Down
23 changes: 13 additions & 10 deletions commands/cmd_cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

func cpVersionIndex(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string,
Expand All @@ -23,15 +24,16 @@ func cpVersionIndex(
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cpVersionIndex"
log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
})
log.Info(fname)

Expand All @@ -48,7 +50,7 @@ func cpVersionIndex(
defer hashRegistry.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -220,6 +222,7 @@ type CpCmd struct {
func (r *CpCmd) Run(ctx *Context) error {
storeStats, timeStats, err := cpVersionIndex(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath,
Expand Down
5 changes: 4 additions & 1 deletion commands/cmd_createversionstoreindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

func createVersionStoreIndex(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
sourceFilePath string,
Expand All @@ -20,6 +21,7 @@ func createVersionStoreIndex(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"sourceFilePath": sourceFilePath,
Expand All @@ -35,7 +37,7 @@ func createVersionStoreIndex(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -96,6 +98,7 @@ type CreateVersionStoreIndexCmd struct {
func (r *CreateVersionStoreIndexCmd) Run(ctx *Context) error {
storeStats, timeStats, err := createVersionStoreIndex(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.SourcePath,
Expand Down
5 changes: 4 additions & 1 deletion commands/cmd_downsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func readVersionIndex(sourceFilePath string, opts ...longtailstorelib.BlobStoreO

func downsync(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
sourceFilePath string,
Expand All @@ -50,6 +51,7 @@ func downsync(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"sourceFilePath": sourceFilePath,
Expand Down Expand Up @@ -191,7 +193,7 @@ func downsync(
}

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPaths, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -486,6 +488,7 @@ type DownsyncCmd struct {
func (r *DownsyncCmd) Run(ctx *Context) error {
storeStats, timeStats, err := downsync(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.SourcePath,
Expand Down
4 changes: 4 additions & 0 deletions commands/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func get(
numWorkerCount int,
numRemoteWorkerCount int,
getConfigPath string,
getConfigPaths []string,
s3EndpointResolverURI string,
Expand All @@ -31,6 +32,7 @@ func get(
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"numRemoteWorkerCount": numRemoteWorkerCount,
"getConfigPath": getConfigPath,
"getConfigPaths": getConfigPaths,
"s3EndpointResolverURI": s3EndpointResolverURI,
Expand Down Expand Up @@ -113,6 +115,7 @@ func get(

downSyncStoreStats, downSyncTimeStats, err := downsync(
numWorkerCount,
numRemoteWorkerCount,
blobStoreURI,
s3EndpointResolverURI,
"",
Expand Down Expand Up @@ -158,6 +161,7 @@ type GetCmd struct {
func (r *GetCmd) Run(ctx *Context) error {
storeStats, timeStats, err := get(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.GetConfigURI,
r.GetConfigURIs,
r.S3EndpointResolverURL,
Expand Down
15 changes: 9 additions & 6 deletions commands/cmd_initremotestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (

func initRemoteStore(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
hashAlgorithm string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "initRemoteStore"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"hashAlgorithm": hashAlgorithm,
"fname": fname,
"numWorkerCount": numWorkerCount,
"remoteStoreWorkerCount": remoteStoreWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"hashAlgorithm": hashAlgorithm,
})
log.Info(fname)

Expand All @@ -33,7 +35,7 @@ func initRemoteStore(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -78,6 +80,7 @@ type InitRemoteStoreCmd struct {
func (r *InitRemoteStoreCmd) Run(ctx *Context) error {
storeStats, timeStats, err := initRemoteStore(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.Hashing)
Expand Down
10 changes: 8 additions & 2 deletions commands/cmd_printVersionUsage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"runtime"
"time"

"github.com/DanEngelbrecht/golongtail/longtaillib"
Expand All @@ -14,6 +15,7 @@ import (

func printVersionUsage(
numWorkerCount int,
remoteStoreWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string,
Expand Down Expand Up @@ -41,7 +43,7 @@ func printVersionUsage(

var indexStore longtaillib.Longtail_BlockStoreAPI

remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, nil, jobs, remoteStoreWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -103,7 +105,10 @@ func printVersionUsage(
defer progress.Dispose()

blockHashes := existingStoreIndex.GetBlockHashes()
maxBatchSize := int(numWorkerCount)
maxBatchSize := int(remoteStoreWorkerCount)
if maxBatchSize == 0 {
maxBatchSize = runtime.NumCPU()
}
for i := 0; i < len(blockHashes); {
batchSize := len(blockHashes) - i
if batchSize > maxBatchSize {
Expand Down Expand Up @@ -216,6 +221,7 @@ type PrintVersionUsageCmd struct {
func (r *PrintVersionUsageCmd) Run(ctx *Context) error {
storeStats, timeStats, err := printVersionUsage(
ctx.NumWorkerCount,
ctx.NumRemoteWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath,
Expand Down
Loading
Loading