From db347905f3c2e01287f30ee1b5f1909c5a1113ff Mon Sep 17 00:00:00 2001 From: prakashsvmx Date: Fri, 10 May 2024 16:05:41 +0530 Subject: [PATCH] refactor compose object to ts --- README.md | 2 +- docs/API.md | 12 +- examples/compose-object-test-example.js | 136 ----------- examples/compose-object-test-example.mjs | 129 ++++++++++ .../{compose-object.js => compose-object.mjs} | 11 +- src/internal/client.ts | 210 ++++++++++++++++- src/internal/type.ts | 10 + src/internal/xml-parser.ts | 6 + src/minio.d.ts | 6 - src/minio.js | 221 +----------------- tests/unit/test.js | 28 +-- 11 files changed, 375 insertions(+), 396 deletions(-) delete mode 100644 examples/compose-object-test-example.js create mode 100644 examples/compose-object-test-example.mjs rename examples/{compose-object.js => compose-object.mjs} (88%) diff --git a/README.md b/README.md index c48a7195..69dffd41 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,7 @@ The complete API Reference is available here: - [remove-object-tagging.mjs](https://github.com/minio/minio-js/blob/master/examples/remove-object-tagging.js) - [set-object-legal-hold.mjs](https://github.com/minio/minio-js/blob/master/examples/set-object-legalhold.mjs) - [get-object-legal-hold.mjs](https://github.com/minio/minio-js/blob/master/examples/get-object-legal-hold.mjs) -- [compose-object.js](https://github.com/minio/minio-js/blob/master/examples/compose-object.js) +- [compose-object.mjs](https://github.com/minio/minio-js/blob/master/examples/compose-object.js) - [select-object-content.mjs](https://github.com/minio/minio-js/blob/master/examples/select-object-content.mjs) #### Presigned Operations diff --git a/docs/API.md b/docs/API.md index 21ca08ac..a1a69f1e 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1501,7 +1501,7 @@ const legalholdStatus = await minioClient.setObjectLegalHold('bucketName', 'obje -### composeObject(destObjConfig, sourceObjectList [, callback]) +### composeObject(destObjConfig, sourceObjectList) Compose an object from parts @@ -1511,7 +1511,6 @@ Compose an object from parts | ------------------ | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `destObjConfig` | _object_ | Destination Object configuration of the type [CopyDestinationOptions](https://github.com/minio/minio-js/blob/master/src/helpers.js) | | `sourceObjectList` | _object[]_ | Array of object(parts) source to compose into an object. Each part configuration should be of type [CopySourceOptions](https://github.com/minio/minio-js/blob/master/src/helpers.js) | -| `callback(err)` | _function_ | Callback function is called with non `null` value in case of error. If no callback is passed, a `Promise` is returned. | **Example 1** @@ -1545,14 +1544,7 @@ const destOption = new minio.CopyDestinationOptions({ }) //using Promise style. -const composePromise = minioClient.composeObject(destOption, sourceList) -composePromise - .then((result) => { - console.log('Success...') - }) - .catch((e) => { - console.log('error', e) - }) +await minioClient.composeObject(destOption, sourceList) ``` diff --git a/examples/compose-object-test-example.js b/examples/compose-object-test-example.js deleted file mode 100644 index 8beb0324..00000000 --- a/examples/compose-object-test-example.js +++ /dev/null @@ -1,136 +0,0 @@ -/* - * MinIO Javascript Library for Amazon S3 Compatible Cloud Storage, (C) 2021 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-bucketname and my-objectname -// are dummy values, please replace them with original values. -import fs from 'node:fs' -import os from 'node:os' - -import * as Minio from 'minio' -import splitFile from 'split-file' - -const s3Client = new Minio.Client({ - endPoint: 's3.amazonaws.com', - accessKey: 'YOUR-ACCESSKEYID', - secretKey: 'YOUR-SECRETACCESSKEY', -}) - -const oneMB = 1024 * 1024 - -// Create a bucket prior to running: mc mb local/source-bucket -function sampleRunComposeObject() { - const tmpDir = os.tmpdir() - - const bucketName = 'source-bucket' - // generate 100 MB buffer and write to a file. - const local100mbFileToBeSplitAndComposed = Buffer.alloc(100 * oneMB, 0) - - const composedObjName = '_100-mb-file-to-test-compose' - const tmpSubDir = `${tmpDir}/compose` - const fileToSplit = `${tmpSubDir}/${composedObjName}` - const partObjNameList = [] - - fs.mkdir(tmpSubDir, { recursive: true }, function (err) { - if (err) { - console.log(err) - } else { - console.log('New Temp directory successfully created.') - } - }) - - try { - fs.writeFileSync(fileToSplit, local100mbFileToBeSplitAndComposed) - console.log('Written 100 MB File ') - // 100 MB split into 26 MB part size. ( just to test unequal parts ). But change as required. - - splitFile - .splitFileBySize(fileToSplit, 26 * oneMB) - .then((names) => { - console.log('Split and write 100 MB File(s) ', names) - const putPartRequests = names.map((partFileName) => { - const partObjName = partFileName.slice((tmpSubDir + '/').length) - partObjNameList.push(partObjName) - return s3Client.fPutObject(bucketName, partObjName, partFileName, {}) - }) - - Promise.all(putPartRequests) - .then(() => { - console.log('Uploaded part Files: ', names) - const sourcePartObjList = partObjNameList.map((partObjName) => { - return new Minio.CopySourceOptions({ - Bucket: bucketName, - Object: partObjName, - }) - }) - - const destObjConfig = new Minio.CopyDestinationOptions({ - Bucket: bucketName, - Object: composedObjName, - }) - - s3Client - .composeObject(destObjConfig, sourcePartObjList) - .then(() => { - console.log('Composed to a single file: ', composedObjName) - - /** Begin Clean up ***/ - // To verify that the parts are uploaded properly, comment the below code blocks and verify - const sourcePartObjList = partObjNameList.map((partObjName) => { - return s3Client.removeObject(bucketName, partObjName) - }) - - Promise.all(sourcePartObjList) - .then(() => { - console.log('Removed source parts: ') - - // Uncomment to remove the composed object itself. commented for verification. - /* - s3Client.removeObject(bucketName, composedObjName).then(()=>{ - console.log("Clean up: Removed the composed Object ") - }).catch(()=>{ - console.log("Error removing composed object", er) - }) - */ - }) - .catch((er) => { - console.log('Error removing parts used in composing', er) - }) - - /** End Clean up **/ - - // Clean up generated parts locally - fs.rmSync(tmpSubDir, { recursive: true, force: true }) - console.log('Clean up temp parts directory : ') - }) - .catch((e) => { - console.log('Error Composing parts into an object', e) - }) - }) - .catch((e) => { - console.log('Error Uploading parts ', e) - }) - }) - .catch((e) => { - // this is a client error not related to compose object - console.log('Error Splitting files into parts ', e) - }) - } catch (err) { - // this is a client error not related to compose object - console.log('Error Creating local files ', err) - } -} - -sampleRunComposeObject() diff --git a/examples/compose-object-test-example.mjs b/examples/compose-object-test-example.mjs new file mode 100644 index 00000000..5263b00d --- /dev/null +++ b/examples/compose-object-test-example.mjs @@ -0,0 +1,129 @@ +/* + * MinIO Javascript Library for Amazon S3 Compatible Cloud Storage, (C) 2021 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-bucketname and my-objectname +// are dummy values, please replace them with original values. +import fs from 'node:fs' +import os from 'node:os' + +import * as Minio from 'minio' +import splitFile from 'split-file' + +const s3Client = new Minio.Client({ + endPoint: 'localhost', + accessKey: 'minio', + secretKey: 'minio123', + useSSL: false, + port: 22000, + //partSize: 5 * 1024 * 1024 +}) + +const oneMB = 1024 * 1024 + +// Create a bucket prior to running: mc mb local/source-bucket +const sampleRunComposeObject=async ()=> { + const tmpDir = os.tmpdir() + + const bucketName = 'source-bucket' + // generate 100 MB buffer and write to a file. + const local100mbFileToBeSplitAndComposed = Buffer.alloc(100 * oneMB, 0) + + const composedObjName = '_100-mb-file-to-test-compose' + const tmpSubDir = `${tmpDir}/compose` + const fileToSplit = `${tmpSubDir}/${composedObjName}` + const partObjNameList = [] + + fs.mkdir(tmpSubDir, { recursive: true }, function(err) { + if (err) { + console.log(err) + } else { + console.log('New Temp directory successfully created.') + } + }) + + try { + fs.writeFileSync(fileToSplit, local100mbFileToBeSplitAndComposed) + console.log('Written 100 MB File ') + // 100 MB split into 26 MB part size. ( just to test unequal parts ). But change as required. + + const names = await splitFile + .splitFileBySize(fileToSplit, 26 * oneMB) + + + console.log('Split and write 100 MB File(s) ', names) + const putPartRequests = names.map((partFileName) => { + const partObjName = partFileName.slice((tmpSubDir + '/').length) + partObjNameList.push(partObjName) + return s3Client.fPutObject(bucketName, partObjName, partFileName, {}) + }) + await Promise.all(putPartRequests) + + console.log('Uploaded part Files: ', names) + const sourcePartObjList = partObjNameList.map((partObjName) => { + return new Minio.CopySourceOptions({ + Bucket: bucketName, + Object: partObjName, + }) + }) + + const destObjConfig = new Minio.CopyDestinationOptions({ + Bucket: bucketName, + Object: composedObjName, + }) + + try { + await s3Client.composeObject(destObjConfig, sourcePartObjList) + console.log('Composed to a single file: ', composedObjName) + } catch (err) { + console.log('Composed to a single file: ', composedObjName) + + /** Begin Clean up ***/ + // To verify that the parts are uploaded properly, comment the below code blocks and verify + const sourcePartObjList = partObjNameList.map((partObjName) => { + return s3Client.removeObject(bucketName, partObjName) + }) + + Promise.all(sourcePartObjList) + .then(() => { + console.log('Removed source parts: ') + + // Uncomment to remove the composed object itself. commented for verification. + /* + s3Client.removeObject(bucketName, composedObjName).then(()=>{ + console.log("Clean up: Removed the composed Object ") + }).catch(()=>{ + console.log("Error removing composed object", er) + }) + */ + }) + .catch((er) => { + console.log('Error removing parts used in composing', er) + }) + + /** End Clean up **/ + + // Clean up generated parts locally + fs.rmSync(tmpSubDir, { recursive: true, force: true }) + console.log('Clean up temp parts directory : ') + } + + } catch (e) { + + console.log('Error Creating local files ', err) + } +} + +sampleRunComposeObject() diff --git a/examples/compose-object.js b/examples/compose-object.mjs similarity index 88% rename from examples/compose-object.js rename to examples/compose-object.mjs index b8b08d6f..c50c8cbb 100644 --- a/examples/compose-object.js +++ b/examples/compose-object.mjs @@ -51,7 +51,7 @@ const sourceList = [ const destOption = new Minio.CopyDestinationOptions({ Bucket: bucketName, - Object: '100MB.zip', + Object: 'object-name', /** Other possible options */ /* Encryption:{ type:Helpers.ENCRYPTION_TYPES.KMS, @@ -67,11 +67,4 @@ const destOption = new Minio.CopyDestinationOptions({ */ }) -const composePromise = s3Client.composeObject(destOption, sourceList) -composePromise - .then((result) => { - console.log('ComposeObject Success...', result) - }) - .catch((e) => { - console.log('composeObject Promise Error', e) - }) +await s3Client.composeObject(destOption, sourceList) diff --git a/src/internal/client.ts b/src/internal/client.ts index 3fc75664..810b37d2 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -29,6 +29,7 @@ import { fsp, streamPromise } from './async.ts' import { CopyConditions } from './copy-conditions.ts' import { Extensions } from './extensions.ts' import { + calculateEvenSplits, extractMetadata, getContentLength, getSourceVersionId, @@ -50,6 +51,8 @@ import { isValidPrefix, isVirtualHostStyle, makeDateLong, + PART_CONSTRAINTS, + partsRequired, prependXAMZMeta, readableStream, sanitizeETag, @@ -99,16 +102,18 @@ import type { Tags, Transport, UploadedObjectInfo, + UploadPartConfig, VersionIdentificator, } from './type.ts' import type { ListMultipartResult, UploadedPart } from './xml-parser.ts' -import * as xmlParsers from './xml-parser.ts' import { parseCompleteMultipart, parseInitiateMultipart, parseObjectLegalHoldConfig, parseSelectObjectContentResponse, + uploadPartParser, } from './xml-parser.ts' +import * as xmlParsers from './xml-parser.ts' const xml = new xml2js.Builder({ renderOpts: { pretty: false }, headless: true }) @@ -2553,4 +2558,207 @@ export class TypedClient { // @ts-ignore return await this.copyObjectV2(...allArgs) } + + async uploadPart(partConfig: { + bucketName: string + objectName: string + uploadID: string + partNumber: number + headers: RequestHeaders + }) { + const { bucketName, objectName, uploadID, partNumber, headers } = partConfig + + const method = 'PUT' + const query = `uploadId=${uploadID}&partNumber=${partNumber}` + const requestOptions = { method, bucketName, objectName: objectName, query, headers } + + const res = await this.makeRequestAsync(requestOptions) + const body = await readAsString(res) + const partRes = uploadPartParser(body) + + return { + etag: sanitizeETag(partRes.ETag), + key: objectName, + part: partNumber, + } + } + + async composeObject(destObjConfig: CopyDestinationOptions, sourceObjList: CopySourceOptions[]) { + const sourceFilesLength = sourceObjList.length + + if (!Array.isArray(sourceObjList)) { + throw new errors.InvalidArgumentError('sourceConfig should an array of CopySourceOptions ') + } + if (!(destObjConfig instanceof CopyDestinationOptions)) { + throw new errors.InvalidArgumentError('destConfig should of type CopyDestinationOptions ') + } + + if (sourceFilesLength < 1 || sourceFilesLength > PART_CONSTRAINTS.MAX_PARTS_COUNT) { + throw new errors.InvalidArgumentError( + `"There must be as least one and up to ${PART_CONSTRAINTS.MAX_PARTS_COUNT} source objects.`, + ) + } + + for (let i = 0; i < sourceFilesLength; i++) { + const sObj = sourceObjList[i] as CopySourceOptions + if (!sObj.validate()) { + return false + } + } + + if (!(destObjConfig as CopyDestinationOptions).validate()) { + return false + } + + const getStatOptions = (srcConfig: CopySourceOptions) => { + let statOpts = {} + if (!_.isEmpty(srcConfig.VersionID)) { + statOpts = { + versionId: srcConfig.VersionID, + } + } + return statOpts + } + const srcObjectSizes: number[] = [] + let totalSize = 0 + let totalParts = 0 + + const sourceObjStats = sourceObjList.map((srcItem) => + this.statObject(srcItem.Bucket, srcItem.Object, getStatOptions(srcItem)), + ) + + return Promise.all(sourceObjStats).then(async (srcObjectInfos) => { + const validatedStats = srcObjectInfos.map((resItemStat, index) => { + const srcConfig: CopySourceOptions | undefined = sourceObjList[index] + + let srcCopySize = resItemStat.size + // Check if a segment is specified, and if so, is the + // segment within object bounds? + if (srcConfig && srcConfig.MatchRange) { + // Since range is specified, + // 0 <= src.srcStart <= src.srcEnd + // so only invalid case to check is: + const srcStart = srcConfig.Start + const srcEnd = srcConfig.End + if (srcEnd >= srcCopySize || srcStart < 0) { + throw new errors.InvalidArgumentError( + `CopySrcOptions ${index} has invalid segment-to-copy [${srcStart}, ${srcEnd}] (size is ${srcCopySize})`, + ) + } + srcCopySize = srcEnd - srcStart + 1 + } + + // Only the last source may be less than `absMinPartSize` + if (srcCopySize < PART_CONSTRAINTS.ABS_MIN_PART_SIZE && index < sourceFilesLength - 1) { + throw new errors.InvalidArgumentError( + `CopySrcOptions ${index} is too small (${srcCopySize}) and it is not the last part.`, + ) + } + + // Is data to copy too large? + totalSize += srcCopySize + if (totalSize > PART_CONSTRAINTS.MAX_MULTIPART_PUT_OBJECT_SIZE) { + throw new errors.InvalidArgumentError(`Cannot compose an object of size ${totalSize} (> 5TiB)`) + } + + // record source size + srcObjectSizes[index] = srcCopySize + + // calculate parts needed for current source + totalParts += partsRequired(srcCopySize) + // Do we need more parts than we are allowed? + if (totalParts > PART_CONSTRAINTS.MAX_PARTS_COUNT) { + throw new errors.InvalidArgumentError( + `Your proposed compose object requires more than ${PART_CONSTRAINTS.MAX_PARTS_COUNT} parts`, + ) + } + + return resItemStat + }) + + if ((totalParts === 1 && totalSize <= PART_CONSTRAINTS.MAX_PART_SIZE) || totalSize === 0) { + return await this.copyObject(sourceObjList[0] as CopySourceOptions, destObjConfig) // use copyObjectV2 + } + + // preserve etag to avoid modification of object while copying. + for (let i = 0; i < sourceFilesLength; i++) { + ;(sourceObjList[i] as CopySourceOptions).MatchETag = (validatedStats[i] as BucketItemStat).etag + } + + const splitPartSizeList = validatedStats.map((resItemStat, idx) => { + return calculateEvenSplits(srcObjectSizes[idx] as number, sourceObjList[idx] as CopySourceOptions) + }) + + const getUploadPartConfigList = (uploadId: string) => { + const uploadPartConfigList: UploadPartConfig[] = [] + + splitPartSizeList.forEach((splitSize, splitIndex: number) => { + if (splitSize) { + const { startIndex: startIdx, endIndex: endIdx, objInfo: objConfig } = splitSize + + const partIndex = splitIndex + 1 // part index starts from 1. + const totalUploads = Array.from(startIdx) + + const headers = (sourceObjList[splitIndex] as CopySourceOptions).getHeaders() + + totalUploads.forEach((splitStart, upldCtrIdx) => { + const splitEnd = endIdx[upldCtrIdx] + + const sourceObj = `${objConfig.Bucket}/${objConfig.Object}` + headers['x-amz-copy-source'] = `${sourceObj}` + headers['x-amz-copy-source-range'] = `bytes=${splitStart}-${splitEnd}` + + const uploadPartConfig = { + bucketName: destObjConfig.Bucket, + objectName: destObjConfig.Object, + uploadID: uploadId, + partNumber: partIndex, + headers: headers, + sourceObj: sourceObj, + } + + uploadPartConfigList.push(uploadPartConfig) + }) + } + }) + + return uploadPartConfigList + } + + const uploadAllParts = async (uploadList: UploadPartConfig[], uploadId: string) => { + try { + const partUploads = uploadList.map(async (item) => { + return this.uploadPart(item) + }) + // Process results here if needed + return await Promise.all(partUploads) + } catch (error) { + // Handle error + await this.abortMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, uploadId) + } + return [] + } + + const performUploadParts = async (uploadId: string) => { + const uploadList = getUploadPartConfigList(uploadId) + const partsRes = await uploadAllParts(uploadList, uploadId) + return partsRes.map((partCopy) => ({ etag: partCopy.etag, part: partCopy.part })) + } + + const newUploadHeaders = destObjConfig.getHeaders() + + const uploadId = await this.initiateNewMultipartUpload( + destObjConfig.Bucket, + destObjConfig.Object, + newUploadHeaders, + ) + + try { + const partsDone = await performUploadParts(uploadId) + await this.completeMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, uploadId, partsDone) + } catch (err) { + await this.abortMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, uploadId) + } + }) + } } diff --git a/src/internal/type.ts b/src/internal/type.ts index 3cca16e3..ba14c0e7 100644 --- a/src/internal/type.ts +++ b/src/internal/type.ts @@ -1,5 +1,6 @@ import type * as http from 'node:http' import type { Readable as ReadableStream } from 'node:stream' +import { CopySourceOptions } from '../helpers.ts' export type VersionIdentificator = { versionId?: string @@ -408,3 +409,12 @@ export type CopyObjectResultV2 = { } export type CopyObjectResult = CopyObjectResultV1 | CopyObjectResultV2 + +export type UploadPartConfig = { + bucketName: string + objectName: string + uploadID: string + partNumber: number + headers: RequestHeaders + sourceObj: string +} diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index c8b685f8..45627577 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -590,3 +590,9 @@ export function parseCopyObject(xml: string): CopyObjectResultV1 { return result } + +export function uploadPartParser(xml: string) { + const xmlObj = parseXml(xml) + const respEl = xmlObj.CopyPartResult + return respEl +} diff --git a/src/minio.d.ts b/src/minio.d.ts index ec19af99..8b4d8e88 100644 --- a/src/minio.d.ts +++ b/src/minio.d.ts @@ -152,12 +152,6 @@ export class Client extends TypedClient { removeIncompleteUpload(bucketName: string, objectName: string, callback: NoResultCallback): void removeIncompleteUpload(bucketName: string, objectName: string): Promise - composeObject( - destObjConfig: CopyDestinationOptions, - sourceObjList: CopySourceOptions[], - callback: ResultCallback, - ): void - composeObject(destObjConfig: CopyDestinationOptions, sourceObjList: CopySourceOptions[]): Promise // Presigned operations presignedUrl(httpMethod: string, bucketName: string, objectName: string, callback: ResultCallback): void diff --git a/src/minio.js b/src/minio.js index 133e6630..b30b3828 100644 --- a/src/minio.js +++ b/src/minio.js @@ -743,225 +743,6 @@ export class Client extends TypedClient { return listener } - - /** - * Internal method to upload a part during compose object. - * @param partConfig __object__ contains the following. - * bucketName __string__ - * objectName __string__ - * uploadID __string__ - * partNumber __number__ - * headers __object__ - * @param cb called with null incase of error. - */ - uploadPartCopy(partConfig, cb) { - const { bucketName, objectName, uploadID, partNumber, headers } = partConfig - - const method = 'PUT' - let query = `uploadId=${uploadID}&partNumber=${partNumber}` - const requestOptions = { method, bucketName, objectName: objectName, query, headers } - return this.makeRequest(requestOptions, '', [200], '', true, (e, response) => { - let partCopyResult = Buffer.from('') - if (e) { - return cb(e) - } - pipesetup(response, transformers.uploadPartTransformer()) - .on('data', (data) => { - partCopyResult = data - }) - .on('error', cb) - .on('end', () => { - let uploadPartCopyRes = { - etag: sanitizeETag(partCopyResult.ETag), - key: objectName, - part: partNumber, - } - - cb(null, uploadPartCopyRes) - }) - }) - } - - composeObject(destObjConfig = {}, sourceObjList = [], cb) { - const me = this // many async flows. so store the ref. - const sourceFilesLength = sourceObjList.length - - if (!Array.isArray(sourceObjList)) { - throw new errors.InvalidArgumentError('sourceConfig should an array of CopySourceOptions ') - } - if (!(destObjConfig instanceof CopyDestinationOptions)) { - throw new errors.InvalidArgumentError('destConfig should of type CopyDestinationOptions ') - } - - if (sourceFilesLength < 1 || sourceFilesLength > PART_CONSTRAINTS.MAX_PARTS_COUNT) { - throw new errors.InvalidArgumentError( - `"There must be as least one and up to ${PART_CONSTRAINTS.MAX_PARTS_COUNT} source objects.`, - ) - } - - if (!isFunction(cb)) { - throw new TypeError('callback should be of type "function"') - } - - for (let i = 0; i < sourceFilesLength; i++) { - if (!sourceObjList[i].validate()) { - return false - } - } - - if (!destObjConfig.validate()) { - return false - } - - const getStatOptions = (srcConfig) => { - let statOpts = {} - if (!_.isEmpty(srcConfig.VersionID)) { - statOpts = { - versionId: srcConfig.VersionID, - } - } - return statOpts - } - const srcObjectSizes = [] - let totalSize = 0 - let totalParts = 0 - - const sourceObjStats = sourceObjList.map((srcItem) => - me.statObject(srcItem.Bucket, srcItem.Object, getStatOptions(srcItem)), - ) - - return Promise.all(sourceObjStats) - .then((srcObjectInfos) => { - const validatedStats = srcObjectInfos.map((resItemStat, index) => { - const srcConfig = sourceObjList[index] - - let srcCopySize = resItemStat.size - // Check if a segment is specified, and if so, is the - // segment within object bounds? - if (srcConfig.MatchRange) { - // Since range is specified, - // 0 <= src.srcStart <= src.srcEnd - // so only invalid case to check is: - const srcStart = srcConfig.Start - const srcEnd = srcConfig.End - if (srcEnd >= srcCopySize || srcStart < 0) { - throw new errors.InvalidArgumentError( - `CopySrcOptions ${index} has invalid segment-to-copy [${srcStart}, ${srcEnd}] (size is ${srcCopySize})`, - ) - } - srcCopySize = srcEnd - srcStart + 1 - } - - // Only the last source may be less than `absMinPartSize` - if (srcCopySize < PART_CONSTRAINTS.ABS_MIN_PART_SIZE && index < sourceFilesLength - 1) { - throw new errors.InvalidArgumentError( - `CopySrcOptions ${index} is too small (${srcCopySize}) and it is not the last part.`, - ) - } - - // Is data to copy too large? - totalSize += srcCopySize - if (totalSize > PART_CONSTRAINTS.MAX_MULTIPART_PUT_OBJECT_SIZE) { - throw new errors.InvalidArgumentError(`Cannot compose an object of size ${totalSize} (> 5TiB)`) - } - - // record source size - srcObjectSizes[index] = srcCopySize - - // calculate parts needed for current source - totalParts += partsRequired(srcCopySize) - // Do we need more parts than we are allowed? - if (totalParts > PART_CONSTRAINTS.MAX_PARTS_COUNT) { - throw new errors.InvalidArgumentError( - `Your proposed compose object requires more than ${PART_CONSTRAINTS.MAX_PARTS_COUNT} parts`, - ) - } - - return resItemStat - }) - - if ((totalParts === 1 && totalSize <= PART_CONSTRAINTS.MAX_PART_SIZE) || totalSize === 0) { - return this.copyObject(sourceObjList[0], destObjConfig, cb) // use copyObjectV2 - } - - // preserve etag to avoid modification of object while copying. - for (let i = 0; i < sourceFilesLength; i++) { - sourceObjList[i].MatchETag = validatedStats[i].etag - } - - const splitPartSizeList = validatedStats.map((resItemStat, idx) => { - const calSize = calculateEvenSplits(srcObjectSizes[idx], sourceObjList[idx]) - return calSize - }) - - function getUploadPartConfigList(uploadId) { - const uploadPartConfigList = [] - - splitPartSizeList.forEach((splitSize, splitIndex) => { - const { startIndex: startIdx, endIndex: endIdx, objInfo: objConfig } = splitSize - - let partIndex = splitIndex + 1 // part index starts from 1. - const totalUploads = Array.from(startIdx) - - const headers = sourceObjList[splitIndex].getHeaders() - - totalUploads.forEach((splitStart, upldCtrIdx) => { - let splitEnd = endIdx[upldCtrIdx] - - const sourceObj = `${objConfig.Bucket}/${objConfig.Object}` - headers['x-amz-copy-source'] = `${sourceObj}` - headers['x-amz-copy-source-range'] = `bytes=${splitStart}-${splitEnd}` - - const uploadPartConfig = { - bucketName: destObjConfig.Bucket, - objectName: destObjConfig.Object, - uploadID: uploadId, - partNumber: partIndex, - headers: headers, - sourceObj: sourceObj, - } - - uploadPartConfigList.push(uploadPartConfig) - }) - }) - - return uploadPartConfigList - } - - const performUploadParts = (uploadId) => { - const uploadList = getUploadPartConfigList(uploadId) - - async.map(uploadList, me.uploadPartCopy.bind(me), (err, res) => { - if (err) { - this.abortMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, uploadId).then( - () => cb(), - (err) => cb(err), - ) - return - } - const partsDone = res.map((partCopy) => ({ etag: partCopy.etag, part: partCopy.part })) - return me.completeMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, uploadId, partsDone).then( - (result) => cb(null, result), - (err) => cb(err), - ) - }) - } - - const newUploadHeaders = destObjConfig.getHeaders() - - me.initiateNewMultipartUpload(destObjConfig.Bucket, destObjConfig.Object, newUploadHeaders).then( - (uploadId) => { - performUploadParts(uploadId) - }, - (err) => { - cb(err, null) - }, - ) - }) - .catch((error) => { - cb(error, null) - }) - } } // Promisify various public-facing APIs on the Client module. @@ -975,7 +756,6 @@ Client.prototype.getBucketNotification = promisify(Client.prototype.getBucketNot Client.prototype.setBucketNotification = promisify(Client.prototype.setBucketNotification) Client.prototype.removeAllBucketNotification = promisify(Client.prototype.removeAllBucketNotification) Client.prototype.removeIncompleteUpload = promisify(Client.prototype.removeIncompleteUpload) -Client.prototype.composeObject = promisify(Client.prototype.composeObject) // refactored API use promise internally Client.prototype.makeBucket = callbackify(Client.prototype.makeBucket) @@ -1018,3 +798,4 @@ Client.prototype.getBucketEncryption = callbackify(Client.prototype.getBucketEnc Client.prototype.removeBucketEncryption = callbackify(Client.prototype.removeBucketEncryption) Client.prototype.getObjectRetention = callbackify(Client.prototype.getObjectRetention) Client.prototype.copyObject = callbackify(Client.prototype.copyObject) +Client.prototype.composeObject = callbackify(Client.prototype.composeObject) diff --git a/tests/unit/test.js b/tests/unit/test.js index 98b346d2..582a7075 100644 --- a/tests/unit/test.js +++ b/tests/unit/test.js @@ -1611,30 +1611,32 @@ describe('Client', function () { describe('Compose Object APIs', () => { describe('composeObject(destObjConfig, sourceObjectList,cb)', () => { - it('should fail on null destination config', (done) => { + it('should fail on null destination config', async () => { try { - client.composeObject(null, function () {}) - } catch (e) { - done() + await client.composeObject(null) + } catch (err) { + return } + throw new Error('callback should receive error') }) - - it('should fail on no array source config', (done) => { + it('should fail on no array source config', async () => { try { const destOptions = new CopyDestinationOptions({ Bucket: 'test-bucket', Object: 'test-object' }) - client.composeObject(destOptions, 'non-array', function () {}) - } catch (e) { - done() + await client.composeObject(destOptions, 'non-array') + } catch (err) { + return } + throw new Error('callback should receive error') }) - it('should fail on null source config', (done) => { + it('should fail on null source config', async () => { try { const destOptions = new CopyDestinationOptions({ Bucket: 'test-bucket', Object: 'test-object' }) - client.composeObject(destOptions, null, function () {}) - } catch (e) { - done() + await client.composeObject(destOptions, null) + } catch (err) { + return } + throw new Error('callback should receive error') }) }) })