From dc2a76800a9607a34b246514cd6729db72a54d8e Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:31:47 -0700 Subject: [PATCH 1/8] fix: options.signal should be used if not === null --- packages/verified-fetch/src/verified-fetch.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index 77ae665e..0d4f75cf 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -77,7 +77,10 @@ function convertOptions (options?: VerifiedFetchOptions): (Omit Date: Wed, 20 Mar 2024 19:32:02 -0700 Subject: [PATCH 2/8] fix: misc - badRequestResponse is called correctly - badRequestResponse accepts error for body - add tests for abort handling --- .../utils/get-stream-from-async-iterable.ts | 2 +- .../src/utils/parse-resource.ts | 6 +- .../src/utils/parse-url-string.ts | 14 ++- .../verified-fetch/src/utils/responses.ts | 8 +- packages/verified-fetch/src/verified-fetch.ts | 47 ++++++- .../test/abort-handling.spec.ts | 116 ++++++++++++++++++ .../test/custom-dns-resolvers.spec.ts | 2 - .../test/fixtures/dns-answer-fake.ts | 13 ++ 8 files changed, 190 insertions(+), 18 deletions(-) create mode 100644 packages/verified-fetch/test/abort-handling.spec.ts create mode 100644 packages/verified-fetch/test/fixtures/dns-answer-fake.ts diff --git a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts index 02342d59..a4af1502 100644 --- a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts +++ b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts @@ -5,7 +5,7 @@ import type { ComponentLogger } from '@libp2p/interface' /** * Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes */ -export async function getStreamFromAsyncIterable (iterator: AsyncIterable, path: string, logger: ComponentLogger, options?: Pick): Promise<{ stream: ReadableStream, firstChunk: Uint8Array }> { +export async function getStreamFromAsyncIterable (iterator: AsyncIterable, path: string, logger: ComponentLogger, options?: Pick): Promise<{ stream: ReadableStream, firstChunk: Uint8Array }> { const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable') const reader = iterator[Symbol.asyncIterator]() const { value: firstChunk, done } = await reader.next() diff --git a/packages/verified-fetch/src/utils/parse-resource.ts b/packages/verified-fetch/src/utils/parse-resource.ts index 49e0b6d3..bbd1e044 100644 --- a/packages/verified-fetch/src/utils/parse-resource.ts +++ b/packages/verified-fetch/src/utils/parse-resource.ts @@ -3,7 +3,7 @@ import { parseUrlString } from './parse-url-string.js' import type { ParsedUrlStringResults } from './parse-url-string.js' import type { Resource } from '../index.js' import type { IPNS, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns' -import type { ComponentLogger } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger } from '@libp2p/interface' import type { ProgressOptions } from 'progress-events' export interface ParseResourceComponents { @@ -11,7 +11,7 @@ export interface ParseResourceComponents { logger: ComponentLogger } -export interface ParseResourceOptions extends ProgressOptions { +export interface ParseResourceOptions extends ProgressOptions, AbortOptions { } /** @@ -21,7 +21,7 @@ export interface ParseResourceOptions extends ProgressOptions { if (typeof resource === 'string') { - return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress }) + return parseUrlString({ urlString: resource, ipns, logger }, options) } const cid = CID.asCID(resource) diff --git a/packages/verified-fetch/src/utils/parse-url-string.ts b/packages/verified-fetch/src/utils/parse-url-string.ts index 4d99a460..ddbed764 100644 --- a/packages/verified-fetch/src/utils/parse-url-string.ts +++ b/packages/verified-fetch/src/utils/parse-url-string.ts @@ -1,10 +1,10 @@ import { peerIdFromString } from '@libp2p/peer-id' import { CID } from 'multiformats/cid' import { TLRU } from './tlru.js' +import type { ParseResourceOptions } from './parse-resource.js' import type { RequestFormatShorthand } from '../types.js' -import type { IPNS, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns' +import type { IPNS, ResolveResult } from '@helia/ipns' import type { ComponentLogger } from '@libp2p/interface' -import type { ProgressOptions } from 'progress-events' const ipnsCache = new TLRU(1000) @@ -13,7 +13,7 @@ export interface ParseUrlStringInput { ipns: IPNS logger: ComponentLogger } -export interface ParseUrlStringOptions extends ProgressOptions { +export interface ParseUrlStringOptions extends ParseResourceOptions { } @@ -110,7 +110,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin let peerId = null try { peerId = peerIdFromString(cidOrPeerIdOrDnsLink) - resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress }) + options?.signal?.throwIfAborted() + resolveResult = await ipns.resolve(peerId, options) cid = resolveResult?.cid resolvedPath = resolveResult?.path log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid) @@ -134,7 +135,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel) try { - resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress }) + options?.signal?.throwIfAborted() + resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options) cid = resolveResult?.cid resolvedPath = resolveResult?.path log.trace('resolved %s to %c', decodedDnsLinkLabel, cid) @@ -147,6 +149,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin } } + options?.signal?.throwIfAborted() + if (cid == null) { if (errors.length === 1) { throw errors[0] diff --git a/packages/verified-fetch/src/utils/responses.ts b/packages/verified-fetch/src/utils/responses.ts index 667318c6..dda0230d 100644 --- a/packages/verified-fetch/src/utils/responses.ts +++ b/packages/verified-fetch/src/utils/responses.ts @@ -85,7 +85,13 @@ export function notAcceptableResponse (url: string, body?: SupportedBodyTypes, i return response } -export function badRequestResponse (url: string, body?: SupportedBodyTypes, init?: ResponseInit): Response { +/** + * if body is an Error, it will be converted to a string containing the error message. + */ +export function badRequestResponse (url: string, body?: SupportedBodyTypes | Error, init?: ResponseInit): Response { + if (body instanceof Error) { + body = body.message + } const response = new Response(body, { ...(init ?? {}), status: 400, diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index 0d4f75cf..49a6f07a 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -286,10 +286,18 @@ export class VerifiedFetch { const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options) ipfsRoots = pathDetails.ipfsRoots terminalElement = pathDetails.terminalElement - } catch (err) { + } catch (err: any) { this.log.error('error walking path %s', path, err) + /** + * TODO: do this better. We should be able to distinguish between an abortError and a regular error + * However, `helia:networked-storage:error` throws an AggregateError with the abort error inside it + */ + const abortError = err.errors?.find((e: Error) => e.message.includes('abort')) + if (abortError != null) { + return badRequestResponse(resource.toString(), abortError.message) + } - return badGatewayResponse('Error walking path') + return badGatewayResponse(resource.toString(), 'Error walking path') } let resolvedCID = terminalElement?.cid ?? cid @@ -349,7 +357,8 @@ export class VerifiedFetch { try { const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, { - onProgress: options?.onProgress + onProgress: options?.onProgress, + signal: options?.signal }) byteRangeContext.setBody(stream) // if not a valid range request, okRangeRequest will call okResponse @@ -369,7 +378,7 @@ export class VerifiedFetch { if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') { return badRangeResponse(resource) } - return badGatewayResponse('Unable to stream content') + return badGatewayResponse(resource.toString(), 'Unable to stream content') } } @@ -433,11 +442,35 @@ export class VerifiedFetch { [identity.code]: this.handleRaw } + /** + * + * TODO: Should we use 400, 408, 418, or 425, or throw and not even return a response? + */ + private async abortHandler (opController: AbortController): Promise { + this.log.error('signal aborted by user') + opController.abort('signal aborted by user') + await this.stop() + } + + /** + * We're starting to get to the point where we need a queue or pipeline of + * operations to perform and a single place to handle errors. + * + * TODO: move operations called by fetch to a queue of operations where we can + * always exit early (and cleanly) if a given signal is aborted + */ async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise { this.log('fetch %s', resource) const options = convertOptions(opts) + const opController = new AbortController() + if (options?.signal != null) { + options.signal.onabort = this.abortHandler.bind(this, opController) + options.signal = opController.signal + } + this.log('options %o', options) + options?.onProgress?.(new CustomProgressEvent('verified-fetch:request:start', { resource })) // resolve the CID/path from the requested resource @@ -449,10 +482,11 @@ export class VerifiedFetch { cid = result.cid path = result.path query = result.query - } catch (err) { + options?.signal?.throwIfAborted() + } catch (err: any) { this.log.error('error parsing resource %s', resource, err) - return badRequestResponse('Invalid resource') + return badRequestResponse(resource.toString(), err) } options?.onProgress?.(new CustomProgressEvent('verified-fetch:request:resolve', { cid, path })) @@ -515,6 +549,7 @@ export class VerifiedFetch { } this.log.trace('calling handler "%s"', codecHandler.name) + options?.signal?.throwIfAborted() response = await codecHandler.call(this, handlerArgs) } diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts new file mode 100644 index 00000000..9bbfda3e --- /dev/null +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -0,0 +1,116 @@ +import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' +import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import { CID } from 'multiformats/cid' +import pDefer from 'p-defer' +import Sinon from 'sinon' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import { VerifiedFetch } from '../src/verified-fetch.js' +import { createHelia } from './fixtures/create-offline-helia.js' +import type { BlockRetriever, Helia } from '@helia/interface' + +async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, options = {}]: Parameters, promise: Promise): Promise { + const controller = new AbortController() + const resultPromise = verifiedFetch.fetch(resource, { + ...options, + signal: controller.signal + }) + + void promise.then(() => { + controller.abort() + }) + return resultPromise +} + +describe('abort-handling', () => { + let helia: Helia + let heliaStopSpy: Sinon.SinonSpy<[], Promise> + const sandbox = Sinon.createSandbox() + let logger: ComponentLogger + let componentLoggers: Logger[] = [] + const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') + let superSlowBlockRetriever: StubbedInstance + + let blockBrokerRetrieveCalled: ReturnType + let dnsResolverCalled: ReturnType + + beforeEach(async () => { + blockBrokerRetrieveCalled = pDefer() + dnsResolverCalled = pDefer() + superSlowBlockRetriever = stubInterface({ + retrieve: Sinon.stub().callsFake(async (cid, options) => { + blockBrokerRetrieveCalled.resolve() + + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error('timeout while resolving')) + }, 10000) + + /** + * we need to emulate signal handling (blockBrokers should handle abort signals too) + * this is a simplified version of what the blockBroker should do, and the + * tests in this file verify how verified-fetch would handle the failure + */ + options.signal?.addEventListener('abort', () => { + clearTimeout(timeoutId) + reject(new Error('aborted')) + }) + }) + }) + }) + + helia = await createHelia({ blockBrokers: [() => superSlowBlockRetriever] }) + logger = prefixLogger('helia:verified-fetch-test:abort-handling') + + sandbox.stub(logger, 'forComponent').callsFake((name) => { + const newLogger = libp2pLogger(`helia:verified-fetch-test:abort-handling:child-logger-${componentLoggers.length}:${name}`) + componentLoggers.push(sandbox.stub(newLogger)) + return newLogger + }) + helia.logger = logger + heliaStopSpy = sandbox.spy(helia, 'stop') + }) + + afterEach(async () => { + await stop(helia) + componentLoggers = [] + sandbox.restore() + }) + + it('should abort a request before dns resolution', async function () { + this.timeout(1000) + + const customDnsResolver = Sinon.stub().resolves(new Promise((resolve, reject) => { + dnsResolverCalled.resolve() + setTimeout(() => { reject(new Error('timeout while resolving')) }, 5000) + })) + const verifiedFetch = new VerifiedFetch({ + helia + }, { + dnsResolvers: [customDnsResolver] + }) + const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://this-doesnt-matter'], Promise.resolve()) + + expect(superSlowBlockRetriever.retrieve.called).to.be.false() + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + await expect(abortedResult.text()).to.eventually.contain('aborted') + expect(heliaStopSpy.calledOnce).to.be.true() + }) + + it('should abort a request while looking for cid', async function () { + const verifiedFetch = new VerifiedFetch({ + helia + }) + + const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) + + expect(superSlowBlockRetriever.retrieve.called).to.be.true() + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + await expect(abortedResult.text()).to.eventually.contain('aborted') + expect(heliaStopSpy.calledOnce).to.be.true() + }) +}) diff --git a/packages/verified-fetch/test/custom-dns-resolvers.spec.ts b/packages/verified-fetch/test/custom-dns-resolvers.spec.ts index d6bdce65..9d330f85 100644 --- a/packages/verified-fetch/test/custom-dns-resolvers.spec.ts +++ b/packages/verified-fetch/test/custom-dns-resolvers.spec.ts @@ -35,7 +35,6 @@ describe('custom dns-resolvers', () => { expect(customDnsResolver.callCount).to.equal(1) expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain.com', { - onProgress: undefined, types: [ RecordType.TXT ] @@ -68,7 +67,6 @@ describe('custom dns-resolvers', () => { expect(customDnsResolver.callCount).to.equal(1) expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain2.com', { - onProgress: undefined, types: [ RecordType.TXT ] diff --git a/packages/verified-fetch/test/fixtures/dns-answer-fake.ts b/packages/verified-fetch/test/fixtures/dns-answer-fake.ts new file mode 100644 index 00000000..7c6fede4 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/dns-answer-fake.ts @@ -0,0 +1,13 @@ +import { stubInterface } from 'sinon-ts' +import type { DNSResponse } from '@multiformats/dns' + +export function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse { + const fake = stubInterface() + fake.Answer = [{ + data, + TTL, + name, + type + }] + return fake +} From f03be52c8bd398f8372c7aff9f65dc0e1ab05073 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 12:02:37 -0700 Subject: [PATCH 3/8] chore: remove throwIfAborted expressions --- packages/verified-fetch/src/utils/parse-url-string.ts | 4 ---- packages/verified-fetch/src/verified-fetch.ts | 2 -- packages/verified-fetch/test/abort-handling.spec.ts | 3 ++- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/verified-fetch/src/utils/parse-url-string.ts b/packages/verified-fetch/src/utils/parse-url-string.ts index 6d854ed2..d7fd317c 100644 --- a/packages/verified-fetch/src/utils/parse-url-string.ts +++ b/packages/verified-fetch/src/utils/parse-url-string.ts @@ -165,7 +165,6 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin try { // try resolving as an IPNS name peerId = peerIdFromString(cidOrPeerIdOrDnsLink) - options?.signal?.throwIfAborted() resolveResult = await ipns.resolve(peerId, options) cid = resolveResult?.cid resolvedPath = resolveResult?.path @@ -190,7 +189,6 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel) try { - options?.signal?.throwIfAborted() resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options) cid = resolveResult?.cid resolvedPath = resolveResult?.path @@ -203,8 +201,6 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin } } - options?.signal?.throwIfAborted() - if (cid == null) { if (errors.length === 1) { throw errors[0] diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index cc65e9d6..cf1ca425 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -487,7 +487,6 @@ export class VerifiedFetch { query = result.query ttl = result.ttl protocol = result.protocol - options?.signal?.throwIfAborted() } catch (err: any) { this.log.error('error parsing resource %s', resource, err) @@ -554,7 +553,6 @@ export class VerifiedFetch { } this.log.trace('calling handler "%s"', codecHandler.name) - options?.signal?.throwIfAborted() response = await codecHandler.call(this, handlerArgs) } diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index 9bbfda3e..0442733b 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -95,7 +95,8 @@ describe('abort-handling', () => { expect(abortedResult).to.be.ok() expect(abortedResult.status).to.equal(400) expect(abortedResult.statusText).to.equal('Bad Request') - await expect(abortedResult.text()).to.eventually.contain('aborted') + // TODO: we should be able to tell that the error was aborted instead of falling through to "invalid resource" + await expect(abortedResult.text()).to.eventually.contain('Invalid resource') expect(heliaStopSpy.calledOnce).to.be.true() }) From 59686df38c00a6620ac16c39c9bac8a0a5bc7d67 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:50:52 -0700 Subject: [PATCH 4/8] chore: significant cleanup of abort handling --- .../src/utils/parse-url-string.ts | 5 + packages/verified-fetch/src/verified-fetch.ts | 10 +- .../test/abort-handling.spec.ts | 167 ++++++++++++------ 3 files changed, 117 insertions(+), 65 deletions(-) diff --git a/packages/verified-fetch/src/utils/parse-url-string.ts b/packages/verified-fetch/src/utils/parse-url-string.ts index d7fd317c..0a7a5e7b 100644 --- a/packages/verified-fetch/src/utils/parse-url-string.ts +++ b/packages/verified-fetch/src/utils/parse-url-string.ts @@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string { * After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink: * * If it's ipfs, it parses the CID or throws an Aggregate error * * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown. + * + * @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment */ +// eslint-disable-next-line complexity export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise { const log = logger.forComponent('helia:verified-fetch:parse-url-string') const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString) @@ -170,6 +173,7 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin resolvedPath = resolveResult?.path log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid) } catch (err) { + options?.signal?.throwIfAborted() if (peerId == null) { log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err) errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`)) @@ -194,6 +198,7 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin resolvedPath = resolveResult?.path log.trace('resolved %s to %c', decodedDnsLinkLabel, cid) } catch (err: any) { + options?.signal?.throwIfAborted() log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err) errors.push(err) } diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index cf1ca425..8d57b6fd 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -289,13 +289,8 @@ export class VerifiedFetch { terminalElement = pathDetails.terminalElement } catch (err: any) { this.log.error('error walking path %s', path, err) - /** - * TODO: do this better. We should be able to distinguish between an abortError and a regular error - * However, `helia:networked-storage:error` throws an AggregateError with the abort error inside it - */ - const abortError = err.errors?.find((e: Error) => e.message.includes('abort')) - if (abortError != null) { - return badRequestResponse(resource.toString(), abortError.message) + if (options?.signal?.aborted === true) { + return badRequestResponse(resource.toString(), new Error('signal aborted by user')) } return badGatewayResponse(resource.toString(), 'Error walking path') @@ -450,7 +445,6 @@ export class VerifiedFetch { private async abortHandler (opController: AbortController): Promise { this.log.error('signal aborted by user') opController.abort('signal aborted by user') - await this.stop() } /** diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index 0442733b..be1b26d2 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -1,8 +1,11 @@ +import { dagCbor } from '@helia/dag-cbor' +import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { expect } from 'aegir/chai' import { CID } from 'multiformats/cid' -import pDefer from 'p-defer' +import pDefer, { type DeferredPromise } from 'p-defer' import Sinon from 'sinon' import { stubInterface, type StubbedInstance } from 'sinon-ts' import { VerifiedFetch } from '../src/verified-fetch.js' @@ -22,96 +25,146 @@ async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, opti return resultPromise } -describe('abort-handling', () => { - let helia: Helia - let heliaStopSpy: Sinon.SinonSpy<[], Promise> +/** + * we need to emulate signal handling (blockBrokers/dnsResolvers/etc should handle abort signals too) + * this is a simplified version of what libs we depend on should do, and the + * tests in this file verify how verified-fetch would handle the failure + */ +async function getAbortablePromise (signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error('timeout while resolving')) + }, 5000) + + signal?.addEventListener('abort', () => { + clearTimeout(timeoutId) + reject(new Error('aborted')) + }) + }) +} + +describe('abort-handling', function () { + this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine. const sandbox = Sinon.createSandbox() - let logger: ComponentLogger - let componentLoggers: Logger[] = [] + /** + * CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder + */ const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') - let superSlowBlockRetriever: StubbedInstance + let helia: Helia + // let name: IPNS + let name: StubbedInstance - let blockBrokerRetrieveCalled: ReturnType - let dnsResolverCalled: ReturnType + let logger: ComponentLogger + let componentLoggers: Logger[] = [] + let verifiedFetch: VerifiedFetch + + /** + * Stubbed networking components + */ + let blockRetriever: StubbedInstance + let dnsLinkResolver: Sinon.SinonStub> + let peerIdResolver: Sinon.SinonStub> + + /** + * used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. + */ + let blockBrokerRetrieveCalled: DeferredPromise + let dnsLinkResolverCalled: DeferredPromise + let peerIdResolverCalled: DeferredPromise beforeEach(async () => { + peerIdResolver = sandbox.stub() + dnsLinkResolver = sandbox.stub() + peerIdResolverCalled = pDefer() + dnsLinkResolverCalled = pDefer() blockBrokerRetrieveCalled = pDefer() - dnsResolverCalled = pDefer() - superSlowBlockRetriever = stubInterface({ - retrieve: Sinon.stub().callsFake(async (cid, options) => { - blockBrokerRetrieveCalled.resolve() - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - reject(new Error('timeout while resolving')) - }, 10000) - - /** - * we need to emulate signal handling (blockBrokers should handle abort signals too) - * this is a simplified version of what the blockBroker should do, and the - * tests in this file verify how verified-fetch would handle the failure - */ - options.signal?.addEventListener('abort', () => { - clearTimeout(timeoutId) - reject(new Error('aborted')) - }) - }) + dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { + dnsLinkResolverCalled.resolve() + return getAbortablePromise(options.signal) + }) + peerIdResolver.callsFake(async (peerId, options) => { + peerIdResolverCalled.resolve() + return getAbortablePromise(options.signal) + }) + blockRetriever = stubInterface({ + retrieve: sandbox.stub().callsFake(async (cid, options) => { + blockBrokerRetrieveCalled.resolve() + return getAbortablePromise(options.signal) }) }) - helia = await createHelia({ blockBrokers: [() => superSlowBlockRetriever] }) - logger = prefixLogger('helia:verified-fetch-test:abort-handling') - + logger = prefixLogger('test:abort-handling') sandbox.stub(logger, 'forComponent').callsFake((name) => { - const newLogger = libp2pLogger(`helia:verified-fetch-test:abort-handling:child-logger-${componentLoggers.length}:${name}`) + const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`) componentLoggers.push(sandbox.stub(newLogger)) return newLogger }) - helia.logger = logger - heliaStopSpy = sandbox.spy(helia, 'stop') + helia = await createHelia({ + logger, + blockBrokers: [() => blockRetriever] + }) + name = stubInterface({ + resolveDNSLink: dnsLinkResolver, + resolve: peerIdResolver + }) + verifiedFetch = new VerifiedFetch({ + helia, + ipns: name + }) }) afterEach(async () => { - await stop(helia) + await stop(helia, verifiedFetch, name) componentLoggers = [] sandbox.restore() }) - it('should abort a request before dns resolution', async function () { - this.timeout(1000) - - const customDnsResolver = Sinon.stub().resolves(new Promise((resolve, reject) => { - dnsResolverCalled.resolve() - setTimeout(() => { reject(new Error('timeout while resolving')) }, 5000) - })) - const verifiedFetch = new VerifiedFetch({ - helia - }, { - dnsResolvers: [customDnsResolver] + it('should abort a request before peerId resolution', async function () { + const c = dagCbor(helia) + const cid = await c.add({ + hello: 'world' }) - const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://this-doesnt-matter'], Promise.resolve()) - expect(superSlowBlockRetriever.retrieve.called).to.be.false() + const peerId = await createEd25519PeerId() + + await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 }) + + const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise) + + expect(peerIdResolver.callCount).to.equal(1) + expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid expect(abortedResult).to.be.ok() expect(abortedResult.status).to.equal(400) expect(abortedResult.statusText).to.equal('Bad Request') - // TODO: we should be able to tell that the error was aborted instead of falling through to "invalid resource" - await expect(abortedResult.text()).to.eventually.contain('Invalid resource') - expect(heliaStopSpy.calledOnce).to.be.true() + await expect(abortedResult.text()).to.eventually.contain('aborted') }) - it('should abort a request while looking for cid', async function () { - const verifiedFetch = new VerifiedFetch({ - helia - }) + it('should abort a request before dns resolution', async function () { + const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise) + expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails + expect(dnsLinkResolver.callCount).to.equal(1) + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + await expect(abortedResult.text()).to.eventually.contain('aborted') + }) + + it('should abort a request while looking for cid', async function () { const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) - expect(superSlowBlockRetriever.retrieve.called).to.be.true() + expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(blockRetriever.retrieve.callCount).to.equal(1) expect(abortedResult).to.be.ok() expect(abortedResult.status).to.equal(400) expect(abortedResult.statusText).to.equal('Bad Request') + // this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message await expect(abortedResult.text()).to.eventually.contain('aborted') - expect(heliaStopSpy.calledOnce).to.be.true() }) + + // TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath }) From 043490fa88ea35dae1276786eeabcf6253ec3b2b Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:53:57 -0700 Subject: [PATCH 5/8] chore: remove debugging log statement --- packages/verified-fetch/src/verified-fetch.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index 8d57b6fd..ef0d3720 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -464,7 +464,6 @@ export class VerifiedFetch { options.signal.onabort = this.abortHandler.bind(this, opController) options.signal = opController.signal } - this.log('options %o', options) options?.onProgress?.(new CustomProgressEvent('verified-fetch:request:start', { resource })) From d3dc77716dd4a665204183f577ccb63e453ae401 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:57:52 -0700 Subject: [PATCH 6/8] chore: move test fixtures --- .../test/abort-handling.spec.ts | 33 ++----------------- .../test/cache-control-header.spec.ts | 12 +------ .../test/fixtures/get-abortable-promise.ts | 17 ++++++++++ .../test/fixtures/make-aborted-request.ts | 14 ++++++++ 4 files changed, 34 insertions(+), 42 deletions(-) create mode 100644 packages/verified-fetch/test/fixtures/get-abortable-promise.ts create mode 100644 packages/verified-fetch/test/fixtures/make-aborted-request.ts diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index be1b26d2..54daa0ef 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -10,39 +10,10 @@ import Sinon from 'sinon' import { stubInterface, type StubbedInstance } from 'sinon-ts' import { VerifiedFetch } from '../src/verified-fetch.js' import { createHelia } from './fixtures/create-offline-helia.js' +import { getAbortablePromise } from './fixtures/get-abortable-promise.js' +import { makeAbortedRequest } from './fixtures/make-aborted-request.js' import type { BlockRetriever, Helia } from '@helia/interface' -async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, options = {}]: Parameters, promise: Promise): Promise { - const controller = new AbortController() - const resultPromise = verifiedFetch.fetch(resource, { - ...options, - signal: controller.signal - }) - - void promise.then(() => { - controller.abort() - }) - return resultPromise -} - -/** - * we need to emulate signal handling (blockBrokers/dnsResolvers/etc should handle abort signals too) - * this is a simplified version of what libs we depend on should do, and the - * tests in this file verify how verified-fetch would handle the failure - */ -async function getAbortablePromise (signal?: AbortSignal): Promise { - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - reject(new Error('timeout while resolving')) - }, 5000) - - signal?.addEventListener('abort', () => { - clearTimeout(timeoutId) - reject(new Error('aborted')) - }) - }) -} - describe('abort-handling', function () { this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine. const sandbox = Sinon.createSandbox() diff --git a/packages/verified-fetch/test/cache-control-header.spec.ts b/packages/verified-fetch/test/cache-control-header.spec.ts index 5c234fea..6bd84e54 100644 --- a/packages/verified-fetch/test/cache-control-header.spec.ts +++ b/packages/verified-fetch/test/cache-control-header.spec.ts @@ -5,23 +5,13 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { dns } from '@multiformats/dns' import { expect } from 'aegir/chai' import Sinon from 'sinon' -import { stubInterface } from 'sinon-ts' import { VerifiedFetch } from '../src/verified-fetch.js' import { createHelia } from './fixtures/create-offline-helia.js' +import { answerFake } from './fixtures/dns-answer-fake.js' import type { Helia } from '@helia/interface' import type { IPNS } from '@helia/ipns' import type { DNSResponse } from '@multiformats/dns' -function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse { - const fake = stubInterface() - fake.Answer = [{ - data, - TTL, - name, - type - }] - return fake -} describe('cache-control header', () => { let helia: Helia let name: IPNS diff --git a/packages/verified-fetch/test/fixtures/get-abortable-promise.ts b/packages/verified-fetch/test/fixtures/get-abortable-promise.ts new file mode 100644 index 00000000..65ca6624 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/get-abortable-promise.ts @@ -0,0 +1,17 @@ +/** + * we need to emulate signal handling (blockBrokers/dnsResolvers/etc should handle abort signals too) + * this is a simplified version of what libs we depend on should do, and the + * tests in this file verify how verified-fetch would handle the failure + */ +export async function getAbortablePromise (signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error('timeout while resolving')) + }, 5000) + + signal?.addEventListener('abort', () => { + clearTimeout(timeoutId) + reject(new Error('aborted')) + }) + }) +} diff --git a/packages/verified-fetch/test/fixtures/make-aborted-request.ts b/packages/verified-fetch/test/fixtures/make-aborted-request.ts new file mode 100644 index 00000000..6fc8ac73 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/make-aborted-request.ts @@ -0,0 +1,14 @@ +import type { VerifiedFetch } from '../../src/verified-fetch.js' + +export async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, options = {}]: Parameters, promise: Promise): Promise { + const controller = new AbortController() + const resultPromise = verifiedFetch.fetch(resource, { + ...options, + signal: controller.signal + }) + + void promise.then(() => { + controller.abort() + }) + return resultPromise +} From 41b8e0430d4132063f37e7770dcacad6f6be6090 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:59:24 -0700 Subject: [PATCH 7/8] chore: remove commented out test fixture --- packages/verified-fetch/test/abort-handling.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index 54daa0ef..41edd017 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -22,9 +22,7 @@ describe('abort-handling', function () { */ const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') let helia: Helia - // let name: IPNS let name: StubbedInstance - let logger: ComponentLogger let componentLoggers: Logger[] = [] let verifiedFetch: VerifiedFetch From 3b58fa0bf5502a9e2868866a0be8accba4efc2e7 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 Mar 2024 14:04:54 -0700 Subject: [PATCH 8/8] fix: getStreamFromAsyncIterable closes on signal abort --- .../src/utils/get-stream-from-async-iterable.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts index a4af1502..e417d11c 100644 --- a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts +++ b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts @@ -23,6 +23,11 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable