diff --git a/rohmu/errors.py b/rohmu/errors.py index b513d675..5d278d2a 100644 --- a/rohmu/errors.py +++ b/rohmu/errors.py @@ -19,6 +19,18 @@ class InvalidConfigurationError(Error): """Invalid configuration""" +class TransferObjectStoreInitializationError(Error): + """Raised when a transient network or permission issue does not allow us to validate access to the object store""" + + +class TransferObjectStorePermissionError(TransferObjectStoreInitializationError): + """Raised when a permission issue does not allow us to validate access to the object store""" + + +class TransferObjectStoreMissingError(TransferObjectStoreInitializationError): + """Raised when we know for sure the bucket is missing""" + + class LocalFileIsRemoteFileError(StorageError): """File transfer operation source and destination point to the same file""" diff --git a/rohmu/factory.py b/rohmu/factory.py index ee1ec7c1..c285ac11 100644 --- a/rohmu/factory.py +++ b/rohmu/factory.py @@ -66,19 +66,23 @@ def get_transfer_model(storage_config: Config) -> StorageModel: return storage_class.config_model(**storage_config) -def get_transfer(storage_config: Config) -> BaseTransfer[Any]: +def get_transfer(storage_config: Config, ensure_object_store_available: bool = True) -> BaseTransfer[Any]: storage_config = storage_config.copy() notifier_config = storage_config.pop("notifier", None) notifier = None if notifier_config is not None: notifier = get_notifier(notifier_config) model = get_transfer_model(storage_config) - return get_transfer_from_model(model, notifier) + return get_transfer_from_model(model, notifier, ensure_object_store_available=ensure_object_store_available) -def get_transfer_from_model(model: StorageModelT, notifier: Optional[Notifier] = None) -> BaseTransfer[StorageModelT]: +def get_transfer_from_model( + model: StorageModelT, + notifier: Optional[Notifier] = None, + ensure_object_store_available: bool = True, +) -> BaseTransfer[StorageModelT]: storage_class = get_class_for_storage_driver(model.storage_type) - return storage_class.from_model(model, notifier) + return storage_class.from_model(model, notifier, ensure_object_store_available=ensure_object_store_available) def _to_storage_driver(storage_type: str) -> StorageDriver: diff --git a/rohmu/object_storage/azure.py b/rohmu/object_storage/azure.py index 3e15e4af..f627926e 100644 --- a/rohmu/object_storage/azure.py +++ b/rohmu/object_storage/azure.py @@ -8,7 +8,14 @@ from azure.core.exceptions import HttpResponseError, ResourceExistsError from azure.storage.blob import BlobServiceClient, ContentSettings from rohmu.common.statsd import StatsdConfig -from rohmu.errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError +from rohmu.errors import ( + FileNotFoundFromStorageError, + InvalidConfigurationError, + StorageError, + TransferObjectStoreInitializationError, + TransferObjectStoreMissingError, + TransferObjectStorePermissionError, +) from rohmu.notifier.interface import Notifier from rohmu.object_storage.base import ( BaseTransfer, @@ -61,15 +68,21 @@ def __init__( proxy_info: Optional[dict[str, Union[str, int]]] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: prefix = prefix.lstrip("/") if prefix else "" - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) if not account_key and not sas_token: raise InvalidConfigurationError("One of account_key or sas_token must be specified to authenticate") self.account_name = account_name self.account_key = account_key - self.container_name = bucket_name + self.container = self.container_name = bucket_name self.sas_token = sas_token self._conn_str = self.conn_string( account_name=account_name, @@ -95,9 +108,34 @@ def __init__( schema = "http" self._config["proxies"] = {"https": f"{schema}://{auth}{proxy_host}:{proxy_port}"} self._blob_service_client: Optional[BlobServiceClient] = None - self.container = self.get_or_create_container(self.container_name) + if ensure_object_store_available: + self._create_object_store_if_needed_unwrapped() self.log.debug("AzureTransfer initialized, %r", self.container_name) + def _verify_object_storage_unwrapped(self) -> None: + self.get_or_create_container(self.container_name, create_if_needed=False) + + def verify_object_storage(self) -> None: + try: + self._verify_object_storage_unwrapped() + except HttpResponseError as ex: + if ex.status_code == 403: + raise TransferObjectStorePermissionError() from ex + else: + raise TransferObjectStoreInitializationError() from ex + + def _create_object_store_if_needed_unwrapped(self) -> None: + self.get_or_create_container(self.container_name, create_if_needed=True) + + def create_object_store_if_needed(self) -> None: + try: + self._create_object_store_if_needed_unwrapped() + except HttpResponseError as ex: + if ex.status_code == 403: + raise TransferObjectStorePermissionError() from ex + else: + raise TransferObjectStoreInitializationError() from ex + def get_blob_service_client(self) -> BlobServiceClient: if self._blob_service_client is None: self._blob_service_client = BlobServiceClient.from_connection_string( @@ -168,7 +206,16 @@ def _copy_file_from_bucket( if time.monotonic() - start < timeout: time.sleep(0.1) else: - destination_client.abort_copy(copy_props.id, timeout=timeout) + if copy_props.id is None: + # The description of CopyProperties tells us copy_id should not be None unless status is also None + # The type checker cannot know this, but we still log a warning if this ever happens + self.log.warning( + "Pending copy operation from %r to %r was missing a copy_id, will not be aborted after timeout", + source_key, + destination_key, + ) + else: + destination_client.abort_copy(copy_props.id, timeout=timeout) raise StorageError( f"Copying {repr(source_key)} to {repr(destination_key)} did not complete in {timeout} seconds" ) @@ -402,21 +449,38 @@ def progress_callback(pipeline_response: Any) -> None: else: delattr(fd, "tell") - def get_or_create_container(self, container_name: str) -> str: + def get_or_create_container(self, container_name: str, create_if_needed: bool = True) -> str: if isinstance(container_name, enum.Enum): # ensure that the enum value is used rather than the enum name # https://github.com/Azure/azure-sdk-for-python/blob/azure-storage-blob_12.8.1/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_service_client.py#L667 container_name = container_name.value start_time = time.monotonic() - try: - self.get_blob_service_client().create_container(container_name) - except ResourceExistsError: - pass - except HttpResponseError as e: - if "request is not authorized" in e.exc_msg: - self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name) - return container_name - else: - raise e + if create_if_needed: + try: + self.get_blob_service_client().create_container(container_name) + except ResourceExistsError: + pass + except HttpResponseError as e: + if "request is not authorized" in e.exc_msg: + # Corresponds to a 403/AuthorizationFailure + self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name) + return container_name + else: + raise e + else: + try: + container_exists = self.get_blob_service_client().get_container_client(container_name).exists() + except HttpResponseError as e: + if e.status_code == 403: + # We have to handle 403/AuthorizationFailure gracefully as container-scoped SAS tokens might have + # all permissions inside the container but won't have permissions on the container itself and we cannot + # assume we'd have an account-scoped SAS token or account key here + self.log.debug("Container metadata unauthorized. Assuming container %r already exists", container_name) + return container_name + raise + if not container_exists: + self.log.debug("Container %r does not exist, creation not requested", container_name) + raise TransferObjectStoreMissingError() + self.log.debug("Container %r already exists", container_name) self.log.debug("Got/Created container: %r successfully, took: %.3fs", container_name, time.monotonic() - start_time) return container_name diff --git a/rohmu/object_storage/base.py b/rohmu/object_storage/base.py index 3a18eb73..09b3f082 100644 --- a/rohmu/object_storage/base.py +++ b/rohmu/object_storage/base.py @@ -10,7 +10,11 @@ from io import BytesIO from rohmu.common.models import StorageModel from rohmu.common.statsd import StatsClient, StatsdConfig -from rohmu.errors import FileNotFoundFromStorageError, InvalidByteRangeError, StorageError +from rohmu.errors import ( + FileNotFoundFromStorageError, + InvalidByteRangeError, + StorageError, +) from rohmu.notifier.interface import Notifier from rohmu.notifier.null import NullNotifier from rohmu.object_storage.config import StorageModelT @@ -71,8 +75,18 @@ class BaseTransfer(Generic[StorageModelT]): supports_concurrent_upload: bool = False def __init__( - self, prefix: Optional[str], notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None + self, + prefix: Optional[str], + notifier: Optional[Notifier] = None, + statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: + """ + Initialize a Transfer instance. Although it's possible to do implementation-specific IO here, it's preferable to + delegate to verify_object_storage and create_object_store_if_needed. + + The ensure_object_store_available flag is here to allow this transition in a backwards-compatible manner. + """ self.log = logging.getLogger(self.__class__.__name__) if not prefix: prefix = "" @@ -82,6 +96,40 @@ def __init__( self.notifier = notifier or NullNotifier() self.stats = StatsClient(statsd_info) + def _verify_object_storage_unwrapped(self) -> None: + """ + Perform read-only operations to verify the backing object store is available and accessible. + + Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should + never be called directly. + """ + raise NotImplementedError + + def verify_object_storage(self) -> None: + """ + Perform read-only operations to verify the backing object store is available and accessible. + + Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details. + """ + raise NotImplementedError + + def _create_object_store_if_needed_unwrapped(self) -> None: + """ + Create the backing object store if it's needed (e.g. creating directories, buckets, etc.). + + Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should + never be called directly. + """ + raise NotImplementedError + + def create_object_store_if_needed(self) -> None: + """ + Create the backing object store if it's needed (e.g. creating directories, buckets, etc.). + + Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details. + """ + raise NotImplementedError + def close(self) -> None: """Release all resources associated with the Transfer object.""" pass @@ -138,8 +186,14 @@ def _should_multipart( return int(size) > chunk_size @classmethod - def from_model(cls, model: StorageModelT, notifier: Optional[Notifier] = None) -> Self: - return cls(**model.dict(by_alias=True, exclude={"storage_type"}), notifier=notifier) + def from_model( + cls, model: StorageModelT, notifier: Optional[Notifier] = None, ensure_object_store_available: bool = True + ) -> Self: + return cls( + **model.dict(by_alias=True, exclude={"storage_type"}), + notifier=notifier, + ensure_object_store_available=ensure_object_store_available, + ) def copy_file( self, *, source_key: str, destination_key: str, metadata: Optional[Metadata] = None, **_kwargs: Any diff --git a/rohmu/object_storage/google.py b/rohmu/object_storage/google.py index 81643a2e..3977b243 100644 --- a/rohmu/object_storage/google.py +++ b/rohmu/object_storage/google.py @@ -6,7 +6,7 @@ from __future__ import annotations from contextlib import contextmanager -from googleapiclient.discovery import build, Resource +from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import ( build_http, @@ -22,7 +22,14 @@ from oauth2client.client import GoogleCredentials from rohmu.common.models import StorageOperation from rohmu.common.statsd import StatsClient, StatsdConfig -from rohmu.errors import FileNotFoundFromStorageError, InvalidByteRangeError, InvalidConfigurationError +from rohmu.errors import ( + FileNotFoundFromStorageError, + InvalidByteRangeError, + InvalidConfigurationError, + TransferObjectStoreInitializationError, + TransferObjectStoreMissingError, + TransferObjectStorePermissionError, +) from rohmu.notifier.interface import Notifier from rohmu.object_storage.base import ( BaseTransfer, @@ -40,7 +47,21 @@ ) from rohmu.typing import AnyPath, Metadata from rohmu.util import get_total_size_from_content_range -from typing import Any, BinaryIO, Callable, cast, Collection, Iterable, Iterator, Optional, TextIO, Tuple, TypeVar, Union +from typing import ( + Any, + BinaryIO, + Callable, + cast, + Collection, + Iterable, + Iterator, + Optional, + TextIO, + Tuple, + TYPE_CHECKING, + TypeVar, + Union, +) from typing_extensions import Protocol import codecs @@ -81,6 +102,9 @@ def ServiceAccountCredentials_from_dict( ) +if TYPE_CHECKING: + from googleapiclient._apis.storage.v1 import StorageResource + # Silence Google API client verbose spamming logging.getLogger("googleapiclient.discovery_cache").setLevel(logging.ERROR) logging.getLogger("googleapiclient").setLevel(logging.WARNING) @@ -182,25 +206,37 @@ def __init__( proxy_info: Optional[dict[str, Union[str, int]]] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) self.project_id = project_id self.proxy_info = proxy_info self.google_creds = get_credentials(credential_file=credential_file, credentials=credentials) - self.gs: Optional[Resource] = self._init_google_client() - self.gs_object_client: Any = None - self.bucket_name = self.get_or_create_bucket(bucket_name) + self.gs: Optional[StorageResource] = self._init_google_client() + self.gs_object_client: Optional[StorageResource.ObjectsResource] = None + self.gs_bucket_client: Optional[StorageResource.BucketsResource] = None + self.bucket_name = bucket_name + if ensure_object_store_available: + self._create_object_store_if_needed_unwrapped() self.log.debug("GoogleTransfer initialized") def close(self) -> None: if self.gs_object_client is not None: self.gs_object_client.close() self.gs_object_client = None + if self.gs_bucket_client is not None: + self.gs_bucket_client.close() + self.gs_bucket_client = None if self.gs is not None: self.gs.close() self.gs = None - def _init_google_client(self) -> Resource: + def _init_google_client(self) -> StorageResource: start_time = time.monotonic() delay = 2 while True: @@ -242,7 +278,7 @@ def _object_client(self, *, not_found: Optional[str] = None) -> Iterator[Any]: if self.gs is None: self.gs = self._init_google_client() # https://googleapis.github.io/google-api-python-client/docs/dyn/storage_v1.objects.html - self.gs_object_client = self.gs.objects() # type: ignore[attr-defined] + self.gs_object_client = self.gs.objects() try: yield self.gs_object_client except HttpError as ex: @@ -254,6 +290,20 @@ def _object_client(self, *, not_found: Optional[str] = None) -> Iterator[Any]: self.gs_object_client = None raise + @contextmanager + def _bucket_client(self) -> Iterator[Any]: + """ + (Re-)initialize object client lazily if required. + There is no reset logic for the buckets client (as opposed to the object client) as that's not strictly needed + since it's only used once at setup. + """ + if self.gs_bucket_client is None: + if self.gs is None: + self.gs = self._init_google_client() + # https://googleapis.github.io/google-api-python-client/docs/dyn/storage_v1.objects.html + self.gs_bucket_client = self.gs.buckets() + yield self.gs_bucket_client + def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], retry_reporter: Reporter) -> ResType: retries = 60 retry_wait = 2.0 @@ -586,7 +636,40 @@ def store_file_object( ) self.notifier.object_created(key=key, size=int(result["size"]), metadata=sanitized_metadata) - def get_or_create_bucket(self, bucket_name: str) -> str: + def _verify_object_storage_unwrapped(self) -> None: + """Look up the bucket to see if it already exists or raise TransferObjectStoreMissingError if not.""" + start_time = time.time() + with self._bucket_client() as gs_buckets: + try: + self._try_get_bucket(gs_buckets) + self.log.debug("Bucket: %r already exists, took: %.3fs", self.bucket_name, time.time() - start_time) + except HttpError as ex: + if ex.resp["status"] == "404": + raise TransferObjectStoreMissingError() + elif ex.resp["status"] == "403": + raise InvalidConfigurationError(f"Bucket {repr(self.bucket_name)} exists but isn't accessible") + else: + raise + + def _try_get_bucket(self, gs_buckets: StorageResource.BucketsResource) -> None: + """Useful for mocking in tests""" + request = gs_buckets.get(bucket=self.bucket_name) + reporter = Reporter(StorageOperation.head_request) + self._retry_on_reset(request, request.execute, retry_reporter=reporter) + reporter.report(self.stats) + + def verify_object_storage(self) -> None: + try: + self._verify_object_storage_unwrapped() + except InvalidConfigurationError as ex: + # The only reason we'd raise this exception is if we caught a 403 and raised this exception for the older apps + # In this method, we can raise a "proper" exception for permission errors + raise TransferObjectStorePermissionError() from ex + except HttpError as ex: + # Wrap implementation-specific exceptions with rohmu's + raise TransferObjectStoreInitializationError() from ex + + def _create_object_store_if_needed_unwrapped(self) -> None: """Look up the bucket if it already exists and try to create the bucket in case it doesn't. Note that we can't just always try to unconditionally create the bucket as Google imposes a strict rate @@ -598,41 +681,52 @@ def get_or_create_bucket(self, bucket_name: str) -> str: invalid bucket names ("Invalid bucket name") as well as for invalid project ("Invalid argument"), try to handle both gracefully.""" start_time = time.time() - gs_buckets = self.gs.buckets() # type: ignore[union-attr] try: - request = gs_buckets.get(bucket=bucket_name) - reporter = Reporter(StorageOperation.head_request) - self._retry_on_reset(request, request.execute, retry_reporter=reporter) - reporter.report(self.stats) - self.log.debug("Bucket: %r already exists, took: %.3fs", bucket_name, time.time() - start_time) - except HttpError as ex: - if ex.resp["status"] == "404": - pass # we need to create it - elif ex.resp["status"] == "403": - raise InvalidConfigurationError(f"Bucket {repr(bucket_name)} exists but isn't accessible") - else: - raise + self._verify_object_storage_unwrapped() + except TransferObjectStoreMissingError: + pass # We only continue with creation in case the bucket does not exist else: - return bucket_name + return + with self._bucket_client() as gs_buckets: + try: + self._try_create_bucket(gs_buckets) + self.log.debug("Created bucket: %r successfully, took: %.3fs", self.bucket_name, time.time() - start_time) + except HttpError as ex: + error = json.loads(ex.content.decode("utf-8"))["error"] + if error["message"].startswith("You already own this bucket"): + self.log.debug("Bucket: %r already exists, took: %.3fs", self.bucket_name, time.time() - start_time) + elif error["message"] == "Invalid argument.": + raise InvalidConfigurationError(f"Invalid project id {repr(self.project_id)}") + elif error["message"].startswith("Invalid bucket name"): + raise InvalidConfigurationError(f"Invalid bucket name {repr(self.bucket_name)}") + else: + raise + + def _try_create_bucket(self, gs_buckets: StorageResource.BucketsResource) -> None: + """Useful for mocking in tests""" + req = gs_buckets.insert(project=self.project_id, body={"name": self.bucket_name}) + reporter = Reporter(StorageOperation.create_bucket) + self._retry_on_reset(req, req.execute, retry_reporter=reporter) + reporter.report(self.stats) + def create_object_store_if_needed(self) -> None: try: - req = gs_buckets.insert(project=self.project_id, body={"name": bucket_name}) - reporter = Reporter(StorageOperation.create_bucket) - self._retry_on_reset(req, req.execute, retry_reporter=reporter) - reporter.report(self.stats) - self.log.debug("Created bucket: %r successfully, took: %.3fs", bucket_name, time.time() - start_time) + self._create_object_store_if_needed_unwrapped() except HttpError as ex: - error = json.loads(ex.content.decode("utf-8"))["error"] - if error["message"].startswith("You already own this bucket"): - self.log.debug("Bucket: %r already exists, took: %.3fs", bucket_name, time.time() - start_time) - elif error["message"] == "Invalid argument.": - raise InvalidConfigurationError(f"Invalid project id {repr(self.project_id)}") - elif error["message"].startswith("Invalid bucket name"): - raise InvalidConfigurationError(f"Invalid bucket name {repr(bucket_name)}") + if ex.resp["status"] == "403": + # Translate 403 errors to the proper exception + raise TransferObjectStorePermissionError() from ex else: - raise + # Other special cases involving invalid input are already handled + # Wrap implementation-specific exceptions with rohmu's + raise TransferObjectStoreInitializationError() from ex - return bucket_name + def get_or_create_bucket(self, bucket_name: str) -> str: + """Deprecated: use create_object_store_if_needed() instead""" + if self.bucket_name != bucket_name: + raise ValueError("This method is not meant to be used with a different bucket name than the one configured") + self._verify_object_storage_unwrapped() + return self.bucket_name class MediaStreamUpload(MediaUpload): diff --git a/rohmu/object_storage/local.py b/rohmu/object_storage/local.py index 7c8695a5..b793c559 100644 --- a/rohmu/object_storage/local.py +++ b/rohmu/object_storage/local.py @@ -50,11 +50,29 @@ def __init__( prefix: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: prefix = os.path.join(directory, (prefix or "").strip("/")) - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) self.log.debug("LocalTransfer initialized") + def _verify_object_storage_unwrapped(self) -> None: + """No-op as there's no need to check for the existence of the directory at setup time.""" + + def verify_object_storage(self) -> None: + """No-op as there's no need to check for the existence of the directory at setup time.""" + + def _create_object_store_if_needed_unwrapped(self) -> None: + """No-op as there's no need to create the directory ahead of time.""" + + def create_object_store_if_needed(self) -> None: + """No-op as there's no need to create the directory ahead of time.""" + def copy_file( self, *, source_key: str, destination_key: str, metadata: Optional[Metadata] = None, **_kwargs: Any ) -> None: diff --git a/rohmu/object_storage/s3.py b/rohmu/object_storage/s3.py index 04ee1073..22e5e73e 100644 --- a/rohmu/object_storage/s3.py +++ b/rohmu/object_storage/s3.py @@ -11,7 +11,15 @@ from pathlib import Path from rohmu.common.models import StorageOperation from rohmu.common.statsd import StatsdConfig -from rohmu.errors import ConcurrentUploadError, FileNotFoundFromStorageError, InvalidConfigurationError, StorageError +from rohmu.errors import ( + ConcurrentUploadError, + FileNotFoundFromStorageError, + InvalidConfigurationError, + StorageError, + TransferObjectStoreInitializationError, + TransferObjectStoreMissingError, + TransferObjectStorePermissionError, +) from rohmu.notifier.interface import Notifier from rohmu.object_storage.base import ( BaseTransfer, @@ -124,8 +132,14 @@ def __init__( aws_session_token: Optional[str] = None, use_dualstack_endpoint: Optional[bool] = True, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) self.bucket_name = bucket_name self.region = region self.aws_access_key_id = aws_access_key_id @@ -153,9 +167,34 @@ def __init__( self.location = self.region if not self.is_verify_tls and self.cert_path is not None: raise ValueError("cert_path is set but is_verify_tls is False") - self.check_or_create_bucket() + if ensure_object_store_available: + self._create_object_store_if_needed_unwrapped() self.log.debug("S3Transfer initialized") + def _verify_object_storage_unwrapped(self) -> None: + self.check_or_create_bucket(create_if_needed=False) + + def verify_object_storage(self) -> None: + try: + self._verify_object_storage_unwrapped() + except botocore.exceptions.ClientError as ex: + if ex.response.get("Error", {}).get("Code") == "AccessDenied": + raise TransferObjectStorePermissionError() from ex + else: + raise TransferObjectStoreInitializationError() from ex + + def _create_object_store_if_needed_unwrapped(self) -> None: + self.check_or_create_bucket(create_if_needed=True) + + def create_object_store_if_needed(self) -> None: + try: + self._create_object_store_if_needed_unwrapped() + except botocore.exceptions.ClientError as ex: + if ex.response.get("Error", {}).get("Code") == "AccessDenied": + raise TransferObjectStorePermissionError() from ex + else: + raise TransferObjectStoreInitializationError() from ex + def get_client(self) -> S3Client: if self.s3_client is None: timeouts: dict[str, Any] = {} @@ -586,8 +625,7 @@ def store_file_object( progress_fn=self._proportional_to_incremental_progress(upload_progress_fn), ) - def check_or_create_bucket(self) -> None: - create_bucket = False + def check_or_create_bucket(self, create_if_needed: bool = True) -> None: self.stats.operation(StorageOperation.head_request) try: self.get_client().head_bucket(Bucket=self.bucket_name) @@ -598,24 +636,27 @@ def check_or_create_bucket(self) -> None: raise InvalidConfigurationError(f"Wrong region for bucket {self.bucket_name}, check configuration") elif status_code == HTTPStatus.FORBIDDEN: # Access denied on bucket check, most likely due to missing s3:ListBucket, assuming write permissions - pass + return elif status_code in {HTTPStatus.BAD_REQUEST, HTTPStatus.NOT_FOUND}: - create_bucket = True + if not create_if_needed: + raise TransferObjectStoreMissingError() else: raise + else: + # Bucket exists - bail out + return - if create_bucket: - self.log.debug("Creating bucket: %r in location: %r", self.bucket_name, self.region) - args: dict[str, Any] = { - "Bucket": self.bucket_name, + self.log.debug("Creating bucket: %r in location: %r", self.bucket_name, self.region) + args: dict[str, Any] = { + "Bucket": self.bucket_name, + } + if self.location: + args["CreateBucketConfiguration"] = { + "LocationConstraint": self.location, } - if self.location: - args["CreateBucketConfiguration"] = { - "LocationConstraint": self.location, - } - self.stats.operation(StorageOperation.create_bucket) - self.get_client().create_bucket(**args) + self.stats.operation(StorageOperation.create_bucket) + self.get_client().create_bucket(**args) def create_concurrent_upload( self, diff --git a/rohmu/object_storage/sftp.py b/rohmu/object_storage/sftp.py index 97cdc0ad..8695ad50 100644 --- a/rohmu/object_storage/sftp.py +++ b/rohmu/object_storage/sftp.py @@ -41,8 +41,14 @@ def __init__( prefix: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) self.server = server self.port = port self.username = username @@ -69,6 +75,18 @@ def __init__( self.log.debug("SFTPTransfer initialized") + def _verify_object_storage_unwrapped(self) -> None: + """No-op for now. Eventually, the SFTP connection could be tested here instead of in the constructor.""" + + def verify_object_storage(self) -> None: + """No-op for now. Eventually, the SFTP connection could be tested here instead of in the constructor.""" + + def _create_object_store_if_needed_unwrapped(self) -> None: + """No-op as it's not applicable to SFTP transfers""" + + def create_object_store_if_needed(self) -> None: + """No-op as it's not applicable to SFTP transfers""" + def get_contents_to_fileobj( self, key: str, diff --git a/rohmu/object_storage/swift.py b/rohmu/object_storage/swift.py index dfa6650d..81efa12c 100644 --- a/rohmu/object_storage/swift.py +++ b/rohmu/object_storage/swift.py @@ -8,7 +8,11 @@ from contextlib import suppress from rohmu.common.statsd import StatsdConfig from rohmu.dates import parse_timestamp -from rohmu.errors import FileNotFoundFromStorageError +from rohmu.errors import ( + FileNotFoundFromStorageError, + TransferObjectStoreInitializationError, + TransferObjectStoreMissingError, +) from rohmu.notifier.interface import Notifier from rohmu.object_storage.base import ( BaseTransfer, @@ -80,10 +84,16 @@ def __init__( endpoint_type: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, + ensure_object_store_available: bool = True, ) -> None: prefix = prefix.lstrip("/") if prefix else "" - super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info) - self.container_name = container_name + super().__init__( + prefix=prefix, + notifier=notifier, + statsd_info=statsd_info, + ensure_object_store_available=ensure_object_store_available, + ) + self.container = self.container_name = container_name if auth_version == "3.0": os_options = { @@ -108,10 +118,29 @@ def __init__( self.conn = client.Connection( user=user, key=key, authurl=auth_url, tenant_name=tenant_name, auth_version=auth_version, os_options=os_options ) - self.container = self.get_or_create_container(self.container_name) + if ensure_object_store_available: + self._create_object_store_if_needed_unwrapped() self.segment_size = segment_size self.log.debug("SwiftTransfer initialized") + def _verify_object_storage_unwrapped(self) -> None: + self.get_or_create_container(self.container_name, create_if_needed=False) + + def verify_object_storage(self) -> None: + try: + self._verify_object_storage_unwrapped() + except exceptions.ClientException as ex: + raise TransferObjectStoreInitializationError() from ex + + def _create_object_store_if_needed_unwrapped(self) -> None: + self.get_or_create_container(self.container_name, create_if_needed=True) + + def create_object_store_if_needed(self) -> None: + try: + self._create_object_store_if_needed_unwrapped() + except exceptions.ClientException as ex: + raise TransferObjectStoreInitializationError() from ex + @staticmethod def _headers_to_metadata(headers: dict[str, str]) -> Metadata: return {name[len("x-object-meta-") :]: value for name, value in headers.items() if name.startswith("x-object-meta-")} @@ -250,12 +279,19 @@ def get_file_size(self, key: str) -> int: # PGHoard itself, this is only called by external apps that utilize PGHoard's object storage abstraction. raise NotImplementedError - def get_or_create_container(self, container_name: str) -> str: + def get_or_create_container(self, container_name: str, create_if_needed: bool = True) -> str: start_time = time.monotonic() try: self.conn.get_container(container_name, headers={}, limit=1) # Limit 1 here to not traverse the entire folder except exceptions.ClientException as ex: if ex.http_status == 404: + if not create_if_needed: + self.log.debug( + "Will not create container: %r, check took: %.3fs", + container_name, + time.monotonic() - start_time, + ) + raise TransferObjectStoreMissingError() self.conn.put_container(container_name, headers={}) self.log.debug( "Created container: %r successfully, took: %.3fs", diff --git a/rohmu/transfer_pool.py b/rohmu/transfer_pool.py index 784d1149..f438204b 100644 --- a/rohmu/transfer_pool.py +++ b/rohmu/transfer_pool.py @@ -135,7 +135,9 @@ def __getattribute__(self, attr: str) -> Any: return super().__getattribute__(attr) @classmethod - def from_model(cls, model: StorageModel, notifier: Optional[Notifier] = None) -> Self: + def from_model( + cls, model: StorageModel, notifier: Optional[Notifier] = None, ensure_object_store_available: bool = True + ) -> Self: raise InvalidTransferError("You should not call class methods on SafeTransfer instances") def return_to_pool(self) -> None: diff --git a/test/object_storage/test_google.py b/test/object_storage/test_google.py index 1b57754e..11d0644b 100644 --- a/test/object_storage/test_google.py +++ b/test/object_storage/test_google.py @@ -3,23 +3,27 @@ from contextlib import ExitStack from datetime import datetime, timezone +from googleapiclient.errors import HttpError from googleapiclient.http import MediaUploadProgress from io import BytesIO +from rohmu import InvalidConfigurationError from rohmu.common.models import StorageOperation -from rohmu.errors import InvalidByteRangeError +from rohmu.errors import InvalidByteRangeError, TransferObjectStoreMissingError, TransferObjectStorePermissionError from rohmu.object_storage.base import IterKeyItem from rohmu.object_storage.google import GoogleTransfer, MediaIoBaseDownloadWithByteRange, Reporter from tempfile import NamedTemporaryFile from unittest.mock import ANY, call, MagicMock, Mock, patch import base64 +import googleapiclient.errors +import httplib2 import pytest def test_close() -> None: with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) mock_gs = Mock() stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._init_google_client", return_value=mock_gs)) transfer = GoogleTransfer( @@ -36,11 +40,94 @@ def test_close() -> None: assert transfer.gs is None +def _mock_403_response_from_google_api() -> Exception: + resp = httplib2.Response({"status": "403", "reason": "Unused"}) + uri = "https://storage.googleapis.com/storage/v1/b?project=project&alt=json" + content = ( + b'{\n "error": {\n "code": 403,\n "message": "account@project.iam.gserviceaccount.com does not have stor' + b"age.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource " + b'(or it may not exist).",\n "errors": [\n {\n "message": "account@project.iam.gserviceaccount.com ' + b"does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' " + b'denied on resource (or it may not exist).",\n "domain": "global",\n "reason": "forbidden"' + b"\n }\n ]\n }\n}\n" + ) + return googleapiclient.errors.HttpError(resp, content, uri) + + +def _mock_404_response_from_google_api() -> Exception: + resp = httplib2.Response({"status": "404", "reason": "Unused"}) + uri = "https://storage.googleapis.com/storage/v1/b?project=project&alt=json" + content = b"""{"error": {"code": 404, "message": "Does not matter"}}""" + return googleapiclient.errors.HttpError(resp, content, uri) + + +@pytest.mark.parametrize( + "ensure_object_store_available,bucket_exists,sabotage_create,expect_create_call", + [ + # Happy path + pytest.param(True, True, False, False, id="happy-path-exists"), + pytest.param(True, False, False, True, id="happy-path-not-exists"), + # Happy path - without attempting to create buckets + pytest.param(False, True, False, False, id="no-create-exists"), + pytest.param(False, False, False, False, id="no-create-not-exists"), + # 403 failures when trying to create should not matter with ensure_object_store_available=False + pytest.param(False, False, True, False, id="error-behaviour"), + # 403 failures when trying to create should crash with ensure_object_store_available=False + pytest.param(True, False, True, True, id="graceful-403-handling"), + ], +) +def test_handle_missing_bucket( + ensure_object_store_available: bool, bucket_exists: bool, sabotage_create: bool, expect_create_call: bool +) -> None: + """ + As part of having nicer exception handling for bucket initialization, we need to make sure the behaviour is unchanged + when the backwards-compatibility-flag ensure_object_store_available is set. + """ + # Sanity check: We expect a call to the create function when in "legacy mode" and the bucket is missing + assert expect_create_call == (ensure_object_store_available and not bucket_exists) + + with ExitStack() as stack: + stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) + + _try_get_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_get_bucket")) + if not bucket_exists: + # If the bucket exists, the return value is ignored. This simulates a missing bucket. + _try_get_bucket.side_effect = _mock_404_response_from_google_api() + + _try_create_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_create_bucket")) + if sabotage_create: + _try_create_bucket.side_effect = _mock_403_response_from_google_api() + + if expect_create_call and sabotage_create: + with pytest.raises(googleapiclient.errors.HttpError): + _ = GoogleTransfer( + project_id="test-project-id", + bucket_name="test-bucket", + ensure_object_store_available=ensure_object_store_available, + ) + else: + GoogleTransfer( + project_id="test-project-id", + bucket_name="test-bucket", + ensure_object_store_available=ensure_object_store_available, + ) + + if ensure_object_store_available: + _try_get_bucket.assert_called_once() + else: + _try_get_bucket.assert_not_called() + + if expect_create_call: + _try_create_bucket.assert_called_once() + else: + _try_create_bucket.assert_not_called() + + def test_store_file_from_memory() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) upload = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._upload")) transfer = GoogleTransfer( project_id="test-project-id", @@ -62,7 +149,7 @@ def test_store_file_from_disk() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) upload = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._upload")) transfer = GoogleTransfer( @@ -88,7 +175,7 @@ def test_store_file_object() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) upload = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._upload")) transfer = GoogleTransfer( project_id="test-project-id", @@ -113,7 +200,7 @@ def test_upload_size_unknown_to_reporter() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) mock_retry = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._retry_on_reset")) stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._object_client")) mock_operation = stack.enter_context(patch("rohmu.common.statsd.StatsClient.operation")) @@ -152,7 +239,7 @@ def test_get_contents_to_fileobj_raises_error_on_invalid_byte_range() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) transfer = GoogleTransfer( project_id="test-project-id", bucket_name="test-bucket", @@ -238,7 +325,7 @@ def test_object_listed_when_missing_md5hash_size_and_updated() -> None: notifier = MagicMock() with ExitStack() as stack: stack.enter_context(patch("rohmu.object_storage.google.get_credentials")) - stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket")) + stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped")) mock_operation = stack.enter_context(patch("rohmu.common.statsd.StatsClient.operation")) transfer = GoogleTransfer( project_id="test-project-id", @@ -311,3 +398,57 @@ def test_object_listed_when_missing_md5hash_size_and_updated() -> None: ] assert len(got) == len(expected) assert got == expected + + +def test_error_handling() -> None: + with patch("rohmu.object_storage.google.get_credentials"): + transfer = GoogleTransfer( + project_id="test-project-id", + bucket_name="test-bucket", + ensure_object_store_available=False, + ) + + with patch("rohmu.object_storage.google.GoogleTransfer._try_get_bucket") as _try_get_bucket: + # Unexpected exceptions bubble up + _try_get_bucket.side_effect = RuntimeError("Bad unexpected error") + with pytest.raises(RuntimeError, match="Bad unexpected error"): + transfer.verify_object_storage() + + # Bucket not found is wrapped with our own exception + _try_get_bucket.side_effect = _mock_404_response_from_google_api() + with pytest.raises(TransferObjectStoreMissingError): + transfer.verify_object_storage() + + # Permission error when checking for bucket existence is also our own exception... + _try_get_bucket.side_effect = _mock_403_response_from_google_api() + with pytest.raises(TransferObjectStorePermissionError): + transfer.verify_object_storage() + + with ExitStack() as stack: + _try_get_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_get_bucket")) + _try_create_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_create_bucket")) + # ... and the legacy behaviour of raising InvalidConfigurationError should not regress + _try_get_bucket.side_effect = _mock_403_response_from_google_api() + with pytest.raises(InvalidConfigurationError): + transfer._create_object_store_if_needed_unwrapped() + _try_create_bucket.assert_not_called() + + with ExitStack() as stack: + _try_get_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_get_bucket")) + _try_create_bucket = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._try_create_bucket")) + # Simulate a missing bucket to make it attempt to create + _try_get_bucket.side_effect = _mock_404_response_from_google_api() + + # Unexpected exceptions bubble up + _try_create_bucket.side_effect = RuntimeError("Bad unexpected error") + with pytest.raises(RuntimeError, match="Bad unexpected error"): + transfer.create_object_store_if_needed() + + # Permission errors when trying to create the bucket is wrapped with our own exception + _try_create_bucket.side_effect = _mock_403_response_from_google_api() + with pytest.raises(TransferObjectStorePermissionError): + transfer.create_object_store_if_needed() + + # ... and the legacy behaviour of bubbling up should not regress + with pytest.raises(HttpError, match="403"): + transfer._create_object_store_if_needed_unwrapped() diff --git a/test/test_factory.py b/test/test_factory.py index 9ac59418..8cf8fc86 100644 --- a/test/test_factory.py +++ b/test/test_factory.py @@ -54,7 +54,9 @@ def test_get_transfer_s3( transfer_object.get_client() mock_config_model.assert_called_once_with(**expected_config_arg) - mock_from_model.assert_called_once_with(mock_config_model(), mock_notifier.return_value) + mock_from_model.assert_called_once_with( + mock_config_model(), mock_notifier.return_value, ensure_object_store_available=True + ) mock_notifier.assert_called_once_with(url=config["notifier"]["url"]) assert transfer_object.bucket_name == "dummy-bucket" mock_botocore_config.assert_called_once_with(**expected_botocore_config)