Skip to content

Commit

Permalink
fixed PutStoreLSI sometimes deleting unmerged source lsis
Browse files Browse the repository at this point in the history
  • Loading branch information
DanEngelbrecht committed Apr 11, 2023
1 parent df26e5a commit f10bfc1
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 167 deletions.
217 changes: 104 additions & 113 deletions remotestore/lsistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,49 @@ func OverwriteStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobSto
return nil
}

func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, localStore *longtailstorelib.BlobStore, LSI longtaillib.Longtail_StoreIndex, maxStoreIndexSize int64) (longtaillib.Longtail_StoreIndex, error) {
func mergeLSIs(LSIs []longtaillib.Longtail_StoreIndex) (longtaillib.Longtail_StoreIndex, error) {
const fname = "mergeLSIs"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"LSIs": LSIs,
})
log.Debug(fname)

if len(LSIs) < 2 {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(errors.New("need at least two indexes to merge"), fname)
}

result := LSIs[0]
disposeResult := false
defer func() {
if disposeResult {
result.Dispose()
}
}()

for i := 1; i < len(LSIs); i++ {
mergedLSI, err := longtaillib.MergeStoreIndex(result, LSIs[i])
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
if disposeResult {
result.Dispose()
}
result = mergedLSI
disposeResult = true
}

disposeResult = false
return result, nil
}

func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, localStore *longtailstorelib.BlobStore, newLSI longtaillib.Longtail_StoreIndex, maxStoreIndexSize int64) (longtaillib.Longtail_StoreIndex, error) {
const fname = "PutStoreLSI"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
"ctx": ctx,
"remoteStore": remoteStore,
"LSI": LSI,
"LSI": newLSI,
"maxStoreIndexSize": maxStoreIndexSize,
})
log.Debug(fname)
Expand All @@ -124,11 +160,11 @@ func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, lo
}
}()

