From 52657036dfd9c2664436cc48bab48713a6668b4c Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 24 Jul 2023 17:20:22 -0400 Subject: [PATCH] fix: resolve issue with file resumption on error and add test --- graph_gateway_test.go | 115 ++++++++++++++++++++++++++++++++++++++++++ lib/files.go | 57 ++++++++++++++------- 2 files changed, 153 insertions(+), 19 deletions(-) diff --git a/graph_gateway_test.go b/graph_gateway_test.go index adcefa6..d70ef96 100644 --- a/graph_gateway_test.go +++ b/graph_gateway_test.go @@ -568,6 +568,121 @@ func TestGetFile(t *testing.T) { } } +func TestGetFileWithBadBlockReturned(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + requestNum := 0 + s := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + requestNum++ + switch requestNum { + case 1: + // Expect the full request, but return one that terminates at the root block + expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa" + if request.URL.Path != expectedUri { + panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI)) + } + + if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{ + "bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root + }); err != nil { + panic(err) + } + case 2: + // Expect the full request, but return a totally unrelated block + expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa" + if request.URL.Path != expectedUri { + panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI)) + } + + if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{ + "bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi", // file root + }); err != nil { + panic(err) + } + case 3: + // Expect the full request and return most of the file + // Note: this is an implementation detail, it could be in the future that we request less data (e.g. partial path and file range) + expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa" + if request.URL.Path != expectedUri { + panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI)) + } + + if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{ + "bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root + "bafkreie5noke3mb7hqxukzcy73nl23k6lxszxi5w3dtmuwz62wnvkpsscm", // file chunks start here + "bafkreih4ephajybraj6wnxsbwjwa77fukurtpl7oj7t7pfq545duhot7cq", + }); err != nil { + panic(err) + } + + case 4: + // Expect a request for the remainder of the file + // Note: this is an implementation detail, it could be that the requester really asks for more information + expectedUri := "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa" + if request.URL.Path != expectedUri { + panic(fmt.Errorf("expected URI %s, got %s", expectedUri, request.RequestURI)) + } + + if err := sendBlocks(ctx, dirWithMultiblockHAMTandFiles, writer, []string{ + "bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", // file root + "bafkreigu7buvm3cfunb35766dn7tmqyh2um62zcio63en2btvxuybgcpue", // middle of the file starts here + "bafkreicll3huefkc3qnrzeony7zcfo7cr3nbx64hnxrqzsixpceg332fhe", + "bafkreifst3pqztuvj57lycamoi7z34b4emf7gawxs74nwrc2c7jncmpaqm", + }); err != nil { + panic(err) + } + + default: + t.Fatal("unsupported request number") + } + })) + defer s.Close() + + bs := newProxyBlockStore([]string{s.URL}, newCachedDNS(dnsCacheRefreshInterval)) + backend, err := lib.NewGraphGatewayBackend(&retryFetcher{inner: bs.(lib.CarFetcher), allowedRetries: 3, retriesRemaining: 3}) + if err != nil { + t.Fatal(err) + } + + trustedGatewayServer := httptest.NewServer(gateway.NewHandler(gateway.Config{DeserializedResponses: true}, backend)) + defer trustedGatewayServer.Close() + + resp, err := http.Get(trustedGatewayServer.URL + "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa") + if err != nil { + t.Fatal(err) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) + if err != nil { + t.Fatal(err) + } + + dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) + fileRootNd, err := dsrv.Get(ctx, cid.MustParse("bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa")) + if err != nil { + t.Fatal(err) + } + uio, err := unixfile.NewUnixfsFile(ctx, dsrv, fileRootNd) + if err != nil { + t.Fatal(err) + } + f := uio.(files.File) + expectedFileData, err := io.ReadAll(f) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, expectedFileData) { + t.Fatalf("expected %s, got %s", string(expectedFileData), string(data)) + } +} + func TestGetHAMTDirectory(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/lib/files.go b/lib/files.go index 05e5f2e..fdbffe3 100644 --- a/lib/files.go +++ b/lib/files.go @@ -66,21 +66,39 @@ func (b *backpressuredFile) Read(p []byte) (n int, err error) { err = b.retErr } - from, err := b.f.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - nd, err := loadTerminalUnixFSElementWithRecursiveDirectories(b.ctx, b.fileCid, nil, nil, gateway.CarParams{Scope: gateway.DagScopeEntity, Range: &gateway.DagByteRange{From: from, To: b.byteRange.To}}, b.getLsys) - if err != nil { - return 0, err + from, seekErr := b.f.Seek(0, io.SeekCurrent) + if seekErr != nil { + // Return the seek error since by this point seeking failures like this should be impossible + return 0, seekErr } - f, ok := nd.(files.File) - if !ok { - return 0, fmt.Errorf("not a file, should be unreachable") + // we had an error while reading so attempt to reset the underlying reader + for { + if b.ctx.Err() != nil { + return 0, b.ctx.Err() + } + + retry, processedErr := isRetryableError(err) + if !retry { + return 0, processedErr + } + + var nd files.Node + nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(b.ctx, b.fileCid, nil, nil, gateway.CarParams{Scope: gateway.DagScopeEntity, Range: &gateway.DagByteRange{From: from, To: b.byteRange.To}}, b.getLsys) + if err != nil { + continue + } + + f, ok := nd.(files.File) + if !ok { + return 0, fmt.Errorf("not a file, should be unreachable") + } + + b.f = f + break } - b.f = f + // now that we've reset the reader try reading again return b.Read(p) } @@ -168,15 +186,16 @@ func (it *backpressuredFlatDirIter) Next() bool { } nd, err = loadTerminalUnixFSElementWithRecursiveDirectories(it.ctx, c, nil, it.lsys, params, it.getLsys) if err != nil { - if err := it.ctx.Err(); err == nil { - retry, processedErr := isRetryableError(err) - if retry { - err = processedErr - continue - } - it.err = processedErr - return false + if ctxErr := it.ctx.Err(); ctxErr != nil { + continue } + retry, processedErr := isRetryableError(err) + if retry { + err = processedErr + continue + } + it.err = processedErr + return false } break }