Skip to content

Commit

Permalink
fix error handling on failing to read store index (#241)
Browse files Browse the repository at this point in the history
* Gracefully handle non-existing store index in contentIndexWorker
* Don't do a fatal exit early if remotestore workers fail, log an error instead
* Don't exit contentIndexWorker if failing to read store index, just return proper error
* Add test with broken store.lsi file
* Clearer logging when failing to read remote data
* changelog
  • Loading branch information
DanEngelbrecht authored Feb 19, 2023
1 parent 0e62ff1 commit 6700717
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 127 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
##
- **CHANGED** Only `init` operation will try to rebuild the store index from blocks
- **ADDED** Added retry logic when reading remote store index
- **FIXED** Don't do fatal exit if reading store index fails, just report error back to caller and log error at exit
- **FIXED** Gracefully handle missing store.lsi condition as separate from failing to read existing store index
- **UPDATED** All golang module dependencies updated

## v0.3.6
Expand Down
22 changes: 22 additions & 0 deletions commands/cmd_upsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"io/ioutil"
"os"
"testing"
)

Expand Down Expand Up @@ -42,6 +43,27 @@ func TestUpsyncWithLSI(t *testing.T) {
}
}

func TestUpsyncWithBrokenLSI(t *testing.T) {
testPath, _ := ioutil.TempDir("", "test")
fsBlobPathPrefix := "fsblob://" + testPath
createVersionData(t, fsBlobPathPrefix)

data := [11]uint8{8, 21, 141, 3, 1, 4, 124, 213, 1, 23, 123}
err := os.MkdirAll(fsBlobPathPrefix[9:]+"/storage", 0777)
if err != nil {
t.Errorf("%s", err)
}
err = ioutil.WriteFile(fsBlobPathPrefix[9:]+"/storage/store.lsi", data[:11], 0644)
if err != nil {
t.Errorf("%s", err)
}

cmd, err := executeCommandLine("upsync", "--source-path", testPath+"/version/v1", "--target-path", fsBlobPathPrefix+"/index/v1.lvi", "--storage-uri", fsBlobPathPrefix+"/storage", "--version-local-store-index-path", fsBlobPathPrefix+"/index/v1.lsi")
if err == nil {
t.Errorf("%s", cmd)
}
}

