-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: aborted signals throw libp2p AbortError #30
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import * as ipldDagCbor from '@ipld/dag-cbor' | ||
import * as ipldDagJson from '@ipld/dag-json' | ||
import { code as dagPbCode } from '@ipld/dag-pb' | ||
import { AbortError, type AbortOptions, type Logger, type PeerId } from '@libp2p/interface' | ||
import { Record as DHTRecord } from '@libp2p/kad-dht' | ||
import { peerIdFromString } from '@libp2p/peer-id' | ||
import { Key } from 'interface-datastore' | ||
|
@@ -30,7 +31,6 @@ | |
import type { RequestFormatShorthand } from './types.js' | ||
import type { ParsedUrlStringResults } from './utils/parse-url-string' | ||
import type { Helia } from '@helia/interface' | ||
import type { AbortOptions, Logger, PeerId } from '@libp2p/interface' | ||
import type { DNSResolver } from '@multiformats/dns/resolvers' | ||
import type { UnixFSEntry } from 'ipfs-unixfs-exporter' | ||
import type { CID } from 'multiformats/cid' | ||
|
@@ -288,10 +288,10 @@ | |
ipfsRoots = pathDetails.ipfsRoots | ||
terminalElement = pathDetails.terminalElement | ||
} catch (err: any) { | ||
this.log.error('error walking path %s', path, err) | ||
if (options?.signal?.aborted === true) { | ||
return badRequestResponse(resource.toString(), new Error('signal aborted by user')) | ||
throw new AbortError('signal aborted by user') | ||
} | ||
this.log.error('error walking path %s', path, err) | ||
|
||
return badGatewayResponse(resource.toString(), 'Error walking path') | ||
} | ||
|
@@ -327,6 +327,9 @@ | |
path = rootFilePath | ||
resolvedCID = stat.cid | ||
} catch (err: any) { | ||
if (options?.signal?.aborted === true) { | ||
throw new AbortError('signal aborted by user') | ||
} | ||
this.log('error loading path %c/%s', dirCid, rootFilePath, err) | ||
return notSupportedResponse('Unable to find index.html for directory at given path. Support for directories with implicit root is not implemented') | ||
} finally { | ||
|
@@ -370,6 +373,9 @@ | |
|
||
return response | ||
} catch (err: any) { | ||
if (options?.signal?.aborted === true) { | ||
throw new AbortError('signal aborted by user') | ||
} | ||
this.log.error('error streaming %c/%s', cid, path, err) | ||
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') { | ||
return badRangeResponse(resource) | ||
|
@@ -454,6 +460,7 @@ | |
* TODO: move operations called by fetch to a queue of operations where we can | ||
* always exit early (and cleanly) if a given signal is aborted | ||
*/ | ||
// eslint-disable-next-line complexity | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the additional |
||
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> { | ||
this.log('fetch %s', resource) | ||
|
||
|
@@ -481,6 +488,9 @@ | |
ttl = result.ttl | ||
protocol = result.protocol | ||
} catch (err: any) { | ||
if (options?.signal?.aborted === true) { | ||
throw new AbortError('signal aborted by user') | ||
} | ||
this.log.error('error parsing resource %s', resource, err) | ||
|
||
return badRequestResponse(resource.toString(), err) | ||
|
@@ -542,7 +552,7 @@ | |
const codecHandler = this.codecHandlers[cid.code] | ||
|
||
if (codecHandler == null) { | ||
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia/issues/new`) | ||
return notSupportedResponse(`Support for codec with code ${cid.code} is not yet implemented. Please open an issue at https://github.com/ipfs/helia-verified-fetch/issues/new`) | ||
} | ||
this.log.trace('calling handler "%s"', codecHandler.name) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import { dagCbor } from '@helia/dag-cbor' | ||
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' | ||
import { type UnixFS, type UnixFSStats, unixfs } from '@helia/unixfs' | ||
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' | ||
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' | ||
import { createEd25519PeerId } from '@libp2p/peer-id-factory' | ||
|
@@ -23,6 +24,7 @@ | |
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') | ||
let helia: Helia | ||
let name: StubbedInstance<IPNS> | ||
let fs: StubbedInstance<UnixFS> | ||
let logger: ComponentLogger | ||
let componentLoggers: Logger[] = [] | ||
let verifiedFetch: VerifiedFetch | ||
|
@@ -33,20 +35,28 @@ | |
let blockRetriever: StubbedInstance<BlockRetriever> | ||
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>> | ||
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>> | ||
let unixFsCatStub: Sinon.SinonStub<any[], AsyncIterable<Uint8Array>> | ||
let unixFsStatStub: Sinon.SinonStub<any[], Promise<UnixFSStats>> | ||
|
||
/** | ||
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. | ||
*/ | ||
let blockBrokerRetrieveCalled: DeferredPromise<void> | ||
let dnsLinkResolverCalled: DeferredPromise<void> | ||
let peerIdResolverCalled: DeferredPromise<void> | ||
let unixFsStatCalled: DeferredPromise<void> | ||
let unixFsCatCalled: DeferredPromise<void> | ||
|
||
beforeEach(async () => { | ||
peerIdResolver = sandbox.stub() | ||
dnsLinkResolver = sandbox.stub() | ||
unixFsCatStub = sandbox.stub() | ||
unixFsStatStub = sandbox.stub() | ||
peerIdResolverCalled = pDefer() | ||
dnsLinkResolverCalled = pDefer() | ||
blockBrokerRetrieveCalled = pDefer() | ||
unixFsStatCalled = pDefer() | ||
unixFsCatCalled = pDefer() | ||
|
||
dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { | ||
dnsLinkResolverCalled.resolve() | ||
|
@@ -62,6 +72,29 @@ | |
return getAbortablePromise(options.signal) | ||
}) | ||
}) | ||
unixFsCatStub.callsFake((cid, options) => { | ||
unixFsCatCalled.resolve() | ||
return { | ||
async * [Symbol.asyncIterator] () { | ||
await getAbortablePromise(options.signal) | ||
yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) | ||
} | ||
} | ||
}) | ||
|
||
unixFsStatStub.callsFake(async (cid, options): Promise<UnixFSStats> => { | ||
unixFsStatCalled.resolve() | ||
await getAbortablePromise(options.signal) | ||
return { | ||
cid, | ||
type: 'file', | ||
fileSize: BigInt(0), | ||
dagSize: BigInt(0), | ||
blocks: 1, | ||
localFileSize: BigInt(0), | ||
localDagSize: BigInt(0) | ||
} | ||
}) | ||
Comment on lines
+75
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. defaults that timeout.... override with new |
||
|
||
logger = prefixLogger('test:abort-handling') | ||
sandbox.stub(logger, 'forComponent').callsFake((name) => { | ||
|
@@ -77,9 +110,14 @@ | |
resolveDNSLink: dnsLinkResolver, | ||
resolve: peerIdResolver | ||
}) | ||
fs = stubInterface<UnixFS>({ | ||
cat: unixFsCatStub, | ||
stat: unixFsStatStub | ||
}) | ||
verifiedFetch = new VerifiedFetch({ | ||
helia, | ||
ipns: name | ||
ipns: name, | ||
unixfs: fs | ||
}) | ||
}) | ||
|
||
|
@@ -99,41 +137,61 @@ | |
|
||
await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 }) | ||
|
||
const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise) | ||
|
||
await expect(makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)).to.eventually.be.rejectedWith('aborted') | ||
expect(peerIdResolver.callCount).to.equal(1) | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
it('should abort a request before dns resolution', async function () { | ||
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise) | ||
await expect(makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)).to.eventually.be.rejectedWith('aborted') | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails | ||
expect(dnsLinkResolver.callCount).to.equal(1) | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
it('should abort a request while looking for cid', async function () { | ||
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) | ||
await expect(makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)).to.eventually.be.rejectedWith('aborted') | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(blockRetriever.retrieve.callCount).to.equal(1) | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath | ||
it('should abort a request during unixfs.stat call', async function () { | ||
const fs = unixfs(helia) | ||
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) | ||
const directoryCid = await fs.addDirectory() | ||
const cid = await fs.cp(fileCid, directoryCid, 'index.html') | ||
|
||
await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted') | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content | ||
expect(unixFsStatStub.callCount).to.equal(1) | ||
expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call | ||
}) | ||
|
||
it('should abort a request during unixfs.cat call', async function () { | ||
const fs = unixfs(helia) | ||
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) | ||
const directoryCid = await fs.addDirectory() | ||
const cid = await fs.cp(fileCid, directoryCid, 'index.html') | ||
|
||
// override the default fake set in beforeEach that would timeout. | ||
unixFsStatStub.callsFake(async (cid, options) => { | ||
unixFsStatCalled.resolve() | ||
return fs.stat(cid, options) | ||
}) | ||
|
||
await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted') | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content | ||
expect(unixFsStatStub.callCount).to.equal(1) | ||
expect(unixFsCatStub.callCount).to.equal(1) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this to after throw because we dont want logs showing there was an error walking if the caught error is due to aborted signal. we could probably be smarter about determining this.. but not a huge issue