Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-looker] Move contextual data from DagsterLookerApiTranslator to LookerApiTranslatorStructureData #26742

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):
def get_asset_spec(self, looker_structure: LookerStructureData) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(looker_structure)
default_spec = super().get_asset_spec(looker_structure) # type: ignore
# We customize the team owner tag for all assets,
# and we customize the asset key prefix only for dashboards.
return default_spec.replace_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
LookerApiTranslatorStructureData,
LookerStructureData,
LookerStructureType,
LookmlView,
Expand Down Expand Up @@ -32,19 +33,22 @@ def build_looker_pdt_assets_definitions(
Returns:
AssetsDefinition: The AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
"""
translator = dagster_looker_translator(None)
translator = dagster_looker_translator()
result = []
for request_start_pdt_build in request_start_pdt_builds:

@multi_asset(
specs=[
translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.VIEW,
data=LookmlView(
view_name=request_start_pdt_build.view_name,
sql_table_name=None,
LookerApiTranslatorStructureData(
structure_data=LookerStructureData(
structure_type=LookerStructureType.VIEW,
data=LookmlView(
view_name=request_start_pdt_build.view_name,
sql_table_name=None,
),
),
instance_data=None,
)
)
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,37 @@ class LookerStructureData:
base_url: Optional[str] = None


class DagsterLookerApiTranslator:
def __init__(self, looker_instance_data: Optional[LookerInstanceData]):
self._looker_instance_data = looker_instance_data
@record
class LookerApiTranslatorStructureData:
"""A record representing a structure in Looker and the Looker instance data.
Includes the content's type and data as returned from the API.
"""

structure_data: "LookerStructureData"
instance_data: Optional["LookerInstanceData"]

@property
def structure_type(self) -> LookerStructureType:
return self.structure_data.structure_type

@property
def data(self) -> Union[LookmlView, LookmlModelExplore, DashboardFilter, Dashboard]:
return self.structure_data.data

@property
def instance_data(self) -> Optional[LookerInstanceData]:
return self._looker_instance_data
def base_url(self) -> Optional[str]:
return self.structure_data.base_url


class DagsterLookerApiTranslator:
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_view_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
def get_view_asset_key(self, looker_structure: LookerApiTranslatorStructureData) -> AssetKey:
return self.get_asset_spec(looker_structure).key

def get_view_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
def get_view_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) -> AssetSpec:
lookml_view = check.inst(looker_structure.data, LookmlView)
return AssetSpec(
key=AssetKey(["view", lookml_view.view_name]),
Expand All @@ -132,10 +147,12 @@ def get_view_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpe
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_explore_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
def get_explore_asset_key(self, looker_structure: LookerApiTranslatorStructureData) -> AssetKey:
return self.get_explore_asset_spec(looker_structure).key

def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
def get_explore_asset_spec(
self, looker_structure: LookerApiTranslatorStructureData
) -> AssetSpec:
lookml_explore = check.inst(looker_structure.data, (LookmlModelExplore, DashboardFilter))

if isinstance(lookml_explore, LookmlModelExplore):
Expand All @@ -157,8 +174,11 @@ def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> Asset
deps=list(
{
self.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.VIEW, data=lookml_view
LookerApiTranslatorStructureData(
structure_data=LookerStructureData(
structure_type=LookerStructureType.VIEW, data=lookml_view
),
instance_data=looker_structure.instance_data,
)
).key
for lookml_view in [explore_base_view, *explore_join_views]
Expand All @@ -185,23 +205,30 @@ def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> Asset
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_dashboard_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
def get_dashboard_asset_key(
self, looker_structure: LookerApiTranslatorStructureData
) -> AssetKey:
return self.get_asset_spec(looker_structure).key

def get_dashboard_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
def get_dashboard_asset_spec(
self, looker_structure: LookerApiTranslatorStructureData
) -> AssetSpec:
looker_dashboard = check.inst(looker_structure.data, Dashboard)

user = None
if self.instance_data and looker_dashboard.user_id:
user = self.instance_data.users_by_id.get(looker_dashboard.user_id)
if looker_structure.instance_data and looker_dashboard.user_id:
user = looker_structure.instance_data.users_by_id.get(looker_dashboard.user_id)

return AssetSpec(
key=AssetKey(f"{check.not_none(looker_dashboard.title)}_{looker_dashboard.id}"),
deps=list(
{
self.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.EXPLORE, data=dashboard_filter
LookerApiTranslatorStructureData(
structure_data=LookerStructureData(
structure_type=LookerStructureType.EXPLORE, data=dashboard_filter
),
instance_data=looker_structure.instance_data,
)
).key
for dashboard_filter in looker_dashboard.dashboard_filters or []
Expand All @@ -220,7 +247,7 @@ def get_dashboard_asset_spec(self, looker_structure: LookerStructureData) -> Ass
)

@public
def get_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) -> AssetSpec:
if looker_structure.structure_type == LookerStructureType.VIEW:
return self.get_view_asset_spec(looker_structure)
if looker_structure.structure_type == LookerStructureType.EXPLORE:
Expand All @@ -235,5 +262,5 @@ def get_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
@public
def get_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
def get_asset_key(self, looker_structure: LookerApiTranslatorStructureData) -> AssetKey:
return self.get_asset_spec(looker_structure).key
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
LookerApiTranslatorStructureData,
LookerInstanceData,
LookerStructureData,
LookerStructureType,
Expand Down Expand Up @@ -177,7 +178,7 @@ def fetch_state(self) -> Mapping[str, Any]:

def defs_from_state(self, state: Mapping[str, Any]) -> Definitions:
looker_instance_data = LookerInstanceData.from_state(self.looker_resource.get_sdk(), state)
translator = self.translator_cls(looker_instance_data)
translator = self.translator_cls()
return self._build_defs_from_looker_instance_data(looker_instance_data, translator)

def _build_defs_from_looker_instance_data(
Expand All @@ -187,20 +188,26 @@ def _build_defs_from_looker_instance_data(
) -> Definitions:
explores = [
dagster_looker_translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.EXPLORE,
data=lookml_explore,
base_url=self.looker_resource.base_url,
),
LookerApiTranslatorStructureData(
structure_data=LookerStructureData(
structure_type=LookerStructureType.EXPLORE,
data=lookml_explore,
base_url=self.looker_resource.base_url,
),
instance_data=looker_instance_data,
)
)
for lookml_explore in looker_instance_data.explores_by_id.values()
]
views = [
dagster_looker_translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.DASHBOARD,
data=looker_dashboard,
base_url=self.looker_resource.base_url,
LookerApiTranslatorStructureData(
structure_data=LookerStructureData(
structure_type=LookerStructureType.DASHBOARD,
data=looker_dashboard,
base_url=self.looker_resource.base_url,
),
instance_data=looker_instance_data,
)
)
for looker_dashboard in looker_instance_data.dashboards_by_id.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster_looker.api.assets import build_looker_pdt_assets_definitions
from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
LookerStructureData,
LookerApiTranslatorStructureData,
RequestStartPdtBuild,
)
from dagster_looker.api.resource import LookerResource, load_looker_asset_specs
Expand Down Expand Up @@ -196,7 +196,7 @@ def test_custom_asset_specs(
looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock
) -> None:
class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):
def get_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) -> AssetSpec:
default_spec = super().get_asset_spec(looker_structure)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("my_prefix"),
Expand Down
Loading