Skip to content

Commit

Permalink
chore: update utils
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Apr 15, 2024
1 parent 8230eb0 commit 7f29e5b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 81 deletions.
27 changes: 14 additions & 13 deletions packages/verified-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
"release": "aegir release"
},
"dependencies": {
"@helia/block-brokers": "^2.0.3",
"@helia/car": "^3.1.2",
"@helia/http": "^1.0.3",
"@helia/interface": "^4.1.0",
"@helia/ipns": "^7.2.0",
"@helia/routers": "^1.0.2",
"@helia/block-brokers": "^2.1.0",
"@helia/car": "^3.1.3",
"@helia/http": "^1.0.4",
"@helia/interface": "^4.2.0",
"@helia/ipns": "^7.2.1",
"@helia/routers": "^1.0.3",
"@ipld/dag-cbor": "^9.2.0",
"@ipld/dag-json": "^10.2.0",
"@ipld/dag-pb": "^4.1.0",
Expand All @@ -84,12 +84,12 @@
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@helia/car": "^3.1.2",
"@helia/dag-cbor": "^3.0.2",
"@helia/dag-json": "^3.0.2",
"@helia/json": "^3.0.2",
"@helia/unixfs": "^3.0.3",
"@helia/utils": "^0.1.0",
"@helia/car": "^3.1.3",
"@helia/dag-cbor": "^3.0.3",
"@helia/dag-json": "^3.0.3",
"@helia/json": "^3.0.3",
"@helia/unixfs": "^3.0.4",
"@helia/utils": "^0.2.0",
"@ipld/car": "^5.3.0",
"@libp2p/interface-compliance-tests": "^5.3.4",
"@libp2p/logger": "^4.0.9",
Expand All @@ -100,10 +100,11 @@
"blockstore-core": "^4.4.1",
"browser-readablestream-to-it": "^2.0.5",
"datastore-core": "^9.2.9",
"helia": "^4.1.0",
"helia": "^4.1.1",
"ipfs-unixfs-importer": "^15.2.5",
"ipns": "^9.1.0",
"it-all": "^3.0.4",
"it-drain": "^3.0.5",
"it-last": "^3.0.4",
"it-to-buffer": "^4.0.5",
"magic-bytes.js": "^1.10.0",
Expand Down
26 changes: 13 additions & 13 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,19 +375,20 @@ export class VerifiedFetch {
const length = byteRangeContext.length
this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length)

const entry = await exporter(resolvedCID, this.helia.blockstore, {
signal: options?.signal,
onProgress: options?.onProgress
})
const asyncIter = entry.content({
signal: options?.signal,
onProgress: options?.onProgress,
offset,
length
})
this.log('got async iterator for %c/%s', cid, path)

try {
const entry = await exporter(resolvedCID, this.helia.blockstore, {
signal: options?.signal,
onProgress: options?.onProgress
})

const asyncIter = entry.content({
signal: options?.signal,
onProgress: options?.onProgress,
offset,
length
})
this.log('got async iterator for %c/%s', cid, path)

const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress,
signal: options?.signal
Expand All @@ -403,7 +404,6 @@ export class VerifiedFetch {
if (ipfsRoots != null) {
response.headers.set('X-Ipfs-Roots', ipfsRoots.map(cid => cid.toV1().toString()).join(',')) // https://specs.ipfs.tech/http-gateways/path-gateway/#x-ipfs-roots-response-header
}

return response
} catch (err: any) {
options?.signal?.throwIfAborted()
Expand Down
180 changes: 125 additions & 55 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { type UnixFSStats, unixfs } from '@helia/unixfs'
import { unixfs } from '@helia/unixfs'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import browserReadableStreamToIt from 'browser-readablestream-to-it'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import drain from 'it-drain'
import { CID } from 'multiformats/cid'
import pDefer, { type DeferredPromise } from 'p-defer'
import Sinon from 'sinon'
Expand All @@ -13,7 +16,7 @@ import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import { getAbortablePromise } from './fixtures/get-abortable-promise.js'
import { makeAbortedRequest } from './fixtures/make-aborted-request.js'
import type { BlockRetriever, Helia } from '@helia/interface'
import type { BlockBroker, Helia } from '@helia/interface'

describe('abort-handling', function () {
this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine.
Expand All @@ -31,31 +34,23 @@ describe('abort-handling', function () {
/**
* Stubbed networking components
*/
let blockRetriever: StubbedInstance<BlockRetriever>
let blockRetriever: StubbedInstance<Required<Pick<BlockBroker, 'retrieve'>>>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>
let unixFsCatStub: Sinon.SinonStub<any[], AsyncIterable<Uint8Array>>
let unixFsStatStub: Sinon.SinonStub<any[], Promise<UnixFSStats>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>
let unixFsStatCalled: DeferredPromise<void>
let unixFsCatCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
unixFsCatStub = sandbox.stub()
unixFsStatStub = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()
unixFsStatCalled = pDefer()
unixFsCatCalled = pDefer()

dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
Expand All @@ -65,35 +60,12 @@ describe('abort-handling', function () {
peerIdResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
blockRetriever = stubInterface<BlockRetriever>({
blockRetriever = stubInterface<Required<Pick<BlockBroker, 'retrieve'>>>({
retrieve: sandbox.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()
return getAbortablePromise(options.signal)
})
})
unixFsCatStub.callsFake((cid, options) => {
unixFsCatCalled.resolve()
return {
async * [Symbol.asyncIterator] () {
await getAbortablePromise(options.signal)
yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
}
}
})

unixFsStatStub.callsFake(async (cid, options): Promise<UnixFSStats> => {
unixFsStatCalled.resolve()
await getAbortablePromise(options.signal)
return {
cid,
type: 'file',
fileSize: BigInt(0),
dagSize: BigInt(0),
blocks: 1,
localFileSize: BigInt(0),
localDagSize: BigInt(0)
}
})

logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
Expand Down Expand Up @@ -153,39 +125,137 @@ describe('abort-handling', function () {
expect(blockRetriever.retrieve.callCount).to.equal(1)
})

it.skip('should abort a request during unixfs.stat call', async function () {
it('should abort a request while loading a file root', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))

// add a file with a very small chunk size - this is to ensure we end up
// with a DAG that contains a root and some leaf nodes
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]), {
chunker: fixedSize({ chunkSize: 2 })
})
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted')
const leaf1 = CID.parse('bafkreifucp2h2e7of7tmqrns5ykbv6a55bmn6twfjgsyw6lqxolgiw6i2i')
const leaf2 = CID.parse('bafkreihosbapmxbudbk6a4h7iohlb2u5lobrwkrme4h3p32zfv2qichdwm')

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call
// file root
await expect(helia.blockstore.has(fileCid))
.to.eventually.be.true()

// leaf nodes
await expect(helia.blockstore.has(leaf1))
.to.eventually.be.true()
await expect(helia.blockstore.has(leaf2))
.to.eventually.be.true()

const fileRootGot = pDefer()
const blockstoreGetSpy = Sinon.stub(helia.blockstore, 'get')
blockstoreGetSpy.callsFake(async (cid, options) => {
if (cid.equals(fileCid)) {
fileRootGot.resolve()
}

return blockstoreGetSpy.wrappedMethod.call(helia.blockstore, cid, options)
})

await expect(makeAbortedRequest(verifiedFetch, [cid], fileRootGot.promise))
.to.eventually.be.rejectedWith('aborted')

// not called because parseResource never passes the resource to
// parseUrlString
expect(peerIdResolver.callCount).to.equal(0)

// not called because parseResource never passes the resource to
// parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0)

// not called because the blockstore has the content
expect(blockRetriever.retrieve.callCount).to.equal(0)

// the file root was loaded
expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString()))
.to.include(fileCid.toString())

// the leaf nodes were not have been loaded because the request was aborted
// after the root node was loaded
expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString()))
.to.not.include(leaf1.toString())
expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString()))
.to.not.include(leaf2.toString())
})

