Skip to content

Commit

Permalink
[dagster-powerbi] Use Power BI translator instance in spec loader and…
Browse files Browse the repository at this point in the history
… state-backed defs (#26734)

## Summary & Motivation

Updates `load_powerbi_asset_specs()` and state-backed definitions to
accept an instance of `DagsterPowerBITranslator`.

See #26133 for more details about the motivation.

## How I Tested These Changes

BK

## Changelog

[dagster-powerbi] `load_powerbi_asset_specs` is updated to accept an
instance of `DagsterPowerBITranslator` or custom subclass.
  • Loading branch information
maximearmstrong authored Jan 2, 2025
1 parent 6e0eb02 commit 3f62106
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
3 changes: 2 additions & 1 deletion docs/content/integrations/powerbi.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class MyCustomPowerBITranslator(DagsterPowerBITranslator):


power_bi_specs = load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec:


power_bi_specs = load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Dict, Mapping, Optional, Sequence, Type
from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union
from urllib.parse import urlencode

import requests
Expand All @@ -21,6 +21,7 @@
from dagster._time import get_current_timestamp
from dagster._utils.cached_method import cached_method
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import deprecation_warning
from pydantic import Field, PrivateAttr

from dagster_powerbi.translator import (
Expand Down Expand Up @@ -385,7 +386,7 @@ def build_defs(
if PowerBITagSet.extract(spec.tags).asset_type == "semantic_model"
else spec
for spec in load_powerbi_asset_specs(
self, dagster_powerbi_translator, use_workspace_scan=False
self, dagster_powerbi_translator(), use_workspace_scan=False
)
],
resources={resource_key: self},
Expand All @@ -395,24 +396,39 @@ def build_defs(
@experimental
def load_powerbi_asset_specs(
workspace: PowerBIWorkspace,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
dagster_powerbi_translator: Optional[
Union[DagsterPowerBITranslator, Type[DagsterPowerBITranslator]]
] = None,
use_workspace_scan: bool = True,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Power BI content in the workspace.
Args:
workspace (PowerBIWorkspace): The Power BI workspace to load assets from.
dagster_powerbi_translator (Optional[Union[DagsterPowerBITranslator, Type[DagsterPowerBITranslator]]]):
The translator to use to convert Power BI content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterPowerBITranslator`.
use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs
at once to get all content. Defaults to True.
Returns:
List[AssetSpec]: The set of assets representing the Power BI content in the workspace.
"""
if isinstance(dagster_powerbi_translator, type):
deprecation_warning(
subject="Support of `dagster_powerbi_translator` as a Type[DagsterPowerBITranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterPowerBITranslator or subclass to `dagster_powerbi_translator` instead."
),
)
dagster_powerbi_translator = dagster_powerbi_translator()

with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
PowerBIWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_powerbi_translator,
translator=dagster_powerbi_translator or DagsterPowerBITranslator(),
use_workspace_scan=use_workspace_scan,
)
.build_defs()
Expand All @@ -424,7 +440,7 @@ def load_powerbi_asset_specs(
@dataclass
class PowerBIWorkspaceDefsLoader(StateBackedDefinitionsLoader[PowerBIWorkspaceData]):
workspace: PowerBIWorkspace
translator_cls: Type[DagsterPowerBITranslator]
translator: DagsterPowerBITranslator
use_workspace_scan: bool

@property
Expand All @@ -438,15 +454,13 @@ def fetch_state(self) -> PowerBIWorkspaceData:
)

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls()

all_external_data = [
*state.dashboards_by_id.values(),
*state.reports_by_id.values(),
*state.semantic_models_by_id.values(),
]
all_external_asset_specs = [
translator.get_asset_spec(
self.translator.get_asset_spec(
PowerBITranslatorData(
content_data=content,
workspace_data=state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id
workspace_id=workspace_id,
)
all_asset_specs = load_powerbi_asset_specs(
workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False
workspace=resource,
dagster_powerbi_translator=MyCustomTranslator(),
use_workspace_scan=False,
)
asset_spec = next(spec for spec in all_asset_specs)

Expand All @@ -109,6 +111,32 @@ def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id
assert "dagster/kind/powerbi" in asset_spec.tags


def test_translator_custom_metadata_legacy(
workspace_data_api_mocks: None, workspace_id: str
) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
credentials=PowerBIToken(api_token=fake_token),
workspace_id=workspace_id,
)
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_powerbi_translator` as a Type\[DagsterPowerBITranslator\]",
):
# Pass the translator type
all_asset_specs = load_powerbi_asset_specs(
workspace=resource,
dagster_powerbi_translator=MyCustomTranslator,
use_workspace_scan=False,
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]
assert "dagster/kind/powerbi" in asset_spec.tags


@lazy_definitions
def state_derived_defs_two_workspaces() -> Definitions:
resource = PowerBIWorkspace(
Expand Down

0 comments on commit 3f62106

Please sign in to comment.