Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Blob Storage Resource #27394

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Resources
.. autoconfigurable:: dagster_azure.adls2.FakeADLS2Resource
:annotation: ResourceDefinition

.. autoconfigurable:: dagster_azure.blob.AzureBlobStorageResource
:annotation: ResourceDefinition

.. autoclass:: dagster_azure.blob.AzureBlobComputeLogManager


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@
AzureBlobComputeLogManager as AzureBlobComputeLogManager,
)
from dagster_azure.blob.fake_blob_client import FakeBlobServiceClient as FakeBlobServiceClient
from dagster_azure.blob.resources import (
AzureBlobStorageAnonymousCredential as AzureBlobStorageAnonymousCredential,
AzureBlobStorageDefaultCredential as AzureBlobStorageDefaultCredential,
AzureBlobStorageKeyCredential as AzureBlobStorageKeyCredential,
AzureBlobStorageResource as AzureBlobStorageResource,
AzureBlobStorageSASTokenCredential as AzureBlobStorageSASTokenCredential,
)
from dagster_azure.blob.utils import create_blob_client as create_blob_client
126 changes: 126 additions & 0 deletions python_modules/libraries/dagster-azure/dagster_azure/blob/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, Literal, Union

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from dagster import Config, ConfigurableResource
from pydantic import Field


class AzureBlobStorageSASTokenCredential(Config):
"""Authentication using an azure SAS token."""

credential_type: Literal["sas"] = "sas"

token: str
"an azure SAS token"


class AzureBlobStorageKeyCredential(Config):
"""Authentication using an azure shared-key."""

credential_type: Literal["key"] = "key"
key: str
"an azure shared-key"


class AzureBlobStorageDefaultCredential(Config):
"""Authenticate using azure.identity.DefaultAzureCredential."""

credential_type: Literal["default_azure_credential"] = "default_azure_credential"

kwargs: dict[str, Any] = {}
"additional arguments to be passed to azure.identity.DefaultAzureCredential."
' e.g. AzureBlobStorageDefaultCredential(kwargs={"exclude_environment_credential": True})'


class AzureBlobStorageAnonymousCredential(Config):
"""For anonymous access to azure blob storage."""

credential_type: Literal["anonymous"] = "anonymous"


class AzureBlobStorageResource(ConfigurableResource):
"""Resource for interacting with Azure Blob Storage.

Examples:
.. code-block:: python

import os
from dagster import Definitions, asset, EnvVar
from dagster_azure.blob import (
AzureBlobStorageResource,
AzureBlobStorageKeyCredential,
AzureBlobStorageDefaultCredential
)

@asset
def my_table(azure_blob_storage: AzureBlobStorageResource):
with azure_blob_storage.get_client() as blob_storage_client:
response = blob_storage_client.list_containers()

defs = Definitions(
assets=[my_table],
resources={
"azure_blob_storage": AzureBlobStorageResource(
account_url=EnvVar("AZURE_BLOB_STORAGE_ACCOUNT_URL"),
credential=AzureBlobStorageDefaultCredential() if os.getenv("DEV") else
AzureBlobStorageKeyCredential(key=EnvVar("AZURE_BLOB_STORAGE_KEY"))
),
},
)

"""

account_url: str = Field(
description=(
"The URL to the blob storage account. Any other entities included"
" in the URL path (e.g. container or blob) will be discarded. This URL can be optionally"
" authenticated with a SAS token."
),
)

credential: Union[
AzureBlobStorageKeyCredential,
AzureBlobStorageSASTokenCredential,
AzureBlobStorageDefaultCredential,
AzureBlobStorageAnonymousCredential,
] = Field(
discriminator="credential_type",
description=(
"The credential used to authenticate to the storage account. One of:"
" AzureBlobStorageSASTokenCredential,"
" AzureBlobStorageKeyCredential,"
" AzureBlobStorageDefaultCredential,"
" AzureBlobStorageAnonymousCredential"
),
)

@classmethod
def _is_dagster_maintained(cls):
return True

