Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: aborted signals throw libp2p AbortError #30

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/verified-fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ Known Errors that can be thrown:
1. `TypeError` - If the resource argument is not a string, CID, or CID string.
2. `TypeError` - If the options argument is passed and not an object.
3. `TypeError` - If the options argument is passed and is malformed.
4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal.
4. `AbortError` - If the content request is aborted due to user aborting provided AbortSignal. Note that this is a `AbortError` from `@libp2p/interface` and not the standard `AbortError` from the Fetch API.

# Install

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AbortError, type ComponentLogger } from '@libp2p/interface'
import { CustomProgressEvent } from 'progress-events'
import type { VerifiedFetchInit } from '../index.js'
import type { ComponentLogger } from '@libp2p/interface'

/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
Expand All @@ -24,7 +24,7 @@
async pull (controller) {
const { value, done } = await reader.next()
if (options?.signal?.aborted === true) {
controller.error(new Error(options.signal.reason ?? 'signal aborted by user'))
controller.error(new AbortError(options.signal.reason ?? 'signal aborted by user'))

Check warning on line 27 in packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts#L27

Added line #L27 was not covered by tests
controller.close()
return
}
Expand Down
18 changes: 14 additions & 4 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import * as ipldDagCbor from '@ipld/dag-cbor'
import * as ipldDagJson from '@ipld/dag-json'
import { code as dagPbCode } from '@ipld/dag-pb'
import { AbortError, type AbortOptions, type Logger, type PeerId } from '@libp2p/interface'
import { Record as DHTRecord } from '@libp2p/kad-dht'
import { peerIdFromString } from '@libp2p/peer-id'
import { Key } from 'interface-datastore'
Expand All @@ -30,7 +31,6 @@
import type { RequestFormatShorthand } from './types.js'
import type { ParsedUrlStringResults } from './utils/parse-url-string'
import type { Helia } from '@helia/interface'
import type { AbortOptions, Logger, PeerId } from '@libp2p/interface'
import type { DNSResolver } from '@multiformats/dns/resolvers'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -288,10 +288,10 @@
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err: any) {
this.log.error('error walking path %s', path, err)
if (options?.signal?.aborted === true) {
return badRequestResponse(resource.toString(), new Error('signal aborted by user'))
throw new AbortError('signal aborted by user')
}
this.log.error('error walking path %s', path, err)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this to after throw because we dont want logs showing there was an error walking if the caught error is due to aborted signal. we could probably be smarter about determining this.. but not a huge issue


return badGatewayResponse(resource.toString(), 'Error walking path')
}
Expand Down Expand Up @@ -327,6 +327,9 @@
path = rootFilePath
resolvedCID = stat.cid
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log('error loading path %c/%s', dirCid, rootFilePath, err)
return notSupportedResponse('Unable to find index.html for directory at given path. Support for directories with implicit root is not implemented')
} finally {
Expand Down Expand Up @@ -370,6 +373,9 @@

return response
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log.error('error streaming %c/%s', cid, path, err)
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
Expand Down Expand Up @@ -454,6 +460,7 @@
* TODO: move operations called by fetch to a queue of operations where we can
* always exit early (and cleanly) if a given signal is aborted
*/
// eslint-disable-next-line complexity
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the additional if (options?.signal?.aborted === true) { required adding this.

async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> {
this.log('fetch %s', resource)

Expand Down Expand Up @@ -481,6 +488,9 @@
ttl = result.ttl
protocol = result.protocol
} catch (err: any) {
if (options?.signal?.aborted === true) {
throw new AbortError('signal aborted by user')
}
this.log.error('error parsing resource %s', resource, err)

return badRequestResponse(resource.toString(), err)
Expand Down Expand Up @@ -542,7 +552,7 @@
const codecHandler = this.codecHandlers[cid.code]

if (codecHandler == null) {
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia/issues/new`)
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia-verified-fetch/issues/new`)

Check warning on line 555 in packages/verified-fetch/src/verified-fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/verified-fetch.ts#L555

Added line #L555 was not covered by tests
}
this.log.trace('calling handler "%s"', codecHandler.name)

Expand Down
96 changes: 77 additions & 19 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { type UnixFS, type UnixFSStats, unixfs } from '@helia/unixfs'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
Expand All @@ -23,6 +24,7 @@
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let helia: Helia
let name: StubbedInstance<IPNS>
let fs: StubbedInstance<UnixFS>
let logger: ComponentLogger
let componentLoggers: Logger[] = []
let verifiedFetch: VerifiedFetch
Expand All @@ -33,20 +35,28 @@
let blockRetriever: StubbedInstance<BlockRetriever>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>
let unixFsCatStub: Sinon.SinonStub<any[], AsyncIterable<Uint8Array>>
let unixFsStatStub: Sinon.SinonStub<any[], Promise<UnixFSStats>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>
let unixFsStatCalled: DeferredPromise<void>
let unixFsCatCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
unixFsCatStub = sandbox.stub()
unixFsStatStub = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()
unixFsStatCalled = pDefer()
unixFsCatCalled = pDefer()

dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
Expand All @@ -62,6 +72,29 @@
return getAbortablePromise(options.signal)
})
})
unixFsCatStub.callsFake((cid, options) => {
unixFsCatCalled.resolve()
return {
async * [Symbol.asyncIterator] () {
await getAbortablePromise(options.signal)
yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
}

Check warning on line 81 in packages/verified-fetch/test/abort-handling.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/test/abort-handling.spec.ts#L80-L81

Added lines #L80 - L81 were not covered by tests
}
})

unixFsStatStub.callsFake(async (cid, options): Promise<UnixFSStats> => {
unixFsStatCalled.resolve()
await getAbortablePromise(options.signal)
return {
cid,
type: 'file',
fileSize: BigInt(0),
dagSize: BigInt(0),
blocks: 1,
localFileSize: BigInt(0),
localDagSize: BigInt(0)
}

Check warning on line 96 in packages/verified-fetch/test/abort-handling.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/test/abort-handling.spec.ts#L88-L96

Added lines #L88 - L96 were not covered by tests
})
Comment on lines +75 to +97
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaults that timeout.... override with new .callsFake if you want it to return something valid.


logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
Expand All @@ -77,9 +110,14 @@
resolveDNSLink: dnsLinkResolver,
resolve: peerIdResolver
})
fs = stubInterface<UnixFS>({
cat: unixFsCatStub,
stat: unixFsStatStub
})
verifiedFetch = new VerifiedFetch({
helia,
ipns: name
ipns: name,
unixfs: fs
})
})

Expand All @@ -99,41 +137,61 @@

await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 })

const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)

await expect(makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)).to.eventually.be.rejectedWith('aborted')
expect(peerIdResolver.callCount).to.equal(1)
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request before dns resolution', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)
await expect(makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails
expect(dnsLinkResolver.callCount).to.equal(1)
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)
await expect(makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(1)
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath
it('should abort a request during unixfs.stat call', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call
})

it('should abort a request during unixfs.cat call', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

// override the default fake set in beforeEach that would timeout.
unixFsStatStub.callsFake(async (cid, options) => {
unixFsStatCalled.resolve()
return fs.stat(cid, options)
})

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(1)
})
})
Loading