Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dagster_azure: do not import fake implementations by default #26754

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import io
import random
from typing import Any, Optional
from unittest import mock
import warnings

from dagster import resource
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method

from dagster_azure.adls2.utils import ResourceNotFoundError
from dagster_azure.blob import FakeBlobServiceClient
from dagster_azure.fakes.fake_adls2_resource import (
FakeADLS2FileClient as FakeADLS2FileClientBase,
FakeADLS2FileDownloader as FakeADLS2FileDownloaderBase,
FakeADLS2FilesystemClient as FakeADLS2FilesystemClientBase,
FakeADLS2Resource as FakeADLS2ResourceBase,
FakeADLS2ServiceClient as FakeADLS2ServiceClientBase,
FakeLeaseClient as FakeLeaseClientBase,
)


@dagster_maintained_resource
Expand All @@ -18,188 +19,56 @@ def fake_adls2_resource(context):
return FakeADLS2Resource(account_name=context.resource_config["account_name"])


class FakeADLS2Resource(ConfigurableResource):
DEPRECATION_WARNING = "Fake class imports from dagster_azure.adls2 are deprecated and will be removed in a next minor release (1.10.0). Please use dagster_azure.fakes path instead"


class FakeADLS2Resource(FakeADLS2ResourceBase):
"""Stateful mock of an ADLS2Resource for testing.

Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
"""

account_name: str
storage_account: Optional[str] = None

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

@property
@cached_method
def adls2_client(self) -> "FakeADLS2ServiceClient":
return FakeADLS2ServiceClient(self.account_name)
def __init__(self, **kwargs):
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(**kwargs)

@property
@cached_method
def blob_client(self) -> FakeBlobServiceClient:
return FakeBlobServiceClient(self.account_name)

@property
def lease_client_constructor(self) -> Any:
return FakeLeaseClient


class FakeLeaseClient:
class FakeLeaseClient(FakeLeaseClientBase):
def __init__(self, client):
self.client = client
self.id = None

# client needs a ref to self to check if a given lease is valid
self.client._lease = self # noqa: SLF001
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(client)

def acquire(self, lease_duration=-1):
if self.id is None:
self.id = random.randint(0, 2**9)
else:
raise Exception("Lease already held")

def release(self):
self.id = None

def is_valid(self, lease):
if self.id is None:
# no lease is held so any operation is valid
return True
return lease == self.id


class FakeADLS2ServiceClient:
class FakeADLS2ServiceClient(FakeADLS2ServiceClientBase):
"""Stateful mock of an ADLS2 service client for testing.

Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
"""

def __init__(self, account_name, credential="fake-creds"):
self._account_name = account_name
self._credential = mock.MagicMock()
self._credential.account_key = credential
self._file_systems = {}

@property
def account_name(self):
return self._account_name

@property
def credential(self):
return self._credential

@property
def file_systems(self):
return self._file_systems

def get_file_system_client(self, file_system):
return self._file_systems.setdefault(
file_system, FakeADLS2FilesystemClient(self.account_name, file_system)
)
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(account_name, credential)

def get_file_client(self, file_system, file_path):
return self.get_file_system_client(file_system).get_file_client(file_path)


class FakeADLS2FilesystemClient:
class FakeADLS2FilesystemClient(FakeADLS2FilesystemClientBase):
"""Stateful mock of an ADLS2 filesystem client for testing."""

def __init__(self, account_name, file_system_name):
self._file_system: dict[str, FakeADLS2FileClient] = {}
self._account_name = account_name
self._file_system_name = file_system_name

@property
def account_name(self):
return self._account_name

@property
def file_system_name(self):
return self._file_system_name

def keys(self):
return self._file_system.keys()

def get_file_system_properties(self):
return {"account_name": self.account_name, "file_system_name": self.file_system_name}

def has_file(self, path):
return bool(self._file_system.get(path))

def get_file_client(self, file_path):
# pass fileclient a ref to self and its name so the file can delete itself
self._file_system.setdefault(file_path, FakeADLS2FileClient(self, file_path))
return self._file_system[file_path]
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(account_name, file_system_name)

def create_file(self, file):
# pass fileclient a ref to self and the file's name so the file can delete itself by
# accessing the self._file_system dict
self._file_system.setdefault(file, FakeADLS2FileClient(fs_client=self, name=file))
return self._file_system[file]

def delete_file(self, file):
for k in list(self._file_system.keys()):
if k.startswith(file):
del self._file_system[k]


class FakeADLS2FileClient:
class FakeADLS2FileClient(FakeADLS2FileClientBase):
"""Stateful mock of an ADLS2 file client for testing."""

def __init__(self, name, fs_client):
self.name = name
self.contents = None
self._lease = None
self.fs_client = fs_client

@property
def lease(self):
return self._lease if self._lease is None else self._lease.id

def get_file_properties(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
lease_id = None if self._lease is None else self._lease.id
return {"lease": lease_id}

def upload_data(self, contents, overwrite=False, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
if self.contents is not None or overwrite is True:
if isinstance(contents, str):
self.contents = contents.encode("utf8")
elif isinstance(contents, io.BytesIO):
self.contents = contents.read()
elif isinstance(contents, io.StringIO):
self.contents = contents.read().encode("utf8")
elif isinstance(contents, bytes):
self.contents = contents
else:
self.contents = contents

def download_file(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
return FakeADLS2FileDownloader(contents=self.contents)

def delete_file(self, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
self.fs_client.delete_file(self.name)


class FakeADLS2FileDownloader:
"""Mock of an ADLS2 file downloader for testing."""
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(name, fs_client)

def __init__(self, contents):
self.contents = contents

def readall(self):
return self.contents
class FakeADLS2FileDownloader(FakeADLS2FileDownloaderBase):
"""Mock of an ADLS2 file downloader for testing."""

def readinto(self, fileobj):
fileobj.write(self.contents)
def __init__(self, contents):
warnings.warn(DEPRECATION_WARNING, DeprecationWarning, stacklevel=2)
super().__init__(contents)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dagster_azure.fakes.fake_adls2_resource import (
FakeADLS2Resource as FakeADLS2Resource,
FakeADLS2ServiceClient as FakeADLS2ServiceClient,
fake_adls2_resource as fake_adls2_resource,
)
Loading