Skip to content

Commit

Permalink
lsi cache store path
Browse files Browse the repository at this point in the history
  • Loading branch information
DanEngelbrecht committed Apr 22, 2023
1 parent 88df138 commit ca262bd
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 42 deletions.
14 changes: 4 additions & 10 deletions commands/cmd_clonestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,7 @@ func cloneStore(
compression string,
minBlockUsagePercent uint32,
skipValidate bool,
enableFileMapping bool,
maxStoreIndexSize int64) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cloneStore"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -533,7 +532,6 @@ func cloneStore(
"compression": compression,
"minBlockUsagePercent": minBlockUsagePercent,
"skipValidate": skipValidate,
"maxStoreIndexSize": maxStoreIndexSize,
})
log.Info(fname)

Expand All @@ -555,8 +553,7 @@ func cloneStore(
localFS := longtaillib.CreateFSStorageAPI()
defer localFS.Dispose()

// TODO: Cache store uri
sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, "", -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(sourceEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -584,8 +581,7 @@ func cloneStore(
sourceStore := longtaillib.CreateShareBlockStore(sourceLRUBlockStore)
defer sourceStore.Dispose()

// TODO: Cache store uri
targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, "", maxStoreIndexSize, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, "", -1, nil, jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -702,7 +698,6 @@ type CloneStoreCmd struct {
CompressionOption
MinBlockUsagePercentOption
EnableFileMappingOption
MaxStoreIndexSizeOption
}

func (r *CloneStoreCmd) Run(ctx *Context) error {
Expand All @@ -725,8 +720,7 @@ func (r *CloneStoreCmd) Run(ctx *Context) error {
r.Compression,
r.MinBlockUsagePercent,
r.SkipValidate,
r.EnableFileMapping,
r.MaxStoreIndexSize)
r.EnableFileMapping)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
11 changes: 7 additions & 4 deletions commands/cmd_cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func cpVersionIndex(
localCachePath string,
sourcePath string,
targetPath string,
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
enableFileMapping bool,
storeIndexCachePath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cpVersionIndex"
log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -32,6 +33,7 @@ func cpVersionIndex(
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
"storeIndexCachePath": storeIndexCachePath,
})
log.Info(fname)

Expand All @@ -48,8 +50,7 @@ func cpVersionIndex(
defer hashRegistry.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
// TODO: Cache store uri
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, storeIndexCachePath, -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -216,6 +217,7 @@ type CpCmd struct {
SourcePath string `name:"source path" arg:"" help:"Source path inside the version index to copy"`
TargetPath string `name:"target path" arg:"" help:"Target uri path"`
EnableFileMappingOption
StoreIndexCachePathOption
}

func (r *CpCmd) Run(ctx *Context) error {
Expand All @@ -227,7 +229,8 @@ func (r *CpCmd) Run(ctx *Context) error {
r.CachePath,
r.SourcePath,
r.TargetPath,
r.EnableFileMapping)
r.EnableFileMapping,
r.StoreIndexCachePath)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
15 changes: 11 additions & 4 deletions commands/cmd_createversionstoreindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ func createVersionStoreIndex(
blobStoreURI string,
s3EndpointResolverURI string,
sourceFilePath string,
versionLocalStoreIndexPath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
versionLocalStoreIndexPath string,
storeIndexCachePath string,
maxStoreIndexSize int64) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "createVersionStoreIndex"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -24,6 +26,8 @@ func createVersionStoreIndex(
"s3EndpointResolverURI": s3EndpointResolverURI,
"sourceFilePath": sourceFilePath,
"versionLocalStoreIndexPath": versionLocalStoreIndexPath,
"storeIndexCachePath": storeIndexCachePath,
"maxStoreIndexSize": maxStoreIndexSize,
})
log.Info(fname)

Expand All @@ -35,8 +39,7 @@ func createVersionStoreIndex(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

// TODO: Cache store uri
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, storeIndexCachePath, maxStoreIndexSize, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -92,6 +95,8 @@ type CreateVersionStoreIndexCmd struct {
S3EndpointResolverURLOption
SourceUriOption
VersionLocalStoreIndexPathOption
StoreIndexCachePathOption
MaxStoreIndexSizeOption
}

func (r *CreateVersionStoreIndexCmd) Run(ctx *Context) error {
Expand All @@ -100,7 +105,9 @@ func (r *CreateVersionStoreIndexCmd) Run(ctx *Context) error {
r.StorageURI,
r.S3EndpointResolverURL,
r.SourcePath,
r.VersionLocalStoreIndexPath)
r.VersionLocalStoreIndexPath,
r.StoreIndexCachePath,
r.MaxStoreIndexSize)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
11 changes: 3 additions & 8 deletions commands/cmd_initremotestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ func initRemoteStore(
numWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
hashAlgorithm string,
maxStoreIndexSize int64) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
hashAlgorithm string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "initRemoteStore"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"hashAlgorithm": hashAlgorithm,
"maxStoreIndexSize": maxStoreIndexSize,
})
log.Info(fname)

Expand All @@ -35,8 +33,7 @@ func initRemoteStore(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

// TODO: Cache store uri
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", maxStoreIndexSize, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -76,16 +73,14 @@ type InitRemoteStoreCmd struct {
StorageURIOption
S3EndpointResolverURLOption
HashingOption
MaxStoreIndexSizeOption
}

func (r *InitRemoteStoreCmd) Run(ctx *Context) error {
storeStats, timeStats, err := initRemoteStore(
ctx.NumWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.Hashing,
r.MaxStoreIndexSize)
r.Hashing)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
11 changes: 7 additions & 4 deletions commands/cmd_printVersionUsage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ func printVersionUsage(
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string,
localCachePath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
localCachePath string,
storeIndexCachePath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "printVersionUsage"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -26,6 +27,7 @@ func printVersionUsage(
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"storeIndexCachePath": storeIndexCachePath,
})
log.Info(fname)

Expand All @@ -41,8 +43,7 @@ func printVersionUsage(

var indexStore longtaillib.Longtail_BlockStoreAPI

// TODO: Cache store uri
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, storeIndexCachePath, -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -212,6 +213,7 @@ type PrintVersionUsageCmd struct {
S3EndpointResolverURLOption
VersionIndexPathOption
CachePathOption
StoreIndexCachePathOption
}

func (r *PrintVersionUsageCmd) Run(ctx *Context) error {
Expand All @@ -220,7 +222,8 @@ func (r *PrintVersionUsageCmd) Run(ctx *Context) error {
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath,
r.CachePath)
r.CachePath,
r.StoreIndexCachePath)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
18 changes: 10 additions & 8 deletions commands/cmd_prunestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func gatherBlocksToKeep(
writeVersionLocalStoreIndex bool,
validateVersions bool,
skipInvalidVersions bool,
storeIndexCachePath string,
dryRun bool,
jobs longtaillib.Longtail_JobAPI) ([]uint64, error) {
const fname = "gatherBlocksToKeep"
Expand All @@ -152,11 +153,11 @@ func gatherBlocksToKeep(
"writeVersionLocalStoreIndex": writeVersionLocalStoreIndex,
"validateVersions": validateVersions,
"skipInvalidVersions": skipInvalidVersions,
"storeIndexCachePath": storeIndexCachePath,
"dryRun": dryRun,
})
log.Debug(fname)
// TODO: Cache store uri
remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, storeIndexCachePath, -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return nil, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func pruneStore(
writeVersionLocalStoreIndex bool,
validateVersions bool,
skipInvalidVersions bool,
maxStoreIndexSize int64,
storeIndexCachePath string,
dryRun bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "pruneStore"
log := logrus.WithFields(logrus.Fields{
Expand All @@ -282,7 +283,7 @@ func pruneStore(
"writeVersionLocalStoreIndex": writeVersionLocalStoreIndex,
"validateVersions": validateVersions,
"skipInvalidVersions": skipInvalidVersions,
"maxStoreIndexSize": maxStoreIndexSize,
"storeIndexCachePath": storeIndexCachePath,
"dryRun": dryRun,
})
log.Info(fname)
Expand Down Expand Up @@ -348,6 +349,7 @@ func pruneStore(
writeVersionLocalStoreIndex,
validateVersions,
skipInvalidVersions,
storeIndexCachePath,
dryRun,
jobs)

Expand All @@ -362,8 +364,8 @@ func pruneStore(
fmt.Printf("Prune would keep %d blocks", len(blocksToKeep))
return storeStats, timeStats, nil
}
// TODO: Cache store uri
remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, "", maxStoreIndexSize, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadWrite, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))

remoteStore, err := remotestore.CreateBlockStoreForURI(storageURI, storeIndexCachePath, -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadWrite, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -404,7 +406,7 @@ type PruneStoreCmd struct {
WriteVersionLocalStoreIndex bool `name:"write-version-local-store-index" help:"Write a new version local store index for each version. This requires a valid version-local-store-index-paths input parameter"`
ValidateVersions bool `name:"validate-versions" help:"Verify that all content needed for a version is available in the store"`
SkipInvalidVersions bool `name:"skip-invalid-versions" help:"If an invalid version is found, disregard its blocks. If not set and validate-version is set, invalid version will abort with an error"`
MaxStoreIndexSizeOption
StoreIndexCachePathOption
}

func (r *PruneStoreCmd) Run(ctx *Context) error {
Expand All @@ -417,7 +419,7 @@ func (r *PruneStoreCmd) Run(ctx *Context) error {
r.WriteVersionLocalStoreIndex,
r.ValidateVersions,
r.SkipInvalidVersions,
r.MaxStoreIndexSize,
r.StoreIndexCachePath,
r.DryRun)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
Expand Down
11 changes: 7 additions & 4 deletions commands/cmd_validateversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ func validateVersion(
numWorkerCount int,
blobStoreURI string,
s3EndpointResolverURI string,
versionIndexPath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
versionIndexPath string,
storeIndexCachePath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "validateVersion"
log := logrus.WithFields(logrus.Fields{
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"s3EndpointResolverURI": s3EndpointResolverURI,
"versionIndexPath": versionIndexPath,
"storeIndexCachePath": storeIndexCachePath,
})
log.Info(fname)

Expand All @@ -33,8 +35,7 @@ func validateVersion(
defer jobs.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
// TODO: Cache store uri
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", 1024*1024*16, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, storeIndexCachePath, -1, nil, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -81,14 +82,16 @@ type ValidateVersionCmd struct {
StorageURIOption
S3EndpointResolverURLOption
VersionIndexPathOption
StoreIndexCachePathOption
}

func (r *ValidateVersionCmd) Run(ctx *Context) error {
storeStats, timeStats, err := validateVersion(
ctx.NumWorkerCount,
r.StorageURI,
r.S3EndpointResolverURL,
r.VersionIndexPath)
r.VersionIndexPath,
r.StoreIndexCachePath)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down

0 comments on commit ca262bd

Please sign in to comment.