diff --git a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts index 02342d59..e417d11c 100644 --- a/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts +++ b/packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts @@ -5,7 +5,7 @@ import type { ComponentLogger } from '@libp2p/interface' /** * Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes */ -export async function getStreamFromAsyncIterable (iterator: AsyncIterable, path: string, logger: ComponentLogger, options?: Pick): Promise<{ stream: ReadableStream, firstChunk: Uint8Array }> { +export async function getStreamFromAsyncIterable (iterator: AsyncIterable, path: string, logger: ComponentLogger, options?: Pick): Promise<{ stream: ReadableStream, firstChunk: Uint8Array }> { const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable') const reader = iterator[Symbol.asyncIterator]() const { value: firstChunk, done } = await reader.next() @@ -23,6 +23,11 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable { +export interface ParseResourceOptions extends ParseUrlStringOptions { } /** @@ -21,7 +20,7 @@ export interface ParseResourceOptions extends ProgressOptions { if (typeof resource === 'string') { - return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress }) + return parseUrlString({ urlString: resource, ipns, logger }, options) } const cid = CID.asCID(resource) diff --git a/packages/verified-fetch/src/utils/parse-url-string.ts b/packages/verified-fetch/src/utils/parse-url-string.ts index bf3b6127..0a7a5e7b 100644 --- a/packages/verified-fetch/src/utils/parse-url-string.ts +++ b/packages/verified-fetch/src/utils/parse-url-string.ts @@ -2,8 +2,8 @@ import { peerIdFromString } from '@libp2p/peer-id' import { CID } from 'multiformats/cid' import { TLRU } from './tlru.js' import type { RequestFormatShorthand } from '../types.js' -import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns' -import type { ComponentLogger } from '@libp2p/interface' +import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents, ResolveResult } from '@helia/ipns' +import type { AbortOptions, ComponentLogger } from '@libp2p/interface' import type { ProgressOptions } from 'progress-events' const ipnsCache = new TLRU(1000) @@ -13,7 +13,7 @@ export interface ParseUrlStringInput { ipns: IPNS logger: ComponentLogger } -export interface ParseUrlStringOptions extends ProgressOptions { +export interface ParseUrlStringOptions extends ProgressOptions, AbortOptions { } @@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string { * After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink: * * If it's ipfs, it parses the CID or throws an Aggregate error * * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown. + * + * @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment */ +// eslint-disable-next-line complexity export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise { const log = logger.forComponent('helia:verified-fetch:parse-url-string') const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString) @@ -165,11 +168,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin try { // try resolving as an IPNS name peerId = peerIdFromString(cidOrPeerIdOrDnsLink) - resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress }) - cid = resolveResult.cid - resolvedPath = resolveResult.path + resolveResult = await ipns.resolve(peerId, options) + cid = resolveResult?.cid + resolvedPath = resolveResult?.path log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid) } catch (err) { + options?.signal?.throwIfAborted() if (peerId == null) { log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err) errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`)) @@ -189,11 +193,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel) try { - resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress }) + resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options) cid = resolveResult?.cid resolvedPath = resolveResult?.path log.trace('resolved %s to %c', decodedDnsLinkLabel, cid) } catch (err: any) { + options?.signal?.throwIfAborted() log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err) errors.push(err) } diff --git a/packages/verified-fetch/src/utils/responses.ts b/packages/verified-fetch/src/utils/responses.ts index 667318c6..dda0230d 100644 --- a/packages/verified-fetch/src/utils/responses.ts +++ b/packages/verified-fetch/src/utils/responses.ts @@ -85,7 +85,13 @@ export function notAcceptableResponse (url: string, body?: SupportedBodyTypes, i return response } -export function badRequestResponse (url: string, body?: SupportedBodyTypes, init?: ResponseInit): Response { +/** + * if body is an Error, it will be converted to a string containing the error message. + */ +export function badRequestResponse (url: string, body?: SupportedBodyTypes | Error, init?: ResponseInit): Response { + if (body instanceof Error) { + body = body.message + } const response = new Response(body, { ...(init ?? {}), status: 400, diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index 15280d91..ef0d3720 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -78,7 +78,10 @@ function convertOptions (options?: VerifiedFetchOptions): (Omit { + this.log.error('signal aborted by user') + opController.abort('signal aborted by user') + } + + /** + * We're starting to get to the point where we need a queue or pipeline of + * operations to perform and a single place to handle errors. + * + * TODO: move operations called by fetch to a queue of operations where we can + * always exit early (and cleanly) if a given signal is aborted + */ async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise { this.log('fetch %s', resource) const options = convertOptions(opts) + const opController = new AbortController() + if (options?.signal != null) { + options.signal.onabort = this.abortHandler.bind(this, opController) + options.signal = opController.signal + } + options?.onProgress?.(new CustomProgressEvent('verified-fetch:request:start', { resource })) // resolve the CID/path from the requested resource @@ -451,10 +480,10 @@ export class VerifiedFetch { query = result.query ttl = result.ttl protocol = result.protocol - } catch (err) { + } catch (err: any) { this.log.error('error parsing resource %s', resource, err) - return badRequestResponse('Invalid resource') + return badRequestResponse(resource.toString(), err) } options?.onProgress?.(new CustomProgressEvent('verified-fetch:request:resolve', { cid, path })) diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts new file mode 100644 index 00000000..41edd017 --- /dev/null +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -0,0 +1,139 @@ +import { dagCbor } from '@helia/dag-cbor' +import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' +import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' +import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import { CID } from 'multiformats/cid' +import pDefer, { type DeferredPromise } from 'p-defer' +import Sinon from 'sinon' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import { VerifiedFetch } from '../src/verified-fetch.js' +import { createHelia } from './fixtures/create-offline-helia.js' +import { getAbortablePromise } from './fixtures/get-abortable-promise.js' +import { makeAbortedRequest } from './fixtures/make-aborted-request.js' +import type { BlockRetriever, Helia } from '@helia/interface' + +describe('abort-handling', function () { + this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine. + const sandbox = Sinon.createSandbox() + /** + * CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder + */ + const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') + let helia: Helia + let name: StubbedInstance + let logger: ComponentLogger + let componentLoggers: Logger[] = [] + let verifiedFetch: VerifiedFetch + + /** + * Stubbed networking components + */ + let blockRetriever: StubbedInstance + let dnsLinkResolver: Sinon.SinonStub> + let peerIdResolver: Sinon.SinonStub> + + /** + * used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. + */ + let blockBrokerRetrieveCalled: DeferredPromise + let dnsLinkResolverCalled: DeferredPromise + let peerIdResolverCalled: DeferredPromise + + beforeEach(async () => { + peerIdResolver = sandbox.stub() + dnsLinkResolver = sandbox.stub() + peerIdResolverCalled = pDefer() + dnsLinkResolverCalled = pDefer() + blockBrokerRetrieveCalled = pDefer() + + dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { + dnsLinkResolverCalled.resolve() + return getAbortablePromise(options.signal) + }) + peerIdResolver.callsFake(async (peerId, options) => { + peerIdResolverCalled.resolve() + return getAbortablePromise(options.signal) + }) + blockRetriever = stubInterface({ + retrieve: sandbox.stub().callsFake(async (cid, options) => { + blockBrokerRetrieveCalled.resolve() + return getAbortablePromise(options.signal) + }) + }) + + logger = prefixLogger('test:abort-handling') + sandbox.stub(logger, 'forComponent').callsFake((name) => { + const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`) + componentLoggers.push(sandbox.stub(newLogger)) + return newLogger + }) + helia = await createHelia({ + logger, + blockBrokers: [() => blockRetriever] + }) + name = stubInterface({ + resolveDNSLink: dnsLinkResolver, + resolve: peerIdResolver + }) + verifiedFetch = new VerifiedFetch({ + helia, + ipns: name + }) + }) + + afterEach(async () => { + await stop(helia, verifiedFetch, name) + componentLoggers = [] + sandbox.restore() + }) + + it('should abort a request before peerId resolution', async function () { + const c = dagCbor(helia) + const cid = await c.add({ + hello: 'world' + }) + + const peerId = await createEd25519PeerId() + + await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 }) + + const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise) + + expect(peerIdResolver.callCount).to.equal(1) + expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + await expect(abortedResult.text()).to.eventually.contain('aborted') + }) + + it('should abort a request before dns resolution', async function () { + const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise) + + expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails + expect(dnsLinkResolver.callCount).to.equal(1) + expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + await expect(abortedResult.text()).to.eventually.contain('aborted') + }) + + it('should abort a request while looking for cid', async function () { + const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) + + expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString + expect(blockRetriever.retrieve.callCount).to.equal(1) + expect(abortedResult).to.be.ok() + expect(abortedResult.status).to.equal(400) + expect(abortedResult.statusText).to.equal('Bad Request') + // this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message + await expect(abortedResult.text()).to.eventually.contain('aborted') + }) + + // TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath +}) diff --git a/packages/verified-fetch/test/cache-control-header.spec.ts b/packages/verified-fetch/test/cache-control-header.spec.ts index 5c234fea..6bd84e54 100644 --- a/packages/verified-fetch/test/cache-control-header.spec.ts +++ b/packages/verified-fetch/test/cache-control-header.spec.ts @@ -5,23 +5,13 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { dns } from '@multiformats/dns' import { expect } from 'aegir/chai' import Sinon from 'sinon' -import { stubInterface } from 'sinon-ts' import { VerifiedFetch } from '../src/verified-fetch.js' import { createHelia } from './fixtures/create-offline-helia.js' +import { answerFake } from './fixtures/dns-answer-fake.js' import type { Helia } from '@helia/interface' import type { IPNS } from '@helia/ipns' import type { DNSResponse } from '@multiformats/dns' -function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse { - const fake = stubInterface() - fake.Answer = [{ - data, - TTL, - name, - type - }] - return fake -} describe('cache-control header', () => { let helia: Helia let name: IPNS diff --git a/packages/verified-fetch/test/custom-dns-resolvers.spec.ts b/packages/verified-fetch/test/custom-dns-resolvers.spec.ts index d6bdce65..9d330f85 100644 --- a/packages/verified-fetch/test/custom-dns-resolvers.spec.ts +++ b/packages/verified-fetch/test/custom-dns-resolvers.spec.ts @@ -35,7 +35,6 @@ describe('custom dns-resolvers', () => { expect(customDnsResolver.callCount).to.equal(1) expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain.com', { - onProgress: undefined, types: [ RecordType.TXT ] @@ -68,7 +67,6 @@ describe('custom dns-resolvers', () => { expect(customDnsResolver.callCount).to.equal(1) expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain2.com', { - onProgress: undefined, types: [ RecordType.TXT ] diff --git a/packages/verified-fetch/test/fixtures/dns-answer-fake.ts b/packages/verified-fetch/test/fixtures/dns-answer-fake.ts new file mode 100644 index 00000000..7c6fede4 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/dns-answer-fake.ts @@ -0,0 +1,13 @@ +import { stubInterface } from 'sinon-ts' +import type { DNSResponse } from '@multiformats/dns' + +export function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse { + const fake = stubInterface() + fake.Answer = [{ + data, + TTL, + name, + type + }] + return fake +} diff --git a/packages/verified-fetch/test/fixtures/get-abortable-promise.ts b/packages/verified-fetch/test/fixtures/get-abortable-promise.ts new file mode 100644 index 00000000..65ca6624 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/get-abortable-promise.ts @@ -0,0 +1,17 @@ +/** + * we need to emulate signal handling (blockBrokers/dnsResolvers/etc should handle abort signals too) + * this is a simplified version of what libs we depend on should do, and the + * tests in this file verify how verified-fetch would handle the failure + */ +export async function getAbortablePromise (signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error('timeout while resolving')) + }, 5000) + + signal?.addEventListener('abort', () => { + clearTimeout(timeoutId) + reject(new Error('aborted')) + }) + }) +} diff --git a/packages/verified-fetch/test/fixtures/make-aborted-request.ts b/packages/verified-fetch/test/fixtures/make-aborted-request.ts new file mode 100644 index 00000000..6fc8ac73 --- /dev/null +++ b/packages/verified-fetch/test/fixtures/make-aborted-request.ts @@ -0,0 +1,14 @@ +import type { VerifiedFetch } from '../../src/verified-fetch.js' + +export async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, options = {}]: Parameters, promise: Promise): Promise { + const controller = new AbortController() + const resultPromise = verifiedFetch.fetch(resource, { + ...options, + signal: controller.signal + }) + + void promise.then(() => { + controller.abort() + }) + return resultPromise +}