Skip to content

Commit

Permalink
update callback function interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Aug 1, 2024
1 parent f23754d commit 8d7f9b3
Show file tree
Hide file tree
Showing 20 changed files with 198 additions and 128 deletions.
4 changes: 2 additions & 2 deletions storage/form_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (p *FormUploader) newObjectParams(upToken string, key string, hasKey bool,
}
objectParams.Metadata, objectParams.CustomVars = splitParams(extra.Params)
if extra.OnProgress != nil {
objectParams.OnUploadingProgress = func(uploaded, fileSize uint64) {
extra.OnProgress(int64(fileSize), int64(uploaded))
objectParams.OnUploadingProgress = func(progress *uploader.UploadingProgress) {
extra.OnProgress(int64(progress.TotalSize), int64(progress.Uploaded))
}
}
return &objectParams
Expand Down
15 changes: 10 additions & 5 deletions storagev2/downloader/download_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ type (
BeforeObjectDownload func(objectName string, objectOptions *ObjectOptions)

// 下载进度
OnDownloadingProgress func(objectName string, downloaded, totalSize uint64)
OnDownloadingProgress func(objectName string, progress *DownloadingProgress)

// 对象下载成功后回调
OnObjectDownloaded func(objectName string, size uint64)
OnObjectDownloaded func(objectName string, info *DownloadedObjectInfo)

// 是否下载指定对象
ShouldDownloadObject func(objectName string) bool
Expand All @@ -86,6 +86,11 @@ type (
PathSeparator string
}

// 已经下载的对象信息
DownloadedObjectInfo struct {
Size uint64 // 对象大小
}

writeSeekCloser struct {
w io.Writer
}
Expand Down Expand Up @@ -202,8 +207,8 @@ func (downloadManager *DownloadManager) DownloadDirectory(ctx context.Context, t
g.Go(func() error {
var destinationDownloadOptions DestinationDownloadOptions
if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil {
destinationDownloadOptions.OnDownloadingProgress = func(downloaded, totalSize uint64) {
onDownloadingProgress(objectName, downloaded, totalSize)
destinationDownloadOptions.OnDownloadingProgress = func(progress *DownloadingProgress) {
onDownloadingProgress(objectName, progress)
}
}
objectOptions := ObjectOptions{
Expand All @@ -222,7 +227,7 @@ func (downloadManager *DownloadManager) DownloadDirectory(ctx context.Context, t
}
n, err := downloadManager.DownloadToFile(ctx, objectName, fullPath, &objectOptions)
if err == nil && options.OnObjectDownloaded != nil {
options.OnObjectDownloaded(objectName, n)
options.OnObjectDownloaded(objectName, &DownloadedObjectInfo{Size: n})
}
return err
})
Expand Down
6 changes: 3 additions & 3 deletions storagev2/downloader/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (downloader concurrentDownloader) Download(ctx context.Context, urlsIter UR
var progress func(uint64)
if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil {
progress = func(downloaded uint64) {
onDownloadingProgress(downloaded, 0)
onDownloadingProgress(&DownloadingProgress{Downloaded: downloaded})
}
}
return downloadToPartReader(ctx, urlsIter, etag, options.Header, downloader.client, dest, progress)
Expand Down Expand Up @@ -198,13 +198,13 @@ func (downloader concurrentDownloader) Download(ctx context.Context, urlsIter UR
n, err := downloader.downloadToPart(ctx, urlsIterClone, etag, offset, options.Header, p, writeableMedium, &downloadingProgressMutex, func(downloaded uint64) {
downloadingProgress.setPartDownloadingProgress(p.Offset(), downloaded)
if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil {
onDownloadingProgress(downloadingProgress.totalDownloaded(), needToDownload)
onDownloadingProgress(&DownloadingProgress{Downloaded: downloadingProgress.totalDownloaded(), TotalSize: needToDownload})
}
})
if n > 0 {
downloadingProgress.partDownloaded(p.Offset(), n)
if onDownloadingProgress := options.OnDownloadingProgress; onDownloadingProgress != nil {
onDownloadingProgress(downloadingProgress.totalDownloaded(), needToDownload)
onDownloadingProgress(&DownloadingProgress{Downloaded: downloadingProgress.totalDownloaded(), TotalSize: needToDownload})
}
}
return err
Expand Down
40 changes: 20 additions & 20 deletions storagev2/downloader/downloaders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ func TestConcurrentDownloaderWithSinglePart(t *testing.T) {
downloader.NewURLsIter([]*url.URL{url1, url2, url3}),
destination.NewWriteCloserDestination(&buf, ""),
&downloader.DestinationDownloadOptions{
OnDownloadingProgress: func(downloaded, totalSize uint64) {
if downloaded < lastDownloaded {
OnDownloadingProgress: func(progress *downloader.DownloadingProgress) {
if progress.Downloaded < lastDownloaded {
t.Fatalf("unexpected downloaded progress")
}
lastDownloaded = downloaded
if totalSize != 1024*1024 {
lastDownloaded = progress.Downloaded
if progress.TotalSize != 1024*1024 {
t.Fatalf("unexpected downloaded progress")
}
},
Expand Down Expand Up @@ -200,12 +200,12 @@ func TestConcurrentDownloaderWithCompression(t *testing.T) {
context.Background(),
downloader.NewURLsIter([]*url.URL{url1}),
destination.NewWriteCloserDestination(&buf, ""), &downloader.DestinationDownloadOptions{
OnDownloadingProgress: func(downloaded, totalSize uint64) {
if downloaded < lastDownloaded {
OnDownloadingProgress: func(progress *downloader.DownloadingProgress) {
if progress.Downloaded < lastDownloaded {
t.Fatalf("unexpected downloaded progress")
}
lastDownloaded = downloaded
if totalSize != 0 {
lastDownloaded = progress.Downloaded
if progress.TotalSize != 0 {
t.Fatalf("unexpected downloaded progress")
}
},
Expand Down Expand Up @@ -285,12 +285,12 @@ func TestConcurrentDownloaderWithMultipleParts(t *testing.T) {
downloader.NewURLsIter([]*url.URL{url1}),
dest,
&downloader.DestinationDownloadOptions{
OnDownloadingProgress: func(downloaded, totalSize uint64) {
if downloaded < atomic.LoadUint64(&lastDownloaded) {
OnDownloadingProgress: func(progress *downloader.DownloadingProgress) {
if progress.Downloaded < atomic.LoadUint64(&lastDownloaded) {
t.Fatalf("unexpected downloaded progress")
}
atomic.StoreUint64(&lastDownloaded, downloaded)
if totalSize != SIZE {
atomic.StoreUint64(&lastDownloaded, progress.Downloaded)
if progress.TotalSize != SIZE {
t.Fatalf("unexpected downloaded progress")
}
},
Expand Down Expand Up @@ -383,12 +383,12 @@ func TestConcurrentDownloaderWithMultiplePartsAndRange(t *testing.T) {
dest,
&downloader.DestinationDownloadOptions{
Header: http.Header{"Range": []string{fmt.Sprintf("bytes=%d-", SIZE-REQUEST_SIZE)}},
OnDownloadingProgress: func(downloaded, totalSize uint64) {
if downloaded < atomic.LoadUint64(&lastDownloaded) {
OnDownloadingProgress: func(progress *downloader.DownloadingProgress) {
if progress.Downloaded < atomic.LoadUint64(&lastDownloaded) {
t.Fatalf("unexpected downloaded progress")
}
atomic.StoreUint64(&lastDownloaded, downloaded)
if totalSize != REQUEST_SIZE {
atomic.StoreUint64(&lastDownloaded, progress.Downloaded)
if progress.TotalSize != REQUEST_SIZE {
t.Fatalf("unexpected downloaded progress")
}
},
Expand Down Expand Up @@ -533,12 +533,12 @@ func TestConcurrentDownloaderWithResumableRecorder(t *testing.T) {
downloader.NewURLsIter([]*url.URL{url1}),
dest,
&downloader.DestinationDownloadOptions{
OnDownloadingProgress: func(downloaded, totalSize uint64) {
if downloaded < lastDownloaded {
OnDownloadingProgress: func(progress *downloader.DownloadingProgress) {
if progress.Downloaded < lastDownloaded {
t.Fatalf("unexpected downloaded progress")
}
lastDownloaded = downloaded
if totalSize != 10*1024*1024 {
lastDownloaded = progress.Downloaded
if progress.TotalSize != 10*1024*1024 {
t.Fatalf("unexpected downloaded progress")
}
},
Expand Down
8 changes: 7 additions & 1 deletion storagev2/downloader/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ type (
Sign(context.Context, *url.URL, *SignOptions) error
}

// 下载进度
DownloadingProgress struct {
Downloaded uint64 // 已经下载的数据量,单位为字节
TotalSize uint64 // 总数据量,单位为字节
}

// 目标下载选项
DestinationDownloadOptions struct {
// 对象下载附加 HTTP Header
Header http.Header
// 对象下载进度
OnDownloadingProgress func(downloaded, totalSize uint64)
OnDownloadingProgress func(*DownloadingProgress)
// 对象 Header 获取回调
OnResponseHeader func(http.Header)
}
Expand Down
1 change: 1 addition & 0 deletions storagev2/objects/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func (wm *workersManager) asyncWorker(ctx internal_context.Context, id uint) {
for getCtxError(wm.parentCtx) == nil {
if operations := wm.requestsManager.takeOperations(); len(operations) > 0 {
if operations, err := wm.doOperations(ctx, operations); err != nil {
// 确定错误是否可以重试
wm.requestsManager.putBackOperations(operations)
if isTimeoutError(err) { // 超时,说明 batchSize 过大
wm.requestsManager.handleTimeoutError()
Expand Down
7 changes: 7 additions & 0 deletions storagev2/objects/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type (

// 获取错误信息
Error() error

// 获取位置标记
Marker() string
}

listerV1 struct {
Expand Down Expand Up @@ -63,6 +66,10 @@ func (v1 *listerV1) Next(object *ObjectDetails) bool {
return true
}

func (v1 *listerV1) Marker() string {
return v1.marker
}

func (v1 *listerV1) callListApi() error {
if v1.marker == "" && !v1.firstCall {
return nil
Expand Down
8 changes: 4 additions & 4 deletions storagev2/uploader/form_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func TestFormUploader(t *testing.T) {
ContentType: "application/json",
Metadata: map[string]string{"a": "b", "c": "d"},
CustomVars: map[string]string{"a": "b", "c": "d"},
OnUploadingProgress: func(uploaded, fileSize uint64) {
if fileSize != 1024*1024 {
OnUploadingProgress: func(progress *UploadingProgress) {
if progress.TotalSize != 1024*1024 {
t.Fatalf("unexpected file size")
} else if uploaded > fileSize {
} else if progress.Uploaded > progress.TotalSize {
t.Fatalf("unexpected uploaded")
} else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize {
} else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize {
t.Fatalf("unexpected uploaded")
}
},
Expand Down
6 changes: 6 additions & 0 deletions storagev2/uploader/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ type (

// 已经上传的分片
UploadedPart interface {
// 分片编号
PartNumber() uint64

// 分片偏移量
Offset() uint64

// 分片大小
PartSize() uint64
}

// 分片上传调度器
Expand Down
22 changes: 11 additions & 11 deletions storagev2/uploader/multi_parts_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ func TestMultiPartsUploader(t *testing.T) {
ContentType: "application/json",
Metadata: map[string]string{"a": "b", "c": "d"},
CustomVars: map[string]string{"a": "b", "c": "d"},
OnUploadingProgress: func(uploaded uint64, fileSize uint64) {
if fileSize != 5*1024*1024 {
OnUploadingProgress: func(progress *UploadingProgress) {
if progress.TotalSize != 5*1024*1024 {
t.Fatalf("unexpected file size")
} else if uploaded > fileSize {
} else if progress.Uploaded > progress.TotalSize {
t.Fatalf("unexpected uploaded")
} else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize {
} else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize {
t.Fatalf("unexpected uploaded")
}
},
Expand Down Expand Up @@ -414,12 +414,12 @@ func TestMultiPartsUploaderResuming(t *testing.T) {
ContentType: "application/json",
Metadata: map[string]string{"a": "b", "c": "d"},
CustomVars: map[string]string{"a": "b", "c": "d"},
OnUploadingProgress: func(uploaded uint64, fileSize uint64) {
if fileSize != 5*1024*1024 {
OnUploadingProgress: func(progress *UploadingProgress) {
if progress.TotalSize != 5*1024*1024 {
t.Fatalf("unexpected file size")
} else if uploaded > fileSize {
} else if progress.Uploaded > progress.TotalSize {
t.Fatalf("unexpected uploaded")
} else if lu := atomic.SwapUint64(&lastUploaded, uploaded); lu > uploaded || lu > fileSize {
} else if lu := atomic.SwapUint64(&lastUploaded, progress.Uploaded); lu > progress.Uploaded || lu > progress.TotalSize {
t.Fatalf("unexpected uploaded")
}
},
Expand Down Expand Up @@ -644,11 +644,11 @@ func TestMultiPartsUploaderRetry(t *testing.T) {
ContentType: "application/json",
Metadata: map[string]string{"a": "b", "c": "d"},
CustomVars: map[string]string{"a": "b", "c": "d"},
OnUploadingProgress: func(uploaded uint64, fileSize uint64) {
if fileSize != 5*1024*1024 {
OnUploadingProgress: func(progress *UploadingProgress) {
if progress.TotalSize != 5*1024*1024 {
t.Fatalf("unexpected file size")
}
atomic.StoreUint64(&lastUploaded, uploaded)
atomic.StoreUint64(&lastUploaded, progress.Uploaded)
},
}, &returnValue); err != nil {
t.Fatal(err)
Expand Down
36 changes: 22 additions & 14 deletions storagev2/uploader/multi_parts_uploader_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type (
}

multiPartsUploaderV1UploadedPart struct {
ctx string
crc32 uint32
offset, size uint64
ctx string
crc32 uint32
partNumber, offset, size uint64
}

resumedMultiPartsUploaderV1Record struct {
Expand Down Expand Up @@ -103,13 +103,14 @@ func (uploader *multiPartsUploaderV1) UploadPart(ctx context.Context, initialize
if record, ok := initializedParts.records[part.PartNumber()]; ok {
if record.offset == part.Offset() && record.size == part.Size() {
if options != nil && options.OnUploadingProgress != nil {
options.OnUploadingProgress(record.size, record.size)
options.OnUploadingProgress(&UploadingPartProgress{Uploaded: record.size, PartSize: record.size})
}
return multiPartsUploaderV1UploadedPart{
ctx: record.ctx,
crc32: record.crc32,
offset: record.offset,
size: record.size,
ctx: record.ctx,
crc32: record.crc32,
offset: record.offset,
size: record.size,
partNumber: part.PartNumber(),
}, nil
}
}
Expand All @@ -122,7 +123,9 @@ func (uploader *multiPartsUploaderV1) uploadPart(ctx context.Context, initialize
OverwrittenRegion: initialized.multiPartsObjectOptions.RegionsProvider,
}
if options != nil && options.OnUploadingProgress != nil {
apisOptions.OnRequestProgress = options.OnUploadingProgress
apisOptions.OnRequestProgress = func(uploaded, totalSize uint64) {
options.OnUploadingProgress(&UploadingPartProgress{Uploaded: uploaded, PartSize: totalSize})
}
}
upToken, err := getUpToken(uploader.options.Credentials, &initialized.multiPartsObjectOptions.ObjectOptions, uploader.options.UpTokenProvider)
if err != nil {
Expand Down Expand Up @@ -161,10 +164,11 @@ func (uploader *multiPartsUploaderV1) uploadPart(ctx context.Context, initialize
}

return multiPartsUploaderV1UploadedPart{
ctx: response.Ctx,
crc32: uint32(response.Crc32),
offset: part.Offset(),
size: part.Size(),
ctx: response.Ctx,
crc32: uint32(response.Crc32),
offset: part.Offset(),
size: part.Size(),
partNumber: part.PartNumber(),
}, nil
}

Expand Down Expand Up @@ -229,7 +233,11 @@ func (uploadedPart multiPartsUploaderV1UploadedPart) Offset() uint64 {
return uploadedPart.offset
}

func (uploadedPart multiPartsUploaderV1UploadedPart) Size() uint64 {
func (uploadedPart multiPartsUploaderV1UploadedPart) PartNumber() uint64 {
return uploadedPart.partNumber
}

func (uploadedPart multiPartsUploaderV1UploadedPart) PartSize() uint64 {
return uploadedPart.size
}

Expand Down
Loading

0 comments on commit 8d7f9b3

Please sign in to comment.