diff --git a/CHANGELOG.md b/CHANGELOG.md index c33f330..dec93d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - 支持缓存多个 `v4/query` 查询结果 - 上传重试支持切换上传域名 - 支持上传加速域名 +- 增加上传进度回调支持 ## v7.7.0 (2023-12-25) diff --git a/qiniu/http.c b/qiniu/http.c index 8a8a215..ce5c31e 100644 --- a/qiniu/http.c +++ b/qiniu/http.c @@ -553,12 +553,30 @@ static Qiniu_Error Qiniu_Client_callWithBody( Qiniu_Error Qiniu_Client_CallWithMethod( Qiniu_Client *self, Qiniu_Json **ret, const char *url, Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, const char *httpMethod, const char *md5) +{ + return Qiniu_Client_CallWithMethodAndProgressCallback(self, ret, url, body, bodyLen, mimeType, httpMethod, md5, NULL, NULL); +} + +Qiniu_Error Qiniu_Client_CallWithMethodAndProgressCallback( + Qiniu_Client *self, Qiniu_Json **ret, const char *url, + Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, const char *httpMethod, const char *md5, + int (*callback)(void *, double, double, double, double), void *callbackData) { CURL *curl = Qiniu_Client_initcall_withMethod(self, url, httpMethod); curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, bodyLen); curl_easy_setopt(curl, CURLOPT_READFUNCTION, body.Read); curl_easy_setopt(curl, CURLOPT_READDATA, body.self); + if (callback != NULL) + { + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, callbackData); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + } + else + { + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); + } return Qiniu_Client_callWithBody(self, ret, url, NULL, bodyLen, mimeType, md5); } @@ -566,12 +584,30 @@ Qiniu_Error Qiniu_Client_CallWithMethod( Qiniu_Error Qiniu_Client_CallWithBinary( Qiniu_Client *self, Qiniu_Json **ret, const char *url, Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType) +{ + return Qiniu_Client_CallWithBinaryAndProgressCallback(self, ret, url, body, bodyLen, mimeType, NULL, NULL); +} + +Qiniu_Error Qiniu_Client_CallWithBinaryAndProgressCallback( + Qiniu_Client *self, Qiniu_Json **ret, const char *url, + Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, + int (*callback)(void *, double, double, double, double), void *callbackData) { CURL *curl = Qiniu_Client_initcall(self, url); curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, bodyLen); curl_easy_setopt(curl, CURLOPT_READFUNCTION, body.Read); curl_easy_setopt(curl, CURLOPT_READDATA, body.self); + if (callback != NULL) + { + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, callbackData); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + } + else + { + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); + } return Qiniu_Client_callWithBody(self, ret, url, NULL, bodyLen, mimeType, NULL); } diff --git a/qiniu/http.h b/qiniu/http.h index 45f73d4..c64f160 100644 --- a/qiniu/http.h +++ b/qiniu/http.h @@ -177,6 +177,16 @@ extern "C" QINIU_DLLAPI extern Qiniu_Error Qiniu_Client_CallWithMethod( Qiniu_Client *self, Qiniu_Json **ret, const char *url, Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, const char *httpMethod, const char *md5); + + QINIU_DLLAPI extern Qiniu_Error Qiniu_Client_CallWithBinaryAndProgressCallback( + Qiniu_Client *self, Qiniu_Json **ret, const char *url, + Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, + int (*callback)(void *, double, double, double, double), void *callbackData); + + QINIU_DLLAPI extern Qiniu_Error Qiniu_Client_CallWithMethodAndProgressCallback( + Qiniu_Client *self, Qiniu_Json **ret, const char *url, + Qiniu_Reader body, Qiniu_Int64 bodyLen, const char *mimeType, const char *httpMethod, const char *md5, + int (*callback)(void *, double, double, double, double), void *callbackData); /*============================================================================*/ /* func Qiniu_Client_InitNoAuth/InitMacAuth */ diff --git a/qiniu/io.c b/qiniu/io.c index 4d1f99a..5ad35b5 100644 --- a/qiniu/io.c +++ b/qiniu/io.c @@ -152,6 +152,13 @@ Qiniu_Error Qiniu_Client_config(Qiniu_Client *self) return err; } +static int _Qiniu_Progress_Callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) +{ + void (*uploadingProgress)(size_t, size_t) = (void (*)(size_t, size_t))clientp; + uploadingProgress((size_t)ultotal, (size_t)ulnow); + return 0; +} + static Qiniu_Error Qiniu_Io_call( Qiniu_Client *self, const char *accessKey, const char *bucketName, Qiniu_Io_PutRet *ret, struct curl_httppost *formpost, Qiniu_Io_PutExtra *extra) @@ -187,6 +194,16 @@ static Qiniu_Error Qiniu_Io_call( curl_easy_setopt(curl, CURLOPT_URL, upHosts[retries % upHostsCount]); curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + if (extra->uploadingProgress != NULL) + { + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, _Qiniu_Progress_Callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, extra->uploadingProgress); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + } + else + { + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); + } //// For aborting uploading file. if (extra->upAbortCallback) @@ -379,6 +396,17 @@ static Qiniu_Error Qiniu_Io_call_with_callback( curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl, CURLOPT_READFUNCTION, rdr); + if (extra->uploadingProgress != NULL) + { + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, _Qiniu_Progress_Callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, extra->uploadingProgress); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + } + else + { + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); + } + err = Qiniu_callex(curl, &self->b, &self->root, Qiniu_False, &self->respHeader); if (err.code == 200) { diff --git a/qiniu/io.h b/qiniu/io.h index d98c3cf..e5eb036 100644 --- a/qiniu/io.h +++ b/qiniu/io.h @@ -61,6 +61,9 @@ extern "C" // Specify multiple upHosts const char *const *upHosts; size_t upHostsCount; + + // Uploading file progress + void (*uploadingProgress)(size_t ultotal, size_t ulnow); } Qiniu_Io_PutExtra; /*============================================================================*/ diff --git a/qiniu/multipart_upload.c b/qiniu/multipart_upload.c index 500159f..d217b08 100644 --- a/qiniu/multipart_upload.c +++ b/qiniu/multipart_upload.c @@ -50,6 +50,12 @@ typedef struct int totalPartNum; } Qiniu_UploadParts_Ret; +struct _Qiniu_Progress_Callback_Data +{ + size_t base, totalSize, previousUlNow; + void (*callback)(size_t, size_t); +}; + static Qiniu_Error readMedium(struct Qiniu_Record_Medium *medium, char **uploadId, Qiniu_Uint64 *expireAt, Qiniu_UploadPartResp *ret); static Qiniu_Error writeMedium(struct Qiniu_Record_Medium *medium, const Qiniu_InitPart_Ret *, const Qiniu_UploadPartResp *); static Qiniu_Error initializeRecorder(Qiniu_Multipart_PutExtra *param, const char *uptoken, const char *key, const char *fileName, Qiniu_FileInfo *fi, Qiniu_Record_Medium *medium, Qiniu_Multipart_Recorder *recorder); @@ -166,13 +172,31 @@ static Qiniu_Error init_upload(Qiniu_Client *client, const char *bucket, const c return err; } +static int _Qiniu_Progress_Callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) +{ + struct _Qiniu_Progress_Callback_Data *data = (struct _Qiniu_Progress_Callback_Data *)clientp; + if (data->previousUlNow != (size_t)ulnow) + { + data->callback((size_t)data->totalSize, data->base + (size_t)ulnow); + data->previousUlNow = (size_t)ulnow; + } + return 0; +} + Qiniu_Error upload_one_part(Qiniu_Client *client, Qiniu_Multipart_PutExtra *extraParam, const char *path, int partNum, Qiniu_ReaderAt reader, Qiniu_Int64 partOffset, Qiniu_Int64 partSize, const char *md5str, - Qiniu_UploadPartResp *ret) + Qiniu_UploadPartResp *ret, struct _Qiniu_Progress_Callback_Data *progressCallback) { Qiniu_Error err = Qiniu_OK; const char *const *upHosts; size_t upHostsCount; + int (*callback)(void *, double, double, double, double) = NULL; + void *callbackData = NULL; + if (progressCallback != NULL && progressCallback->callback != NULL) + { + callback = _Qiniu_Progress_Callback; + callbackData = (void *)progressCallback; + } if (extraParam->upHost != NULL) { @@ -192,7 +216,7 @@ Qiniu_Error upload_one_part(Qiniu_Client *client, Qiniu_Multipart_PutExtra *extr Qiniu_Json *callRet = NULL; // don't cJSON_Delete(callRet), it will be automatically freed on next http request by Qiniu_Client_Call. const char *upHost = upHosts[tries % upHostsCount]; const char *reqUrl = Qiniu_String_Concat(upHost, path, NULL); - err = Qiniu_Client_CallWithMethod(client, &callRet, reqUrl, thisPartBody, partSize, NULL, "PUT", md5str); + err = Qiniu_Client_CallWithMethodAndProgressCallback(client, &callRet, reqUrl, thisPartBody, partSize, NULL, "PUT", md5str, callback, callbackData); Qiniu_Free((void *)reqUrl); if (err.code == 200) { @@ -229,18 +253,17 @@ static Qiniu_Error upload_parts(Qiniu_Client *client, const char *bucket, const Qiniu_Error err = Qiniu_OK; Qiniu_Int64 partSize = extraParam->partSize; const int lastPart = totalPartNum - 1; - for (int partNum = 0; partNum < totalPartNum; partNum++) - { - if ((uploadPartsRet->PartsRet + partNum)->etag != NULL) - { - continue; - } + struct _Qiniu_Progress_Callback_Data progressCallbackData; + Qiniu_Zero(progressCallbackData); - const int partNumInReq = partNum + 1; // partNum start from 1 - char partNumStr[10]; // valid partNum ={"1"~"10000"} - snprintf(partNumStr, 10, "%d", partNumInReq); - const char *path = Qiniu_String_Concat("/buckets/", bucket, "/objects/", encodedKey, "/uploads/", initParts->uploadId, "/", partNumStr, NULL); + if (extraParam->uploadingProgress != NULL) + { + progressCallbackData.callback = extraParam->uploadingProgress; + progressCallbackData.totalSize = (size_t)fsize; + } + for (int partNum = 0; partNum < totalPartNum; partNum++) + { Qiniu_Int64 thisPartOffset = partNum * partSize; Qiniu_Int64 thisPartSize = partSize; if (partNum == lastPart) @@ -248,28 +271,36 @@ static Qiniu_Error upload_parts(Qiniu_Client *client, const char *bucket, const thisPartSize = fsize - (totalPartNum - 1) * partSize; } - const char *md5str = NULL; - if (extraParam->enableContentMd5) + if ((uploadPartsRet->PartsRet + partNum)->etag == NULL) { - md5str = caculatePartMd5(*reader, thisPartOffset, thisPartSize); - // Qiniu_Log_Debug("partNum:%d, local Md5:%s ", partNumInReq, md5str); - } - err = upload_one_part(client, extraParam, path, partNumInReq, *reader, thisPartOffset, thisPartSize, md5str, &uploadPartsRet->PartsRet[partNum]); - Qiniu_Multi_Free(2, (void *)path, (void *)md5str); + const int partNumInReq = partNum + 1; // partNum start from 1 + char partNumStr[10]; // valid partNum ={"1"~"10000"} + snprintf(partNumStr, 10, "%d", partNumInReq); + const char *path = Qiniu_String_Concat("/buckets/", bucket, "/objects/", encodedKey, "/uploads/", initParts->uploadId, "/", partNumStr, NULL); - if (err.code != 200) - { - return err; - } + const char *md5str = NULL; + if (extraParam->enableContentMd5) + { + md5str = caculatePartMd5(*reader, thisPartOffset, thisPartSize); + // Qiniu_Log_Debug("partNum:%d, local Md5:%s ", partNumInReq, md5str); + } + err = upload_one_part(client, extraParam, path, partNumInReq, *reader, thisPartOffset, thisPartSize, md5str, &uploadPartsRet->PartsRet[partNum], &progressCallbackData); + Qiniu_Multi_Free(2, (void *)path, (void *)md5str); - if (recorder != NULL && recorder->recorderMedium != NULL) - { - err = writeMedium(recorder->recorderMedium, initParts, &uploadPartsRet->PartsRet[partNum]); if (err.code != 200) { return err; } + if (recorder != NULL && recorder->recorderMedium != NULL) + { + err = writeMedium(recorder->recorderMedium, initParts, &uploadPartsRet->PartsRet[partNum]); + if (err.code != 200) + { + return err; + } + } } + progressCallbackData.base += thisPartSize; } return err; } diff --git a/qiniu/multipart_upload.h b/qiniu/multipart_upload.h index d8f069f..4c536d7 100644 --- a/qiniu/multipart_upload.h +++ b/qiniu/multipart_upload.h @@ -63,6 +63,9 @@ extern "C" // Specify multiple upHosts, if not set explicitly, will global QINIU_UP_HOST const char *const *upHosts; size_t upHostsCount; + + // Uploading file progress + void (*uploadingProgress)(size_t ultotal, size_t ulnow); } Qiniu_Multipart_PutExtra; typedef struct diff --git a/qiniu/resumable_io.c b/qiniu/resumable_io.c index 77b8128..1131554 100644 --- a/qiniu/resumable_io.c +++ b/qiniu/resumable_io.c @@ -16,6 +16,7 @@ #include "recorder_key.h" #include "recorder_utils.h" #include "../cJSON/cJSON.h" +#include "../hashmap/hashmap.h" #include "private/region.h" #include "private/code.h" @@ -278,7 +279,7 @@ static Qiniu_Error Qiniu_Rio_PutExtra_Init( } else { - memset(self, 0, sizeof(Qiniu_Rio_PutExtra)); + Qiniu_Zero_Ptr(self); } cbprog = sizeof(Qiniu_Rio_BlkputRet) * blockCnt; @@ -350,19 +351,133 @@ static void Qiniu_Io_PutExtra_initFrom(Qiniu_Io_PutExtra *self, Qiniu_Rio_PutExt } else { - memset(self, 0, sizeof(*self)); + Qiniu_Zero_Ptr(self); } } /*============================================================================*/ -static Qiniu_Error Qiniu_Rio_bput( - Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, Qiniu_Reader body, int bodyLength, const char *url) +struct _Qiniu_Uploading_Parts_Progress +{ + size_t uploaded; + struct hashmap *uploading; + Qiniu_Mutex mutex; +}; + +struct _Qiniu_Uploading_Parts_Progress_Pair +{ + int blkIdx; + size_t uploaded; +}; + +static uint64_t _Qiniu_Uploading_Parts_Progress_Hash(const void *item, uint64_t seed0, uint64_t seed1) +{ + const struct _Qiniu_Uploading_Parts_Progress_Pair *pair = (const struct _Qiniu_Uploading_Parts_Progress_Pair *)item; + return hashmap_sip(&pair->blkIdx, sizeof(pair->blkIdx), seed0, seed1); +} + +static int _Qiniu_Uploading_Parts_Progress_Compare(const void *a, const void *b, void *udata) +{ + const struct _Qiniu_Uploading_Parts_Progress_Pair *left = (const struct _Qiniu_Uploading_Parts_Progress_Pair *)a; + const struct _Qiniu_Uploading_Parts_Progress_Pair *right = (const struct _Qiniu_Uploading_Parts_Progress_Pair *)b; + return left->blkIdx - right->blkIdx; +} + +static struct _Qiniu_Uploading_Parts_Progress *_Qiniu_Uploading_Parts_Progress_New() +{ + struct _Qiniu_Uploading_Parts_Progress *data = (struct _Qiniu_Uploading_Parts_Progress *)malloc(sizeof(struct _Qiniu_Uploading_Parts_Progress)); + Qiniu_Zero_Ptr(data); + Qiniu_Mutex_Init(&data->mutex); + data->uploading = hashmap_new( + sizeof(struct _Qiniu_Uploading_Parts_Progress_Pair), 0, rand(), rand(), + _Qiniu_Uploading_Parts_Progress_Hash, _Qiniu_Uploading_Parts_Progress_Compare, + free, NULL); + return data; +} + +static void _Qiniu_Uploading_Parts_Progress_Free(struct _Qiniu_Uploading_Parts_Progress *data) +{ + hashmap_free(data->uploading); + Qiniu_Mutex_Cleanup(&data->mutex); + free((void *)data); +} + +static void _Qiniu_Uploading_Parts_Progress_Set_Progress(struct _Qiniu_Uploading_Parts_Progress *progress, int blkIdx, size_t uploaded) +{ + Qiniu_Mutex_Lock(&progress->mutex); + const struct _Qiniu_Uploading_Parts_Progress_Pair pair = {.blkIdx = blkIdx}; + struct _Qiniu_Uploading_Parts_Progress_Pair *pPair = (struct _Qiniu_Uploading_Parts_Progress_Pair *)hashmap_get(progress->uploading, &pair); + if (pPair == NULL) + { + pPair = (struct _Qiniu_Uploading_Parts_Progress_Pair *)malloc(sizeof(struct _Qiniu_Uploading_Parts_Progress_Pair)); + pPair->blkIdx = blkIdx; + pPair->uploaded = uploaded; + hashmap_set(progress->uploading, pPair); + } + else + { + pPair->uploaded = uploaded; + } + + Qiniu_Mutex_Unlock(&progress->mutex); +} + +static void _Qiniu_Uploading_Parts_Progress_Part_Uploaded(struct _Qiniu_Uploading_Parts_Progress *progress, int blkIdx, size_t partSize) +{ + Qiniu_Mutex_Lock(&progress->mutex); + const struct _Qiniu_Uploading_Parts_Progress_Pair pair = {.blkIdx = blkIdx}; + hashmap_delete(progress->uploading, (const void *)&pair); + progress->uploaded += partSize; + Qiniu_Mutex_Unlock(&progress->mutex); +} + +static size_t _Qiniu_Uploading_Parts_Progress_Get_Total_Size(struct _Qiniu_Uploading_Parts_Progress *progress) +{ + Qiniu_Mutex_Lock(&progress->mutex); + size_t total = progress->uploaded, iter = 0; + void *iter_val; + while (hashmap_iter(progress->uploading, &iter, &iter_val)) + { + struct _Qiniu_Uploading_Parts_Progress_Pair *pair = (struct _Qiniu_Uploading_Parts_Progress_Pair *)iter_val; + total += pair->uploaded; + } + Qiniu_Mutex_Unlock(&progress->mutex); + return total; +} + +struct _Qiniu_Progress_Callback_Data +{ + size_t totalSize, previousUlNow; + int blkIdx; + struct _Qiniu_Uploading_Parts_Progress *progress; + void (*callback)(size_t, size_t); +}; + +static int _Qiniu_Progress_Callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) +{ + struct _Qiniu_Progress_Callback_Data *data = (struct _Qiniu_Progress_Callback_Data *)clientp; + if (data->previousUlNow != (size_t)ulnow) + { + _Qiniu_Uploading_Parts_Progress_Set_Progress(data->progress, data->blkIdx, (size_t)ulnow); + data->callback((size_t)data->totalSize, _Qiniu_Uploading_Parts_Progress_Get_Total_Size(data->progress)); + data->previousUlNow = (size_t)ulnow; + } + return 0; +} + +static Qiniu_Error Qiniu_Rio_bput(Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, Qiniu_Reader body, int bodyLength, const char *url, struct _Qiniu_Progress_Callback_Data *progressCallback) { Qiniu_Rio_BlkputRet retFromResp; Qiniu_Json *root; + int (*callback)(void *, double, double, double, double) = NULL; + void *callbackData = NULL; + if (progressCallback != NULL && progressCallback->callback != NULL) + { + callback = _Qiniu_Progress_Callback; + callbackData = (void *)progressCallback; + } - Qiniu_Error err = Qiniu_Client_CallWithBinary(self, &root, url, body, bodyLength, NULL); + Qiniu_Error err = Qiniu_Client_CallWithBinaryAndProgressCallback(self, &root, url, body, bodyLength, NULL, callback, callbackData); if (err.code == 200) { retFromResp.ctx = Qiniu_Json_GetString(root, "ctx", NULL); @@ -386,20 +501,20 @@ static Qiniu_Error Qiniu_Rio_bput( } static Qiniu_Error Qiniu_Rio_Mkblock( - Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, int blkSize, const char *upHost, Qiniu_Reader body, int bodyLength) + Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, int blkSize, const char *upHost, Qiniu_Reader body, int bodyLength, struct _Qiniu_Progress_Callback_Data *progressCallback) { const char *url = Qiniu_String_Format(128, "%s/mkblk/%d", upHost, blkSize); - Qiniu_Error err = Qiniu_Rio_bput(self, ret, body, bodyLength, url); + Qiniu_Error err = Qiniu_Rio_bput(self, ret, body, bodyLength, url, progressCallback); Qiniu_Free((void *)url); return err; } static Qiniu_Error Qiniu_Rio_Blockput( - Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, const char *upHost, Qiniu_Reader body, int bodyLength) + Qiniu_Client *self, Qiniu_Rio_BlkputRet *ret, const char *upHost, Qiniu_Reader body, int bodyLength, struct _Qiniu_Progress_Callback_Data *progressCallback) { const char *url = Qiniu_String_Format(1024, "%s/bput/%s/%d", upHost, ret->ctx, (int)ret->offset); - Qiniu_Error err = Qiniu_Rio_bput(self, ret, body, bodyLength, url); + Qiniu_Error err = Qiniu_Rio_bput(self, ret, body, bodyLength, url, progressCallback); Qiniu_Free((void *)url); return err; } @@ -409,9 +524,10 @@ static Qiniu_Error Qiniu_Rio_Blockput( static Qiniu_Error ErrUnmatchedChecksum = { Qiniu_Rio_UnmatchedChecksum, "unmatched checksum"}; -static Qiniu_Error Qiniu_Rio_ResumableBlockput( - Qiniu_Client *c, Qiniu_Rio_BlkputRet *ret, Qiniu_ReaderAt f, int blkIdx, int blkSize, - Qiniu_Rio_PutExtra *extra, size_t *pChunksUploaded) +static Qiniu_Error +Qiniu_Rio_ResumableBlockput( + Qiniu_Client *c, Qiniu_Rio_BlkputRet *ret, Qiniu_ReaderAt f, int blkIdx, int blkSize, int fsize, + struct _Qiniu_Uploading_Parts_Progress *uploadingPartsProgress, Qiniu_Rio_PutExtra *extra, size_t *pChunksUploaded) { Qiniu_Error err = Qiniu_OK; Qiniu_Tee tee; @@ -431,6 +547,17 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( int notifyRet = 0; size_t chunksUploaded = 0; + struct _Qiniu_Progress_Callback_Data progressCallbackData; + Qiniu_Zero(progressCallbackData); + + if (extra->uploadingProgress != NULL) + { + progressCallbackData.callback = extra->uploadingProgress; + progressCallbackData.totalSize = (size_t)fsize; + progressCallbackData.progress = uploadingPartsProgress; + progressCallbackData.blkIdx = blkIdx; + } + if (extra->upHost != NULL) { upHosts = &extra->upHost; @@ -459,14 +586,25 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( body = Qiniu_TeeReader(&tee, body1, h); upHost = upHosts[tries % upHostsCount]; - err = Qiniu_Rio_Mkblock(c, ret, blkSize, upHost, body, bodyLength); + err = Qiniu_Rio_Mkblock(c, ret, blkSize, upHost, body, bodyLength, &progressCallbackData); if (err.code == 200) { + if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Part_Uploaded(uploadingPartsProgress, blkIdx, bodyLength); + } break; } - else if (_Qiniu_Should_Retry(err.code) == QINIU_DONT_RETRY) + else { - goto handleErr; + if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Set_Progress(uploadingPartsProgress, blkIdx, 0); + } + if (_Qiniu_Should_Retry(err.code) == QINIU_DONT_RETRY) + { + goto handleErr; + } } } if (err.code != 200) @@ -488,9 +626,14 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( goto handleErr; } } + else if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Part_Uploaded(uploadingPartsProgress, blkIdx, ret->offset); + } while ((int)(ret->offset) < blkSize) { + if (chunkSize < blkSize - (int)(ret->offset)) { bodyLength = chunkSize; @@ -512,7 +655,7 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( body = Qiniu_TeeReader(&tee, body1, h); upHost = upHosts[tryTimes % upHostsCount]; - err = Qiniu_Rio_Blockput(c, ret, upHost, body, bodyLength); + err = Qiniu_Rio_Blockput(c, ret, upHost, body, bodyLength, &progressCallbackData); if (err.code == 200) { chunksUploaded++; @@ -526,12 +669,20 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( err.message = "Interrupted by the caller"; goto handleErr; } + if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Part_Uploaded(uploadingPartsProgress, blkIdx, bodyLength); + } continue; } else { Qiniu_Log_Warn("ResumableBlockput: invalid checksum, retry"); err = ErrUnmatchedChecksum; + if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Set_Progress(uploadingPartsProgress, blkIdx, 0); + } } } else if (err.code == Qiniu_Rio_InvalidCtx) @@ -543,6 +694,10 @@ static Qiniu_Error Qiniu_Rio_ResumableBlockput( else { Qiniu_Log_Warn("ResumableBlockput %d off:%d failed - %E", blkIdx, (int)ret->offset, err); + if (extra->uploadingProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Set_Progress(uploadingPartsProgress, blkIdx, 0); + } } if (tryTimes > 1 && _Qiniu_Should_Retry(err.code) != QINIU_DONT_RETRY) { @@ -701,6 +856,8 @@ typedef struct _Qiniu_Rio_task Qiniu_Count *ninterrupts; int blkIdx; int blkSize1; + int fsize; + struct _Qiniu_Uploading_Parts_Progress *progress; } Qiniu_Rio_task; static void Qiniu_Rio_doTask(void *params) @@ -728,7 +885,7 @@ static void Qiniu_Rio_doTask(void *params) lzRetry: Qiniu_Rio_BlkputRet_Assign(&ret, &extra->progresses[blkIdx]); - Qiniu_Error err = Qiniu_Rio_ResumableBlockput(c, &ret, task->f, blkIdx, task->blkSize1, extra, &chunksUploaded); + Qiniu_Error err = Qiniu_Rio_ResumableBlockput(c, &ret, task->f, blkIdx, task->blkSize1, task->fsize, task->progress, extra, &chunksUploaded); if (err.code != 200) { if (err.code == Qiniu_Rio_PutInterrupted) @@ -832,6 +989,7 @@ static Qiniu_Error _Qiniu_Rio_Put( int nfails; int retCode; Qiniu_Count ninterrupts; + struct _Qiniu_Uploading_Parts_Progress *uploadingPartProgress = NULL; Qiniu_Error err = Qiniu_Rio_PutExtra_Init(&extra, fsize, extra1); if (err.code != 200) { @@ -853,6 +1011,10 @@ static Qiniu_Error _Qiniu_Rio_Put( return err; } } + if (extra.uploadingProgress != NULL) + { + uploadingPartProgress = _Qiniu_Uploading_Parts_Progress_New(); + } tm = extra.threadModel; wg = tm.itbl->WaitGroup(tm.self); @@ -878,6 +1040,8 @@ static Qiniu_Error _Qiniu_Rio_Put( task->ninterrupts = &ninterrupts; task->blkIdx = i; task->blkSize1 = blkSize; + task->fsize = fsize; + task->progress = uploadingPartProgress; if (i == last) { offbase = (Qiniu_Int64)(i) << blockBits; @@ -945,6 +1109,10 @@ static Qiniu_Error _Qiniu_Rio_Put( } } + if (uploadingPartProgress != NULL) + { + _Qiniu_Uploading_Parts_Progress_Free(uploadingPartProgress); + } Qiniu_Rio_PutExtra_Cleanup(&extra); Qiniu_Free((void *)accessKey); Qiniu_Free((void *)bucketName); diff --git a/qiniu/resumable_io.h b/qiniu/resumable_io.h index d4b14b5..091da12 100644 --- a/qiniu/resumable_io.h +++ b/qiniu/resumable_io.h @@ -143,6 +143,9 @@ extern "C" // Specify multiple upHosts, if not set explicitly, will global QINIU_UP_HOST const char *const *upHosts; size_t upHostsCount; + + // Uploading file progress + void (*uploadingProgress)(size_t ultotal, size_t ulnow); } Qiniu_Rio_PutExtra; /*============================================================================*/