Skip to content

Commit

Permalink
[dagster-tableau] Move contextual data from DagsterTableauTranslator …
Browse files Browse the repository at this point in the history
…to TableauTranslatorData
  • Loading branch information
maximearmstrong committed Dec 27, 2024
1 parent 8c4f449 commit f02a110
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class MyCustomTableauTranslator(DagsterTableauTranslator):
def get_asset_spec(self, data: TableauContentData) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
# We customize the metadata and asset key prefix for all assets, including sheets,
# and we customize the team owner tag only for sheets.
return default_spec.replace_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
TableauContentType,
TableauMetadataSet,
TableauTagSet,
TableauTranslatorData,
TableauWorkspaceData,
)

Expand Down Expand Up @@ -607,7 +608,7 @@ def fetch_state(self) -> TableauWorkspaceData:
return self.workspace.fetch_tableau_workspace_data()

def defs_from_state(self, state: TableauWorkspaceData) -> Definitions:
translator = self.translator_cls(context=state)
translator = self.translator_cls()

all_external_data = [
*state.data_sources_by_id.values(),
Expand All @@ -616,7 +617,10 @@ def defs_from_state(self, state: TableauWorkspaceData) -> Definitions:
]

all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
translator.get_asset_spec(
TableauTranslatorData(content_data=content, workspace_data=state)
)
for content in all_external_data
]

return Definitions(assets=all_external_asset_specs)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ class TableauContentData:
properties: Mapping[str, Any]


@record
class TableauTranslatorData:
"""A record representing a piece of content in Tableau and the Tableau workspace data.
Includes the content's type and data as returned from the API.
"""

content_data: "TableauContentData"
workspace_data: "TableauWorkspaceData"

@property
def content_type(self) -> TableauContentType:
return self.content_data.content_type

@property
def properties(self) -> Mapping[str, Any]:
return self.content_data.properties


