diff --git a/packages/verified-fetch/README.md b/packages/verified-fetch/README.md index bb3ef513..2830a832 100644 --- a/packages/verified-fetch/README.md +++ b/packages/verified-fetch/README.md @@ -615,7 +615,7 @@ Known Errors that can be thrown: 1. `TypeError` - If the resource argument is not a string, CID, or CID string. 2. `TypeError` - If the options argument is passed and not an object. 3. `TypeError` - If the options argument is passed and is malformed. -4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal. +4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal. Note that this is a `AbortError` from `@libp2p/interface` and not the standard `AbortError` from the Fetch API. # Install diff --git a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts index e417d11c..476276fa 100644 --- a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts +++ b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts @@ -1,6 +1,6 @@ +import { AbortError, type ComponentLogger } from '@libp2p/interface' import { CustomProgressEvent } from 'progress-events' import type { VerifiedFetchInit } from '../index.js' -import type { ComponentLogger } from '@libp2p/interface' /** * Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes @@ -24,7 +24,7 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable { this.log('fetch %s', resource) @@ -481,6 +488,9 @@ export class VerifiedFetch { ttl = result.ttl protocol = result.protocol } catch (err: any) { + if (options?.signal?.aborted === true) { + throw new AbortError('signal aborted by user') + } this.log.error('error parsing resource %s', resource, err) return badRequestResponse(resource.toString(), err) @@ -542,7 +552,7 @@ export class VerifiedFetch { const codecHandler = this.codecHandlers[cid.code] if (codecHandler == null) { - return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia/issues/new`) + return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia-verified-fetch/issues/new`) } this.log.trace('calling handler "%s"', codecHandler.name) diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index 41edd017..1c2e9f6b 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -1,5 +1,6 @@ import { dagCbor } from '@helia/dag-cbor' import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' +import { type UnixFS, type UnixFSStats, unixfs } from '@helia/unixfs' import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' import { createEd25519PeerId } from '@libp2p/peer-id-factory' @@ -23,6 +24,7 @@ describe('abort-handling', function () { const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') let helia: Helia let name: StubbedInstance + let fs: StubbedInstance let logger: ComponentLogger let componentLoggers: Logger[] = [] let verifiedFetch: VerifiedFetch @@ -33,6 +35,8 @@ describe('abort-handling', function () { let blockRetriever: StubbedInstance let dnsLinkResolver: Sinon.SinonStub> let peerIdResolver: Sinon.SinonStub> + let unixFsCatStub: Sinon.SinonStub> + let unixFsStatStub: Sinon.SinonStub> /** * used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. @@ -40,13 +44,19 @@ describe('abort-handling', function () { let blockBrokerRetrieveCalled: DeferredPromise let dnsLinkResolverCalled: DeferredPromise let peerIdResolverCalled: DeferredPromise + let unixFsStatCalled: DeferredPromise + let unixFsCatCalled: DeferredPromise beforeEach(async () => { peerIdResolver = sandbox.stub() dnsLinkResolver = sandbox.stub() + unixFsCatStub = sandbox.stub() + unixFsStatStub = sandbox.stub() peerIdResolverCalled = pDefer() dnsLinkResolverCalled = pDefer() blockBrokerRetrieveCalled = pDefer() + unixFsStatCalled = pDefer() + unixFsCatCalled = pDefer() dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { dnsLinkResolverCalled.resolve() @@ -62,6 +72,29 @@ describe('abort-handling', function () { return getAbortablePromise(options.signal) }) }) + unixFsCatStub.callsFake((cid, options) => { + unixFsCatCalled.resolve() + return { + async * [Symbol.asyncIterator] () { + await getAbortablePromise(options.signal) + yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + } + } + }) + + unixFsStatStub.callsFake(async (cid, options): Promise => { + unixFsStatCalled.resolve() + await getAbortablePromise(options.signal) + return { + cid, + type: 'file', + fileSize: BigInt(0), + dagSize: BigInt(0), + blocks: 1, + localFileSize: BigInt(0), + localDagSize: BigInt(0) + } + }) logger = prefixLogger('test:abort-handling') sandbox.stub(logger, 'forComponent').callsFake((name) => { @@ -77,9 +110,14 @@ describe('abort-handling', function () { resolveDNSLink: dnsLinkResolver, resolve: peerIdResolver }) + fs = stubInterface({ + cat: unixFsCatStub, + stat: unixFsStatStub + }) verifiedFetch = new VerifiedFetch({ helia, - ipns: name + ipns: name, + unixfs: fs }) }) @@ -99,41 +137,61 @@ describe('abort-handling', function () { await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 }) - const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise) - + await expect(makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)).to.eventually.be.rejectedWith('aborted') expect(peerIdResolver.callCount).to.equal(1) expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid - expect(abortedResult).to.be.ok() - expect(abortedResult.status).to.equal(400) - expect(abortedResult.statusText).to.equal('Bad Request') - await expect(abortedResult.text()).to.eventually.contain('aborted') }) it('should abort a request before dns resolution', async function () { - const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise) + await expect(makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)).to.eventually.be.rejectedWith('aborted') expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails expect(dnsLinkResolver.callCount).to.equal(1) expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid - expect(abortedResult).to.be.ok() - expect(abortedResult.status).to.equal(400) - expect(abortedResult.statusText).to.equal('Bad Request') - await expect(abortedResult.text()).to.eventually.contain('aborted') }) it('should abort a request while looking for cid', async function () { - const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) + await expect(makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)).to.eventually.be.rejectedWith('aborted') expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString expect(blockRetriever.retrieve.callCount).to.equal(1) - expect(abortedResult).to.be.ok() - expect(abortedResult.status).to.equal(400) - expect(abortedResult.statusText).to.equal('Bad Request') - // this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message - await expect(abortedResult.text()).to.eventually.contain('aborted') }) - // TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath + it('should abort a request during unixfs.stat call', async function () { + const fs = unixfs(helia) + const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) + const directoryCid = await fs.addDirectory() + const cid = await fs.cp(fileCid, directoryCid, 'index.html') + + await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted') + + expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content + expect(unixFsStatStub.callCount).to.equal(1) + expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call + }) + + it('should abort a request during unixfs.cat call', async function () { + const fs = unixfs(helia) + const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) + const directoryCid = await fs.addDirectory() + const cid = await fs.cp(fileCid, directoryCid, 'index.html') + + // override the default fake set in beforeEach that would timeout. + unixFsStatStub.callsFake(async (cid, options) => { + unixFsStatCalled.resolve() + return fs.stat(cid, options) + }) + + await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted') + + expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content + expect(unixFsStatStub.callCount).to.equal(1) + expect(unixFsCatStub.callCount).to.equal(1) + }) })