Skip to content

Commit

Permalink
Merge pull request #25 from AfricasVoices/retry_blob_file_upload
Browse files Browse the repository at this point in the history
handle http 408 in blob file upload by retrying in small chunk size
  • Loading branch information
IsaackMwenda authored May 4, 2020
2 parents abc1acf + 7464f11 commit dfc0a90
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions storage/google_cloud/google_cloud_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from google.cloud import storage
from core_data_modules.logging import Logger
from requests import ConnectionError, Timeout
import socket

log = Logger(__name__)

Expand Down Expand Up @@ -74,7 +76,7 @@ def download_blob_to_file(bucket_credentials_file_path, blob_url, f):
log.info(f"Downloaded blob to file")


def upload_file_to_blob(bucket_credentials_file_path, target_blob_url, f):
def upload_file_to_blob(bucket_credentials_file_path, target_blob_url, f, max_retries=4, blob_chunk_size=100 * 1024):
"""
Uploads a file to a Google Cloud Storage blob.
Expand All @@ -84,9 +86,32 @@ def upload_file_to_blob(bucket_credentials_file_path, target_blob_url, f):
:type target_blob_url: str
:param f: File to upload, opened in binary mode.
:type f: file-like
:param max_retries: Maximum number of times to retry uploading the file.
:type max_retries: int
:param blob_chunk_size: The chunk size to use for resumable uploads, in KiB.
:type blob_chunk_size: float
"""
log.info(f"Uploading file to blob '{target_blob_url}'...")
storage_client = storage.Client.from_service_account_json(bucket_credentials_file_path)
blob = _blob_at_url(storage_client, target_blob_url)
blob.upload_from_file(f)
log.info(f"Uploaded file to blob")
try:
log.info(f"Uploading file to blob '{target_blob_url}'...")
storage_client = storage.Client.from_service_account_json(bucket_credentials_file_path)
blob = _blob_at_url(storage_client, target_blob_url)
blob.chunk_size = int(blob_chunk_size * 1024) # resumable expects an integer
blob.upload_from_file(f)
log.info(f"Uploaded file to blob")

except (ConnectionError, socket.timeout, Timeout) as ex:
log.warning("Failed to upload due to connection/timeout error")

if max_retries <= 0:
log.error(f"Failed to upload file to blob")
raise ex

if blob_chunk_size < 256:
log.error(f"Not retrying because the blob_chunk_size {blob_chunk_size} is below the minimum allowed (256KB)")
raise ex

log.info(f"Retrying up to {max_retries} more times with a reduced chunk_size of {blob_chunk_size / 2}KB")
# lower the chunk size and start uploading from beginning because resumable_media requires so
f.seek(0)
upload_file_to_blob(bucket_credentials_file_path, target_blob_url, f,
max_retries - 1, blob_chunk_size / 2)

0 comments on commit dfc0a90

Please sign in to comment.