Skip to content

Commit

Permalink
Merge pull request #190 from Aiven-Open/jad-elkik-handle-bucket-creat…
Browse files Browse the repository at this point in the history
…ion-failure

Handle deleted buckets gracefully
  • Loading branch information
Prime541 authored Aug 19, 2024
2 parents 9358041 + 31bfb06 commit 6b695f1
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 96 deletions.
12 changes: 12 additions & 0 deletions rohmu/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ class InvalidConfigurationError(Error):
"""Invalid configuration"""


class TransferObjectStoreInitializationError(Error):
"""Raised when a transient network or permission issue does not allow us to validate access to the object store"""


class TransferObjectStorePermissionError(TransferObjectStoreInitializationError):
"""Raised when a permission issue does not allow us to validate access to the object store"""


class TransferObjectStoreMissingError(TransferObjectStoreInitializationError):
"""Raised when we know for sure the bucket is missing"""


class LocalFileIsRemoteFileError(StorageError):
"""File transfer operation source and destination point to the same file"""

Expand Down
12 changes: 8 additions & 4 deletions rohmu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ def get_transfer_model(storage_config: Config) -> StorageModel:
return storage_class.config_model(**storage_config)


def get_transfer(storage_config: Config) -> BaseTransfer[Any]:
def get_transfer(storage_config: Config, ensure_object_store_available: bool = True) -> BaseTransfer[Any]:
storage_config = storage_config.copy()
notifier_config = storage_config.pop("notifier", None)
notifier = None
if notifier_config is not None:
notifier = get_notifier(notifier_config)
model = get_transfer_model(storage_config)
return get_transfer_from_model(model, notifier)
return get_transfer_from_model(model, notifier, ensure_object_store_available=ensure_object_store_available)


def get_transfer_from_model(model: StorageModelT, notifier: Optional[Notifier] = None) -> BaseTransfer[StorageModelT]:
def get_transfer_from_model(
model: StorageModelT,
notifier: Optional[Notifier] = None,
ensure_object_store_available: bool = True,
) -> BaseTransfer[StorageModelT]:
storage_class = get_class_for_storage_driver(model.storage_type)
return storage_class.from_model(model, notifier)
return storage_class.from_model(model, notifier, ensure_object_store_available=ensure_object_store_available)


def _to_storage_driver(storage_type: str) -> StorageDriver:
Expand Down
96 changes: 80 additions & 16 deletions rohmu/object_storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from azure.core.exceptions import HttpResponseError, ResourceExistsError
from azure.storage.blob import BlobServiceClient, ContentSettings
from rohmu.common.statsd import StatsdConfig
from rohmu.errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
from rohmu.errors import (
FileNotFoundFromStorageError,
InvalidConfigurationError,
StorageError,
TransferObjectStoreInitializationError,
TransferObjectStoreMissingError,
TransferObjectStorePermissionError,
)
from rohmu.notifier.interface import Notifier
from rohmu.object_storage.base import (
BaseTransfer,
Expand Down Expand Up @@ -61,15 +68,21 @@ def __init__(
proxy_info: Optional[dict[str, Union[str, int]]] = None,
notifier: Optional[Notifier] = None,
statsd_info: Optional[StatsdConfig] = None,
ensure_object_store_available: bool = True,
) -> None:
prefix = prefix.lstrip("/") if prefix else ""
super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info)
super().__init__(
prefix=prefix,
notifier=notifier,
statsd_info=statsd_info,
ensure_object_store_available=ensure_object_store_available,
)
if not account_key and not sas_token:
raise InvalidConfigurationError("One of account_key or sas_token must be specified to authenticate")

