Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
fix: resolve issue with file resumption on error and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Jul 24, 2023
1 parent 7bdb4bb commit 5265703
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 19 deletions.
115 changes: 115 additions & 0 deletions graph_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,121 @@ func TestGetFile(t *testing.T) {
}
}

func TestGetFileWithBadBlockReturned(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

requestNum := 0
s := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
requestNum++
switch requestNum {
case 1:
// Expect the full request, but return one that terminates at the root block
expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa"
if request.URL.Path != expectedUri {
panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI))
}

if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{
"bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root
}); err != nil {
panic(err)
}
case 2:
// Expect the full request, but return a totally unrelated block
expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa"
if request.URL.Path != expectedUri {
panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI))
}

if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{
"bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi", // file root
}); err != nil {
panic(err)
}
case 3:
// Expect the full request and return most of the file
// Note: this is an implementation detail, it could be in the future that we request less data (e.g. partial path and file range)
expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa"
if request.URL.Path != expectedUri {
panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI))
}

if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{
"bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root
"bafkreie5noke3mb7hqxukzcy73nl23k6lxszxi5w3dtmuwz62wnvkpsscm", // file chunks start here
"bafkreih4ephajybraj6wnxsbwjwa77fukurtpl7oj7t7pfq545duhot7cq",
}); err != nil {
panic(err)
}

case 4:
// Expect a request for the remainder of the file
// Note: this is an implementation detail, it could be that the requester really asks for more information
expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa"
if request.URL.Path != expectedUri {
panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI))
}

if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{
"bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root
"bafkreigu7buvm3cfunb35766dn7tmqyh2um62zcio63en2btvxuybgcpue", // middle of the file starts here
"bafkreicll3huefkc3qnrzeony7zcfo7cr3nbx64hnxrqzsixpceg332fhe",
"bafkreifst3pqztuvj57lycamoi7z34b4emf7gawxs74nwrc2c7jncmpaqm",
}); err != nil {
panic(err)
}

default:
t.Fatal("unsupported request number")
}
}))
defer s.Close()

bs := newProxyBlockStore([]string{s.URL}, newCachedDNS(dnsCacheRefreshInterval))
backend, err := lib.NewGraphGatewayBackend(&retryFetcher{inner: bs.(lib.CarFetcher), allowedRetries: 3, retriesRemaining: 3})
if err != nil {
t.Fatal(err)
}

trustedGatewayServer := httptest.NewServer(gateway.NewHandler(gateway.Config{DeserializedResponses: true}, backend))
defer trustedGatewayServer.Close()

resp, err := http.Get(trustedGatewayServer.URL + "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa")
if err != nil {
t.Fatal(err)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}

robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil)
if err != nil {
t.Fatal(err)
}

dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs)))
fileRootNd, err := dsrv.Get(ctx, cid.MustParse("bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa"))
if err != nil {
t.Fatal(err)
}
uio, err := unixfile.NewUnixfsFile(ctx, dsrv, fileRootNd)
if err != nil {
t.Fatal(err)
}
f := uio.(files.File)
expectedFileData, err := io.ReadAll(f)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(data, expectedFileData) {
t.Fatalf("expected %s, got %s", string(expectedFileData), string(data))
}
}

func TestGetHAMTDirectory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
57 changes: 38 additions & 19 deletions lib/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,39 @@ func (b *backpressuredFile) Read(p []byte) (n int, err error) {
err = b.retErr
}

from, err := b.f.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
nd, err := loadTerminalUnixFSElementWithRecursiveDirectories(b.ctx, b.fileCid, nil, nil, gateway.CarParams{Scope: gateway.DagScopeEntity, Range: &gateway.DagByteRange{From: from, To: b.byteRange.To}}, b.getLsys)
if err != nil {
return 0, err
from, seekErr := b.f.Seek(0, io.SeekCurrent)
if seekErr != nil {
// Return the seek error since by this point seeking failures like this should be impossible
return 0, seekErr
}

f, ok := nd.(files.File)
if !ok {
return 0, fmt.Errorf("not a file, should be unreachable")
// we had an error while reading so attempt to reset the underlying reader
for {
if b.ctx.Err() != nil {
return 0, b.ctx.Err()
}

retry, processedErr := isRetryableError(err)
if !retry {
return 0, processedErr
}

var nd files.Node
nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(b.ctx, b.fileCid, nil, nil, gateway.CarParams{Scope: gateway.DagScopeEntity, Range: &gateway.DagByteRange{From: from, To: b.byteRange.To}}, b.getLsys)
if err != nil {
continue
}

f, ok := nd.(files.File)
if !ok {
return 0, fmt.Errorf("not a file, should be unreachable")
}

b.f = f
break
}

b.f = f
// now that we've reset the reader try reading again
return b.Read(p)
}

Expand Down Expand Up @@ -168,15 +186,16 @@ func (it *backpressuredFlatDirIter) Next() bool {
}
nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(it.ctx, c, nil, it.lsys, params, it.getLsys)
if err != nil {
if err := it.ctx.Err(); err == nil {
retry, processedErr := isRetryableError(err)
if retry {
err = processedErr
continue
}
it.err = processedErr
return false
if ctxErr := it.ctx.Err(); ctxErr != nil {
continue
}
retry, processedErr := isRetryableError(err)
if retry {
err = processedErr
continue
}
it.err = processedErr
return false
}
break
}
Expand Down

0 comments on commit 5265703

Please sign in to comment.