Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-looker] Use Looker API translator instance in spec loader and state-backed defs #26743

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/integrations/looker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator,
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_asset_spec(self, looker_structure: LookerStructureData) -> dg.AssetSpec:


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator,
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Sequence, Type, cast
from typing import Optional, Sequence, Type, Union, cast

from dagster import AssetExecutionContext, AssetsDefinition, Failure, multi_asset
from dagster._annotations import experimental
from dagster._utils.warnings import deprecation_warning

from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
Expand All @@ -18,7 +19,9 @@
def build_looker_pdt_assets_definitions(
resource_key: str,
request_start_pdt_builds: Sequence[RequestStartPdtBuild],
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[
Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]
] = None,
) -> Sequence[AssetsDefinition]:
"""Returns the AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.

Expand All @@ -27,13 +30,24 @@ def build_looker_pdt_assets_definitions(
request_start_pdt_builds (Optional[Sequence[RequestStartPdtBuild]]): A list of requests to start PDT builds.
See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to
use to convert Looker structures into assets. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]]):
The translator to use to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.

Returns:
AssetsDefinition: The AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
"""
translator = dagster_looker_translator()
if isinstance(dagster_looker_translator, type):
deprecation_warning(
subject="Support of `dagster_looker_translator` as a Type[DagsterLookerApiTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterLookerApiTranslator or subclass to `dagster_looker_translator` instead."
),
)
dagster_looker_translator = dagster_looker_translator()

translator = dagster_looker_translator or DagsterLookerApiTranslator()
result = []
for request_start_pdt_build in request_start_pdt_builds:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)

from dagster import (
AssetSpec,
Expand All @@ -13,6 +25,7 @@
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from dagster._utils.warnings import deprecation_warning
from looker_sdk import init40
from looker_sdk.rtl.api_settings import ApiSettings, SettingsConfig
from looker_sdk.sdk.api40.methods import Looker40SDK
Expand Down Expand Up @@ -107,44 +120,53 @@ def build_defs(
from dagster_looker.api.assets import build_looker_pdt_assets_definitions

resource_key = "looker"
translator_cls = (
dagster_looker_translator.__class__
if dagster_looker_translator
else DagsterLookerApiTranslator
)
translator = dagster_looker_translator or DagsterLookerApiTranslator()

pdts = build_looker_pdt_assets_definitions(
resource_key=resource_key,
request_start_pdt_builds=request_start_pdt_builds or [],
dagster_looker_translator=translator_cls,
dagster_looker_translator=translator,
)

return Definitions(
assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)],
assets=[*pdts, *load_looker_asset_specs(self, translator, looker_filter)],
resources={resource_key: self},
)


@experimental
def load_looker_asset_specs(
looker_resource: LookerResource,
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[
Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]
] = None,
looker_filter: Optional[LookerFilter] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Looker structures.

Args:
looker_resource (LookerResource): The Looker resource to fetch assets from.
dagster_looker_translator (Type[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into AssetSpecs. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]]):
The translator to use to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.

Returns:
List[AssetSpec]: The set of AssetSpecs representing the Looker structures.
"""
if isinstance(dagster_looker_translator, type):
deprecation_warning(
subject="Support of `dagster_looker_translator` as a Type[DagsterLookerApiTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterLookerApiTranslator or subclass to `dagster_looker_translator` instead."
),
)
dagster_looker_translator = dagster_looker_translator()

return check.is_list(
LookerApiDefsLoader(
looker_resource=looker_resource,
translator_cls=dagster_looker_translator,
translator=dagster_looker_translator or DagsterLookerApiTranslator(),
looker_filter=looker_filter or LookerFilter(),
)
.build_defs()
Expand All @@ -165,7 +187,7 @@ def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str)
@dataclass(frozen=True)
class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
looker_resource: LookerResource
translator_cls: Type[DagsterLookerApiTranslator]
translator: DagsterLookerApiTranslator
looker_filter: LookerFilter

@property
Expand All @@ -178,8 +200,7 @@ def fetch_state(self) -> Mapping[str, Any]:

def defs_from_state(self, state: Mapping[str, Any]) -> Definitions:
looker_instance_data = LookerInstanceData.from_state(self.looker_resource.get_sdk(), state)
translator = self.translator_cls()
return self._build_defs_from_looker_instance_data(looker_instance_data, translator)
return self._build_defs_from_looker_instance_data(looker_instance_data, self.translator)

def _build_defs_from_looker_instance_data(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) ->
all_assets = (
asset
for asset in Definitions(
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator)],
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator())],
)
.get_asset_graph()
.assets_defs
Expand All @@ -219,3 +219,40 @@ def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) ->
assert all(key.path[0] == "my_prefix" for key in asset.keys)
for deps in asset.asset_deps.values():
assert all(key.path[0] == "my_prefix" for key in deps), str(deps)


@responses.activate
def test_custom_asset_specs_legacy(
looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock
) -> None:
class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):
def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) -> AssetSpec:
default_spec = super().get_asset_spec(looker_structure)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("my_prefix"),
).merge_attributes(metadata={"custom": "metadata"})

# Pass the translator type
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_looker_translator` as a Type\[DagsterLookerApiTranslator\]",
):
all_assets = (
asset
for asset in Definitions(
assets=[
*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator)
],
)
.get_asset_graph()
.assets_defs
if not asset.is_auto_created_stub
)

for asset in all_assets:
for metadata in asset.metadata_by_key.values():
assert "custom" in metadata
assert metadata["custom"] == "metadata"
assert all(key.path[0] == "my_prefix" for key in asset.keys)
for deps in asset.asset_deps.values():
assert all(key.path[0] == "my_prefix" for key in deps), str(deps)
Loading