From a7442326404326cd5fca56a5b3491135270a3ab5 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:08:03 -0600 Subject: [PATCH] feat: enhanced dag traversal --- packages/verified-fetch/package.json | 1 + .../src/utils/enhanced-dag-traversal.ts | 90 +++++++++++++++++++ .../src/utils/set-content-type.ts | 2 +- packages/verified-fetch/src/verified-fetch.ts | 51 ++++++++--- 4 files changed, 129 insertions(+), 15 deletions(-) create mode 100644 packages/verified-fetch/src/utils/enhanced-dag-traversal.ts diff --git a/packages/verified-fetch/package.json b/packages/verified-fetch/package.json index 52320ce..dcc9dfa 100644 --- a/packages/verified-fetch/package.json +++ b/packages/verified-fetch/package.json @@ -166,6 +166,7 @@ "interface-datastore": "^8.3.1", "ipfs-unixfs-exporter": "^13.6.1", "ipns": "^10.0.0", + "it-first": "^3.0.6", "it-map": "^3.1.1", "it-pipe": "^3.0.1", "it-tar": "^6.0.5", diff --git a/packages/verified-fetch/src/utils/enhanced-dag-traversal.ts b/packages/verified-fetch/src/utils/enhanced-dag-traversal.ts new file mode 100644 index 0000000..d114d84 --- /dev/null +++ b/packages/verified-fetch/src/utils/enhanced-dag-traversal.ts @@ -0,0 +1,90 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +import { type ComponentLogger } from '@libp2p/interface' +import { type ReadableStorage, exporter, type ExporterOptions } from 'ipfs-unixfs-exporter' +import first from 'it-first' +import toBrowserReadableStream from 'it-to-browser-readablestream' +import { type CID } from 'multiformats/cid' +import { type ContentTypeParser } from '../types.js' +import { getStreamFromAsyncIterable } from './get-stream-from-async-iterable.js' +import { setContentType } from './set-content-type.js' + +export interface EnhancedDagTraversalOptions extends ExporterOptions { + blockstore: ReadableStorage + cidOrPath: string | CID + response: Response + logger: ComponentLogger + path: string + contentTypeParser?: ContentTypeParser +} + +export interface EnhancedDagTraversalResponse { + stream: ReadableStream + firstChunk: Uint8Array +} + +export async function enhancedDagTraversal ({ blockstore, signal, onProgress, cidOrPath, offset, length, path, logger, contentTypeParser, response }: EnhancedDagTraversalOptions): Promise { + const log = logger.forComponent('helia:verified-fetch:enhanced-dag-traversal') + let firstChunk: any + // try { + const singleBlockEntry = await exporter(cidOrPath, blockstore, { + signal, + onProgress, + blockReadConcurrency: 1 + }) + + const singleBlockIter = singleBlockEntry.content({ + signal, + onProgress, + offset, + length + }) + log.trace('got single concurrency iterator for %s', cidOrPath) + + firstChunk = await first(singleBlockIter) + await setContentType({ bytes: firstChunk, path, response, contentTypeParser, log }) + + const contentType = response.headers.get('content-type') + + // if video or image, return toBrowserReadableStream(asyncIter) + if (contentType?.startsWith('video/') === true || contentType?.startsWith('image/') === true) { + log('returning stream for image/video') + return { + // stream: toBrowserReadableStream(singleBlockIter), + stream: (await getStreamFromAsyncIterable(singleBlockIter, path, logger, { signal })).stream, + firstChunk + } + } + // } catch (err: any) { + // // signal?.throwIfAborted() + // log.error('Unknown error', err) + // throw err + // } + + // try { + log.trace('getting iterator for non-image/video content') + // otherwise, use blockReadConcurrency: undefined + const entry = await exporter(cidOrPath, blockstore, { + signal, + onProgress + }) + const iter = entry.content({ + signal, + onProgress, + offset, + length + }) + firstChunk ??= await first(iter) + + log('returning stream for non-image/video content') + return { + // stream: toBrowserReadableStream(iter), + stream: (await getStreamFromAsyncIterable(iter, path, logger, { signal })).stream, + firstChunk + } + // } catch (err: any) { + // // if aborted + // // signal?.throwIfAborted() + // log.error('Unknown error', err) + // throw err + // } +} diff --git a/packages/verified-fetch/src/utils/set-content-type.ts b/packages/verified-fetch/src/utils/set-content-type.ts index d3b9af4..ab8f7d8 100644 --- a/packages/verified-fetch/src/utils/set-content-type.ts +++ b/packages/verified-fetch/src/utils/set-content-type.ts @@ -7,7 +7,7 @@ export interface SetContentTypeOptions { path: string response: Response defaultContentType?: string - contentTypeParser: ContentTypeParser | undefined + contentTypeParser?: ContentTypeParser log: Logger } diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index a721f9a..e3c824b 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -19,11 +19,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { ByteRangeContext } from './utils/byte-range-context.js' import { dagCborToSafeJSON } from './utils/dag-cbor-to-safe-json.js' +import { enhancedDagTraversal } from './utils/enhanced-dag-traversal.js' import { getContentDispositionFilename } from './utils/get-content-disposition-filename.js' import { getETag } from './utils/get-e-tag.js' import { getPeerIdFromString } from './utils/get-peer-id-from-string.js' import { getResolvedAcceptHeader } from './utils/get-resolved-accept-header.js' -import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js' +// import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js' import { tarStream } from './utils/get-tar-stream.js' import { getRedirectResponse } from './utils/handle-redirects.js' import { parseResource } from './utils/parse-resource.js' @@ -375,35 +376,57 @@ export class VerifiedFetch { this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length) try { - const entry = await exporter(resolvedCID, this.helia.blockstore, { - signal: options?.signal, - onProgress: options?.onProgress - }) + // const entry = await exporter(resolvedCID, this.helia.blockstore, { + // signal: options?.signal, + // onProgress: options?.onProgress + // }) + + // const asyncIter = entry.content({ + // signal: options?.signal, + // onProgress: options?.onProgress, + // offset, + // length + // }) + this.log('got async iterator for %c/%s', cid, path) - const asyncIter = entry.content({ + // const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, { + // onProgress: options?.onProgress, + // signal: options?.signal + // }) + const tmpResponse = new Response() + const { stream } = await enhancedDagTraversal({ + blockstore: this.helia.blockstore, signal: options?.signal, onProgress: options?.onProgress, + cidOrPath: resolvedCID, offset, - length - }) - this.log('got async iterator for %c/%s', cid, path) - - const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, { - onProgress: options?.onProgress, - signal: options?.signal + length, + path, + response: tmpResponse, + logger: this.helia.logger, + contentTypeParser: this.contentTypeParser }) byteRangeContext.setBody(stream) // if not a valid range request, okRangeRequest will call okResponse const response = okRangeResponse(resource, byteRangeContext.getBody(), { byteRangeContext, log: this.log }, { redirected }) + const contentType = tmpResponse.headers.get('content-type') + if (contentType != null) { + response.headers.set('content-type', contentType) + } else { + this.log('FIXME: content-type should be set') + } - await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log }) + // await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log }) setIpfsRoots(response, ipfsRoots) return response } catch (err: any) { options?.signal?.throwIfAborted() + // if (options?.signal?.aborted === true) { + // throw new Error('aborted') + // } this.log.error('error streaming %c/%s', cid, path, err) if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') { return badRangeResponse(resource)