self.account_name = account_name
self.account_key = account_key
self.container_name = bucket_name
self.container = self.container_name = bucket_name
self.sas_token = sas_token
self._conn_str = self.conn_string(
account_name=account_name,
Expand All @@ -95,9 +108,34 @@ def __init__(
schema = "http"
self._config["proxies"] = {"https": f"{schema}://{auth}{proxy_host}:{proxy_port}"}
self._blob_service_client: Optional[BlobServiceClient] = None
self.container = self.get_or_create_container(self.container_name)
if ensure_object_store_available:
self._create_object_store_if_needed_unwrapped()
self.log.debug("AzureTransfer initialized, %r", self.container_name)

def _verify_object_storage_unwrapped(self) -> None:
self.get_or_create_container(self.container_name, create_if_needed=False)

def verify_object_storage(self) -> None:
try:
self._verify_object_storage_unwrapped()
except HttpResponseError as ex:
if ex.status_code == 403:
raise TransferObjectStorePermissionError() from ex
else:
raise TransferObjectStoreInitializationError() from ex

def _create_object_store_if_needed_unwrapped(self) -> None:
self.get_or_create_container(self.container_name, create_if_needed=True)

def create_object_store_if_needed(self) -> None:
try:
self._create_object_store_if_needed_unwrapped()
except HttpResponseError as ex:
if ex.status_code == 403:
raise TransferObjectStorePermissionError() from ex
else:
raise TransferObjectStoreInitializationError() from ex

def get_blob_service_client(self) -> BlobServiceClient:
if self._blob_service_client is None:
self._blob_service_client = BlobServiceClient.from_connection_string(
Expand Down Expand Up @@ -168,7 +206,16 @@ def _copy_file_from_bucket(
if time.monotonic() - start < timeout:
time.sleep(0.1)
else:
destination_client.abort_copy(copy_props.id, timeout=timeout)
if copy_props.id is None:
# The description of CopyProperties tells us copy_id should not be None unless status is also None
# The type checker cannot know this, but we still log a warning if this ever happens
self.log.warning(
"Pending copy operation from %r to %r was missing a copy_id, will not be aborted after timeout",
source_key,
destination_key,
)
else:
destination_client.abort_copy(copy_props.id, timeout=timeout)
raise StorageError(
f"Copying {repr(source_key)} to {repr(destination_key)} did not complete in {timeout} seconds"
)
Expand Down Expand Up @@ -402,21 +449,38 @@ def progress_callback(pipeline_response: Any) -> None:
else:
delattr(fd, "tell")

def get_or_create_container(self, container_name: str) -> str:
def get_or_create_container(self, container_name: str, create_if_needed: bool = True) -> str:
if isinstance(container_name, enum.Enum):
# ensure that the enum value is used rather than the enum name
# https://github.com/Azure/azure-sdk-for-python/blob/azure-storage-blob_12.8.1/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_service_client.py#L667
container_name = container_name.value
start_time = time.monotonic()
try:
self.get_blob_service_client().create_container(container_name)
except ResourceExistsError:
pass
except HttpResponseError as e:
if "request is not authorized" in e.exc_msg:
self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name)
return container_name
else:
raise e
if create_if_needed:
try:
self.get_blob_service_client().create_container(container_name)
except ResourceExistsError:
pass
except HttpResponseError as e:
if "request is not authorized" in e.exc_msg:
# Corresponds to a 403/AuthorizationFailure
self.log.debug("Container creation unauthorized. Assuming container %r already exists", container_name)
return container_name
else:
raise e
else:
try:
container_exists = self.get_blob_service_client().get_container_client(container_name).exists()
except HttpResponseError as e:
if e.status_code == 403:
# We have to handle 403/AuthorizationFailure gracefully as container-scoped SAS tokens might have
# all permissions inside the container but won't have permissions on the container itself and we cannot
# assume we'd have an account-scoped SAS token or account key here
self.log.debug("Container metadata unauthorized. Assuming container %r already exists", container_name)
return container_name
raise
if not container_exists:
self.log.debug("Container %r does not exist, creation not requested", container_name)
raise TransferObjectStoreMissingError()
self.log.debug("Container %r already exists", container_name)
self.log.debug("Got/Created container: %r successfully, took: %.3fs", container_name, time.monotonic() - start_time)
return container_name
62 changes: 58 additions & 4 deletions rohmu/object_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from io import BytesIO
from rohmu.common.models import StorageModel
from rohmu.common.statsd import StatsClient, StatsdConfig
from rohmu.errors import FileNotFoundFromStorageError, InvalidByteRangeError, StorageError
from rohmu.errors import (
FileNotFoundFromStorageError,
InvalidByteRangeError,
StorageError,
)
from rohmu.notifier.interface import Notifier
from rohmu.notifier.null import NullNotifier
from rohmu.object_storage.config import StorageModelT
Expand Down Expand Up @@ -71,8 +75,18 @@ class BaseTransfer(Generic[StorageModelT]):
supports_concurrent_upload: bool = False

def __init__(
self, prefix: Optional[str], notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None
self,
prefix: Optional[str],
notifier: Optional[Notifier] = None,
statsd_info: Optional[StatsdConfig] = None,
ensure_object_store_available: bool = True,
) -> None:
"""
Initialize a Transfer instance. Although it's possible to do implementation-specific IO here, it's preferable to
delegate to verify_object_storage and create_object_store_if_needed.
The ensure_object_store_available flag is here to allow this transition in a backwards-compatible manner.
"""
self.log = logging.getLogger(self.__class__.__name__)
if not prefix:
prefix = ""
Expand All @@ -82,6 +96,40 @@ def __init__(
self.notifier = notifier or NullNotifier()
self.stats = StatsClient(statsd_info)

def _verify_object_storage_unwrapped(self) -> None:
"""
Perform read-only operations to verify the backing object store is available and accessible.
Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should
never be called directly.
"""
raise NotImplementedError

def verify_object_storage(self) -> None:
"""
Perform read-only operations to verify the backing object store is available and accessible.
Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.
"""
raise NotImplementedError

def _create_object_store_if_needed_unwrapped(self) -> None:
"""
Create the backing object store if it's needed (e.g. creating directories, buckets, etc.).
Raises the implementation-specific exception as-is. This is mainly useful for backwards-compatibility and should
never be called directly.
"""
raise NotImplementedError

def create_object_store_if_needed(self) -> None:
"""
Create the backing object store if it's needed (e.g. creating directories, buckets, etc.).
Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.
"""
raise NotImplementedError

def close(self) -> None:
"""Release all resources associated with the Transfer object."""
pass
Expand Down Expand Up @@ -138,8 +186,14 @@ def _should_multipart(
return int(size) > chunk_size

@classmethod
def from_model(cls, model: StorageModelT, notifier: Optional[Notifier] = None) -> Self:
return cls(**model.dict(by_alias=True, exclude={"storage_type"}), notifier=notifier)
def from_model(
cls, model: StorageModelT, notifier: Optional[Notifier] = None, ensure_object_store_available: bool = True
) -> Self:
return cls(
**model.dict(by_alias=True, exclude={"storage_type"}),
notifier=notifier,
ensure_object_store_available=ensure_object_store_available,
)

def copy_file(
self, *, source_key: str, destination_key: str, metadata: Optional[Metadata] = None, **_kwargs: Any
Expand Down
Loading

0 comments on commit 6b695f1

Please sign in to comment.