diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 92de6c544e412..5e533517fc1a2 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -63,6 +63,28 @@ class SigmaMaterializationStatus(str, enum.Enum): READY = "ready" +def build_folder_path_err(folder: Any, idx: int, param_name: str): + return ( + f"{param_name} at index {idx} is not a sequence: `{folder!r}`.\n" + "Paths should be specified as a list of folder names, starting from the root folder." + ) + + +def validate_folder_path_input(folder_input: Optional[Sequence[Sequence[str]]], param_name: str): + check.opt_sequence_param(folder_input, param_name, of_type=Sequence) + if folder_input: + for idx, folder in enumerate(folder_input): + check.invariant( + not isinstance(folder, str), + build_folder_path_err(folder, idx, param_name), + ) + check.is_iterable( + folder, + of_type=str, + additional_message=build_folder_path_err(folder, idx, param_name), + ) + + @record_custom class SigmaFilter(IHaveNew): """Filters the set of Sigma objects to fetch. @@ -71,21 +93,30 @@ class SigmaFilter(IHaveNew): workbook_folders (Optional[Sequence[Sequence[str]]]): A list of folder paths to fetch workbooks from. Each folder path is a list of folder names, starting from the root folder. All workbooks contained in the specified folders will be fetched. If not provided, all workbooks will be fetched. + workbooks (Optional[Sequence[Sequence[str]]]): A list of fully qualified workbook paths to fetch. + Each workbook path is a list of folder names, starting from the root folder, and ending + with the workbook name. If not provided, all workbooks will be fetched. include_unused_datasets (bool): Whether to include datasets that are not used in any workbooks. Defaults to True. """ workbook_folders: Optional[Sequence[Sequence[str]]] = None + workbooks: Optional[Sequence[Sequence[str]]] = None include_unused_datasets: bool = True def __new__( cls, workbook_folders: Optional[Sequence[Sequence[str]]] = None, + workbooks: Optional[Sequence[Sequence[str]]] = None, include_unused_datasets: bool = True, ): + validate_folder_path_input(workbook_folders, "workbook_folders") + validate_folder_path_input(workbooks, "workbooks") + return super().__new__( cls, workbook_folders=tuple([tuple(folder) for folder in workbook_folders or []]), + workbooks=tuple([tuple(workbook) for workbook in workbooks or []]), include_unused_datasets=include_unused_datasets, ) @@ -575,14 +606,21 @@ async def safe_fetch_lineage_for_element( async def _fetch_workbooks_and_filter(self, sigma_filter: SigmaFilter) -> List[Dict[str, Any]]: raw_workbooks = await self._fetch_workbooks() workbooks_to_fetch = [] - if sigma_filter.workbook_folders: + if sigma_filter.workbook_folders or sigma_filter.workbooks: workbook_filter_strings = [ - "/".join(folder).lower() for folder in sigma_filter.workbook_folders + "/".join(folder).lower() for folder in sigma_filter.workbook_folders or [] ] + workbook_strings = ["/".join(folder).lower() for folder in sigma_filter.workbooks or []] for workbook in raw_workbooks: - workbook_path = str(workbook["path"]).lower() - if any( - workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings + workbook_path_and_name = ( + f"{str(workbook['path']).lower()}/{workbook['name'].lower()}" + ) + if ( + any( + workbook_path_and_name.startswith(folder_str) + for folder_str in workbook_filter_strings + ) + or workbook_path_and_name in workbook_strings ): workbooks_to_fetch.append(workbook) else: diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index 5c3056ca2e229..b933ff62a95c6 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -99,8 +99,11 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) assert data.datasets[0].inputs == {"TESTDB.JAFFLE_SHOP.STG_ORDERS"} +@pytest.mark.parametrize("filter_type", ["workbook_folders", "workbook_names", "both"]) @responses.activate -def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data: None) -> None: +def test_model_organization_data_filter( + sigma_auth_token: str, sigma_sample_data: None, filter_type: str +) -> None: fake_client_id = uuid.uuid4().hex fake_client_secret = uuid.uuid4().hex @@ -110,6 +113,17 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data client_secret=fake_client_secret, ) + non_matching_filter = ( + {"workbook_folders": [("My Documents", "Test Folder")]} + if filter_type == "workbook_folders" + else {"workbooks": [("My Documents", "Test Folder", "Sample Workbook")]} + ) + matching_filter = ( + {"workbook_folders": [("My Documents", "My Subfolder")]} + if filter_type == "workbook_folders" + else {"workbooks": [("My Documents", "My Subfolder", "Sample Workbook")]} + ) + with mock.patch.object( SigmaOrganization, "_fetch_pages_for_workbook", @@ -117,7 +131,7 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data ) as mock_fetch_pages: data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]), + sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=True), fetch_column_data=True, fetch_lineage_data=True, ) @@ -130,10 +144,7 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter( - workbook_folders=[("My Documents", "Test Folder")], - include_unused_datasets=False, - ), + sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=False), fetch_column_data=True, fetch_lineage_data=True, ) @@ -146,10 +157,7 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter( - workbook_folders=[("My Documents", "My Subfolder")], - include_unused_datasets=False, - ), + sigma_filter=SigmaFilter(**matching_filter, include_unused_datasets=False), fetch_column_data=True, fetch_lineage_data=True, ) @@ -166,7 +174,9 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]), + sigma_filter=SigmaFilter( + workbook_folders=[("My Documents",)], workbooks=[("Does", "Not", "Exist")] + ), fetch_column_data=True, fetch_lineage_data=True, )