-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: abort signals are respected #26
Changes from all commits
dc2a768
565c321
5d222a8
f03be52
59686df
043490f
d3dc777
41b8e04
3b58fa0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
/** | ||
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes | ||
*/ | ||
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> { | ||
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress' | 'signal'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> { | ||
const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable') | ||
const reader = iterator[Symbol.asyncIterator]() | ||
const { value: firstChunk, done } = await reader.next() | ||
|
@@ -23,6 +23,11 @@ | |
}, | ||
async pull (controller) { | ||
const { value, done } = await reader.next() | ||
if (options?.signal?.aborted === true) { | ||
controller.error(new Error(options.signal.reason ?? 'signal aborted by user')) | ||
controller.close() | ||
return | ||
} | ||
Comment on lines
+26
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in iterator from |
||
|
||
if (done === true) { | ||
if (value != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,16 @@ | ||
import { CID } from 'multiformats/cid' | ||
import { parseUrlString } from './parse-url-string.js' | ||
import type { ParsedUrlStringResults } from './parse-url-string.js' | ||
import type { ParseUrlStringOptions, ParsedUrlStringResults } from './parse-url-string.js' | ||
import type { Resource } from '../index.js' | ||
import type { IPNS, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns' | ||
import type { IPNS } from '@helia/ipns' | ||
import type { ComponentLogger } from '@libp2p/interface' | ||
import type { ProgressOptions } from 'progress-events' | ||
|
||
export interface ParseResourceComponents { | ||
ipns: IPNS | ||
logger: ComponentLogger | ||
} | ||
|
||
export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents> { | ||
export interface ParseResourceOptions extends ParseUrlStringOptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we extend options accepted by ParseUrlStringOptions |
||
|
||
} | ||
/** | ||
|
@@ -21,7 +20,7 @@ export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEve | |
*/ | ||
export async function parseResource (resource: Resource, { ipns, logger }: ParseResourceComponents, options?: ParseResourceOptions): Promise<ParsedUrlStringResults> { | ||
if (typeof resource === 'string') { | ||
return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress }) | ||
return parseUrlString({ urlString: resource, ipns, logger }, options) | ||
} | ||
|
||
const cid = CID.asCID(resource) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ import { peerIdFromString } from '@libp2p/peer-id' | |
import { CID } from 'multiformats/cid' | ||
import { TLRU } from './tlru.js' | ||
import type { RequestFormatShorthand } from '../types.js' | ||
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns' | ||
import type { ComponentLogger } from '@libp2p/interface' | ||
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents, ResolveResult } from '@helia/ipns' | ||
import type { AbortOptions, ComponentLogger } from '@libp2p/interface' | ||
import type { ProgressOptions } from 'progress-events' | ||
|
||
const ipnsCache = new TLRU<DNSLinkResolveResult | IPNSResolveResult>(1000) | ||
|
@@ -13,7 +13,7 @@ export interface ParseUrlStringInput { | |
ipns: IPNS | ||
logger: ComponentLogger | ||
} | ||
export interface ParseUrlStringOptions extends ProgressOptions<ResolveDNSLinkProgressEvents> { | ||
export interface ParseUrlStringOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents>, AbortOptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another part of the bug fix. we should accept a |
||
|
||
} | ||
|
||
|
@@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string { | |
* After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink: | ||
* * If it's ipfs, it parses the CID or throws an Aggregate error | ||
* * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown. | ||
* | ||
* @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment | ||
*/ | ||
// eslint-disable-next-line complexity | ||
export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise<ParsedUrlStringResults> { | ||
const log = logger.forComponent('helia:verified-fetch:parse-url-string') | ||
const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString) | ||
|
@@ -165,11 +168,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin | |
try { | ||
// try resolving as an IPNS name | ||
peerId = peerIdFromString(cidOrPeerIdOrDnsLink) | ||
resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress }) | ||
cid = resolveResult.cid | ||
resolvedPath = resolveResult.path | ||
resolveResult = await ipns.resolve(peerId, options) | ||
cid = resolveResult?.cid | ||
resolvedPath = resolveResult?.path | ||
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid) | ||
} catch (err) { | ||
options?.signal?.throwIfAborted() | ||
if (peerId == null) { | ||
log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err) | ||
errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`)) | ||
|
@@ -189,11 +193,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin | |
log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel) | ||
|
||
try { | ||
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress }) | ||
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options) | ||
cid = resolveResult?.cid | ||
resolvedPath = resolveResult?.path | ||
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid) | ||
} catch (err: any) { | ||
options?.signal?.throwIfAborted() | ||
log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err) | ||
errors.push(err) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,10 @@ | |
let signal: AbortSignal | undefined | ||
if (options?.signal === null) { | ||
signal = undefined | ||
} else { | ||
signal = options?.signal | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the main part that fixes the bug mentioned by #25 |
||
} | ||
|
||
return { | ||
...options, | ||
signal | ||
|
@@ -284,10 +287,13 @@ | |
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options) | ||
ipfsRoots = pathDetails.ipfsRoots | ||
terminalElement = pathDetails.terminalElement | ||
} catch (err) { | ||
} catch (err: any) { | ||
this.log.error('error walking path %s', path, err) | ||
if (options?.signal?.aborted === true) { | ||
return badRequestResponse(resource.toString(), new Error('signal aborted by user')) | ||
} | ||
|
||
return badGatewayResponse('Error walking path') | ||
return badGatewayResponse(resource.toString(), 'Error walking path') | ||
} | ||
|
||
let resolvedCID = terminalElement?.cid ?? cid | ||
|
@@ -347,7 +353,8 @@ | |
|
||
try { | ||
const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, { | ||
onProgress: options?.onProgress | ||
onProgress: options?.onProgress, | ||
signal: options?.signal | ||
}) | ||
byteRangeContext.setBody(stream) | ||
// if not a valid range request, okRangeRequest will call okResponse | ||
|
@@ -367,7 +374,7 @@ | |
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') { | ||
return badRangeResponse(resource) | ||
} | ||
return badGatewayResponse('Unable to stream content') | ||
return badGatewayResponse(resource.toString(), 'Unable to stream content') | ||
} | ||
} | ||
|
||
|
@@ -431,11 +438,33 @@ | |
[identity.code]: this.handleRaw | ||
} | ||
|
||
/** | ||
* | ||
* TODO: Should we use 400, 408, 418, or 425, or throw and not even return a response? | ||
Comment on lines
+442
to
+443
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should discuss this. |
||
*/ | ||
private async abortHandler (opController: AbortController): Promise<void> { | ||
this.log.error('signal aborted by user') | ||
opController.abort('signal aborted by user') | ||
} | ||
|
||
/** | ||
* We're starting to get to the point where we need a queue or pipeline of | ||
* operations to perform and a single place to handle errors. | ||
* | ||
* TODO: move operations called by fetch to a queue of operations where we can | ||
* always exit early (and cleanly) if a given signal is aborted | ||
*/ | ||
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> { | ||
this.log('fetch %s', resource) | ||
|
||
const options = convertOptions(opts) | ||
|
||
const opController = new AbortController() | ||
if (options?.signal != null) { | ||
options.signal.onabort = this.abortHandler.bind(this, opController) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we override any handlers they have on their signal.. they should wrap any signals they want to provide There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can change this later if we want.. but it would be good to be aware of this. @achingbrain |
||
options.signal = opController.signal | ||
} | ||
|
||
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:start', { resource })) | ||
|
||
// resolve the CID/path from the requested resource | ||
|
@@ -451,10 +480,10 @@ | |
query = result.query | ||
ttl = result.ttl | ||
protocol = result.protocol | ||
} catch (err) { | ||
} catch (err: any) { | ||
this.log.error('error parsing resource %s', resource, err) | ||
|
||
return badRequestResponse('Invalid resource') | ||
return badRequestResponse(resource.toString(), err) | ||
} | ||
|
||
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:resolve', { cid, path })) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import { dagCbor } from '@helia/dag-cbor' | ||
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' | ||
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' | ||
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' | ||
import { createEd25519PeerId } from '@libp2p/peer-id-factory' | ||
import { expect } from 'aegir/chai' | ||
import { CID } from 'multiformats/cid' | ||
import pDefer, { type DeferredPromise } from 'p-defer' | ||
import Sinon from 'sinon' | ||
import { stubInterface, type StubbedInstance } from 'sinon-ts' | ||
import { VerifiedFetch } from '../src/verified-fetch.js' | ||
import { createHelia } from './fixtures/create-offline-helia.js' | ||
import { getAbortablePromise } from './fixtures/get-abortable-promise.js' | ||
import { makeAbortedRequest } from './fixtures/make-aborted-request.js' | ||
import type { BlockRetriever, Helia } from '@helia/interface' | ||
|
||
describe('abort-handling', function () { | ||
this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine. | ||
const sandbox = Sinon.createSandbox() | ||
/** | ||
* CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder | ||
*/ | ||
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') | ||
let helia: Helia | ||
let name: StubbedInstance<IPNS> | ||
let logger: ComponentLogger | ||
let componentLoggers: Logger[] = [] | ||
let verifiedFetch: VerifiedFetch | ||
|
||
/** | ||
* Stubbed networking components | ||
*/ | ||
let blockRetriever: StubbedInstance<BlockRetriever> | ||
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>> | ||
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>> | ||
|
||
/** | ||
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. | ||
*/ | ||
let blockBrokerRetrieveCalled: DeferredPromise<void> | ||
let dnsLinkResolverCalled: DeferredPromise<void> | ||
let peerIdResolverCalled: DeferredPromise<void> | ||
|
||
beforeEach(async () => { | ||
peerIdResolver = sandbox.stub() | ||
dnsLinkResolver = sandbox.stub() | ||
peerIdResolverCalled = pDefer() | ||
dnsLinkResolverCalled = pDefer() | ||
blockBrokerRetrieveCalled = pDefer() | ||
|
||
dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { | ||
dnsLinkResolverCalled.resolve() | ||
return getAbortablePromise(options.signal) | ||
}) | ||
peerIdResolver.callsFake(async (peerId, options) => { | ||
peerIdResolverCalled.resolve() | ||
return getAbortablePromise(options.signal) | ||
}) | ||
blockRetriever = stubInterface<BlockRetriever>({ | ||
retrieve: sandbox.stub().callsFake(async (cid, options) => { | ||
blockBrokerRetrieveCalled.resolve() | ||
return getAbortablePromise(options.signal) | ||
}) | ||
}) | ||
|
||
logger = prefixLogger('test:abort-handling') | ||
sandbox.stub(logger, 'forComponent').callsFake((name) => { | ||
const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`) | ||
componentLoggers.push(sandbox.stub(newLogger)) | ||
return newLogger | ||
}) | ||
helia = await createHelia({ | ||
logger, | ||
blockBrokers: [() => blockRetriever] | ||
}) | ||
name = stubInterface<IPNS>({ | ||
resolveDNSLink: dnsLinkResolver, | ||
resolve: peerIdResolver | ||
}) | ||
verifiedFetch = new VerifiedFetch({ | ||
helia, | ||
ipns: name | ||
}) | ||
}) | ||
|
||
afterEach(async () => { | ||
await stop(helia, verifiedFetch, name) | ||
componentLoggers = [] | ||
sandbox.restore() | ||
}) | ||
|
||
it('should abort a request before peerId resolution', async function () { | ||
const c = dagCbor(helia) | ||
const cid = await c.add({ | ||
hello: 'world' | ||
}) | ||
|
||
const peerId = await createEd25519PeerId() | ||
|
||
await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 }) | ||
|
||
const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise) | ||
|
||
expect(peerIdResolver.callCount).to.equal(1) | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
it('should abort a request before dns resolution', async function () { | ||
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise) | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails | ||
expect(dnsLinkResolver.callCount).to.equal(1) | ||
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
it('should abort a request while looking for cid', async function () { | ||
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise) | ||
|
||
expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString | ||
expect(blockRetriever.retrieve.callCount).to.equal(1) | ||
expect(abortedResult).to.be.ok() | ||
expect(abortedResult.status).to.equal(400) | ||
expect(abortedResult.statusText).to.equal('Bad Request') | ||
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message | ||
await expect(abortedResult.text()).to.eventually.contain('aborted') | ||
}) | ||
|
||
// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another part of the bug fix... we need to be passing progressOptions and signal through. I just realized I wasn't checking for signal abort here... the iterator should handle this.. but i'll update just in case.