From 434158986446d5c25122ddeb497bb64b3bb59d68 Mon Sep 17 00:00:00 2001 From: benpankow Date: Fri, 27 Dec 2024 15:16:35 -0600 Subject: [PATCH 1/2] [dagster-sigma] Add ability to filter to specific workbooks --- .../dagster-sigma/dagster_sigma/resource.py | 43 ++++++++++++++++++ .../dagster_sigma_tests/test_resource.py | 45 ++++++++++++++----- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 55d20e57968e9..affbe472185dd 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -63,6 +63,28 @@ class SigmaMaterializationStatus(str, enum.Enum): READY = "ready" +def build_folder_path_err(folder: Any, idx: int, param_name: str): + return ( + f"{param_name} at index {idx} is not a sequence: `{folder!r}`.\n" + "Paths should be specified as a list of folder names, starting from the root folder." + ) + + +def validate_folder_path_input(folder_input: Optional[Sequence[Sequence[str]]], param_name: str): + check.opt_sequence_param(folder_input, param_name, of_type=Sequence) + if folder_input: + for idx, folder in enumerate(folder_input): + check.invariant( + not isinstance(folder, str), + build_folder_path_err(folder, idx, param_name), + ) + check.is_iterable( + folder, + of_type=str, + additional_message=build_folder_path_err(folder, idx, param_name), + ) + + @record_custom class SigmaFilter(IHaveNew): """Filters the set of Sigma objects to fetch. @@ -71,21 +93,34 @@ class SigmaFilter(IHaveNew): workbook_folders (Optional[Sequence[Sequence[str]]]): A list of folder paths to fetch workbooks from. Each folder path is a list of folder names, starting from the root folder. All workbooks contained in the specified folders will be fetched. If not provided, all workbooks will be fetched. + workbooks (Optional[Sequence[Sequence[str]]]): A list of fully qualified workbook paths to fetch. + Each workbook path is a list of folder names, starting from the root folder, and ending + with the workbook name. If not provided, all workbooks will be fetched. include_unused_datasets (bool): Whether to include datasets that are not used in any workbooks. Defaults to True. """ workbook_folders: Optional[Sequence[Sequence[str]]] = None + workbooks: Optional[Sequence[Sequence[str]]] = None include_unused_datasets: bool = True def __new__( cls, workbook_folders: Optional[Sequence[Sequence[str]]] = None, + workbooks: Optional[Sequence[Sequence[str]]] = None, include_unused_datasets: bool = True, ): + validate_folder_path_input(workbook_folders, "workbook_folders") + validate_folder_path_input(workbooks, "workbooks") + check.invariant( + not (workbook_folders and workbooks), + "Only one of workbook_folders or workbooks can be provided", + ) + return super().__new__( cls, workbook_folders=tuple([tuple(folder) for folder in workbook_folders or []]), + workbooks=tuple([tuple(workbook) for workbook in workbooks or []]), include_unused_datasets=include_unused_datasets, ) @@ -579,6 +614,14 @@ async def _fetch_workbooks_and_filter(self, sigma_filter: SigmaFilter) -> List[D workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings ): workbooks_to_fetch.append(workbook) + elif sigma_filter.workbooks: + workbook_strings = ["/".join(folder).lower() for folder in sigma_filter.workbooks] + for workbook in raw_workbooks: + workbook_path_and_name = ( + f"{str(workbook['path']).lower()}/{str(workbook['name']).lower()}" + ) + if workbook_path_and_name in workbook_strings: + workbooks_to_fetch.append(workbook) else: workbooks_to_fetch = raw_workbooks return workbooks_to_fetch diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index 3e4777d2b39b1..d95e367775deb 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -95,8 +95,11 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) assert data.datasets[0].inputs == {"TESTDB.JAFFLE_SHOP.STG_ORDERS"} +@pytest.mark.parametrize("filter_type", ["workbook_folders", "workbook_names"]) @responses.activate -def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data: None) -> None: +def test_model_organization_data_filter( + sigma_auth_token: str, sigma_sample_data: None, filter_type: str +) -> None: fake_client_id = uuid.uuid4().hex fake_client_secret = uuid.uuid4().hex @@ -111,9 +114,16 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data "_fetch_pages_for_workbook", wraps=resource._fetch_pages_for_workbook, # noqa: SLF001 ) as mock_fetch_pages: + sigma_filter = ( + SigmaFilter( + workbook_folders=[("My Documents", "Test Folder")], + ) + if filter_type == "workbook_folders" + else SigmaFilter(workbooks=[("My Documents", "Test Folder", "Sample Workbook")]) + ) data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]), + sigma_filter=sigma_filter, fetch_column_data=True, ) ) @@ -123,12 +133,19 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data # We don't fetch the workbooks, so we shouldn't have made any calls assert len(mock_fetch_pages.mock_calls) == 0 + sigma_filter = ( + SigmaFilter( + workbook_folders=[("My Documents", "Test Folder")], include_unused_datasets=False + ) + if filter_type == "workbook_folders" + else SigmaFilter( + workbooks=[("My Documents", "Test Folder", "Sample Workbook")], + include_unused_datasets=False, + ) + ) data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter( - workbook_folders=[("My Documents", "Test Folder")], - include_unused_datasets=False, - ), + sigma_filter=sigma_filter, fetch_column_data=True, ) ) @@ -138,12 +155,20 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data # We still don't fetch the workbooks, so we shouldn't have made any calls assert len(mock_fetch_pages.mock_calls) == 0 + sigma_filter = ( + SigmaFilter( + workbook_folders=[("My Documents", "My Subfolder")], + include_unused_datasets=False, + ) + if filter_type == "workbook_folders" + else SigmaFilter( + workbooks=[("My Documents", "My Subfolder", "Sample Workbook")], + include_unused_datasets=False, + ) + ) data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter( - workbook_folders=[("My Documents", "My Subfolder")], - include_unused_datasets=False, - ), + sigma_filter=sigma_filter, fetch_column_data=True, ) ) From 5dc5e246519ebdd57417f92d30f8d3aba6add664 Mon Sep 17 00:00:00 2001 From: benpankow Date: Fri, 27 Dec 2024 15:30:04 -0600 Subject: [PATCH 2/2] update test --- .../dagster-sigma/dagster_sigma/resource.py | 27 ++++------ .../dagster_sigma_tests/test_resource.py | 51 +++++++------------ 2 files changed, 29 insertions(+), 49 deletions(-) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index affbe472185dd..28b4ce20cea45 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -112,10 +112,6 @@ def __new__( ): validate_folder_path_input(workbook_folders, "workbook_folders") validate_folder_path_input(workbooks, "workbooks") - check.invariant( - not (workbook_folders and workbooks), - "Only one of workbook_folders or workbooks can be provided", - ) return super().__new__( cls, @@ -604,23 +600,22 @@ async def safe_fetch_lineage_for_element( async def _fetch_workbooks_and_filter(self, sigma_filter: SigmaFilter) -> List[Dict[str, Any]]: raw_workbooks = await self._fetch_workbooks() workbooks_to_fetch = [] - if sigma_filter.workbook_folders: + if sigma_filter.workbook_folders or sigma_filter.workbooks: workbook_filter_strings = [ - "/".join(folder).lower() for folder in sigma_filter.workbook_folders + "/".join(folder).lower() for folder in sigma_filter.workbook_folders or [] ] - for workbook in raw_workbooks: - workbook_path = str(workbook["path"]).lower() - if any( - workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings - ): - workbooks_to_fetch.append(workbook) - elif sigma_filter.workbooks: - workbook_strings = ["/".join(folder).lower() for folder in sigma_filter.workbooks] + workbook_strings = ["/".join(folder).lower() for folder in sigma_filter.workbooks or []] for workbook in raw_workbooks: workbook_path_and_name = ( - f"{str(workbook['path']).lower()}/{str(workbook['name']).lower()}" + f"{str(workbook['path']).lower()}/{workbook['name'].lower()}" ) - if workbook_path_and_name in workbook_strings: + if ( + any( + workbook_path_and_name.startswith(folder_str) + for folder_str in workbook_filter_strings + ) + or workbook_path_and_name in workbook_strings + ): workbooks_to_fetch.append(workbook) else: workbooks_to_fetch = raw_workbooks diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index d95e367775deb..6fa2b0fc50c0b 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -95,7 +95,7 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) assert data.datasets[0].inputs == {"TESTDB.JAFFLE_SHOP.STG_ORDERS"} -@pytest.mark.parametrize("filter_type", ["workbook_folders", "workbook_names"]) +@pytest.mark.parametrize("filter_type", ["workbook_folders", "workbook_names", "both"]) @responses.activate def test_model_organization_data_filter( sigma_auth_token: str, sigma_sample_data: None, filter_type: str @@ -109,21 +109,25 @@ def test_model_organization_data_filter( client_secret=fake_client_secret, ) + non_matching_filter = ( + {"workbook_folders": [("My Documents", "Test Folder")]} + if filter_type == "workbook_folders" + else {"workbooks": [("My Documents", "Test Folder", "Sample Workbook")]} + ) + matching_filter = ( + {"workbook_folders": [("My Documents", "My Subfolder")]} + if filter_type == "workbook_folders" + else {"workbooks": [("My Documents", "My Subfolder", "Sample Workbook")]} + ) + with mock.patch.object( SigmaOrganization, "_fetch_pages_for_workbook", wraps=resource._fetch_pages_for_workbook, # noqa: SLF001 ) as mock_fetch_pages: - sigma_filter = ( - SigmaFilter( - workbook_folders=[("My Documents", "Test Folder")], - ) - if filter_type == "workbook_folders" - else SigmaFilter(workbooks=[("My Documents", "Test Folder", "Sample Workbook")]) - ) data = asyncio.run( resource.build_organization_data( - sigma_filter=sigma_filter, + sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=True), fetch_column_data=True, ) ) @@ -133,19 +137,9 @@ def test_model_organization_data_filter( # We don't fetch the workbooks, so we shouldn't have made any calls assert len(mock_fetch_pages.mock_calls) == 0 - sigma_filter = ( - SigmaFilter( - workbook_folders=[("My Documents", "Test Folder")], include_unused_datasets=False - ) - if filter_type == "workbook_folders" - else SigmaFilter( - workbooks=[("My Documents", "Test Folder", "Sample Workbook")], - include_unused_datasets=False, - ) - ) data = asyncio.run( resource.build_organization_data( - sigma_filter=sigma_filter, + sigma_filter=SigmaFilter(**non_matching_filter, include_unused_datasets=False), fetch_column_data=True, ) ) @@ -155,20 +149,9 @@ def test_model_organization_data_filter( # We still don't fetch the workbooks, so we shouldn't have made any calls assert len(mock_fetch_pages.mock_calls) == 0 - sigma_filter = ( - SigmaFilter( - workbook_folders=[("My Documents", "My Subfolder")], - include_unused_datasets=False, - ) - if filter_type == "workbook_folders" - else SigmaFilter( - workbooks=[("My Documents", "My Subfolder", "Sample Workbook")], - include_unused_datasets=False, - ) - ) data = asyncio.run( resource.build_organization_data( - sigma_filter=sigma_filter, + sigma_filter=SigmaFilter(**matching_filter, include_unused_datasets=False), fetch_column_data=True, ) ) @@ -184,7 +167,9 @@ def test_model_organization_data_filter( data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]), + sigma_filter=SigmaFilter( + workbook_folders=[("My Documents",)], workbooks=[("Does", "Not", "Exist")] + ), fetch_column_data=True, ) )