@whitelist_for_serdes
@record
class TableauWorkspaceData:
Expand Down Expand Up @@ -104,21 +122,14 @@ class DagsterTableauTranslator:
Subclass this class to implement custom logic for each type of Tableau content.
"""

def __init__(self, context: TableauWorkspaceData):
self._context = context

@property
def workspace_data(self) -> TableauWorkspaceData:
return self._context

@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterTableauTranslator.get_asset_spec(...).key` instead",
)
def get_asset_key(self, data: TableauContentData) -> AssetKey:
def get_asset_key(self, data: TableauTranslatorData) -> AssetKey:
return self.get_asset_spec(data).key

def get_asset_spec(self, data: TableauContentData) -> AssetSpec:
def get_asset_spec(self, data: TableauTranslatorData) -> AssetSpec:
if data.content_type == TableauContentType.SHEET:
return self.get_sheet_spec(data)
elif data.content_type == TableauContentType.DASHBOARD:
Expand All @@ -132,10 +143,10 @@ def get_asset_spec(self, data: TableauContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterTableauTranslator.get_asset_spec(...).key` instead",
)
def get_sheet_asset_key(self, data: TableauContentData) -> AssetKey:
def get_sheet_asset_key(self, data: TableauTranslatorData) -> AssetKey:
return self.get_sheet_spec(data).key

def get_sheet_spec(self, data: TableauContentData) -> AssetSpec:
def get_sheet_spec(self, data: TableauTranslatorData) -> AssetSpec:
sheet_embedded_data_sources = data.properties.get("parentEmbeddedDatasources", [])
data_source_ids = {
published_data_source["luid"]
Expand All @@ -144,12 +155,17 @@ def get_sheet_spec(self, data: TableauContentData) -> AssetSpec:
}

data_source_keys = [
self.get_asset_spec(self.workspace_data.data_sources_by_id[data_source_id]).key
self.get_asset_spec(
TableauTranslatorData(
content_data=data.workspace_data.data_sources_by_id[data_source_id],
workspace_data=data.workspace_data,
)
).key
for data_source_id in data_source_ids
]

workbook_id = data.properties["workbook"]["luid"]
workbook_data = self.workspace_data.workbooks_by_id[workbook_id]
workbook_data = data.workspace_data.workbooks_by_id[workbook_id]
asset_key = AssetKey(
[
_coerce_input_to_valid_name(workbook_data.properties["name"]),
Expand All @@ -173,20 +189,25 @@ def get_sheet_spec(self, data: TableauContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterTableauTranslator.get_asset_spec(...).key` instead",
)
def get_dashboard_asset_key(self, data: TableauContentData) -> AssetKey:
def get_dashboard_asset_key(self, data: TableauTranslatorData) -> AssetKey:
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: TableauContentData) -> AssetSpec:
def get_dashboard_spec(self, data: TableauTranslatorData) -> AssetSpec:
dashboard_upstream_sheets = data.properties.get("sheets", [])
sheet_ids = {sheet["luid"] for sheet in dashboard_upstream_sheets if sheet["luid"]}

sheet_keys = [
self.get_asset_spec(self.workspace_data.sheets_by_id[sheet_id]).key
self.get_asset_spec(
TableauTranslatorData(
content_data=data.workspace_data.sheets_by_id[sheet_id],
workspace_data=data.workspace_data,
)
).key
for sheet_id in sheet_ids
]

workbook_id = data.properties["workbook"]["luid"]
workbook_data = self.workspace_data.workbooks_by_id[workbook_id]
workbook_data = data.workspace_data.workbooks_by_id[workbook_id]
asset_key = AssetKey(
[
_coerce_input_to_valid_name(workbook_data.properties["name"]),
Expand All @@ -210,10 +231,10 @@ def get_dashboard_spec(self, data: TableauContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterTableauTranslator.get_asset_spec(...).key` instead",
)
def get_data_source_asset_key(self, data: TableauContentData) -> AssetKey:
def get_data_source_asset_key(self, data: TableauTranslatorData) -> AssetKey:
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: TableauContentData) -> AssetSpec:
def get_data_source_spec(self, data: TableauTranslatorData) -> AssetSpec:
return AssetSpec(
key=AssetKey([_coerce_input_to_valid_name(data.properties["name"])]),
tags={"dagster/storage_kind": "tableau", **TableauTagSet(asset_type="data_source")},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import responses
from dagster._check.functions import CheckError
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.test_utils import environ
from dagster_tableau import TableauCloudWorkspace, TableauServerWorkspace, load_tableau_asset_specs
from dagster_tableau.translator import DagsterTableauTranslator, TableauTranslatorData


@responses.activate
Expand Down Expand Up @@ -162,3 +164,61 @@ def test_translator_spec(

data_source_asset_key = next(key for key in all_assets_keys if "datasource" in key.path[0])
assert data_source_asset_key.path == ["superstore_datasource"]


class MyCustomTranslator(DagsterTableauTranslator):
def get_asset_spec(self, data: TableauTranslatorData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
).merge_attributes(metadata={"custom": "metadata"})


@responses.activate
@pytest.mark.parametrize(
"clazz,host_key,host_value",
[
(TableauServerWorkspace, "server_name", "fake_server_name"),
(TableauCloudWorkspace, "pod_name", "fake_pod_name"),
],
)
@pytest.mark.usefixtures("site_name")
@pytest.mark.usefixtures("sign_in")
@pytest.mark.usefixtures("get_workbooks")
@pytest.mark.usefixtures("get_workbook")
def test_translator_custom_metadata(
clazz: Union[Type[TableauCloudWorkspace], Type[TableauServerWorkspace]],
host_key: str,
host_value: str,
site_name: str,
sign_in: MagicMock,
get_workbooks: MagicMock,
get_workbook: MagicMock,
) -> None:
connected_app_client_id = uuid.uuid4().hex
connected_app_secret_id = uuid.uuid4().hex
connected_app_secret_value = uuid.uuid4().hex
username = "fake_username"

with environ({"TABLEAU_CLIENT_ID": connected_app_client_id}):
resource_args = {
"connected_app_client_id": EnvVar("TABLEAU_CLIENT_ID"),
"connected_app_secret_id": connected_app_secret_id,
"connected_app_secret_value": connected_app_secret_value,
"username": username,
"site_name": site_name,
host_key: host_value,
}

resource = clazz(**resource_args)
resource.build_client()

all_asset_specs = load_tableau_asset_specs(
workspace=resource, dagster_tableau_translator=MyCustomTranslator
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "superstore_datasource"]
assert asset_spec.tags["dagster/storage_kind"] == "tableau"
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster_tableau import DagsterTableauTranslator
from dagster_tableau.translator import TableauContentData, TableauWorkspaceData
from dagster_tableau.translator import TableauTranslatorData, TableauWorkspaceData


def test_translator_sheet_spec(
workspace_data: TableauWorkspaceData, sheet_id: str, workbook_id: str
) -> None:
sheet = next(iter(workspace_data.sheets_by_id.values()))

translator = DagsterTableauTranslator(workspace_data)
asset_spec = translator.get_asset_spec(sheet)
translator = DagsterTableauTranslator()
asset_spec = translator.get_asset_spec(
TableauTranslatorData(content_data=sheet, workspace_data=workspace_data)
)

assert asset_spec.key.path == ["test_workbook", "sheet", "sales"]
assert asset_spec.metadata == {
Expand All @@ -31,8 +33,10 @@ def test_translator_dashboard_spec(
) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = DagsterTableauTranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)
translator = DagsterTableauTranslator()
asset_spec = translator.get_asset_spec(
TableauTranslatorData(content_data=dashboard, workspace_data=workspace_data)
)

assert asset_spec.key.path == ["test_workbook", "dashboard", "dashboard_sales"]
assert asset_spec.metadata == {
Expand All @@ -53,8 +57,10 @@ def test_translator_data_source_spec(
) -> None:
data_source = next(iter(workspace_data.data_sources_by_id.values()))

translator = DagsterTableauTranslator(workspace_data)
asset_spec = translator.get_asset_spec(data_source)
translator = DagsterTableauTranslator()
asset_spec = translator.get_asset_spec(
TableauTranslatorData(content_data=data_source, workspace_data=workspace_data)
)

assert asset_spec.key.path == ["superstore_datasource"]
assert asset_spec.metadata == {"dagster-tableau/id": data_source_id}
Expand All @@ -67,7 +73,7 @@ def test_translator_data_source_spec(


class MyCustomTranslator(DagsterTableauTranslator):
def get_asset_spec(self, data: TableauContentData) -> AssetSpec:
def get_asset_spec(self, data: TableauTranslatorData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
Expand All @@ -78,8 +84,10 @@ def get_asset_spec(self, data: TableauContentData) -> AssetSpec:
def test_translator_custom_metadata(workspace_data: TableauWorkspaceData) -> None:
sheet = next(iter(workspace_data.sheets_by_id.values()))

translator = MyCustomTranslator(workspace_data)
asset_spec = translator.get_asset_spec(sheet)
translator = MyCustomTranslator()
asset_spec = translator.get_asset_spec(
TableauTranslatorData(content_data=sheet, workspace_data=workspace_data)
)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
Expand Down

0 comments on commit f02a110

Please sign in to comment.