//func TestUpsyncWithGetConfig(t *testing.T) {
// testPath, _ := ioutil.TempDir("", "test")
// fsBlobPathPrefix := "fsblob://" + testPath
Expand Down
2 changes: 1 addition & 1 deletion longtailutils/longtailutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func ReadBlobWithRetry(
return nil, retryCount, errors.Wrap(err, fname)
}
if !exists {
err = errors.Wrap(longtaillib.NotExistErr(), fmt.Sprintf("%s does not exist", key))
err = errors.Wrap(longtaillib.NotExistErr(), fmt.Sprintf("%s/%s does not exist", client.String(), key))
return nil, retryCount, errors.Wrap(err, fname)
}
retryDelay := []time.Duration{0, 100 * time.Millisecond, 250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
Expand Down
221 changes: 95 additions & 126 deletions remotestore/remotestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,33 +596,6 @@ func addBlocksToRemoteStoreIndex(
return addToRemoteStoreIndex(ctx, blobClient, addedStoreIndex)
}

func storeIndexWorkerReplyErrorState(
blockIndexMessages <-chan blockIndexMessage,
getExistingContentMessages <-chan getExistingContentMessage,
pruneBlocksMessages <-chan pruneBlocksMessage,
flushMessages <-chan int,
flushReplyMessages chan<- error) {
const fname = "storeIndexWorkerReplyErrorState"
log := logrus.WithFields(logrus.Fields{
"fname": fname,
})
log.Debug(fname)
for {
select {
case <-flushMessages:
flushReplyMessages <- nil
case _, more := <-blockIndexMessages:
if !more {
return
}
case getExistingContentMessage := <-getExistingContentMessages:
getExistingContentMessage.asyncCompleteAPI.OnComplete(longtaillib.Longtail_StoreIndex{}, errors.Wrap(longtaillib.InvalidArgumentError(), fname))
case pruneBlocksMessage := <-pruneBlocksMessages:
pruneBlocksMessage.asyncCompleteAPI.OnComplete(0, errors.Wrap(longtaillib.InvalidArgumentError(), fname))
}
}
}

func onPreflighMessage(
s *remoteStore,
storeIndex longtaillib.Longtail_StoreIndex,
Expand Down Expand Up @@ -767,7 +740,21 @@ func contentIndexWorker(
log.Debug(fname)
client, err := s.blobStore.NewClient(ctx)
if err != nil {
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
out:
for {
select {
case <-flushMessages:
flushReplyMessages <- nil
case _, more := <-blockIndexMessages:
if !more {
break out
}
case getExistingContentMessage := <-getExistingContentMessages:
getExistingContentMessage.asyncCompleteAPI.OnComplete(longtaillib.Longtail_StoreIndex{}, errors.Wrap(longtaillib.InvalidArgumentError(), fname))
case pruneBlocksMessage := <-pruneBlocksMessages:
pruneBlocksMessage.asyncCompleteAPI.OnComplete(0, errors.Wrap(longtaillib.InvalidArgumentError(), fname))
}
}
return errors.Wrap(err, fname)
}
defer client.Close()
Expand All @@ -792,10 +779,7 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
preflightGetMsg.asyncCompleteAPI.OnComplete([]uint64{}, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
if updatedStoreIndex.IsValid() {
} else if updatedStoreIndex.IsValid() {
onPreflighMessage(s, updatedStoreIndex, preflightGetMsg, prefetchBlockMessages)
updatedStoreIndex.Dispose()
} else {
Expand All @@ -814,10 +798,7 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
getExistingContentMessage.asyncCompleteAPI.OnComplete(longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
if accessType == ReadOnly {
} else if accessType == ReadOnly {
go func() {
onGetExistingContentMessage(s, storeIndex, getExistingContentMessage)
}()
Expand All @@ -837,22 +818,21 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
pruneBlocksMessage.asyncCompleteAPI.OnComplete(0, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
prunedCount := uint32(0)
prunedStoreIndex := longtaillib.Longtail_StoreIndex{}
if updatedStoreIndex.IsValid() {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, updatedStoreIndex, pruneBlocksMessage.keepBlockHashes)
updatedStoreIndex.Dispose()
} else {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, storeIndex, pruneBlocksMessage.keepBlockHashes)
}
if prunedStoreIndex.IsValid() {
storeIndex.Dispose()
storeIndex = prunedStoreIndex
prunedCount := uint32(0)
prunedStoreIndex := longtaillib.Longtail_StoreIndex{}
if updatedStoreIndex.IsValid() {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, updatedStoreIndex, pruneBlocksMessage.keepBlockHashes)
updatedStoreIndex.Dispose()
} else {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, storeIndex, pruneBlocksMessage.keepBlockHashes)
}
if prunedStoreIndex.IsValid() {
storeIndex.Dispose()
storeIndex = prunedStoreIndex
}
pruneBlocksMessage.asyncCompleteAPI.OnComplete(prunedCount, errors.Wrap(err, fname))
}
pruneBlocksMessage.asyncCompleteAPI.OnComplete(prunedCount, errors.Wrap(err, fname))
default:
}

Expand Down Expand Up @@ -880,10 +860,7 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
preflightGetMsg.asyncCompleteAPI.OnComplete([]uint64{}, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
if updatedStoreIndex.IsValid() {
} else if updatedStoreIndex.IsValid() {
onPreflighMessage(s, updatedStoreIndex, preflightGetMsg, prefetchBlockMessages)
updatedStoreIndex.Dispose()
} else {
Expand All @@ -900,10 +877,7 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
getExistingContentMessage.asyncCompleteAPI.OnComplete(longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
if accessType == ReadOnly {
} else if accessType == ReadOnly {
go func() {
onGetExistingContentMessage(s, storeIndex, getExistingContentMessage)
}()
Expand All @@ -922,22 +896,21 @@ func contentIndexWorker(
if err != nil {
storeIndex.Dispose()
pruneBlocksMessage.asyncCompleteAPI.OnComplete(0, errors.Wrap(err, fname))
storeIndexWorkerReplyErrorState(blockIndexMessages, getExistingContentMessages, pruneBlocksMessages, flushMessages, flushReplyMessages)
return errors.Wrap(err, fname)
}
prunedCount := uint32(0)
prunedStoreIndex := longtaillib.Longtail_StoreIndex{}
if updatedStoreIndex.IsValid() {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, updatedStoreIndex, pruneBlocksMessage.keepBlockHashes)
updatedStoreIndex.Dispose()
} else {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, storeIndex, pruneBlocksMessage.keepBlockHashes)
}
if prunedStoreIndex.IsValid() {
storeIndex.Dispose()
storeIndex = prunedStoreIndex
prunedCount := uint32(0)
prunedStoreIndex := longtaillib.Longtail_StoreIndex{}
if updatedStoreIndex.IsValid() {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, updatedStoreIndex, pruneBlocksMessage.keepBlockHashes)
updatedStoreIndex.Dispose()
} else {
prunedCount, prunedStoreIndex, err = onPruneBlocksMessage(ctx, s, client, storeIndex, pruneBlocksMessage.keepBlockHashes)
}
if prunedStoreIndex.IsValid() {
storeIndex.Dispose()
storeIndex = prunedStoreIndex
}
pruneBlocksMessage.asyncCompleteAPI.OnComplete(prunedCount, errors.Wrap(err, fname))
}
pruneBlocksMessage.asyncCompleteAPI.OnComplete(prunedCount, errors.Wrap(err, fname))
}
}

Expand Down Expand Up @@ -1112,13 +1085,13 @@ func (s *remoteStore) Close() {
for i := 0; i < s.workerCount; i++ {
err := <-s.workerErrorChan
if err != nil {
logrus.Fatal(err)
logrus.Error(err)
}
}
close(s.blockIndexChan)
err := <-s.workerErrorChan
if err != nil {
logrus.Fatal(err)
logrus.Error(err)
}

s.defaultClient.Close()
Expand Down Expand Up @@ -1156,22 +1129,22 @@ func tryAddRemoteStoreIndexWithLocking(

remoteStoreIndex, err := longtaillib.ReadStoreIndexFromBuffer(blob)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Cant parse store index from `%s`", key))
err = errors.Wrap(err, fmt.Sprintf("Cant parse store index from `%s/%s`", blobClient.String(), key))
return false, longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
defer remoteStoreIndex.Dispose()
log.WithFields(logrus.Fields{"path": objHandle.String(), "bytes": len(blob)}).Info("read store index")

newStoreIndex, err := longtaillib.MergeStoreIndex(remoteStoreIndex, addStoreIndex)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed merging store index for `%s`", key))
err = errors.Wrap(err, fmt.Sprintf("Failed merging store index for `%s/%s`", blobClient.String(), key))
return false, longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}

storeBlob, err := longtaillib.WriteStoreIndexToBuffer(newStoreIndex)
if err != nil {
newStoreIndex.Dispose()
err = errors.Wrap(err, fmt.Sprintf("Failed serializing store index for `%s`", key))
err = errors.Wrap(err, fmt.Sprintf("Failed serializing store index for `%s/%s`", blobClient.String(), key))
return false, longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
defer storeBlob.Dispose()
Expand All @@ -1190,7 +1163,7 @@ func tryAddRemoteStoreIndexWithLocking(
}
storeBlob, err := longtaillib.WriteStoreIndexToBuffer(addStoreIndex)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed serializing store index for `%s`", key))
err = errors.Wrap(err, fmt.Sprintf("Failed serializing store index for `%s/%s`", blobClient.String(), key))
return false, longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
defer storeBlob.Dispose()
Expand Down Expand Up @@ -1665,12 +1638,12 @@ func readStoreStoreIndexFromPath(
return longtaillib.Longtail_StoreIndex{}, errors.Wrapf(err, fname)
}
if len(blobData) == 0 {
err = errors.Wrap(os.ErrNotExist, fmt.Sprintf("%s contains no data", key))
err = errors.Wrap(os.ErrNotExist, fmt.Sprintf("%s/%s contains no data", client.String(), key))
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
storeIndex, err := longtaillib.ReadStoreIndexFromBuffer(blobData)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Cant parse store index from `%s`", key))
err = errors.Wrap(err, fmt.Sprintf("Cant parse store index from `%s/%s`", client.String(), key))
return longtaillib.Longtail_StoreIndex{}, errors.Wrapf(err, fname)
}
return storeIndex, nil
Expand Down Expand Up @@ -1885,51 +1858,6 @@ func readRemoteStoreIndex(

var err error
var storeIndex longtaillib.Longtail_StoreIndex
if accessType != Init {
if accessType == ReadOnly && len(optionalStoreIndexPaths) > 0 {
for index, optionalStoreIndexPath := range optionalStoreIndexPaths {
oneStoreIndex, err := readStoreIndexFromURI(ctx, optionalStoreIndexPath, blobStoreOptions...)
if err != nil {
storeIndex.Dispose()
break
}
if index == 0 {
storeIndex = oneStoreIndex
} else {
mergedStoreIndex, err := longtaillib.MergeStoreIndex(storeIndex, oneStoreIndex)
if err != nil {
log.WithError(err).Infof("Failed merging in store index from '%s'", optionalStoreIndexPath)
oneStoreIndex.Dispose()
storeIndex.Dispose()
break
}
storeIndex.Dispose()
oneStoreIndex.Dispose()
storeIndex = mergedStoreIndex
}
}
}
if !storeIndex.IsValid() {
storeIndex, _, err = readStoreStoreIndexWithItems(ctx, client)
if err != nil {
log.WithError(err).Info("Failed reading existsing store index")
}
}
if storeIndex.IsValid() {
return storeIndex, nil
}
}

if accessType == ReadOnly {
log.Info("No remote index found, using empty store index")
storeIndex, err = longtaillib.CreateStoreIndexFromBlocks([]longtaillib.Longtail_BlockIndex{})
if err != nil {
err := errors.Wrap(err, "Failed creating empty store index")
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
return storeIndex, nil
}

if accessType == Init {
log.Info("bulding store index from blocks")
storeIndex, err = buildStoreIndexFromStoreBlocks(
Expand All @@ -1953,7 +1881,48 @@ func readRemoteStoreIndex(
return storeIndex, nil
}

return storeIndex, errors.Wrap(err, fname)
if accessType == ReadOnly && len(optionalStoreIndexPaths) > 0 {
for index, optionalStoreIndexPath := range optionalStoreIndexPaths {
oneStoreIndex, err := readStoreIndexFromURI(ctx, optionalStoreIndexPath, blobStoreOptions...)
if err != nil {
storeIndex.Dispose()
break
}
if index == 0 {
storeIndex = oneStoreIndex
} else {
mergedStoreIndex, err := longtaillib.MergeStoreIndex(storeIndex, oneStoreIndex)
if err != nil {
log.WithError(err).Infof("Failed merging in store index from '%s'", optionalStoreIndexPath)
oneStoreIndex.Dispose()
storeIndex.Dispose()
break
}
storeIndex.Dispose()
oneStoreIndex.Dispose()
storeIndex = mergedStoreIndex
}
}
}

if storeIndex.IsValid() && err == nil {
return storeIndex, nil
}

storeIndex, _, err = readStoreStoreIndexWithItems(ctx, client)
if err == nil {
return storeIndex, nil
} else if !longtaillib.IsNotExist(err) {
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}

storeIndex, err = longtaillib.CreateStoreIndexFromBlocks([]longtaillib.Longtail_BlockIndex{})
if err != nil {
err := errors.Wrap(err, "Failed creating empty store index")
return longtaillib.Longtail_StoreIndex{}, errors.Wrap(err, fname)
}
log.Info("No remote index found, using empty store index")
return storeIndex, nil
}

func getBlockPath(basePath string, blockHash uint64) string {
Expand Down

0 comments on commit 6700717

Please sign in to comment.