Skip to content

Commit

Permalink
chore: significant cleanup of abort handling
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki committed Mar 21, 2024
1 parent f03be52 commit 59686df
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 65 deletions.
5 changes: 5 additions & 0 deletions packages/verified-fetch/src/utils/parse-url-string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string {
* After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink:
* * If it's ipfs, it parses the CID or throws an Aggregate error
* * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown.
*
* @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment
*/
// eslint-disable-next-line complexity
export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise<ParsedUrlStringResults> {
const log = logger.forComponent('helia:verified-fetch:parse-url-string')
const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString)
Expand Down Expand Up @@ -170,6 +173,7 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid)
} catch (err) {
options?.signal?.throwIfAborted()
if (peerId == null) {
log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`))
Expand All @@ -194,6 +198,7 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid)
} catch (err: any) {
options?.signal?.throwIfAborted()
log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(err)
}
Expand Down
10 changes: 2 additions & 8 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,8 @@ export class VerifiedFetch {
terminalElement = pathDetails.terminalElement
} catch (err: any) {
this.log.error('error walking path %s', path, err)
/**
* TODO: do this better. We should be able to distinguish between an abortError and a regular error
* However, `helia:networked-storage:error` throws an AggregateError with the abort error inside it
*/
const abortError = err.errors?.find((e: Error) => e.message.includes('abort'))
if (abortError != null) {
return badRequestResponse(resource.toString(), abortError.message)
if (options?.signal?.aborted === true) {
return badRequestResponse(resource.toString(), new Error('signal aborted by user'))
}

return badGatewayResponse(resource.toString(), 'Error walking path')
Expand Down Expand Up @@ -450,7 +445,6 @@ export class VerifiedFetch {
private async abortHandler (opController: AbortController): Promise<void> {
this.log.error('signal aborted by user')
opController.abort('signal aborted by user')
await this.stop()
}

/**
Expand Down
167 changes: 110 additions & 57 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import pDefer from 'p-defer'
import pDefer, { type DeferredPromise } from 'p-defer'
import Sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
Expand All @@ -22,96 +25,146 @@ async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, opti
return resultPromise
}

describe('abort-handling', () => {
let helia: Helia
let heliaStopSpy: Sinon.SinonSpy<[], Promise<void>>
/**
* we need to emulate signal handling (blockBrokers/dnsResolvers/etc should handle abort signals too)
* this is a simplified version of what libs we depend on should do, and the
* tests in this file verify how verified-fetch would handle the failure
*/
async function getAbortablePromise <T> (signal?: AbortSignal): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error('timeout while resolving'))
}, 5000)

signal?.addEventListener('abort', () => {
clearTimeout(timeoutId)
reject(new Error('aborted'))
})
})
}

describe('abort-handling', function () {
this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine.
const sandbox = Sinon.createSandbox()
let logger: ComponentLogger
let componentLoggers: Logger[] = []
/**
* CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder
*/
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let superSlowBlockRetriever: StubbedInstance<BlockRetriever>
let helia: Helia
// let name: IPNS
let name: StubbedInstance<IPNS>

let blockBrokerRetrieveCalled: ReturnType<typeof pDefer>
let dnsResolverCalled: ReturnType<typeof pDefer>
let logger: ComponentLogger
let componentLoggers: Logger[] = []
let verifiedFetch: VerifiedFetch

/**
* Stubbed networking components
*/
let blockRetriever: StubbedInstance<BlockRetriever>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()
dnsResolverCalled = pDefer()
superSlowBlockRetriever = stubInterface<BlockRetriever>({
retrieve: Sinon.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error('timeout while resolving'))
}, 10000)

/**
* we need to emulate signal handling (blockBrokers should handle abort signals too)
* this is a simplified version of what the blockBroker should do, and the
* tests in this file verify how verified-fetch would handle the failure
*/
options.signal?.addEventListener('abort', () => {
clearTimeout(timeoutId)
reject(new Error('aborted'))
})
})
dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
peerIdResolver.callsFake(async (peerId, options) => {
peerIdResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
blockRetriever = stubInterface<BlockRetriever>({
retrieve: sandbox.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()
return getAbortablePromise(options.signal)
})
})

helia = await createHelia({ blockBrokers: [() => superSlowBlockRetriever] })
logger = prefixLogger('helia:verified-fetch-test:abort-handling')

logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
const newLogger = libp2pLogger(`helia:verified-fetch-test:abort-handling:child-logger-${componentLoggers.length}:${name}`)
const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`)
componentLoggers.push(sandbox.stub(newLogger))
return newLogger
})
helia.logger = logger
heliaStopSpy = sandbox.spy(helia, 'stop')
helia = await createHelia({
logger,
blockBrokers: [() => blockRetriever]
})
name = stubInterface<IPNS>({
resolveDNSLink: dnsLinkResolver,
resolve: peerIdResolver
})
verifiedFetch = new VerifiedFetch({
helia,
ipns: name
})
})

afterEach(async () => {
await stop(helia)
await stop(helia, verifiedFetch, name)
componentLoggers = []
sandbox.restore()
})

it('should abort a request before dns resolution', async function () {
this.timeout(1000)

const customDnsResolver = Sinon.stub().resolves(new Promise((resolve, reject) => {
dnsResolverCalled.resolve()
setTimeout(() => { reject(new Error('timeout while resolving')) }, 5000)
}))
const verifiedFetch = new VerifiedFetch({
helia
}, {
dnsResolvers: [customDnsResolver]
it('should abort a request before peerId resolution', async function () {
const c = dagCbor(helia)
const cid = await c.add({
hello: 'world'
})
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://this-doesnt-matter'], Promise.resolve())

expect(superSlowBlockRetriever.retrieve.called).to.be.false()
const peerId = await createEd25519PeerId()

await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 })

const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(1)
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// TODO: we should be able to tell that the error was aborted instead of falling through to "invalid resource"
await expect(abortedResult.text()).to.eventually.contain('Invalid resource')
expect(heliaStopSpy.calledOnce).to.be.true()
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const verifiedFetch = new VerifiedFetch({
helia
})
it('should abort a request before dns resolution', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails
expect(dnsLinkResolver.callCount).to.equal(1)
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)

expect(superSlowBlockRetriever.retrieve.called).to.be.true()
expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(1)
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message
await expect(abortedResult.text()).to.eventually.contain('aborted')
expect(heliaStopSpy.calledOnce).to.be.true()
})

// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath
})

0 comments on commit 59686df

Please sign in to comment.