Skip to content

Commit

Permalink
feat: improve enhanced-dag-traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki committed Dec 9, 2024
1 parent a744232 commit b1598a9
Showing 1 changed file with 57 additions and 44 deletions.
101 changes: 57 additions & 44 deletions packages/verified-fetch/src/utils/enhanced-dag-traversal.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { type ComponentLogger } from '@libp2p/interface'
import { type ReadableStorage, exporter, type ExporterOptions } from 'ipfs-unixfs-exporter'
import first from 'it-first'
import toBrowserReadableStream from 'it-to-browser-readablestream'
import { type CID } from 'multiformats/cid'
import { type ContentTypeParser } from '../types.js'
import { getStreamFromAsyncIterable } from './get-stream-from-async-iterable.js'
import { setContentType } from './set-content-type.js'

export interface EnhancedDagTraversalOptions extends ExporterOptions {
Expand All @@ -22,69 +20,84 @@ export interface EnhancedDagTraversalResponse {
firstChunk: Uint8Array
}

export async function enhancedDagTraversal ({ blockstore, signal, onProgress, cidOrPath, offset, length, path, logger, contentTypeParser, response }: EnhancedDagTraversalOptions): Promise<EnhancedDagTraversalResponse> {
export async function enhancedDagTraversal ({
blockstore,
signal,
onProgress,
cidOrPath,
offset,
length,
path,
logger,
contentTypeParser,
response
}: EnhancedDagTraversalOptions): Promise<EnhancedDagTraversalResponse> {
const log = logger.forComponent('helia:verified-fetch:enhanced-dag-traversal')
let firstChunk: any
// try {
const singleBlockEntry = await exporter(cidOrPath, blockstore, {

// Fetch the first chunk eagerly
const dfsEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress,
blockReadConcurrency: 1
})

const singleBlockIter = singleBlockEntry.content({
const dfsIter = dfsEntry.content({
signal,
onProgress,
offset,
length
})
log.trace('got single concurrency iterator for %s', cidOrPath)

firstChunk = await first(singleBlockIter)
let firstChunk
let error: Error
try {
firstChunk = await first(dfsIter)
} catch (err: any) {
if (signal?.aborted === true) {
error = err
log.trace('Request aborted while fetching first chunk')

Check warning on line 58 in packages/verified-fetch/src/utils/enhanced-dag-traversal.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/enhanced-dag-traversal.ts#L57-L58

Added lines #L57 - L58 were not covered by tests
} else {
throw err
}
}

// Determine content type based on the first chunk
await setContentType({ bytes: firstChunk, path, response, contentTypeParser, log })

const contentType = response.headers.get('content-type')
const isImageOrVideo = contentType?.startsWith('video/') === true || contentType?.startsWith('image/') === true
log.trace('Content type determined: %s', contentType)

// if video or image, return toBrowserReadableStream(asyncIter)
if (contentType?.startsWith('video/') === true || contentType?.startsWith('image/') === true) {
log('returning stream for image/video')
return {
// stream: toBrowserReadableStream(singleBlockIter),
stream: (await getStreamFromAsyncIterable(singleBlockIter, path, logger, { signal })).stream,
firstChunk
const enhancedIter = async function * (): AsyncGenerator<Uint8Array, void, undefined> {
if (error != null) {
throw error
}

Check warning on line 74 in packages/verified-fetch/src/utils/enhanced-dag-traversal.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/enhanced-dag-traversal.ts#L73-L74

Added lines #L73 - L74 were not covered by tests
if (isImageOrVideo) {
yield * dfsIter
return
}

Check warning on line 78 in packages/verified-fetch/src/utils/enhanced-dag-traversal.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/enhanced-dag-traversal.ts#L76-L78

Added lines #L76 - L78 were not covered by tests

// If not image/video, switch to a BFS iterator
const bfsEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress
})

const bfsIter = bfsEntry.content({
signal,
onProgress,
offset,
length
})

// continue with the BFS iterator
yield * bfsIter
}
// } catch (err: any) {
// // signal?.throwIfAborted()
// log.error('Unknown error', err)
// throw err
// }

// try {
log.trace('getting iterator for non-image/video content')
// otherwise, use blockReadConcurrency: undefined
const entry = await exporter(cidOrPath, blockstore, {
signal,
onProgress
})
const iter = entry.content({
signal,
onProgress,
offset,
length
})
firstChunk ??= await first(iter)
const stream = toBrowserReadableStream(enhancedIter())

log('returning stream for non-image/video content')
return {
// stream: toBrowserReadableStream(iter),
stream: (await getStreamFromAsyncIterable(iter, path, logger, { signal })).stream,
stream,
firstChunk
}
// } catch (err: any) {
// // if aborted
// // signal?.throwIfAborted()
// log.error('Unknown error', err)
// throw err
// }
}

0 comments on commit b1598a9

Please sign in to comment.