Skip to content

Commit

Permalink
fix: aborted signals throw libp2p AbortError (#30)
Browse files Browse the repository at this point in the history
* fix: aborted signals throw

* chore: update url for not implemented codec requests

* test: aborts during .stat and .cat are handled

* docs: add note about AbortError type
  • Loading branch information
SgtPooki authored Mar 22, 2024
1 parent 8715740 commit 4575791
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 26 deletions.
2 changes: 1 addition & 1 deletion packages/verified-fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ Known Errors that can be thrown:
1. `TypeError` - If the resource argument is not a string, CID, or CID string.
2. `TypeError` - If the options argument is passed and not an object.
3. `TypeError` - If the options argument is passed and is malformed.
4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal.
4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal. Note that this is a `AbortError` from `@libp2p/interface` and not the standard `AbortError` from the Fetch API.

# Install

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AbortError, type ComponentLogger } from '@libp2p/interface'
import { CustomProgressEvent } from 'progress-events'
import type { VerifiedFetchInit } from '../index.js'
import type { ComponentLogger } from '@libp2p/interface'

/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
Expand All @@ -24,7 +24,7 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8A
async pull (controller) {
const { value, done } = await reader.next()
if (options?.signal?.aborted === true) {
controller.error(new Error(options.signal.reason ?? 'signal aborted by user'))
controller.error(new AbortError(options.signal.reason ?? 'signal aborted by user'))
controller.close()
return
}
Expand Down
18 changes: 14 additions & 4 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { unixfs as heliaUnixFs, type UnixFS as HeliaUnixFs } from '@helia/unixfs
import * as ipldDagCbor from '@ipld/dag-cbor'
import * as ipldDagJson from '@ipld/dag-json'
import { code as dagPbCode } from '@ipld/dag-pb'
import { AbortError, type AbortOptions, type Logger, type PeerId } from '@libp2p/interface'
import { Record as DHTRecord } from '@libp2p/kad-dht'
import { peerIdFromString } from '@libp2p/peer-id'
import { Key } from 'interface-datastore'
Expand All @@ -30,7 +31,6 @@ import type { CIDDetail, ContentTypeParser, Resource, VerifiedFetchInit as Verif
import type { RequestFormatShorthand } from './types.js'
import type { ParsedUrlStringResults } from './utils/parse-url-string'
import type { Helia } from '@helia/interface'
import type { AbortOptions, Logger, PeerId } from '@libp2p/interface'
import type { DNSResolver } from '@multiformats/dns/resolvers'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -288,10 +288,10 @@ export class VerifiedFetch {
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err: any) {
this.log.error('error walking path %s', path, err)
if (options?.signal?.aborted === true) {
return badRequestResponse(resource.toString(), new Error('signal aborted by user'))
throw new AbortError('signal aborted by user')
}
this.log.error('error walking path %s', path, err)

return badGatewayResponse(resource.toString(), 'Error walking path')
}
Expand Down Expand Up @@ -327,6 +327,9 @@ export class VerifiedFetch {
path = rootFilePath
resolvedCID = stat.cid
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log('error loading path %c/%s', dirCid, rootFilePath, err)
return notSupportedResponse('Unable to find index.html for directory at given path. Support for directories with implicit root is not implemented')
} finally {
Expand Down Expand Up @@ -370,6 +373,9 @@ export class VerifiedFetch {

return response
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log.error('error streaming %c/%s', cid, path, err)
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
Expand Down Expand Up @@ -454,6 +460,7 @@ export class VerifiedFetch {
* TODO: move operations called by fetch to a queue of operations where we can
* always exit early (and cleanly) if a given signal is aborted
*/
// eslint-disable-next-line complexity
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> {
this.log('fetch %s', resource)

Expand Down Expand Up @@ -481,6 +488,9 @@ export class VerifiedFetch {
ttl = result.ttl
protocol = result.protocol
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log.error('error parsing resource %s', resource, err)

return badRequestResponse(resource.toString(), err)
Expand Down Expand Up @@ -542,7 +552,7 @@ export class VerifiedFetch {
const codecHandler = this.codecHandlers[cid.code]

if (codecHandler == null) {
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia/issues/new`)
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia-verified-fetch/issues/new`)
}
this.log.trace('calling handler "%s"', codecHandler.name)

Expand Down
96 changes: 77 additions & 19 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { type UnixFS, type UnixFSStats, unixfs } from '@helia/unixfs'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
Expand All @@ -23,6 +24,7 @@ describe('abort-handling', function () {
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let helia: Helia
let name: StubbedInstance<IPNS>
let fs: StubbedInstance<UnixFS>
let logger: ComponentLogger
let componentLoggers: Logger[] = []
let verifiedFetch: VerifiedFetch
Expand All @@ -33,20 +35,28 @@ describe('abort-handling', function () {
let blockRetriever: StubbedInstance<BlockRetriever>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>
let unixFsCatStub: Sinon.SinonStub<any[], AsyncIterable<Uint8Array>>
let unixFsStatStub: Sinon.SinonStub<any[], Promise<UnixFSStats>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>
let unixFsStatCalled: DeferredPromise<void>
let unixFsCatCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
unixFsCatStub = sandbox.stub()
unixFsStatStub = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()
unixFsStatCalled = pDefer()
unixFsCatCalled = pDefer()

dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
Expand All @@ -62,6 +72,29 @@ describe('abort-handling', function () {
return getAbortablePromise(options.signal)
})
})
unixFsCatStub.callsFake((cid, options) => {
unixFsCatCalled.resolve()
return {
async * [Symbol.asyncIterator] () {
await getAbortablePromise(options.signal)
yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
}
}
})

unixFsStatStub.callsFake(async (cid, options): Promise<UnixFSStats> => {
unixFsStatCalled.resolve()
await getAbortablePromise(options.signal)
return {
cid,
type: 'file',
fileSize: BigInt(0),
dagSize: BigInt(0),
blocks: 1,
localFileSize: BigInt(0),
localDagSize: BigInt(0)
}
})

logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
Expand All @@ -77,9 +110,14 @@ describe('abort-handling', function () {
resolveDNSLink: dnsLinkResolver,
resolve: peerIdResolver
})
fs = stubInterface<UnixFS>({
cat: unixFsCatStub,
stat: unixFsStatStub
})
verifiedFetch = new VerifiedFetch({
helia,
ipns: name
ipns: name,
unixfs: fs
})
})

Expand All @@ -99,41 +137,61 @@ describe('abort-handling', function () {

await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 })

const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)

await expect(makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)).to.eventually.be.rejectedWith('aborted')
expect(peerIdResolver.callCount).to.equal(1)
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request before dns resolution', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)
await expect(makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails
expect(dnsLinkResolver.callCount).to.equal(1)
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)
await expect(makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(1)
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath
it('should abort a request during unixfs.stat call', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call
})

it('should abort a request during unixfs.cat call', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

// override the default fake set in beforeEach that would timeout.
unixFsStatStub.callsFake(async (cid, options) => {
unixFsStatCalled.resolve()
return fs.stat(cid, options)
})

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(1)
})
})

0 comments on commit 4575791

Please sign in to comment.