From 70c2a5aeba057c4e786d1ba7f7fd834cc8af3065 Mon Sep 17 00:00:00 2001 From: Sergei Nikiforov Date: Sat, 28 Dec 2024 12:02:28 +0100 Subject: [PATCH] Add a new dagster_azure.fakes submodule where we include full fakes implementation. Issue a deprecation warning on imports of the code from the original path (subclassed ones) --- .../adls2/fake_adls2_resource.py | 197 +++-------------- .../dagster_azure/fakes/__init__.py | 5 + .../fakes/fake_adls2_resource.py | 205 ++++++++++++++++++ 3 files changed, 243 insertions(+), 164 deletions(-) create mode 100644 python_modules/libraries/dagster-azure/dagster_azure/fakes/__init__.py create mode 100644 python_modules/libraries/dagster-azure/dagster_azure/fakes/fake_adls2_resource.py diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/fake_adls2_resource.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/fake_adls2_resource.py index 682442d3314b8..8fdc8f8ed6cca 100644 --- a/python_modules/libraries/dagster-azure/dagster_azure/adls2/fake_adls2_resource.py +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/fake_adls2_resource.py @@ -1,15 +1,16 @@ -import io -import random -from typing import Any, Optional -from unittest import mock +import warnings from dagster import resource -from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._utils.cached_method import cached_method -from dagster_azure.adls2.utils import ResourceNotFoundError -from dagster_azure.blob import FakeBlobServiceClient +from dagster_azure.fakes.fake_adls2_resource import ( + FakeADLS2FileClient as FakeADLS2FileClientBase, + FakeADLS2FileDownloader as FakeADLS2FileDownloaderBase, + FakeADLS2FilesystemClient as FakeADLS2FilesystemClientBase, + FakeADLS2Resource as FakeADLS2ResourceBase, + FakeADLS2ServiceClient as FakeADLS2ServiceClientBase, + FakeLeaseClient as FakeLeaseClientBase, +) @dagster_maintained_resource @@ -18,188 +19,56 @@ def fake_adls2_resource(context): return FakeADLS2Resource(account_name=context.resource_config["account_name"]) -class FakeADLS2Resource(ConfigurableResource): +DEPRECATION_WARNING = "Fake class imports from dagster_azure.adls2 are deprecated and will be removed in a next minor release (1.10.0). Please use dagster_azure.fakes path instead" + + +class FakeADLS2Resource(FakeADLS2ResourceBase): """Stateful mock of an ADLS2Resource for testing. Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict. """ - account_name: str - storage_account: Optional[str] = None - - @classmethod - def _is_dagster_maintained(cls) -> bool: - return True - - @property - @cached_method - def adls2_client(self) -> "FakeADLS2ServiceClient": - return FakeADLS2ServiceClient(self.account_name) + def __init__(self, **kwargs): + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(**kwargs) - @property - @cached_method - def blob_client(self) -> FakeBlobServiceClient: - return FakeBlobServiceClient(self.account_name) - @property - def lease_client_constructor(self) -> Any: - return FakeLeaseClient - - -class FakeLeaseClient: +class FakeLeaseClient(FakeLeaseClientBase): def __init__(self, client): - self.client = client - self.id = None - - # client needs a ref to self to check if a given lease is valid - self.client._lease = self # noqa: SLF001 + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(client) - def acquire(self, lease_duration=-1): - if self.id is None: - self.id = random.randint(0, 2**9) - else: - raise Exception("Lease already held") - def release(self): - self.id = None - - def is_valid(self, lease): - if self.id is None: - # no lease is held so any operation is valid - return True - return lease == self.id - - -class FakeADLS2ServiceClient: +class FakeADLS2ServiceClient(FakeADLS2ServiceClientBase): """Stateful mock of an ADLS2 service client for testing. Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict. """ def __init__(self, account_name, credential="fake-creds"): - self._account_name = account_name - self._credential = mock.MagicMock() - self._credential.account_key = credential - self._file_systems = {} - - @property - def account_name(self): - return self._account_name - - @property - def credential(self): - return self._credential - - @property - def file_systems(self): - return self._file_systems - - def get_file_system_client(self, file_system): - return self._file_systems.setdefault( - file_system, FakeADLS2FilesystemClient(self.account_name, file_system) - ) + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(account_name, credential) - def get_file_client(self, file_system, file_path): - return self.get_file_system_client(file_system).get_file_client(file_path) - -class FakeADLS2FilesystemClient: +class FakeADLS2FilesystemClient(FakeADLS2FilesystemClientBase): """Stateful mock of an ADLS2 filesystem client for testing.""" def __init__(self, account_name, file_system_name): - self._file_system: dict[str, FakeADLS2FileClient] = {} - self._account_name = account_name - self._file_system_name = file_system_name - - @property - def account_name(self): - return self._account_name - - @property - def file_system_name(self): - return self._file_system_name - - def keys(self): - return self._file_system.keys() - - def get_file_system_properties(self): - return {"account_name": self.account_name, "file_system_name": self.file_system_name} - - def has_file(self, path): - return bool(self._file_system.get(path)) - - def get_file_client(self, file_path): - # pass fileclient a ref to self and its name so the file can delete itself - self._file_system.setdefault(file_path, FakeADLS2FileClient(self, file_path)) - return self._file_system[file_path] + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(account_name, file_system_name) - def create_file(self, file): - # pass fileclient a ref to self and the file's name so the file can delete itself by - # accessing the self._file_system dict - self._file_system.setdefault(file, FakeADLS2FileClient(fs_client=self, name=file)) - return self._file_system[file] - def delete_file(self, file): - for k in list(self._file_system.keys()): - if k.startswith(file): - del self._file_system[k] - - -class FakeADLS2FileClient: +class FakeADLS2FileClient(FakeADLS2FileClientBase): """Stateful mock of an ADLS2 file client for testing.""" def __init__(self, name, fs_client): - self.name = name - self.contents = None - self._lease = None - self.fs_client = fs_client - - @property - def lease(self): - return self._lease if self._lease is None else self._lease.id - - def get_file_properties(self): - if self.contents is None: - raise ResourceNotFoundError("File does not exist!") - lease_id = None if self._lease is None else self._lease.id - return {"lease": lease_id} - - def upload_data(self, contents, overwrite=False, lease=None): - if self._lease is not None: - if not self._lease.is_valid(lease): - raise Exception("Invalid lease!") - if self.contents is not None or overwrite is True: - if isinstance(contents, str): - self.contents = contents.encode("utf8") - elif isinstance(contents, io.BytesIO): - self.contents = contents.read() - elif isinstance(contents, io.StringIO): - self.contents = contents.read().encode("utf8") - elif isinstance(contents, bytes): - self.contents = contents - else: - self.contents = contents - - def download_file(self): - if self.contents is None: - raise ResourceNotFoundError("File does not exist!") - return FakeADLS2FileDownloader(contents=self.contents) - - def delete_file(self, lease=None): - if self._lease is not None: - if not self._lease.is_valid(lease): - raise Exception("Invalid lease!") - self.fs_client.delete_file(self.name) - - -class FakeADLS2FileDownloader: - """Mock of an ADLS2 file downloader for testing.""" + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(name, fs_client) - def __init__(self, contents): - self.contents = contents - def readall(self): - return self.contents +class FakeADLS2FileDownloader(FakeADLS2FileDownloaderBase): + """Mock of an ADLS2 file downloader for testing.""" - def readinto(self, fileobj): - fileobj.write(self.contents) + def __init__(self, contents): + warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2) + super().__init__(contents) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/fakes/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure/fakes/__init__.py new file mode 100644 index 0000000000000..10db43667b06f --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/fakes/__init__.py @@ -0,0 +1,5 @@ +from dagster_azure.fakes.fake_adls2_resource import ( + FakeADLS2Resource as FakeADLS2Resource, + FakeADLS2ServiceClient as FakeADLS2ServiceClient, + fake_adls2_resource as fake_adls2_resource, +) diff --git a/python_modules/libraries/dagster-azure/dagster_azure/fakes/fake_adls2_resource.py b/python_modules/libraries/dagster-azure/dagster_azure/fakes/fake_adls2_resource.py new file mode 100644 index 0000000000000..682442d3314b8 --- /dev/null +++ b/python_modules/libraries/dagster-azure/dagster_azure/fakes/fake_adls2_resource.py @@ -0,0 +1,205 @@ +import io +import random +from typing import Any, Optional +from unittest import mock + +from dagster import resource +from dagster._config.pythonic_config import ConfigurableResource +from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._utils.cached_method import cached_method + +from dagster_azure.adls2.utils import ResourceNotFoundError +from dagster_azure.blob import FakeBlobServiceClient + + +@dagster_maintained_resource +@resource({"account_name": str}) +def fake_adls2_resource(context): + return FakeADLS2Resource(account_name=context.resource_config["account_name"]) + + +class FakeADLS2Resource(ConfigurableResource): + """Stateful mock of an ADLS2Resource for testing. + + Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict. + """ + + account_name: str + storage_account: Optional[str] = None + + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + + @property + @cached_method + def adls2_client(self) -> "FakeADLS2ServiceClient": + return FakeADLS2ServiceClient(self.account_name) + + @property + @cached_method + def blob_client(self) -> FakeBlobServiceClient: + return FakeBlobServiceClient(self.account_name) + + @property + def lease_client_constructor(self) -> Any: + return FakeLeaseClient + + +class FakeLeaseClient: + def __init__(self, client): + self.client = client + self.id = None + + # client needs a ref to self to check if a given lease is valid + self.client._lease = self # noqa: SLF001 + + def acquire(self, lease_duration=-1): + if self.id is None: + self.id = random.randint(0, 2**9) + else: + raise Exception("Lease already held") + + def release(self): + self.id = None + + def is_valid(self, lease): + if self.id is None: + # no lease is held so any operation is valid + return True + return lease == self.id + + +class FakeADLS2ServiceClient: + """Stateful mock of an ADLS2 service client for testing. + + Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict. + """ + + def __init__(self, account_name, credential="fake-creds"): + self._account_name = account_name + self._credential = mock.MagicMock() + self._credential.account_key = credential + self._file_systems = {} + + @property + def account_name(self): + return self._account_name + + @property + def credential(self): + return self._credential + + @property + def file_systems(self): + return self._file_systems + + def get_file_system_client(self, file_system): + return self._file_systems.setdefault( + file_system, FakeADLS2FilesystemClient(self.account_name, file_system) + ) + + def get_file_client(self, file_system, file_path): + return self.get_file_system_client(file_system).get_file_client(file_path) + + +class FakeADLS2FilesystemClient: + """Stateful mock of an ADLS2 filesystem client for testing.""" + + def __init__(self, account_name, file_system_name): + self._file_system: dict[str, FakeADLS2FileClient] = {} + self._account_name = account_name + self._file_system_name = file_system_name + + @property + def account_name(self): + return self._account_name + + @property + def file_system_name(self): + return self._file_system_name + + def keys(self): + return self._file_system.keys() + + def get_file_system_properties(self): + return {"account_name": self.account_name, "file_system_name": self.file_system_name} + + def has_file(self, path): + return bool(self._file_system.get(path)) + + def get_file_client(self, file_path): + # pass fileclient a ref to self and its name so the file can delete itself + self._file_system.setdefault(file_path, FakeADLS2FileClient(self, file_path)) + return self._file_system[file_path] + + def create_file(self, file): + # pass fileclient a ref to self and the file's name so the file can delete itself by + # accessing the self._file_system dict + self._file_system.setdefault(file, FakeADLS2FileClient(fs_client=self, name=file)) + return self._file_system[file] + + def delete_file(self, file): + for k in list(self._file_system.keys()): + if k.startswith(file): + del self._file_system[k] + + +class FakeADLS2FileClient: + """Stateful mock of an ADLS2 file client for testing.""" + + def __init__(self, name, fs_client): + self.name = name + self.contents = None + self._lease = None + self.fs_client = fs_client + + @property + def lease(self): + return self._lease if self._lease is None else self._lease.id + + def get_file_properties(self): + if self.contents is None: + raise ResourceNotFoundError("File does not exist!") + lease_id = None if self._lease is None else self._lease.id + return {"lease": lease_id} + + def upload_data(self, contents, overwrite=False, lease=None): + if self._lease is not None: + if not self._lease.is_valid(lease): + raise Exception("Invalid lease!") + if self.contents is not None or overwrite is True: + if isinstance(contents, str): + self.contents = contents.encode("utf8") + elif isinstance(contents, io.BytesIO): + self.contents = contents.read() + elif isinstance(contents, io.StringIO): + self.contents = contents.read().encode("utf8") + elif isinstance(contents, bytes): + self.contents = contents + else: + self.contents = contents + + def download_file(self): + if self.contents is None: + raise ResourceNotFoundError("File does not exist!") + return FakeADLS2FileDownloader(contents=self.contents) + + def delete_file(self, lease=None): + if self._lease is not None: + if not self._lease.is_valid(lease): + raise Exception("Invalid lease!") + self.fs_client.delete_file(self.name) + + +class FakeADLS2FileDownloader: + """Mock of an ADLS2 file downloader for testing.""" + + def __init__(self, contents): + self.contents = contents + + def readall(self): + return self.contents + + def readinto(self, fileobj): + fileobj.write(self.contents)