def _raw_credential(self) -> Any:
if self.credential.credential_type == "sas":
return self.credential.token
if self.credential.credential_type == "key":
return self.credential.key
if self.credential.credential_type == "default_azure_credential":
return DefaultAzureCredential(**self.credential.kwargs)
if self.credential.credential_type == "anonymous":
return None
raise Exception(
"Invalid credential type - use one of AzureBlobStorageKeyCredential, "
" AzureBlobStorageSASTokenCredential, AzureBlobStorageDefaultCredential,"
" AzureBlobStorageAnonymousCredential"
)

@contextmanager
def get_client(self) -> Generator[BlobServiceClient, None, None]:
service = BlobServiceClient(account_url=self.account_url, credential=self._raw_credential())

try:
yield service
finally:
service.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from unittest.mock import MagicMock, patch

from dagster import asset, materialize_to_memory
from dagster_azure.blob import (
AzureBlobStorageAnonymousCredential,
AzureBlobStorageDefaultCredential,
AzureBlobStorageKeyCredential,
AzureBlobStorageResource,
AzureBlobStorageSASTokenCredential,
)

ACCOUNT_URL = "https://example-account.blob.core.windows.net"
SAS_TOKEN = "example-sas-token"
SHARED_KEY = "example-shared-key"


@patch("dagster_azure.blob.resources.BlobServiceClient")
def test_resource_sas_credential(mock_blob_service_client):
@asset
def az_asset(azure_blob_resource: AzureBlobStorageResource):
with azure_blob_resource.get_client():
mock_blob_service_client.assert_called_once_with(
account_url=ACCOUNT_URL, credential=SAS_TOKEN
)

result = materialize_to_memory(
[az_asset],
resources={
"azure_blob_resource": AzureBlobStorageResource(
account_url=ACCOUNT_URL,
credential=AzureBlobStorageSASTokenCredential(token=SAS_TOKEN),
)
},
)

assert result.success


@patch("dagster_azure.blob.resources.BlobServiceClient")
def test_resource_shared_key_credential(mock_blob_service_client):
@asset
def az_asset(azure_blob_resource: AzureBlobStorageResource):
with azure_blob_resource.get_client():
mock_blob_service_client.assert_called_once_with(
account_url=ACCOUNT_URL, credential=SHARED_KEY
)

result = materialize_to_memory(
[az_asset],
resources={
"azure_blob_resource": AzureBlobStorageResource(
account_url=ACCOUNT_URL, credential=AzureBlobStorageKeyCredential(key=SHARED_KEY)
)
},
)

assert result.success


@patch("dagster_azure.blob.resources.BlobServiceClient")
@patch("dagster_azure.blob.resources.DefaultAzureCredential")
def test_resource_default_credential(mock_default_credential_method, mock_blob_service_client):
mock_default_credential = MagicMock()
mock_default_credential_method.return_value = mock_default_credential

@asset
def az_asset(azure_blob_resource: AzureBlobStorageResource):
with azure_blob_resource.get_client():
mock_blob_service_client.assert_called_once_with(
account_url=ACCOUNT_URL, credential=mock_default_credential
)

result = materialize_to_memory(
[az_asset],
resources={
"azure_blob_resource": AzureBlobStorageResource(
account_url=ACCOUNT_URL, credential=AzureBlobStorageDefaultCredential()
)
},
)

assert result.success


@patch("dagster_azure.blob.resources.BlobServiceClient")
def test_resource_anonymous_credential(mock_blob_service_client):
@asset
def az_asset(azure_blob_resource: AzureBlobStorageResource):
with azure_blob_resource.get_client():
mock_blob_service_client.assert_called_once_with(
account_url=ACCOUNT_URL, credential=None
)

result = materialize_to_memory(
[az_asset],
resources={
"azure_blob_resource": AzureBlobStorageResource(
account_url=ACCOUNT_URL, credential=AzureBlobStorageAnonymousCredential()
)
},
)

assert result.success