Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-powerbi] Use Power BI translator instance in spec loader and state-backed defs #26734

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/integrations/powerbi.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class MyCustomPowerBITranslator(DagsterPowerBITranslator):


power_bi_specs = load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec:


power_bi_specs = load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Dict, Mapping, Optional, Sequence, Type
from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union
from urllib.parse import urlencode

import requests
Expand All @@ -21,6 +21,7 @@
from dagster._time import get_current_timestamp
from dagster._utils.cached_method import cached_method
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import deprecation_warning
from pydantic import Field, PrivateAttr

from dagster_powerbi.translator import (
Expand Down Expand Up @@ -385,7 +386,7 @@ def build_defs(
if PowerBITagSet.extract(spec.tags).asset_type == "semantic_model"
else spec
for spec in load_powerbi_asset_specs(
self, dagster_powerbi_translator, use_workspace_scan=False
self, dagster_powerbi_translator(), use_workspace_scan=False
)
],
resources={resource_key: self},
Expand All @@ -395,24 +396,39 @@ def build_defs(
@experimental
def load_powerbi_asset_specs(
workspace: PowerBIWorkspace,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
dagster_powerbi_translator: Optional[
Union[DagsterPowerBITranslator, Type[DagsterPowerBITranslator]]
] = None,
use_workspace_scan: bool = True,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Power BI content in the workspace.

Args:
workspace (PowerBIWorkspace): The Power BI workspace to load assets from.
dagster_powerbi_translator (Optional[Union[DagsterPowerBITranslator, Type[DagsterPowerBITranslator]]]):
The translator to use to convert Power BI content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterPowerBITranslator`.
use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs
at once to get all content. Defaults to True.

Returns:
List[AssetSpec]: The set of assets representing the Power BI content in the workspace.
"""
if isinstance(dagster_powerbi_translator, type):
deprecation_warning(
subject="Support of `dagster_powerbi_translator` as a Type[DagsterPowerBITranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterPowerBITranslator or subclass to `dagster_powerbi_translator` instead."
),
)
dagster_powerbi_translator = dagster_powerbi_translator()

with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
PowerBIWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_powerbi_translator,
translator=dagster_powerbi_translator or DagsterPowerBITranslator(),
use_workspace_scan=use_workspace_scan,
)
.build_defs()
Expand All @@ -424,7 +440,7 @@ def load_powerbi_asset_specs(
@dataclass
class PowerBIWorkspaceDefsLoader(StateBackedDefinitionsLoader[PowerBIWorkspaceData]):
workspace: PowerBIWorkspace
translator_cls: Type[DagsterPowerBITranslator]
translator: DagsterPowerBITranslator
use_workspace_scan: bool

@property
Expand All @@ -438,15 +454,13 @@ def fetch_state(self) -> PowerBIWorkspaceData:
)

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls()

all_external_data = [
*state.dashboards_by_id.values(),
*state.reports_by_id.values(),
*state.semantic_models_by_id.values(),
]
all_external_asset_specs = [
translator.get_asset_spec(
self.translator.get_asset_spec(
PowerBITranslatorData(
content_data=content,
workspace_data=state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id
workspace_id=workspace_id,
)
all_asset_specs = load_powerbi_asset_specs(
workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False
workspace=resource,
dagster_powerbi_translator=MyCustomTranslator(),
use_workspace_scan=False,
)
asset_spec = next(spec for spec in all_asset_specs)

Expand All @@ -109,6 +111,32 @@ def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id
assert "dagster/kind/powerbi" in asset_spec.tags


def test_translator_custom_metadata_legacy(
workspace_data_api_mocks: None, workspace_id: str
) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
credentials=PowerBIToken(api_token=fake_token),
workspace_id=workspace_id,
)
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_powerbi_translator` as a Type\[DagsterPowerBITranslator\]",
):
# Pass the translator type
all_asset_specs = load_powerbi_asset_specs(
workspace=resource,
dagster_powerbi_translator=MyCustomTranslator,
use_workspace_scan=False,
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]
assert "dagster/kind/powerbi" in asset_spec.tags


@lazy_definitions
def state_derived_defs_two_workspaces() -> Definitions:
resource = PowerBIWorkspace(
Expand Down
Loading