it.skip('should abort a request during unixfs.cat call', async function () {
it('should abort a request while loading file data', async function () {
const fs = unixfs(helia)
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]))

// add a file with a very small chunk size - this is to ensure we end up
// with a DAG that contains a root and some leaf nodes
const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]), {
chunker: fixedSize({ chunkSize: 2 })
})
const directoryCid = await fs.addDirectory()
const cid = await fs.cp(fileCid, directoryCid, 'index.html')

// override the default fake set in beforeEach that would timeout.
unixFsStatStub.callsFake(async (cid, options) => {
unixFsStatCalled.resolve()
return fs.stat(cid, options)
const leaf1 = CID.parse('bafkreifucp2h2e7of7tmqrns5ykbv6a55bmn6twfjgsyw6lqxolgiw6i2i')
const leaf2 = CID.parse('bafkreihosbapmxbudbk6a4h7iohlb2u5lobrwkrme4h3p32zfv2qichdwm')

// file root
await expect(helia.blockstore.has(fileCid))
.to.eventually.be.true()

// leaf nodes
await expect(helia.blockstore.has(leaf1))
.to.eventually.be.true()
await expect(helia.blockstore.has(leaf2))
.to.eventually.be.true()

const leaf1Got = pDefer()
let leaf2Loaded = false
const blockstoreGetSpy = Sinon.stub(helia.blockstore, 'get')
blockstoreGetSpy.callsFake(async (cid, options) => {
if (cid.equals(leaf1)) {
leaf1Got.resolve()
}

const b = await blockstoreGetSpy.wrappedMethod.call(helia.blockstore, cid, options)

if (cid.equals(leaf2)) {
leaf2Loaded = true
}

return b
})

await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted')
const response = await makeAbortedRequest(verifiedFetch, [cid], leaf1Got.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content
expect(unixFsStatStub.callCount).to.equal(1)
expect(unixFsCatStub.callCount).to.equal(1)
if (response.body == null) {
throw new Error('Body was not set')
}

// error occurs during streaming response
await expect(drain(browserReadableStreamToIt(response.body)))
.to.eventually.be.rejectedWith('aborted')

// not called because parseResource never passes the resource to
// parseUrlString
expect(peerIdResolver.callCount).to.equal(0)

// not called because parseResource never passes the resource to
// parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0)

// not called because the blockstore has the content
expect(blockRetriever.retrieve.callCount).to.equal(0)

// the file root was loaded
expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString()))
.to.include(fileCid.toString())

// the first leaf was loaded
expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString()))
.to.include(leaf1.toString())

// the signal was aborted before the second leaf was loaded
expect(leaf2Loaded).to.be.false()
})
})

0 comments on commit 7f29e5b

Please sign in to comment.