Skip to content

Commit

Permalink
[dagster-sigma] Add ability to filter to specific workbooks (#26749)
Browse files Browse the repository at this point in the history
## Summary

Adds an option to the `SigmaFilter` which lets users filter to specific
workbooks rather than folders.

## How I Tested These Changes

New unit test.

## Changelog

> [dagster-sigma] Added the option to filter to specific workbooks in
addition to folders.
  • Loading branch information
benpankow authored Dec 31, 2024
1 parent 1665f69 commit 5a4e2c0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 16 deletions.
48 changes: 43 additions & 5 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ class SigmaMaterializationStatus(str, enum.Enum):
READY = "ready"


def build_folder_path_err(folder: Any, idx: int, param_name: str):
return (
f"{param_name} at index {idx} is not a sequence: `{folder!r}`.\n"
"Paths should be specified as a list of folder names, starting from the root folder."
)


def validate_folder_path_input(folder_input: Optional[Sequence[Sequence[str]]], param_name: str):
check.opt_sequence_param(folder_input, param_name, of_type=Sequence)
if folder_input:
for idx, folder in enumerate(folder_input):
check.invariant(
not isinstance(folder, str),
build_folder_path_err(folder, idx, param_name),
)
check.is_iterable(
folder,
of_type=str,
additional_message=build_folder_path_err(folder, idx, param_name),
)


@record_custom
class SigmaFilter(IHaveNew):
"""Filters the set of Sigma objects to fetch.
Expand All @@ -71,21 +93,30 @@ class SigmaFilter(IHaveNew):
workbook_folders (Optional[Sequence[Sequence[str]]]): A list of folder paths to fetch workbooks from.
Each folder path is a list of folder names, starting from the root folder. All workbooks
contained in the specified folders will be fetched. If not provided, all workbooks will be fetched.
workbooks (Optional[Sequence[Sequence[str]]]): A list of fully qualified workbook paths to fetch.
Each workbook path is a list of folder names, starting from the root folder, and ending
with the workbook name. If not provided, all workbooks will be fetched.
include_unused_datasets (bool): Whether to include datasets that are not used in any workbooks.
Defaults to True.
"""

workbook_folders: Optional[Sequence[Sequence[str]]] = None
workbooks: Optional[Sequence[Sequence[str]]] = None
include_unused_datasets: bool = True

def __new__(
cls,
workbook_folders: Optional[Sequence[Sequence[str]]] = None,
workbooks: Optional[Sequence[Sequence[str]]] = None,
include_unused_datasets: bool = True,
):
validate_folder_path_input(workbook_folders, "workbook_folders")
validate_folder_path_input(workbooks, "workbooks")

return super().__new__(
cls,
workbook_folders=tuple([tuple(folder) for folder in workbook_folders or []]),
workbooks=tuple([tuple(workbook) for workbook in workbooks or []]),
include_unused_datasets=include_unused_datasets,
)

Expand Down Expand Up @@ -575,14 +606,21 @@ async def safe_fetch_lineage_for_element(
async def _fetch_workbooks_and_filter(self, sigma_filter: SigmaFilter) -> List[Dict[str, Any]]:
raw_workbooks = await self._fetch_workbooks()
workbooks_to_fetch = []
if sigma_filter.workbook_folders:
if sigma_filter.workbook_folders or sigma_filter.workbooks:
workbook_filter_strings = [
"/".join(folder).lower() for folder in sigma_filter.workbook_folders
"/".join(folder).lower() for folder in sigma_filter.workbook_folders or []
]
workbook_strings = ["/".join(folder).lower() for folder in sigma_filter.workbooks or []]
for workbook in raw_workbooks:
workbook_path = str(workbook["path"]).lower()
if any(
workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings
workbook_path_and_name = (
f"{str(workbook['path']).lower()}/{workbook['name'].lower()}"
)
if (
any(
workbook_path_and_name.startswith(folder_str)
for folder_str in workbook_filter_strings
)
or workbook_path_and_name in workbook_strings
):
workbooks_to_fetch.append(workbook)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None)
assert data.datasets[0].inputs == {"TESTDB.JAFFLE_SHOP.STG_ORDERS"}


@pytest.mark.parametrize("filter_type", ["workbook_folders", "workbook_names", "both"])
@responses.activate
def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data: None) -> None:
def test_model_organization_data_filter(
sigma_auth_token: str, sigma_sample_data: None, filter_type: str
) -> None:
fake_client_id = uuid.uuid4().hex
fake_client_secret = uuid.uuid4().hex

Expand All @@ -110,14 +113,25 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data
client_secret=fake_client_secret,
)

non_matching_filter = (
{"workbook_folders": [("My Documents", "Test Folder")]}
if filter_type == "workbook_folders"
else {"workbooks": [("My Documents", "Test Folder", "Sample Workbook")]}
)
matching_filter = (
{"workbook_folders": [("My Documents", "My Subfolder")]}
if filter_type == "workbook_folders"
else {"workbooks": [("My Documents", "My Subfolder", "Sample Workbook")]}
)

with mock.patch.object(
SigmaOrganization,
"_fetch_pages_for_workbook",
wraps=resource._fetch_pages_for_workbook, # noqa: SLF001
) as mock_fetch_pages:
data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]),
sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=True),
fetch_column_data=True,
fetch_lineage_data=True,
)
Expand All @@ -130,10 +144,7 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "Test Folder")],
include_unused_datasets=False,
),
sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=False),
fetch_column_data=True,
fetch_lineage_data=True,
)
Expand All @@ -146,10 +157,7 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "My Subfolder")],
include_unused_datasets=False,
),
sigma_filter=SigmaFilter(**matching_filter, include_unused_datasets=False),
fetch_column_data=True,
fetch_lineage_data=True,
)
Expand All @@ -166,7 +174,9 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]),
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents",)], workbooks=[("Does", "Not", "Exist")]
),
fetch_column_data=True,
fetch_lineage_data=True,
)
Expand Down

0 comments on commit 5a4e2c0

Please sign in to comment.