Skip to content

Commit

Permalink
feat: enhanced dag traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki committed Dec 9, 2024
1 parent dc27af6 commit a744232
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 15 deletions.
1 change: 1 addition & 0 deletions packages/verified-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
"interface-datastore": "^8.3.1",
"ipfs-unixfs-exporter": "^13.6.1",
"ipns": "^10.0.0",
"it-first": "^3.0.6",
"it-map": "^3.1.1",
"it-pipe": "^3.0.1",
"it-tar": "^6.0.5",
Expand Down
90 changes: 90 additions & 0 deletions packages/verified-fetch/src/utils/enhanced-dag-traversal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { type ComponentLogger } from '@libp2p/interface'
import { type ReadableStorage, exporter, type ExporterOptions } from 'ipfs-unixfs-exporter'
import first from 'it-first'
import toBrowserReadableStream from 'it-to-browser-readablestream'
import { type CID } from 'multiformats/cid'
import { type ContentTypeParser } from '../types.js'
import { getStreamFromAsyncIterable } from './get-stream-from-async-iterable.js'
import { setContentType } from './set-content-type.js'

export interface EnhancedDagTraversalOptions extends ExporterOptions {
blockstore: ReadableStorage
cidOrPath: string | CID
response: Response
logger: ComponentLogger
path: string
contentTypeParser?: ContentTypeParser
}

export interface EnhancedDagTraversalResponse {
stream: ReadableStream<Uint8Array>
firstChunk: Uint8Array
}

export async function enhancedDagTraversal ({ blockstore, signal, onProgress, cidOrPath, offset, length, path, logger, contentTypeParser, response }: EnhancedDagTraversalOptions): Promise<EnhancedDagTraversalResponse> {
const log = logger.forComponent('helia:verified-fetch:enhanced-dag-traversal')
let firstChunk: any
// try {
const singleBlockEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress,
blockReadConcurrency: 1
})

const singleBlockIter = singleBlockEntry.content({
signal,
onProgress,
offset,
length
})
log.trace('got single concurrency iterator for %s', cidOrPath)

firstChunk = await first(singleBlockIter)
await setContentType({ bytes: firstChunk, path, response, contentTypeParser, log })

const contentType = response.headers.get('content-type')

// if video or image, return toBrowserReadableStream(asyncIter)
if (contentType?.startsWith('video/') === true || contentType?.startsWith('image/') === true) {
log('returning stream for image/video')
return {
// stream: toBrowserReadableStream(singleBlockIter),
stream: (await getStreamFromAsyncIterable(singleBlockIter, path, logger, { signal })).stream,
firstChunk
}
}
// } catch (err: any) {
// // signal?.throwIfAborted()
// log.error('Unknown error', err)
// throw err
// }

// try {
log.trace('getting iterator for non-image/video content')
// otherwise, use blockReadConcurrency: undefined
const entry = await exporter(cidOrPath, blockstore, {
signal,
onProgress
})
const iter = entry.content({
signal,
onProgress,
offset,
length
})
firstChunk ??= await first(iter)

log('returning stream for non-image/video content')
return {
// stream: toBrowserReadableStream(iter),
stream: (await getStreamFromAsyncIterable(iter, path, logger, { signal })).stream,
firstChunk
}
// } catch (err: any) {
// // if aborted
// // signal?.throwIfAborted()
// log.error('Unknown error', err)
// throw err
// }
}
2 changes: 1 addition & 1 deletion packages/verified-fetch/src/utils/set-content-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface SetContentTypeOptions {
path: string
response: Response
defaultContentType?: string
contentTypeParser: ContentTypeParser | undefined
contentTypeParser?: ContentTypeParser
log: Logger
}

Expand Down
51 changes: 37 additions & 14 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { ByteRangeContext } from './utils/byte-range-context.js'
import { dagCborToSafeJSON } from './utils/dag-cbor-to-safe-json.js'
import { enhancedDagTraversal } from './utils/enhanced-dag-traversal.js'
import { getContentDispositionFilename } from './utils/get-content-disposition-filename.js'
import { getETag } from './utils/get-e-tag.js'
import { getPeerIdFromString } from './utils/get-peer-id-from-string.js'
import { getResolvedAcceptHeader } from './utils/get-resolved-accept-header.js'
import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
// import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
import { tarStream } from './utils/get-tar-stream.js'
import { getRedirectResponse } from './utils/handle-redirects.js'
import { parseResource } from './utils/parse-resource.js'
Expand Down Expand Up @@ -375,35 +376,57 @@ export class VerifiedFetch {
this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length)

try {
const entry = await exporter(resolvedCID, this.helia.blockstore, {
signal: options?.signal,
onProgress: options?.onProgress
})
// const entry = await exporter(resolvedCID, this.helia.blockstore, {
// signal: options?.signal,
// onProgress: options?.onProgress
// })

// const asyncIter = entry.content({
// signal: options?.signal,
// onProgress: options?.onProgress,
// offset,
// length
// })
this.log('got async iterator for %c/%s', cid, path)

const asyncIter = entry.content({
// const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
// onProgress: options?.onProgress,
// signal: options?.signal
// })
const tmpResponse = new Response()
const { stream } = await enhancedDagTraversal({
blockstore: this.helia.blockstore,
signal: options?.signal,
onProgress: options?.onProgress,
cidOrPath: resolvedCID,
offset,
length
})
this.log('got async iterator for %c/%s', cid, path)

const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress,
signal: options?.signal
length,
path,
response: tmpResponse,
logger: this.helia.logger,
contentTypeParser: this.contentTypeParser
})
byteRangeContext.setBody(stream)
// if not a valid range request, okRangeRequest will call okResponse
const response = okRangeResponse(resource, byteRangeContext.getBody(), { byteRangeContext, log: this.log }, {
redirected
})
const contentType = tmpResponse.headers.get('content-type')
if (contentType != null) {
response.headers.set('content-type', contentType)
} else {
this.log('FIXME: content-type should be set')
}

await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log })
// await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log })
setIpfsRoots(response, ipfsRoots)

return response
} catch (err: any) {
options?.signal?.throwIfAborted()
// if (options?.signal?.aborted === true) {
// throw new Error('aborted')
// }
this.log.error('error streaming %c/%s', cid, path, err)
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
Expand Down

0 comments on commit a744232

Please sign in to comment.