From 0c4ad153e02c374a8a4839170e55a6f133f8ed8a Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 15:35:34 -0400 Subject: [PATCH] feat: automatically do multi-part upload on 90+ MB files --- libs/client/src/storage.ts | 162 +++++++++++++++++++++++++++++++++++-- package-lock.json | 6 ++ package.json | 1 + 3 files changed, 164 insertions(+), 5 deletions(-) diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index ebb4186..46f1d56 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -1,7 +1,7 @@ +import { Sema } from "async-sema"; import { getRestApiUrl, RequiredConfig } from "./config"; import { dispatchRequest } from "./request"; import { isPlainObject } from "./utils"; - /** * File support for the client. This interface establishes the contract for * uploading files to the server and transforming the input to replace file @@ -53,17 +53,15 @@ function getExtensionFromContentType(contentType: string): string { /** * Initiate the upload of a file to the server. This returns the URL to upload * the file to and the URL of the file once it is uploaded. - * - * @param file the file to upload - * @returns the URL to upload the file to and the URL of the file once it is uploaded. */ async function initiateUpload( file: Blob, config: RequiredConfig, + contentType: string, ): Promise { - const contentType = file.type || "application/octet-stream"; const filename = file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + return await dispatchRequest({ method: "POST", // NOTE: We want to test V3 without making it the default at the API level @@ -76,6 +74,152 @@ async function initiateUpload( }); } +/** + * Initiate the multipart upload of a file to the server. This returns the URL to upload + * the file to and the URL of the file once it is uploaded. + */ +async function initiateMultipartUpload( + file: Blob, + config: RequiredConfig, + contentType: string, +): Promise { + const filename = + file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + + return await dispatchRequest({ + method: "POST", + targetUrl: `${getRestApiUrl()}/storage/upload/initiate-multipart?storage_type=fal-cdn-v3`, + input: { + content_type: contentType, + file_name: filename, + }, + config, + }); +} + +async function partUploadRetries( + uploadUrl: string, + chunk: Blob, + config: RequiredConfig, + cancelled: { current: boolean }, + tries: number, +) { + if (cancelled.current) { + throw new Error("Part upload failed, upload cancelled"); + } + + if (tries === 0) { + throw new Error("Part upload failed, retries exhausted"); + } + + const { fetch, responseHandler } = config; + + try { + const response = await fetch(uploadUrl, { + method: "PUT", + body: chunk, + }); + await responseHandler(response); + return response; + } catch (error) { + console.error("Part upload failed, retrying", uploadUrl, error); + return await partUploadRetries( + uploadUrl, + chunk, + config, + cancelled, + tries - 1, + ); + } +} + +async function partUpload( + uploadUrl: string, + chunk: Blob, + config: RequiredConfig, + parallelUploads: Sema, + cancelled: { current: boolean }, +) { + await parallelUploads.acquire(); + + try { + return await partUploadRetries(uploadUrl, chunk, config, cancelled, 3); + } finally { + parallelUploads.release(); + } +} + +async function multipartUpload( + file: Blob, + config: RequiredConfig, +): Promise { + const contentType = file.type || "application/octet-stream"; + const { upload_url: uploadUrl, file_url: url } = + await initiateMultipartUpload(file, config, contentType); + + // Break the file into 10MB chunks + const chunkSize = 10 * 1024 * 1024; + const chunks = Math.ceil(file.size / chunkSize); + + const parsedUrl = new URL(uploadUrl); + + // Max 5 parallel uploads + const limitedParallelUploads = new Sema(3); + + const partPromises: Promise[] = []; + + // To be able to cancel the upload if any of the parts fail + const cancelled = { current: false }; + for (let i = 0; i < chunks; i++) { + const start = i * chunkSize; + const end = Math.min(start + chunkSize, file.size); + + const chunk = file.slice(start, end); + + const partNumber = i + 1; + // {uploadUrl}/{part_number}?uploadUrlParams=... + const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; + + partPromises.push( + partUpload( + partUploadUrl, + chunk, + config, + limitedParallelUploads, + cancelled, + ), + ); + } + + let responses: Response[]; + try { + // Does this wait for all to finish even if an early one fails? + responses = await Promise.all(partPromises); + } catch (error) { + // If any of the parts fail, cancel other uploads + cancelled.current = true; + + console.error("Multipart upload failed, aborting upload", error); + throw error; + } + + // Complete the upload + const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`; + await dispatchRequest({ + method: "POST", + targetUrl: completeUrl, + input: { + parts: responses.map((response, index) => ({ + partNumber: index + 1, // Parts are 1-indexed + etag: response.headers.get("ETag"), + })), + }, + config, + }); + + return url; +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any type KeyValuePair = [string, any]; @@ -88,10 +232,18 @@ export function createStorageClient({ }: StorageClientDependencies): StorageClient { const ref: StorageClient = { upload: async (file: Blob) => { + // Check for 90+ MB file size to do multipart upload + if (file.size > 90 * 1024 * 1024) { + return await multipartUpload(file, config); + } + + const contentType = file.type || "application/octet-stream"; + const { fetch, responseHandler } = config; const { upload_url: uploadUrl, file_url: url } = await initiateUpload( file, config, + contentType, ); const response = await fetch(uploadUrl, { method: "PUT", diff --git a/package-lock.json b/package-lock.json index 73d41ae..94fcf26 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", + "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2", @@ -10422,6 +10423,11 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==" }, + "node_modules/async-sema": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/async-sema/-/async-sema-3.1.1.tgz", + "integrity": "sha512-tLRNUXati5MFePdAk8dw7Qt7DpxPB60ofAgn8WRhW6a2rcimZnYBP9oxHiv0OHy+Wz7kPMG+t4LGdt31+4EmGg==" + }, "node_modules/asynciterator.prototype": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/asynciterator.prototype/-/asynciterator.prototype-1.0.0.tgz", diff --git a/package.json b/package.json index 3d6231f..1689a2c 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", + "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2",