Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abort signals are respected #26

Merged
merged 9 commits into from
Mar 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
*/
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress' | 'signal'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another part of the bug fix... we need to be passing progressOptions and signal through. I just realized I wasn't checking for signal abort here... the iterator should handle this.. but i'll update just in case.

const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable')
const reader = iterator[Symbol.asyncIterator]()
const { value: firstChunk, done } = await reader.next()
Expand All @@ -23,6 +23,11 @@
},
async pull (controller) {
const { value, done } = await reader.next()
if (options?.signal?.aborted === true) {
controller.error(new Error(options.signal.reason ?? 'signal aborted by user'))
controller.close()
return
}

Check warning on line 30 in packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts#L27-L30

Added lines #L27 - L30 were not covered by tests
Comment on lines +26 to +30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 3b58fa0 (#26).

iterator from unixfs.cat should handle failing this stream, but handling this explicitly may allow us to exit earlier


if (done === true) {
if (value != null) {
Expand Down
9 changes: 4 additions & 5 deletions packages/verified-fetch/src/utils/parse-resource.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { CID } from 'multiformats/cid'
import { parseUrlString } from './parse-url-string.js'
import type { ParsedUrlStringResults } from './parse-url-string.js'
import type { ParseUrlStringOptions, ParsedUrlStringResults } from './parse-url-string.js'
import type { Resource } from '../index.js'
import type { IPNS, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns'
import type { IPNS } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

export interface ParseResourceComponents {
ipns: IPNS
logger: ComponentLogger
}

export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents> {
export interface ParseResourceOptions extends ParseUrlStringOptions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we extend options accepted by ParseUrlStringOptions


}
/**
Expand All @@ -21,7 +20,7 @@ export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEve
*/
export async function parseResource (resource: Resource, { ipns, logger }: ParseResourceComponents, options?: ParseResourceOptions): Promise<ParsedUrlStringResults> {
if (typeof resource === 'string') {
return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress })
return parseUrlString({ urlString: resource, ipns, logger }, options)
}

const cid = CID.asCID(resource)
Expand Down
19 changes: 12 additions & 7 deletions packages/verified-fetch/src/utils/parse-url-string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { CID } from 'multiformats/cid'
import { TLRU } from './tlru.js'
import type { RequestFormatShorthand } from '../types.js'
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents, ResolveResult } from '@helia/ipns'
import type { AbortOptions, ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

const ipnsCache = new TLRU<DNSLinkResolveResult | IPNSResolveResult>(1000)
Expand All @@ -13,7 +13,7 @@ export interface ParseUrlStringInput {
ipns: IPNS
logger: ComponentLogger
}
export interface ParseUrlStringOptions extends ProgressOptions<ResolveDNSLinkProgressEvents> {
export interface ParseUrlStringOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents>, AbortOptions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another part of the bug fix. we should accept a {signal}


}

Expand Down Expand Up @@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string {
* After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink:
* * If it's ipfs, it parses the CID or throws an Aggregate error
* * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown.
*
* @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment
*/
// eslint-disable-next-line complexity
export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise<ParsedUrlStringResults> {
const log = logger.forComponent('helia:verified-fetch:parse-url-string')
const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString)
Expand Down Expand Up @@ -165,11 +168,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
try {
// try resolving as an IPNS name
peerId = peerIdFromString(cidOrPeerIdOrDnsLink)
resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress })
cid = resolveResult.cid
resolvedPath = resolveResult.path
resolveResult = await ipns.resolve(peerId, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid)
} catch (err) {
options?.signal?.throwIfAborted()
if (peerId == null) {
log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`))
Expand All @@ -189,11 +193,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel)

try {
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress })
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid)
} catch (err: any) {
options?.signal?.throwIfAborted()
log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(err)
}
Expand Down
8 changes: 7 additions & 1 deletion packages/verified-fetch/src/utils/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ export function notAcceptableResponse (url: string, body?: SupportedBodyTypes, i
return response
}

export function badRequestResponse (url: string, body?: SupportedBodyTypes, init?: ResponseInit): Response {
/**
* if body is an Error, it will be converted to a string containing the error message.
*/
export function badRequestResponse (url: string, body?: SupportedBodyTypes | Error, init?: ResponseInit): Response {
if (body instanceof Error) {
body = body.message
}
const response = new Response(body, {
...(init ?? {}),
status: 400,
Expand Down
41 changes: 35 additions & 6 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@
let signal: AbortSignal | undefined
if (options?.signal === null) {
signal = undefined
} else {
signal = options?.signal
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main part that fixes the bug mentioned by #25

}

return {
...options,
signal
Expand Down Expand Up @@ -284,10 +287,13 @@
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options)
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err) {
} catch (err: any) {
this.log.error('error walking path %s', path, err)
if (options?.signal?.aborted === true) {
return badRequestResponse(resource.toString(), new Error('signal aborted by user'))
}

return badGatewayResponse('Error walking path')
return badGatewayResponse(resource.toString(), 'Error walking path')
}

let resolvedCID = terminalElement?.cid ?? cid
Expand Down Expand Up @@ -347,7 +353,8 @@

try {
const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress
onProgress: options?.onProgress,
signal: options?.signal
})
byteRangeContext.setBody(stream)
// if not a valid range request, okRangeRequest will call okResponse
Expand All @@ -367,7 +374,7 @@
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
}
return badGatewayResponse('Unable to stream content')
return badGatewayResponse(resource.toString(), 'Unable to stream content')

Check warning on line 377 in packages/verified-fetch/src/verified-fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/verified-fetch.ts#L377

Added line #L377 was not covered by tests
}
}

Expand Down Expand Up @@ -431,11 +438,33 @@
[identity.code]: this.handleRaw
}

/**
*
* TODO: Should we use 400, 408, 418, or 425, or throw and not even return a response?
Comment on lines +442 to +443
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should discuss this.

*/
private async abortHandler (opController: AbortController): Promise<void> {
this.log.error('signal aborted by user')
opController.abort('signal aborted by user')
}

/**
* We're starting to get to the point where we need a queue or pipeline of
* operations to perform and a single place to handle errors.
*
* TODO: move operations called by fetch to a queue of operations where we can
* always exit early (and cleanly) if a given signal is aborted
*/
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> {
this.log('fetch %s', resource)

const options = convertOptions(opts)

const opController = new AbortController()
if (options?.signal != null) {
options.signal.onabort = this.abortHandler.bind(this, opController)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we override any handlers they have on their signal.. they should wrap any signals they want to provide

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can change this later if we want.. but it would be good to be aware of this. @achingbrain

options.signal = opController.signal
}

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:start', { resource }))

// resolve the CID/path from the requested resource
Expand All @@ -451,10 +480,10 @@
query = result.query
ttl = result.ttl
protocol = result.protocol
} catch (err) {
} catch (err: any) {
this.log.error('error parsing resource %s', resource, err)

return badRequestResponse('Invalid resource')
return badRequestResponse(resource.toString(), err)
}

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:resolve', { cid, path }))
Expand Down
139 changes: 139 additions & 0 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import pDefer, { type DeferredPromise } from 'p-defer'
import Sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import { getAbortablePromise } from './fixtures/get-abortable-promise.js'
import { makeAbortedRequest } from './fixtures/make-aborted-request.js'
import type { BlockRetriever, Helia } from '@helia/interface'

describe('abort-handling', function () {
this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine.
const sandbox = Sinon.createSandbox()
/**
* CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder
*/
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let helia: Helia
let name: StubbedInstance<IPNS>
let logger: ComponentLogger
let componentLoggers: Logger[] = []
let verifiedFetch: VerifiedFetch

/**
* Stubbed networking components
*/
let blockRetriever: StubbedInstance<BlockRetriever>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()

dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
peerIdResolver.callsFake(async (peerId, options) => {
peerIdResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
blockRetriever = stubInterface<BlockRetriever>({
retrieve: sandbox.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()
return getAbortablePromise(options.signal)
})
})

logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`)
componentLoggers.push(sandbox.stub(newLogger))
return newLogger
})
helia = await createHelia({
logger,
blockBrokers: [() => blockRetriever]
})
name = stubInterface<IPNS>({
resolveDNSLink: dnsLinkResolver,
resolve: peerIdResolver
})
verifiedFetch = new VerifiedFetch({
helia,
ipns: name
})
})

afterEach(async () => {
await stop(helia, verifiedFetch, name)
componentLoggers = []
sandbox.restore()
})

it('should abort a request before peerId resolution', async function () {
const c = dagCbor(helia)
const cid = await c.add({
hello: 'world'
})

const peerId = await createEd25519PeerId()

await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 })

const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(1)
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request before dns resolution', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails
expect(dnsLinkResolver.callCount).to.equal(1)
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(1)
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath
})
12 changes: 1 addition & 11 deletions packages/verified-fetch/test/cache-control-header.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,13 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { dns } from '@multiformats/dns'
import { expect } from 'aegir/chai'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import { answerFake } from './fixtures/dns-answer-fake.js'
import type { Helia } from '@helia/interface'
import type { IPNS } from '@helia/ipns'
import type { DNSResponse } from '@multiformats/dns'

function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse {
const fake = stubInterface<DNSResponse>()
fake.Answer = [{
data,
TTL,
name,
type
}]
return fake
}
describe('cache-control header', () => {
let helia: Helia
let name: IPNS
Expand Down
Loading
Loading