Skip to content

Commit

Permalink
update longtail to 0.3.0 (#199)
Browse files Browse the repository at this point in the history
* use get existsing content in unpack
* progress cleanup
* update longtail to 0.3.0
* update to new API
* release notes
  • Loading branch information
DanEngelbrecht authored Apr 20, 2022
1 parent f978afb commit 0dee82e
Show file tree
Hide file tree
Showing 27 changed files with 208 additions and 89 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/create-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ jobs:
body: |
Changes in this Release
- **ADDED** Added `--cache-target-index` option to downsync/unpack that automatically caches target folder state. Default on, turn off with `--no-cache-target-index`
- **UPDATED** Updated longtail to v0.2.18
- **ADDED** Added `--enable-file-mapping` option to relevant commands to enable reading using memory mapped files. Default off.
- **UPDATED** Updated longtail to v0.3.0
draft: false
prerelease: false
- name: Download Linux artifacts
Expand Down
42 changes: 32 additions & 10 deletions commands/cmd_clonestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func updateCurrentVersionFromLongtail(
pathFilter longtaillib.Longtail_PathFilterAPI,
retainPermissions bool,
sourceStore longtaillib.Longtail_BlockStoreAPI,
enableFileMapping bool,
sourceFilePath string,
sourceFileZipPath string,
targetBlockSize uint32,
Expand Down Expand Up @@ -234,6 +235,7 @@ func updateCurrentVersionFromLongtail(
fs,
jobs,
hashRegistry,
enableFileMapping,
&targetFolderScanner)

localVersionIndex, hash, _, err = targetIndexReader.Get()
Expand Down Expand Up @@ -270,7 +272,7 @@ func updateCurrentVersionFromLongtail(
}
defer existingStoreIndex.Dispose()

changeVersionProgress := longtailutils.CreateProgress("Updating version", 2)
changeVersionProgress := longtailutils.CreateProgress("Updating version ", 1)
defer changeVersionProgress.Dispose()

// Try to change local version
Expand Down Expand Up @@ -309,6 +311,7 @@ func updateCurrentVersionFromLongtail(
fs,
jobs,
hashRegistry,
enableFileMapping,
&targetFolderScanner)

localVersionIndex, hash, _, err = targetIndexReader.Get()
Expand Down Expand Up @@ -338,7 +341,8 @@ func cloneOneVersion(
targetFilePath string,
sourceFilePath string,
sourceFileZipPath string,
currentVersionIndex longtaillib.Longtail_VersionIndex) (longtaillib.Longtail_VersionIndex, error) {
currentVersionIndex longtaillib.Longtail_VersionIndex,
enableFileMapping bool) (longtaillib.Longtail_VersionIndex, error) {
const fname = "cloneOneVersion"

log := logrus.WithFields(logrus.Fields{
Expand All @@ -353,6 +357,7 @@ func cloneOneVersion(
"targetFilePath": targetFilePath,
"sourceFilePath": sourceFilePath,
"sourceFileZipPath": sourceFileZipPath,
"enableFileMapping": enableFileMapping,
})
log.Debug(fname)

Expand All @@ -367,7 +372,20 @@ func cloneOneVersion(

log.Infof("`%s` -> `%s`", sourceFilePath, targetFilePath)

targetVersionIndex, hash, err := updateCurrentVersionFromLongtail(targetPath, currentVersionIndex, jobs, hashRegistry, fs, pathFilter, retainPermissions, sourceStore, sourceFilePath, sourceFileZipPath, targetBlockSize, maxChunksPerBlock)
targetVersionIndex, hash, err := updateCurrentVersionFromLongtail(
targetPath,
currentVersionIndex,
jobs,
hashRegistry,
fs,
pathFilter,
retainPermissions,
sourceStore,
enableFileMapping,
sourceFilePath,
sourceFileZipPath,
targetBlockSize,
maxChunksPerBlock)
if err != nil {
return targetVersionIndex, errors.Wrap(err, fname)
}
Expand All @@ -394,7 +412,7 @@ func cloneOneVersion(
defer versionMissingStoreIndex.Dispose()

if versionMissingStoreIndex.GetBlockCount() > 0 {
writeContentProgress := longtailutils.CreateProgress("Writing content blocks", 2)
writeContentProgress := longtailutils.CreateProgress("Writing content blocks ", 1)

err = longtaillib.WriteContent(
fs,
Expand Down Expand Up @@ -473,7 +491,8 @@ func cloneStore(
hashing string,
compression string,
minBlockUsagePercent uint32,
skipValidate bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
skipValidate bool,
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cloneStore"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand Down Expand Up @@ -514,7 +533,7 @@ func cloneStore(
localFS := longtaillib.CreateFSStorageAPI()
defer localFS.Dispose()

sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly)
sourceRemoteIndexStore, err := remotestore.CreateBlockStoreForURI(sourceStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand All @@ -524,7 +543,7 @@ func cloneStore(
var sourceCompressBlockStore longtaillib.Longtail_BlockStoreAPI

if len(localCachePath) > 0 {
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath))
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath), enableFileMapping)

cacheBlockStore = longtaillib.CreateCacheBlockStore(jobs, localIndexStore, sourceRemoteIndexStore)

Expand All @@ -542,7 +561,7 @@ func cloneStore(
sourceStore := longtaillib.CreateShareBlockStore(sourceLRUBlockStore)
defer sourceStore.Dispose()

targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, "", jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite)
targetRemoteStore, err := remotestore.CreateBlockStoreForURI(targetStoreURI, "", jobs, numWorkerCount, targetBlockSize, maxChunksPerBlock, remotestore.ReadWrite, enableFileMapping)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down Expand Up @@ -613,7 +632,8 @@ func cloneStore(
targetFilePath,
sourceFilePath,
sourceFileZipPath,
currentVersionIndex)
currentVersionIndex,
enableFileMapping)
currentVersionIndex.Dispose()
currentVersionIndex = newCurrentVersionIndex

Expand Down Expand Up @@ -653,6 +673,7 @@ type CloneStoreCmd struct {
HashingOption
CompressionOption
MinBlockUsagePercentOption
EnableFileMappingOption
}

func (r *CloneStoreCmd) Run(ctx *Context) error {
Expand All @@ -672,7 +693,8 @@ func (r *CloneStoreCmd) Run(ctx *Context) error {
r.Hashing,
r.Compression,
r.MinBlockUsagePercent,
r.SkipValidate)
r.SkipValidate,
r.EnableFileMapping)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
26 changes: 15 additions & 11 deletions commands/cmd_cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ func cpVersionIndex(
versionIndexPath string,
localCachePath string,
sourcePath string,
targetPath string) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
targetPath string,
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "cpVersionIndex"
log := logrus.WithContext(context.Background()).WithFields(logrus.Fields{
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"fname": fname,
"numWorkerCount": numWorkerCount,
"blobStoreURI": blobStoreURI,
"versionIndexPath": versionIndexPath,
"localCachePath": localCachePath,
"sourcePath": sourcePath,
"targetPath": targetPath,
"enableFileMapping": enableFileMapping,
})
log.Debug(fname)

Expand All @@ -43,7 +45,7 @@ func cpVersionIndex(
defer hashRegistry.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly)
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand All @@ -59,7 +61,7 @@ func cpVersionIndex(
if localCachePath == "" {
compressBlockStore = longtaillib.CreateCompressBlockStore(remoteIndexStore, creg)
} else {
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath))
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath), enableFileMapping)

cacheBlockStore = longtaillib.CreateCacheBlockStore(jobs, localIndexStore, remoteIndexStore)

Expand Down Expand Up @@ -208,6 +210,7 @@ type CpCmd struct {
CachePathOption
SourcePath string `name:"source path" arg:"" help:"Source path inside the version index to copy"`
TargetPath string `name:"target path" arg:"" help:"Target uri path"`
EnableFileMappingOption
}

func (r *CpCmd) Run(ctx *Context) error {
Expand All @@ -217,7 +220,8 @@ func (r *CpCmd) Run(ctx *Context) error {
r.VersionIndexPath,
r.CachePath,
r.SourcePath,
r.TargetPath)
r.TargetPath,
r.EnableFileMapping)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
2 changes: 1 addition & 1 deletion commands/cmd_createversionstoreindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func createVersionStoreIndex(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly)
indexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, false)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down
20 changes: 13 additions & 7 deletions commands/cmd_downsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func downsync(
includeFilterRegEx string,
excludeFilterRegEx string,
scanTarget bool,
cacheTargetIndex bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
cacheTargetIndex bool,
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "downsync"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -42,6 +43,7 @@ func downsync(
"excludeFilterRegEx": excludeFilterRegEx,
"scanTarget": scanTarget,
"cacheTargetIndex": cacheTargetIndex,
"enableFileMapping": enableFileMapping,
})
log.Debug(fname)

Expand Down Expand Up @@ -124,6 +126,7 @@ func downsync(
fs,
jobs,
hashRegistry,
enableFileMapping,
&targetFolderScanner)

creg := longtaillib.CreateFullCompressionRegistry()
Expand All @@ -133,7 +136,7 @@ func downsync(
defer localFS.Dispose()

// MaxBlockSize and MaxChunksPerBlock are just temporary values until we get the remote index settings
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPath, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly)
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, versionLocalStoreIndexPath, jobs, numWorkerCount, 8388608, 1024, remotestore.ReadOnly, enableFileMapping)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand All @@ -146,7 +149,7 @@ func downsync(
if localCachePath == "" {
compressBlockStore = longtaillib.CreateCompressBlockStore(remoteIndexStore, creg)
} else {
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath))
localIndexStore = longtaillib.CreateFSBlockStore(jobs, localFS, longtailutils.NormalizePath(localCachePath), enableFileMapping)

cacheBlockStore = longtaillib.CreateCacheBlockStore(jobs, localIndexStore, remoteIndexStore)

Expand Down Expand Up @@ -213,7 +216,7 @@ func downsync(
}

changeVersionStartTime := time.Now()
changeVersionProgress := longtailutils.CreateProgress("Updating version", 2)
changeVersionProgress := longtailutils.CreateProgress("Updating version ", 1)
defer changeVersionProgress.Dispose()
err = longtaillib.ChangeVersion(
indexStore,
Expand Down Expand Up @@ -293,7 +296,7 @@ func downsync(
chunker := longtaillib.CreateHPCDCChunkerAPI()
defer chunker.Dispose()

createVersionIndexProgress := longtailutils.CreateProgress("Validating version", 2)
createVersionIndexProgress := longtailutils.CreateProgress("Validating version ", 1)
defer createVersionIndexProgress.Dispose()
validateVersionIndex, err := longtaillib.CreateVersionIndex(
fs,
Expand All @@ -304,7 +307,8 @@ func downsync(
longtailutils.NormalizePath(resolvedTargetFolderPath),
validateFileInfos,
nil,
targetChunkSize)
targetChunkSize,
enableFileMapping)
if err != nil {
err = errors.Wrapf(err, "Failed to create version index for `%s`", resolvedTargetFolderPath)
return storeStats, timeStats, errors.Wrap(err, fname)
Expand Down Expand Up @@ -378,6 +382,7 @@ type DownsyncCmd struct {
TargetPathExcludeRegExOption
ScanTargetOption
CacheTargetIndexOption
EnableFileMappingOption
}

func (r *DownsyncCmd) Run(ctx *Context) error {
Expand All @@ -394,7 +399,8 @@ func (r *DownsyncCmd) Run(ctx *Context) error {
r.IncludeFilterRegEx,
r.ExcludeFilterRegEx,
r.ScanTarget,
r.CacheTargetIndex)
r.CacheTargetIndex,
r.EnableFileMapping)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
11 changes: 8 additions & 3 deletions commands/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func get(
includeFilterRegEx string,
excludeFilterRegEx string,
scanTarget bool,
cacheTargetIndex bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
cacheTargetIndex bool,
enableFileMapping bool) ([]longtailutils.StoreStat, []longtailutils.TimeStat, error) {
const fname = "get"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
Expand All @@ -37,6 +38,7 @@ func get(
"excludeFilterRegEx": excludeFilterRegEx,
"scanTarget": scanTarget,
"cacheTargetIndex": cacheTargetIndex,
"enableFileMapping": enableFileMapping,
})
log.Debug(fname)

Expand Down Expand Up @@ -88,7 +90,8 @@ func get(
includeFilterRegEx,
excludeFilterRegEx,
scanTarget,
cacheTargetIndex)
cacheTargetIndex,
enableFileMapping)

storeStats = append(storeStats, downSyncStoreStats...)
timeStats = append(timeStats, downSyncTimeStats...)
Expand All @@ -108,6 +111,7 @@ type GetCmd struct {
TargetPathExcludeRegExOption
ScanTargetOption
CacheTargetIndexOption
EnableFileMappingOption
}

func (r *GetCmd) Run(ctx *Context) error {
Expand All @@ -122,7 +126,8 @@ func (r *GetCmd) Run(ctx *Context) error {
r.IncludeFilterRegEx,
r.ExcludeFilterRegEx,
r.ScanTarget,
r.CacheTargetIndex)
r.CacheTargetIndex,
r.EnableFileMapping)
ctx.StoreStats = append(ctx.StoreStats, storeStats...)
ctx.TimeStats = append(ctx.TimeStats, timeStats...)
return err
Expand Down
2 changes: 1 addition & 1 deletion commands/cmd_initremotestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func initRemoteStore(
jobs := longtaillib.CreateBikeshedJobAPI(uint32(numWorkerCount), 0)
defer jobs.Dispose()

remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.Init)
remoteIndexStore, err := remotestore.CreateBlockStoreForURI(blobStoreURI, "", jobs, numWorkerCount, 8388608, 1024, remotestore.Init, false)
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down
2 changes: 1 addition & 1 deletion commands/cmd_ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ls(
fakeBlockStoreFS := longtaillib.CreateInMemStorageAPI()
defer fakeBlockStoreFS.Dispose()

fakeBlockStore := longtaillib.CreateFSBlockStore(jobs, fakeBlockStoreFS, "store")
fakeBlockStore := longtaillib.CreateFSBlockStore(jobs, fakeBlockStoreFS, "store", false)
defer fakeBlockStoreFS.Dispose()

storeIndex, err := longtaillib.CreateStoreIndex(
Expand Down
Loading

0 comments on commit 0dee82e

Please sign in to comment.