ConsolidatedLSI := longtaillib.Longtail_StoreIndex{}
DisposeConsolidatedLSI := true
ResultLSI := newLSI
DisposeResultLSI := false
defer func() {
if DisposeConsolidatedLSI {
ConsolidatedLSI.Dispose()
if DisposeResultLSI {
ResultLSI.Dispose()
}
}()

Expand All @@ -139,22 +175,16 @@ func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, lo
for i := 0; i < len(LSIs); i++ {
lsiSize := LSIs[i].LSI.GetSize()
if lsiSize > maxStoreIndexSize {
unmergedLSIs = append(unmergedLSIs, i)
continue
for i < len(LSIs) {
unmergedLSIs = append(unmergedLSIs, i)
i++
}
break
}

MergedLSI := longtaillib.Longtail_StoreIndex{}

if ConsolidatedLSI.IsValid() {
MergedLSI, err = longtaillib.MergeStoreIndex(ConsolidatedLSI, LSIs[i].LSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
} else {
MergedLSI, err = longtaillib.MergeStoreIndex(LSI, LSIs[i].LSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
MergedLSI, err := longtaillib.MergeStoreIndex(ResultLSI, LSIs[i].LSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}

if MergedLSI.GetSize() > maxStoreIndexSize {
Expand All @@ -163,23 +193,22 @@ func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, lo
continue
}

ConsolidatedLSI.Dispose()
ConsolidatedLSI = MergedLSI
if DisposeResultLSI {
ResultLSI.Dispose()
}
ResultLSI = MergedLSI
DisposeResultLSI = true
log.Debugf("merged store index `%s`", LSIs[i].Name)
mergedLSIs = append(mergedLSIs, i)

// We don't need the source LSI anymore
LSIs[i].LSI.Dispose()
mergedLSIs = append(mergedLSIs, i)
}
}

log.Debugf("merged in %d store indexes, leaving %d unmerged", len(mergedLSIs), len(unmergedLSIs))

var buffer longtaillib.NativeBuffer
if ConsolidatedLSI.IsValid() {
buffer, err = longtaillib.WriteStoreIndexToBuffer(ConsolidatedLSI)
} else {
buffer, err = longtaillib.WriteStoreIndexToBuffer(LSI)
}
buffer, err := longtaillib.WriteStoreIndexToBuffer(ResultLSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
Expand All @@ -202,68 +231,41 @@ func PutStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, lo
}
log.Debugf("stored new store index `%s`", newName)

success := false

defer func() {
if success {
for Index := range mergedLSIs {
if LSIs[Index].Name == newName {
continue
}
_, err = longtailutils.DeleteBlobWithRetry(ctx, remoteClient, LSIs[Index].Name)
if err != nil && !longtaillib.IsNotExist(err) {
log.WithError(err).Warnf("failed to delete `%s` from store `%s`", LSIs[Index].Name, remoteClient.String())
}
{
for _, Index := range mergedLSIs {
if LSIs[Index].Name == newName {
continue
}
_, err = longtailutils.DeleteBlobWithRetry(ctx, remoteClient, LSIs[Index].Name)
if err == nil || longtaillib.IsNotExist(err) {
log.Debugf("deleted merged store index `%s`", LSIs[Index].Name)
continue
}
log.WithError(err).Warnf("failed to delete `%s` from store `%s`", LSIs[Index].Name, remoteClient.String())
}
}()
}

if len(unmergedLSIs) == 0 {
if ConsolidatedLSI.IsValid() {
DisposeConsolidatedLSI = false
success = true
return ConsolidatedLSI, nil
if DisposeResultLSI {
DisposeResultLSI = false
return ResultLSI, nil
}
result, err := LSI.Copy()
CopyLSI, err := ResultLSI.Copy()
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
success = true
return result, nil
}

result := longtaillib.Longtail_StoreIndex{}
if ConsolidatedLSI.IsValid() {
result = ConsolidatedLSI
} else {
result = LSI
return CopyLSI, nil
}

disposeResult := false
defer func() {
if disposeResult {
result.Dispose()
}
}()

toMerge := []longtaillib.Longtail_StoreIndex{ResultLSI}
for _, i := range unmergedLSIs {
mergedLSI, err := longtaillib.MergeStoreIndex(result, LSIs[i].LSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
if disposeResult {
result.Dispose()
}
result = mergedLSI
LSIs[i].LSI.Dispose()
disposeResult = true
log.Debugf("merged in store index `%s` into result", LSIs[i].Name)
toMerge = append(toMerge, LSIs[i].LSI)
}

disposeResult = false
success = true
return result, nil
fullLSI, err := mergeLSIs(toMerge)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
return fullLSI, nil
}

type LSIEntry struct {
Expand Down Expand Up @@ -387,8 +389,7 @@ func GetStoreLSIs(ctx context.Context, remoteStore longtailstorelib.BlobStore, l

// Download all LSIs not in local cache from remote and store them locally
for _, newLSIName := range newLSIs {
// TODO: We probably need to limit the number of goroutines we spawn ere as each one
// wil do a network request
// TODO: We probably need to limit the number of goroutines we spawn ere as each one will do a network request
go func(ctx context.Context, remoteStore longtailstorelib.BlobStore, localStore *longtailstorelib.BlobStore, lsiName string, resultChan chan Result) {
remoteClient, err := remoteStore.NewClient(ctx)
if err != nil {
Expand Down Expand Up @@ -443,37 +444,28 @@ func GetStoreLSIs(ctx context.Context, remoteStore longtailstorelib.BlobStore, l
if err == nil {
log.WithFields(logrus.Fields{
"error": LSIResult.Error,
}).Warn("failed reading remote lsi")
"lsi": LSIResult.Entry.Name,
}).Debug("remote lsi was removed before reading")
err = LSIResult.Error
}
} else if err == nil || longtaillib.IsNotExist(err) {
continue
}

if err == nil || longtaillib.IsNotExist(err) {
log.WithFields(logrus.Fields{
"error": LSIResult.Error,
"lsi": LSIResult.Entry.Name,
}).Error("failed reading remote lsi")
err = LSIResult.Error
}
continue
}
LSIs = append(LSIs, LSIResult.Entry)
}
if err != nil {
return nil, errors.Wrap(err, fname)
}

// remoteLSIs2, err := remoteClient.GetObjects("store", ".lsi")
// if err != nil && !longtaillib.IsNotExist(err) {
// return nil, errors.Wrap(err, fname)
// }
// log.Debugf("post result, found %d store indexes in remote store", len(remoteLSIs2))
// sort.Slice(remoteLSIs2, func(i, j int) bool { return remoteLSIs2[i].Name < remoteLSIs2[j].Name })
// if len(remoteLSIs) != len(remoteLSIs2) {
// return nil, errors.Wrap(os.ErrNotExist, "remote store indexes have changed during get")
// }
// for i := range remoteLSIs {
// if remoteLSIs[i].Name != remoteLSIs2[i].Name {
// return nil, errors.Wrap(os.ErrNotExist, "remote store indexes have changed during get")
// }
// }

success = true
log.Debugf("found %d store indexes", len(LSIs))
return LSIs, nil
Expand All @@ -499,27 +491,26 @@ func GetStoreLSI(ctx context.Context, remoteStore longtailstorelib.BlobStore, lo
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
if len(LSIs) == 0 {
return longtaillib.CreateStoreIndexFromBlocks([]longtaillib.Longtail_BlockIndex{})
}
if len(LSIs) == 1 {
return LSIs[0].LSI, nil
}

defer func() {
for _, LSI := range LSIs {
LSI.LSI.Dispose()
}
}()

if len(LSIs) > 0 {
result := LSIs[0].LSI
for i := 1; i < len(LSIs); i++ {
newLSI, err := longtaillib.MergeStoreIndex(result, LSIs[i].LSI)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
LSIs[i].LSI.Dispose()
result.Dispose()
result = newLSI
}

// Reset reference first LSI as we will either dispose it (at merge) or return it
LSIs[0].LSI = longtaillib.Longtail_StoreIndex{}
return result, nil
toMerge := []longtaillib.Longtail_StoreIndex{}
for i := 0; i < len(LSIs); i++ {
toMerge = append(toMerge, LSIs[i].LSI)
}
fullLSI, err := mergeLSIs(toMerge)
if err != nil {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
return longtaillib.CreateStoreIndexFromBlocks([]longtaillib.Longtail_BlockIndex{})
return fullLSI, nil
}
Loading

0 comments on commit f10bfc1

Please sign in to comment.