Skip to content

Commit

Permalink
feat: some optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki committed Dec 10, 2024
1 parent b1598a9 commit ffa2d62
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 92 deletions.
1 change: 1 addition & 0 deletions packages/verified-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
"ipns": "^10.0.0",
"it-first": "^3.0.6",
"it-map": "^3.1.1",
"it-peekable": "^3.0.5",
"it-pipe": "^3.0.1",
"it-tar": "^6.0.5",
"it-to-browser-readablestream": "^2.0.9",
Expand Down
72 changes: 33 additions & 39 deletions packages/verified-fetch/src/utils/enhanced-dag-traversal.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { type ComponentLogger } from '@libp2p/interface'
import { type ReadableStorage, exporter, type ExporterOptions } from 'ipfs-unixfs-exporter'
import first from 'it-first'
// import peekable from 'it-peekable'
import toBrowserReadableStream from 'it-to-browser-readablestream'
import { type CID } from 'multiformats/cid'
import { type ContentTypeParser } from '../types.js'
import { setContentType } from './set-content-type.js'
import { getContentType } from './get-content-type.js'

export interface EnhancedDagTraversalOptions extends ExporterOptions {
blockstore: ReadableStorage
cidOrPath: string | CID
response: Response
logger: ComponentLogger
path: string
contentTypeParser?: ContentTypeParser
Expand All @@ -29,12 +29,10 @@ export async function enhancedDagTraversal ({
length,
path,
logger,
contentTypeParser,
response
contentTypeParser
}: EnhancedDagTraversalOptions): Promise<EnhancedDagTraversalResponse> {
const log = logger.forComponent('helia:verified-fetch:enhanced-dag-traversal')

// Fetch the first chunk eagerly
const dfsEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress,
Expand All @@ -48,9 +46,10 @@ export async function enhancedDagTraversal ({
length
})

let firstChunk
let firstChunk: Uint8Array = new Uint8Array()
let error: Error
try {
// Fetch the first chunk eagerly
firstChunk = await first(dfsIter)
} catch (err: any) {
if (signal?.aborted === true) {
Expand All @@ -61,43 +60,38 @@ export async function enhancedDagTraversal ({
}
}

// Determine content type based on the first chunk
await setContentType({ bytes: firstChunk, path, response, contentTypeParser, log })

const contentType = response.headers.get('content-type')
const isImageOrVideo = contentType?.startsWith('video/') === true || contentType?.startsWith('image/') === true
log.trace('Content type determined: %s', contentType)

const enhancedIter = async function * (): AsyncGenerator<Uint8Array, void, undefined> {
if (error != null) {
throw error
}
if (isImageOrVideo) {
yield * dfsIter
return
}

// If not image/video, switch to a BFS iterator
const bfsEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress
})
return {
stream: toBrowserReadableStream({
[Symbol.asyncIterator]: async function * (): AsyncGenerator<Uint8Array, void, undefined> {
if (error != null) {
throw error
}

const bfsIter = bfsEntry.content({
signal,
onProgress,
offset,
length
})
// Determine content type based on the first chunk
const contentType = await getContentType({ bytes: firstChunk, contentTypeParser, path, log })

// continue with the BFS iterator
yield * bfsIter
}
const isImageOrVideo = contentType.startsWith('video/') || contentType.startsWith('image/')
log.trace('Content type determined: %s', contentType)

const stream = toBrowserReadableStream(enhancedIter())
const exporterEntry = isImageOrVideo
? dfsEntry
// If not image/video, switch to a BFS iterator
: await exporter(cidOrPath, blockstore, {
signal,
onProgress
})

return {
stream,
// continue with the BFS iterator
for await (const chunk of exporterEntry.content({
signal,
onProgress,
offset,
length
})) {
yield chunk
}
}
}),
firstChunk
}
}
36 changes: 36 additions & 0 deletions packages/verified-fetch/src/utils/get-content-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { type Logger } from '@libp2p/interface'
import { type ContentTypeParser } from '../types.js'
import { isPromise } from './type-guards.js'

export interface GetContentTypeOptions {
bytes: Uint8Array
path: string
defaultContentType?: string
contentTypeParser?: ContentTypeParser
log: Logger
}

export async function getContentType ({ bytes, contentTypeParser, path, log, defaultContentType = 'application/octet-stream' }: GetContentTypeOptions): Promise<string> {
let contentType: string | undefined

if (contentTypeParser != null) {
try {
let fileName = path.split('/').pop()?.trim()
fileName = fileName === '' ? undefined : fileName
const parsed = contentTypeParser(bytes, fileName)

if (isPromise(parsed)) {
const result = await parsed

if (result != null) {
contentType = result
}
} else if (parsed != null) {
contentType = parsed
}
} catch (err) {
log.error('error parsing content type', err)
}
}
return contentType ?? defaultContentType
}
24 changes: 2 additions & 22 deletions packages/verified-fetch/src/utils/set-content-type.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Logger } from '@libp2p/interface'
import { type ContentTypeParser } from '../types.js'
import { isPromise } from './type-guards.js'
import { getContentType } from './get-content-type.js'

export interface SetContentTypeOptions {
bytes: Uint8Array
Expand All @@ -12,27 +12,7 @@ export interface SetContentTypeOptions {
}

export async function setContentType ({ bytes, path, response, contentTypeParser, log, defaultContentType = 'application/octet-stream' }: SetContentTypeOptions): Promise<void> {
let contentType: string | undefined

if (contentTypeParser != null) {
try {
let fileName = path.split('/').pop()?.trim()
fileName = fileName === '' ? undefined : fileName
const parsed = contentTypeParser(bytes, fileName)

if (isPromise(parsed)) {
const result = await parsed

if (result != null) {
contentType = result
}
} else if (parsed != null) {
contentType = parsed
}
} catch (err) {
log.error('error parsing content type', err)
}
}
const contentType = await getContentType({ bytes, contentTypeParser, path, log, defaultContentType })
log.trace('setting content type to "%s"', contentType ?? defaultContentType)
response.headers.set('content-type', contentType ?? defaultContentType)
}
34 changes: 3 additions & 31 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import { getContentDispositionFilename } from './utils/get-content-disposition-f
import { getETag } from './utils/get-e-tag.js'
import { getPeerIdFromString } from './utils/get-peer-id-from-string.js'
import { getResolvedAcceptHeader } from './utils/get-resolved-accept-header.js'
// import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
import { tarStream } from './utils/get-tar-stream.js'
import { getRedirectResponse } from './utils/handle-redirects.js'
import { parseResource } from './utils/parse-resource.js'
Expand Down Expand Up @@ -376,33 +375,14 @@ export class VerifiedFetch {
this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length)

try {
// const entry = await exporter(resolvedCID, this.helia.blockstore, {
// signal: options?.signal,
// onProgress: options?.onProgress
// })

// const asyncIter = entry.content({
// signal: options?.signal,
// onProgress: options?.onProgress,
// offset,
// length
// })
this.log('got async iterator for %c/%s', cid, path)

// const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
// onProgress: options?.onProgress,
// signal: options?.signal
// })
const tmpResponse = new Response()
const { stream } = await enhancedDagTraversal({
const { firstChunk, stream } = await enhancedDagTraversal({
blockstore: this.helia.blockstore,
signal: options?.signal,
onProgress: options?.onProgress,
cidOrPath: resolvedCID,
offset,
length,
path,
response: tmpResponse,
logger: this.helia.logger,
contentTypeParser: this.contentTypeParser
})
Expand All @@ -411,22 +391,14 @@ export class VerifiedFetch {
const response = okRangeResponse(resource, byteRangeContext.getBody(), { byteRangeContext, log: this.log }, {
redirected
})
const contentType = tmpResponse.headers.get('content-type')
if (contentType != null) {
response.headers.set('content-type', contentType)
} else {
this.log('FIXME: content-type should be set')
}

// await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log })
await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log })

setIpfsRoots(response, ipfsRoots)

return response
} catch (err: any) {
options?.signal?.throwIfAborted()
// if (options?.signal?.aborted === true) {
// throw new Error('aborted')
// }
this.log.error('error streaming %c/%s', cid, path, err)
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
Expand Down

0 comments on commit ffa2d62

Please sign in to comment.