diff --git a/storage/form_upload.go b/storage/form_upload.go index 2bc4bd3c..9174b79c 100644 --- a/storage/form_upload.go +++ b/storage/form_upload.go @@ -198,8 +198,8 @@ func (p *FormUploader) newObjectParams(upToken string, key string, hasKey bool, } objectParams.Metadata, objectParams.CustomVars = splitParams(extra.Params) if extra.OnProgress != nil { - objectParams.OnUploadingProgress = func(uploaded, fileSize uint64) { - extra.OnProgress(int64(fileSize), int64(uploaded)) + objectParams.OnUploadingProgress = func(progress *uploader.UploadingProgress) { + extra.OnProgress(int64(progress.TotalSize), int64(progress.Uploaded)) } } return &objectParams diff --git a/storagev2/downloader/download_manager.go b/storagev2/downloader/download_manager.go index b51a6c9e..d1df1693 100644 --- a/storagev2/downloader/download_manager.go +++ b/storagev2/downloader/download_manager.go @@ -74,10 +74,10 @@ type ( BeforeObjectDownload func(objectName string, objectOptions *ObjectOptions) // 下载进度 - OnDownloadingProgress func(objectName string, downloaded, totalSize uint64) + OnDownloadingProgress func(objectName string, progress *DownloadingProgress) // 对象下载成功后回调 - OnObjectDownloaded func(objectName string, size uint64) + OnObjectDownloaded func(objectName string, info *DownloadedObjectInfo) // 是否下载指定对象 ShouldDownloadObject func(objectName string) bool @@ -86,6 +86,11 @@ type ( PathSeparator string } + // 已经下载的对象信息 + DownloadedObjectInfo struct { + Size uint64 // 对象大小 + } + writeSeekCloser struct { w io.Writer } @@ -202,8 +207,8 @@ func (downloadManager *DownloadManager) DownloadDirectory(ctx context.Context, t g.Go(func() error { var destinationDownloadOptions DestinationDownloadOptions if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil { - destinationDownloadOptions.OnDownloadingProgress = func(downloaded, totalSize uint64) { - onDownloadingProgress(objectName, downloaded, totalSize) + destinationDownloadOptions.OnDownloadingProgress = func(progress *DownloadingProgress) { + onDownloadingProgress(objectName, progress) } } objectOptions := ObjectOptions{ @@ -222,7 +227,7 @@ func (downloadManager *DownloadManager) DownloadDirectory(ctx context.Context, t } n, err := downloadManager.DownloadToFile(ctx, objectName, fullPath, &objectOptions) if err == nil && options.OnObjectDownloaded != nil { - options.OnObjectDownloaded(objectName, n) + options.OnObjectDownloaded(objectName, &DownloadedObjectInfo{Size: n}) } return err }) diff --git a/storagev2/downloader/downloaders.go b/storagev2/downloader/downloaders.go index 848675c3..54fdeb3f 100644 --- a/storagev2/downloader/downloaders.go +++ b/storagev2/downloader/downloaders.go @@ -133,7 +133,7 @@ func (downloader concurrentDownloader) Download(ctx context.Context, urlsIter UR var progress func(uint64) if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil { progress = func(downloaded uint64) { - onDownloadingProgress(downloaded, 0) + onDownloadingProgress(&DownloadingProgress{Downloaded: downloaded}) } } return downloadToPartReader(ctx, urlsIter, etag, options.Header, downloader.client, dest, progress) @@ -198,13 +198,13 @@ func (downloader concurrentDownloader) Download(ctx context.Context, urlsIter UR n, err := downloader.downloadToPart(ctx, urlsIterClone, etag, offset, options.Header, p, writeableMedium, &downloadingProgressMutex, func(downloaded uint64) { downloadingProgress.setPartDownloadingProgress(p.Offset(), downloaded) if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil { - onDownloadingProgress(downloadingProgress.totalDownloaded(), needToDownload) + onDownloadingProgress(&DownloadingProgress{Downloaded: downloadingProgress.totalDownloaded(), TotalSize: needToDownload}) } }) if n > 0 { downloadingProgress.partDownloaded(p.Offset(), n) if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil { - onDownloadingProgress(downloadingProgress.totalDownloaded(), needToDownload) + onDownloadingProgress(&DownloadingProgress{Downloaded: downloadingProgress.totalDownloaded(), TotalSize: needToDownload}) } } return err diff --git a/storagev2/downloader/downloaders_test.go b/storagev2/downloader/downloaders_test.go index 0fa8f78c..7ea2cbda 100644 --- a/storagev2/downloader/downloaders_test.go +++ b/storagev2/downloader/downloaders_test.go @@ -103,12 +103,12 @@ func TestConcurrentDownloaderWithSinglePart(t *testing.T) { downloader.NewURLsIter([]*url.URL{url1, url2, url3}), destination.NewWriteCloserDestination(&buf, ""), &downloader.DestinationDownloadOptions{ - OnDownloadingProgress: func(downloaded, totalSize uint64) { - if downloaded < lastDownloaded { + OnDownloadingProgress: func(progress *downloader.DownloadingProgress) { + if progress.Downloaded < lastDownloaded { t.Fatalf("unexpected downloaded progress") } - lastDownloaded = downloaded - if totalSize != 1024*1024 { + lastDownloaded = progress.Downloaded + if progress.TotalSize != 1024*1024 { t.Fatalf("unexpected downloaded progress") } }, @@ -200,12 +200,12 @@ func TestConcurrentDownloaderWithCompression(t *testing.T) { context.Background(), downloader.NewURLsIter([]*url.URL{url1}), destination.NewWriteCloserDestination(&buf, ""), &downloader.DestinationDownloadOptions{ - OnDownloadingProgress: func(downloaded, totalSize uint64) { - if downloaded < lastDownloaded { + OnDownloadingProgress: func(progress *downloader.DownloadingProgress) { + if progress.Downloaded < lastDownloaded { t.Fatalf("unexpected downloaded progress") } - lastDownloaded = downloaded - if totalSize != 0 { + lastDownloaded = progress.Downloaded + if progress.TotalSize != 0 { t.Fatalf("unexpected downloaded progress") } }, @@ -285,12 +285,12 @@ func TestConcurrentDownloaderWithMultipleParts(t *testing.T) { downloader.NewURLsIter([]*url.URL{url1}), dest, &downloader.DestinationDownloadOptions{ - OnDownloadingProgress: func(downloaded, totalSize uint64) { - if downloaded < atomic.LoadUint64(&lastDownloaded) { + OnDownloadingProgress: func(progress *downloader.DownloadingProgress) { + if progress.Downloaded < atomic.LoadUint64(&lastDownloaded) { t.Fatalf("unexpected downloaded progress") } - atomic.StoreUint64(&lastDownloaded, downloaded) - if totalSize != SIZE { + atomic.StoreUint64(&lastDownloaded, progress.Downloaded) + if progress.TotalSize != SIZE { t.Fatalf("unexpected downloaded progress") } }, @@ -383,12 +383,12 @@ func TestConcurrentDownloaderWithMultiplePartsAndRange(t *testing.T) { dest, &downloader.DestinationDownloadOptions{ Header: http.Header{"Range": []string{fmt.Sprintf("bytes=%d-", SIZE-REQUEST_SIZE)}}, - OnDownloadingProgress: func(downloaded, totalSize uint64) { - if downloaded < atomic.LoadUint64(&lastDownloaded) { + OnDownloadingProgress: func(progress *downloader.DownloadingProgress) { + if progress.Downloaded < atomic.LoadUint64(&lastDownloaded) { t.Fatalf("unexpected downloaded progress") } - atomic.StoreUint64(&lastDownloaded, downloaded) - if totalSize != REQUEST_SIZE { + atomic.StoreUint64(&lastDownloaded, progress.Downloaded) + if progress.TotalSize != REQUEST_SIZE { t.Fatalf("unexpected downloaded progress") } }, @@ -533,12 +533,12 @@ func TestConcurrentDownloaderWithResumableRecorder(t *testing.T) { downloader.NewURLsIter([]*url.URL{url1}), dest, &downloader.DestinationDownloadOptions{ - OnDownloadingProgress: func(downloaded, totalSize uint64) { - if downloaded < lastDownloaded { + OnDownloadingProgress: func(progress *downloader.DownloadingProgress) { + if progress.Downloaded < lastDownloaded { t.Fatalf("unexpected downloaded progress") } - lastDownloaded = downloaded - if totalSize != 10*1024*1024 { + lastDownloaded = progress.Downloaded + if progress.TotalSize != 10*1024*1024 { t.Fatalf("unexpected downloaded progress") } }, diff --git a/storagev2/downloader/interfaces.go b/storagev2/downloader/interfaces.go index 78e66696..abf7a4f9 100644 --- a/storagev2/downloader/interfaces.go +++ b/storagev2/downloader/interfaces.go @@ -32,12 +32,18 @@ type ( Sign(context.Context, *url.URL, *SignOptions) error } + // 下载进度 + DownloadingProgress struct { + Downloaded uint64 // 已经下载的数据量,单位为字节 + TotalSize uint64 // 总数据量,单位为字节 + } + // 目标下载选项 DestinationDownloadOptions struct { // 对象下载附加 HTTP Header Header http.Header // 对象下载进度 - OnDownloadingProgress func(downloaded, totalSize uint64) + OnDownloadingProgress func(*DownloadingProgress) // 对象 Header 获取回调 OnResponseHeader func(http.Header) } diff --git a/storagev2/objects/batch.go b/storagev2/objects/batch.go index e6791760..450963e8 100644 --- a/storagev2/objects/batch.go +++ b/storagev2/objects/batch.go @@ -398,6 +398,7 @@ func (wm *workersManager) asyncWorker(ctx internal_context.Context, id uint) { for getCtxError(wm.parentCtx) == nil { if operations := wm.requestsManager.takeOperations(); len(operations) > 0 { if operations, err := wm.doOperations(ctx, operations); err != nil { + // 确定错误是否可以重试 wm.requestsManager.putBackOperations(operations) if isTimeoutError(err) { // 超时,说明 batchSize 过大 wm.requestsManager.handleTimeoutError() diff --git a/storagev2/objects/lister.go b/storagev2/objects/lister.go index b1448a53..f8dbda25 100644 --- a/storagev2/objects/lister.go +++ b/storagev2/objects/lister.go @@ -21,6 +21,9 @@ type ( // 获取错误信息 Error() error + + // 获取位置标记 + Marker() string } listerV1 struct { @@ -63,6 +66,10 @@ func (v1 *listerV1) Next(object *ObjectDetails) bool { return true } +func (v1 *listerV1) Marker() string { + return v1.marker +} + func (v1 *listerV1) callListApi() error { if v1.marker == "" && !v1.firstCall { return nil diff --git a/storagev2/uploader/form_uploader_test.go b/storagev2/uploader/form_uploader_test.go index 28ef925d..3e5533af 100644 --- a/storagev2/uploader/form_uploader_test.go +++ b/storagev2/uploader/form_uploader_test.go @@ -110,12 +110,12 @@ func TestFormUploader(t *testing.T) { ContentType: "application/json", Metadata: map[string]string{"a": "b", "c": "d"}, CustomVars: map[string]string{"a": "b", "c": "d"}, - OnUploadingProgress: func(uploaded, fileSize uint64) { - if fileSize != 1024*1024 { + OnUploadingProgress: func(progress *UploadingProgress) { + if progress.TotalSize != 1024*1024 { t.Fatalf("unexpected file size") - } else if uploaded > fileSize { + } else if progress.Uploaded > progress.TotalSize { t.Fatalf("unexpected uploaded") - } else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize { + } else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize { t.Fatalf("unexpected uploaded") } }, diff --git a/storagev2/uploader/interfaces.go b/storagev2/uploader/interfaces.go index a86a74c3..1bcce80e 100644 --- a/storagev2/uploader/interfaces.go +++ b/storagev2/uploader/interfaces.go @@ -46,8 +46,14 @@ type ( // 已经上传的分片 UploadedPart interface { + // 分片编号 + PartNumber() uint64 + // 分片偏移量 Offset() uint64 + + // 分片大小 + PartSize() uint64 } // 分片上传调度器 diff --git a/storagev2/uploader/multi_parts_uploader_test.go b/storagev2/uploader/multi_parts_uploader_test.go index 75f86cf6..5fddc90f 100644 --- a/storagev2/uploader/multi_parts_uploader_test.go +++ b/storagev2/uploader/multi_parts_uploader_test.go @@ -220,12 +220,12 @@ func TestMultiPartsUploader(t *testing.T) { ContentType: "application/json", Metadata: map[string]string{"a": "b", "c": "d"}, CustomVars: map[string]string{"a": "b", "c": "d"}, - OnUploadingProgress: func(uploaded uint64, fileSize uint64) { - if fileSize != 5*1024*1024 { + OnUploadingProgress: func(progress *UploadingProgress) { + if progress.TotalSize != 5*1024*1024 { t.Fatalf("unexpected file size") - } else if uploaded > fileSize { + } else if progress.Uploaded > progress.TotalSize { t.Fatalf("unexpected uploaded") - } else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize { + } else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize { t.Fatalf("unexpected uploaded") } }, @@ -414,12 +414,12 @@ func TestMultiPartsUploaderResuming(t *testing.T) { ContentType: "application/json", Metadata: map[string]string{"a": "b", "c": "d"}, CustomVars: map[string]string{"a": "b", "c": "d"}, - OnUploadingProgress: func(uploaded uint64, fileSize uint64) { - if fileSize != 5*1024*1024 { + OnUploadingProgress: func(progress *UploadingProgress) { + if progress.TotalSize != 5*1024*1024 { t.Fatalf("unexpected file size") - } else if uploaded > fileSize { + } else if progress.Uploaded > progress.TotalSize { t.Fatalf("unexpected uploaded") - } else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize { + } else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize { t.Fatalf("unexpected uploaded") } }, @@ -644,11 +644,11 @@ func TestMultiPartsUploaderRetry(t *testing.T) { ContentType: "application/json", Metadata: map[string]string{"a": "b", "c": "d"}, CustomVars: map[string]string{"a": "b", "c": "d"}, - OnUploadingProgress: func(uploaded uint64, fileSize uint64) { - if fileSize != 5*1024*1024 { + OnUploadingProgress: func(progress *UploadingProgress) { + if progress.TotalSize != 5*1024*1024 { t.Fatalf("unexpected file size") } - atomic.StoreUint64(&lastUploaded, uploaded) + atomic.StoreUint64(&lastUploaded, progress.Uploaded) }, }, &returnValue); err != nil { t.Fatal(err) diff --git a/storagev2/uploader/multi_parts_uploader_v1.go b/storagev2/uploader/multi_parts_uploader_v1.go index 73317014..f0756d55 100644 --- a/storagev2/uploader/multi_parts_uploader_v1.go +++ b/storagev2/uploader/multi_parts_uploader_v1.go @@ -28,9 +28,9 @@ type ( } multiPartsUploaderV1UploadedPart struct { - ctx string - crc32 uint32 - offset, size uint64 + ctx string + crc32 uint32 + partNumber, offset, size uint64 } resumedMultiPartsUploaderV1Record struct { @@ -103,13 +103,14 @@ func (uploader *multiPartsUploaderV1) UploadPart(ctx context.Context, initialize if record, ok := initializedParts.records[part.PartNumber()]; ok { if record.offset == part.Offset() && record.size == part.Size() { if options != nil && options.OnUploadingProgress != nil { - options.OnUploadingProgress(record.size, record.size) + options.OnUploadingProgress(&UploadingPartProgress{Uploaded: record.size, PartSize: record.size}) } return multiPartsUploaderV1UploadedPart{ - ctx: record.ctx, - crc32: record.crc32, - offset: record.offset, - size: record.size, + ctx: record.ctx, + crc32: record.crc32, + offset: record.offset, + size: record.size, + partNumber: part.PartNumber(), }, nil } } @@ -122,7 +123,9 @@ func (uploader *multiPartsUploaderV1) uploadPart(ctx context.Context, initialize OverwrittenRegion: initialized.multiPartsObjectOptions.RegionsProvider, } if options != nil && options.OnUploadingProgress != nil { - apisOptions.OnRequestProgress = options.OnUploadingProgress + apisOptions.OnRequestProgress = func(uploaded, totalSize uint64) { + options.OnUploadingProgress(&UploadingPartProgress{Uploaded: uploaded, PartSize: totalSize}) + } } upToken, err := getUpToken(uploader.options.Credentials, &initialized.multiPartsObjectOptions.ObjectOptions, uploader.options.UpTokenProvider) if err != nil { @@ -161,10 +164,11 @@ func (uploader *multiPartsUploaderV1) uploadPart(ctx context.Context, initialize } return multiPartsUploaderV1UploadedPart{ - ctx: response.Ctx, - crc32: uint32(response.Crc32), - offset: part.Offset(), - size: part.Size(), + ctx: response.Ctx, + crc32: uint32(response.Crc32), + offset: part.Offset(), + size: part.Size(), + partNumber: part.PartNumber(), }, nil } @@ -229,7 +233,11 @@ func (uploadedPart multiPartsUploaderV1UploadedPart) Offset() uint64 { return uploadedPart.offset } -func (uploadedPart multiPartsUploaderV1UploadedPart) Size() uint64 { +func (uploadedPart multiPartsUploaderV1UploadedPart) PartNumber() uint64 { + return uploadedPart.partNumber +} + +func (uploadedPart multiPartsUploaderV1UploadedPart) PartSize() uint64 { return uploadedPart.size } diff --git a/storagev2/uploader/multi_parts_uploader_v1_test.go b/storagev2/uploader/multi_parts_uploader_v1_test.go index 904baff1..c3a6c105 100644 --- a/storagev2/uploader/multi_parts_uploader_v1_test.go +++ b/storagev2/uploader/multi_parts_uploader_v1_test.go @@ -232,14 +232,14 @@ func TestMultiPartsUploaderV1(t *testing.T) { } lastUploaded := uint64(0) uploadedPart_1, err := multiPartsUploaderV1.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 4*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 4*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded < lastUploaded || uploaded > partSize { + if progress.Uploaded < lastUploaded || progress.Uploaded > progress.PartSize { t.Fatalf("unexpected uploaded") } - lastUploaded = uploaded + lastUploaded = progress.Uploaded }, }) if err != nil { @@ -252,14 +252,14 @@ func TestMultiPartsUploaderV1(t *testing.T) { } lastUploaded = 0 uploadedPart_2, err := multiPartsUploaderV1.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 1*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 1*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded < lastUploaded || uploaded > partSize { + if progress.Uploaded < lastUploaded || progress.Uploaded > progress.PartSize { t.Fatalf("unexpected uploaded") } - lastUploaded = uploaded + lastUploaded = progress.Uploaded }, }) if err != nil { @@ -465,11 +465,11 @@ func TestMultiPartsUploaderV1Resuming(t *testing.T) { t.Fatal(err) } uploadedPart_1, err := multiPartsUploaderV1.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 4*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 4*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded != 4*1024*1024 { + if progress.Uploaded != 4*1024*1024 { t.Fatalf("unexpected uploaded") } }, @@ -483,11 +483,11 @@ func TestMultiPartsUploaderV1Resuming(t *testing.T) { t.Fatal(err) } uploadedPart_2, err := multiPartsUploaderV1.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded != 1024*1024 { + if progress.Uploaded != 1024*1024 { t.Fatalf("unexpected uploaded") } }, diff --git a/storagev2/uploader/multi_parts_uploader_v2.go b/storagev2/uploader/multi_parts_uploader_v2.go index bbd6dd71..240597a1 100644 --- a/storagev2/uploader/multi_parts_uploader_v2.go +++ b/storagev2/uploader/multi_parts_uploader_v2.go @@ -171,7 +171,7 @@ func (uploader *multiPartsUploaderV2) UploadPart(ctx context.Context, initialize if record, ok := initializedParts.records[part.PartNumber()]; ok { if record.offset == part.Offset() && record.size == part.Size() { if options != nil && options.OnUploadingProgress != nil { - options.OnUploadingProgress(record.size, record.size) + options.OnUploadingProgress(&UploadingPartProgress{Uploaded: record.size, PartSize: record.size}) } return multiPartsUploaderV2UploadedPart{ partNumber: record.partNumber, @@ -191,7 +191,9 @@ func (uploader *multiPartsUploaderV2) uploadPart(ctx context.Context, initialize OverwrittenRegion: initialized.multiPartsObjectOptions.RegionsProvider, } if options != nil && options.OnUploadingProgress != nil { - apisOptions.OnRequestProgress = options.OnUploadingProgress + apisOptions.OnRequestProgress = func(uploaded, partSize uint64) { + options.OnUploadingProgress(&UploadingPartProgress{Uploaded: uploaded, PartSize: partSize}) + } } upToken, err := getUpToken(uploader.options.Credentials, &initialized.multiPartsObjectOptions.ObjectOptions, uploader.options.UpTokenProvider) if err != nil { @@ -313,7 +315,7 @@ func (uploadedPart multiPartsUploaderV2UploadedPart) PartNumber() uint64 { return uploadedPart.partNumber } -func (uploadedPart multiPartsUploaderV2UploadedPart) Size() uint64 { +func (uploadedPart multiPartsUploaderV2UploadedPart) PartSize() uint64 { return uploadedPart.size } diff --git a/storagev2/uploader/multi_parts_uploader_v2_test.go b/storagev2/uploader/multi_parts_uploader_v2_test.go index c0c9385a..b156c68c 100644 --- a/storagev2/uploader/multi_parts_uploader_v2_test.go +++ b/storagev2/uploader/multi_parts_uploader_v2_test.go @@ -227,14 +227,14 @@ func TestMultiPartsUploaderV2(t *testing.T) { } lastUploaded := uint64(0) uploadedPart_1, err := multiPartsUploaderV2.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 4*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 4*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded < lastUploaded || uploaded > partSize { + if progress.Uploaded < lastUploaded || progress.Uploaded > progress.PartSize { t.Fatalf("unexpected uploaded") } - lastUploaded = uploaded + lastUploaded = progress.Uploaded }, }) if err != nil { @@ -247,14 +247,14 @@ func TestMultiPartsUploaderV2(t *testing.T) { } lastUploaded = 0 uploadedPart_2, err := multiPartsUploaderV2.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 1*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 1*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded < lastUploaded || uploaded > partSize { + if progress.Uploaded < lastUploaded || progress.Uploaded > progress.PartSize { t.Fatalf("unexpected uploaded") } - lastUploaded = uploaded + lastUploaded = progress.Uploaded }, }) if err != nil { @@ -428,11 +428,11 @@ func TestMultiPartsUploaderV2Resuming(t *testing.T) { t.Fatal(err) } uploadedPart_1, err := multiPartsUploaderV2.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 4*1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 4*1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded != 4*1024*1024 { + if progress.Uploaded != 4*1024*1024 { t.Fatalf("unexpected uploaded") } }, @@ -446,11 +446,11 @@ func TestMultiPartsUploaderV2Resuming(t *testing.T) { t.Fatal(err) } uploadedPart_2, err := multiPartsUploaderV2.UploadPart(context.Background(), initializedPart, part, &uploader.UploadPartOptions{ - OnUploadingProgress: func(uploaded, partSize uint64) { - if partSize != 1024*1024 { + OnUploadingProgress: func(progress *uploader.UploadingPartProgress) { + if progress.PartSize != 1024*1024 { t.Fatalf("unexpected partSize") } - if uploaded != 1024*1024 { + if progress.Uploaded != 1024*1024 { t.Fatalf("unexpected uploaded") } }, diff --git a/storagev2/uploader/params.go b/storagev2/uploader/params.go index 1ba8ffb4..6b9248ec 100644 --- a/storagev2/uploader/params.go +++ b/storagev2/uploader/params.go @@ -33,7 +33,7 @@ type ( CustomVars map[string]string // 对象上传进度 - OnUploadingProgress func(uploaded, totalSize uint64) + OnUploadingProgress func(*UploadingProgress) } // 分片上传对象上传选项 @@ -45,18 +45,30 @@ type ( PartSize uint64 } + // 分片上传进度 + UploadingPartProgress struct { + Uploaded uint64 // 已经上传的数据量,单位为字节 + PartSize uint64 // 分片大小,单位为字节 + } + + // 对象上传进度 + UploadingProgress struct { + Uploaded uint64 // 已经上传的数据量,单位为字节 + TotalSize uint64 // 总数据量,单位为字节 + } + // 上传分片列表选项 UploadPartsOptions struct { // 分片上传进度 - OnUploadingProgress func(partNumber, uploaded, partSize uint64) + OnUploadingProgress func(partNumber uint64, progress *UploadingPartProgress) // 分片上传成功后回调函数 - OnPartUploaded func(partNumber, partSize uint64) + OnPartUploaded func(UploadedPart) error } // 上传分片选项 UploadPartOptions struct { // 分片上传进度 - OnUploadingProgress func(uploaded, partSize uint64) + OnUploadingProgress func(*UploadingPartProgress) } DirectoryOptions struct { @@ -76,10 +88,10 @@ type ( BeforeObjectUpload func(filePath string, objectOptions *ObjectOptions) // 上传进度 - OnUploadingProgress func(filePath string, uploaded, totalSize uint64) + OnUploadingProgress func(filePath string, progress *UploadingProgress) // 对象上传成功后回调 - OnObjectUploaded func(filePath string, size uint64) + OnObjectUploaded func(filePath string, info *UploadedObjectInfo) // 是否在空间内创建目录 ShouldCreateDirectory bool @@ -93,4 +105,9 @@ type ( // 分隔符,默认为 / PathSeparator string } + + // 已经上传的对象信息 + UploadedObjectInfo struct { + Size uint64 // 对象大小 + } ) diff --git a/storagev2/uploader/schedulers.go b/storagev2/uploader/schedulers.go index 3dd95a13..5930463a 100644 --- a/storagev2/uploader/schedulers.go +++ b/storagev2/uploader/schedulers.go @@ -82,8 +82,8 @@ func (scheduler serialMultiPartsUploaderScheduler) UploadParts(ctx context.Conte } var uploadPartParam UploadPartOptions if options != nil && options.OnUploadingProgress != nil { - uploadPartParam.OnUploadingProgress = func(uploaded, partSize uint64) { - options.OnUploadingProgress(part.PartNumber(), uploaded, part.Size()) + uploadPartParam.OnUploadingProgress = func(progress *UploadingPartProgress) { + options.OnUploadingProgress(part.PartNumber(), &UploadingPartProgress{Uploaded: progress.Uploaded, PartSize: part.Size()}) } } uploadedPart, err := scheduler.uploader.UploadPart(ctx, initialized, part, &uploadPartParam) @@ -91,7 +91,9 @@ func (scheduler serialMultiPartsUploaderScheduler) UploadParts(ctx context.Conte return nil, err } if options != nil && options.OnPartUploaded != nil { - options.OnPartUploaded(part.PartNumber(), part.Size()) + if err = options.OnPartUploaded(uploadedPart); err != nil { + return nil, err + } } parts = append(parts, uploadedPart) } @@ -133,10 +135,10 @@ func (scheduler concurrentMultiPartsUploaderScheduler) UploadParts(ctx context.C g.Go(func() error { var uploadPartParam UploadPartOptions if options != nil && options.OnUploadingProgress != nil { - uploadPartParam.OnUploadingProgress = func(uploaded, partSize uint64) { + uploadPartParam.OnUploadingProgress = func(progress *UploadingPartProgress) { onUploadingProgressMutex.Lock() defer onUploadingProgressMutex.Unlock() - options.OnUploadingProgress(part.PartNumber(), uploaded, partSize) + options.OnUploadingProgress(part.PartNumber(), progress) } } uploadedPart, err := scheduler.uploader.UploadPart(ctx, initialized, part, &uploadPartParam) @@ -144,7 +146,9 @@ func (scheduler concurrentMultiPartsUploaderScheduler) UploadParts(ctx context.C return err } if options != nil && options.OnPartUploaded != nil { - options.OnPartUploaded(part.PartNumber(), part.Size()) + if err = options.OnPartUploaded(uploadedPart); err != nil { + return err + } } partsLock.Lock() diff --git a/storagev2/uploader/schedulers_test.go b/storagev2/uploader/schedulers_test.go index 6ddc4c3d..4596c28d 100644 --- a/storagev2/uploader/schedulers_test.go +++ b/storagev2/uploader/schedulers_test.go @@ -234,22 +234,23 @@ func TestMultiPartsUploaderScheduler(t *testing.T) { var lastUploaded [2]uint64 var uploadedPartSizes [2]uint64 uploadedParts, err := scheduler.UploadParts(context.Background(), initializedPart, src, &UploadPartsOptions{ - OnUploadingProgress: func(partNumber uint64, uploaded uint64, partSize uint64) { - if partNumber == 1 && partSize != 4*1024*1024 { + OnUploadingProgress: func(partNumber uint64, progress *UploadingPartProgress) { + if partNumber == 1 && progress.PartSize != 4*1024*1024 { t.Fatalf("unexpected partSize") - } else if partNumber == 2 && partSize != 1024*1024 { + } else if partNumber == 2 && progress.PartSize != 1024*1024 { t.Fatalf("unexpected partSize") - } else if uploaded < lastUploaded[partNumber-1] || uploaded > partSize { + } else if progress.Uploaded < lastUploaded[partNumber-1] || progress.Uploaded > progress.PartSize { t.Fatalf("unexpected uploaded") } - lastUploaded[partNumber-1] = uploaded + lastUploaded[partNumber-1] = progress.Uploaded }, - OnPartUploaded: func(partNumber uint64, partSize uint64) { - if uploadedPartSizes[partNumber-1] > 0 { + OnPartUploaded: func(part UploadedPart) error { + if uploadedPartSizes[part.PartNumber()-1] > 0 { t.Fatalf("unexpected OnPartUploaded call") } else { - uploadedPartSizes[partNumber-1] = partSize + uploadedPartSizes[part.PartNumber()-1] = part.PartSize() } + return nil }, }) if err != nil { diff --git a/storagev2/uploader/upload_manager.go b/storagev2/uploader/upload_manager.go index 8dfc48ac..0ec29e1a 100644 --- a/storagev2/uploader/upload_manager.go +++ b/storagev2/uploader/upload_manager.go @@ -151,13 +151,13 @@ func (uploadManager *UploadManager) UploadDirectory(ctx context.Context, directo directoryOptions.BeforeObjectUpload(path, &objectOptions) } if directoryOptions.OnUploadingProgress != nil { - objectOptions.OnUploadingProgress = func(uploaded, totalSize uint64) { - directoryOptions.OnUploadingProgress(path, uploaded, totalSize) + objectOptions.OnUploadingProgress = func(progress *UploadingProgress) { + directoryOptions.OnUploadingProgress(path, progress) } } err = uploadManager.UploadFile(ctx, path, &objectOptions, nil) if err == nil && directoryOptions.OnObjectUploaded != nil { - directoryOptions.OnObjectUploaded(path, uint64(info.Size())) + directoryOptions.OnObjectUploaded(path, &UploadedObjectInfo{Size: uint64(info.Size())}) } } else if directoryOptions.ShouldCreateDirectory && info.IsDir() { if !strings.HasSuffix(objectName, pathSeparator) { diff --git a/storagev2/uploader/upload_manager_test.go b/storagev2/uploader/upload_manager_test.go index 2c5ff5b1..06434b7e 100644 --- a/storagev2/uploader/upload_manager_test.go +++ b/storagev2/uploader/upload_manager_test.go @@ -589,22 +589,22 @@ func testUploadManagerUploadDirectory(t *testing.T, createDirectory bool) { t.Fatalf("unexpected filePath") } }, - OnUploadingProgress: func(filePath string, uploaded uint64, totalSize uint64) { - if totalSize != 1024*1024 { + OnUploadingProgress: func(filePath string, progress *uploader.UploadingProgress) { + if progress.TotalSize != 1024*1024 { t.Fatalf("unexpected totalSize") } if lastUploadedValue, ok := localFiles.Load(filePath); !ok { t.Fatalf("unexpected filePath") } else if lastUploaded, ok := lastUploadedValue.(uint64); !ok { t.Fatalf("unexpected filePath") - } else if uploaded < lastUploaded { + } else if progress.Uploaded < lastUploaded { t.Fatalf("unexpected uploaded") } else { - localFiles.Store(filePath, uploaded) + localFiles.Store(filePath, progress.Uploaded) } }, - OnObjectUploaded: func(filePath string, size uint64) { - if size != 1024*1024 { + OnObjectUploaded: func(filePath string, info *uploader.UploadedObjectInfo) { + if info.Size != 1024*1024 { t.Fatalf("unexpected size") } if _, ok := localFiles.Load(filePath); !ok { diff --git a/storagev2/uploader/uploaders.go b/storagev2/uploader/uploaders.go index e7510be8..5566f8e4 100644 --- a/storagev2/uploader/uploaders.go +++ b/storagev2/uploader/uploaders.go @@ -77,8 +77,14 @@ func (uploader formUploader) UploadFile(ctx context.Context, path string, object if err != nil { return err } + var onRequestProgress func(uploaded, totalSize uint64) + if onUploadingProgress := objectOptions.OnUploadingProgress; onUploadingProgress != nil { + onRequestProgress = func(uploaded, totalSize uint64) { + onUploadingProgress(&UploadingProgress{Uploaded: uploaded, TotalSize: totalSize}) + } + } return uploader.upload(ctx, file, fileSize, upToken, objectOptions.BucketName, objectOptions.ObjectName, objectOptions.FileName, objectOptions.ContentType, - crc32, mergeCustomVarsAndMetadata(objectOptions.Metadata, objectOptions.CustomVars), objectOptions.OnUploadingProgress, returnValue) + crc32, mergeCustomVarsAndMetadata(objectOptions.Metadata, objectOptions.CustomVars), onRequestProgress, returnValue) } func (uploader formUploader) UploadReader(ctx context.Context, reader io.Reader, objectOptions *ObjectOptions, returnValue interface{}) error { @@ -110,8 +116,14 @@ func (uploader formUploader) UploadReader(ctx context.Context, reader io.Reader, if err != nil { return err } + var onRequestProgress func(uploaded, totalSize uint64) + if onUploadingProgress := objectOptions.OnUploadingProgress; onUploadingProgress != nil { + onRequestProgress = func(uploaded, totalSize uint64) { + onUploadingProgress(&UploadingProgress{Uploaded: uploaded, TotalSize: totalSize}) + } + } return uploader.upload(ctx, rsc, size, upToken, objectOptions.BucketName, objectOptions.ObjectName, objectOptions.FileName, objectOptions.ContentType, - crc32, mergeCustomVarsAndMetadata(objectOptions.Metadata, objectOptions.CustomVars), objectOptions.OnUploadingProgress, returnValue) + crc32, mergeCustomVarsAndMetadata(objectOptions.Metadata, objectOptions.CustomVars), onRequestProgress, returnValue) } func (uploader formUploader) upload( @@ -282,13 +294,14 @@ func (uploader multiPartsUploader) uploadPartsAndComplete(ctx context.Context, s var uploadPartsOptions UploadPartsOptions if objectOptions.OnUploadingProgress != nil { progress := newUploadingPartsProgress() - uploadPartsOptions.OnUploadingProgress = func(partNumber, uploaded, _ uint64) { - progress.setPartUploadingProgress(partNumber, uploaded) - objectOptions.OnUploadingProgress(progress.totalUploaded(), size) + uploadPartsOptions.OnUploadingProgress = func(partNumber uint64, p *UploadingPartProgress) { + progress.setPartUploadingProgress(partNumber, p.Uploaded) + objectOptions.OnUploadingProgress(&UploadingProgress{Uploaded: progress.totalUploaded(), TotalSize: size}) } - uploadPartsOptions.OnPartUploaded = func(partNumber, partSize uint64) { - progress.partUploaded(partNumber, partSize) - objectOptions.OnUploadingProgress(progress.totalUploaded(), size) + uploadPartsOptions.OnPartUploaded = func(part UploadedPart) error { + progress.partUploaded(part.PartNumber(), part.PartSize()) + objectOptions.OnUploadingProgress(&UploadingProgress{Uploaded: progress.totalUploaded(), TotalSize: size}) + return nil } } uploadParts, err := uploader.scheduler.UploadParts(ctx, initializedParts, src, &uploadPartsOptions)