From b9dd154e26c8d17ab2ae06dd6a24c2df8427bbb4 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Mon, 30 Dec 2024 09:47:39 -0500 Subject: [PATCH] [dagster-dbt] Spec-ify dbt cloud integration (#26662) ## Summary & Motivation Updates the dbt cloud integration to be more spec-native, taking advantage of existing utilities for generating specs from a dbt project. This results in the ability to delete a large amount of code. ## How I Tested These Changes ## Changelog NOCHANGELOG --- .../dagster-dbt/dagster_dbt/asset_utils.py | 140 --------------- .../dagster_dbt/cloud/asset_defs.py | 163 +++++++----------- .../dagster-dbt/dagster_dbt/cloud/utils.py | 4 +- .../dagster_dbt_tests/cloud/utils.py | 8 +- 4 files changed, 73 insertions(+), 242 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 4c557bec3b6ef..ef02b3fe45335 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -14,7 +14,6 @@ Sequence, Set, Tuple, - cast, ) from dagster import ( @@ -26,14 +25,10 @@ AssetSelection, AssetSpec, AutoMaterializePolicy, - AutomationCondition, DagsterInvalidDefinitionError, DagsterInvariantViolationError, DefaultScheduleStatus, FreshnessPolicy, - In, - Nothing, - Out, RunConfig, ScheduleDefinition, TableColumn, @@ -41,16 +36,12 @@ _check as check, define_asset_job, ) -from dagster._core.definitions.decorators.decorator_assets_definition_builder import ( - validate_and_assign_output_names_to_check_specs, -) from dagster._core.definitions.metadata import TableMetadataSet from dagster._core.definitions.metadata.source_code import ( CodeReferencesMetadataSet, CodeReferencesMetadataValue, LocalFileCodeReference, ) -from dagster._utils.merger import merge_dicts from dagster_dbt.metadata_set import DbtMetadataSet from dagster_dbt.utils import ( @@ -893,137 +884,6 @@ def _validate_asset_keys( ) -def get_asset_deps( - dbt_nodes, - deps, - io_manager_key, - manifest: Optional[Mapping[str, Any]], - dagster_dbt_translator: "DagsterDbtTranslator", -) -> Tuple[ - Dict[AssetKey, Set[AssetKey]], - Dict[AssetKey, Tuple[str, In]], - Dict[AssetKey, Tuple[str, Out]], - Dict[AssetKey, str], - Dict[AssetKey, FreshnessPolicy], - Dict[AssetKey, AutomationCondition], - Dict[str, AssetCheckSpec], - Dict[str, List[str]], - Dict[str, Dict[str, Any]], -]: - from dagster_dbt.dagster_dbt_translator import DbtManifestWrapper, validate_translator - - dagster_dbt_translator = validate_translator(dagster_dbt_translator) - - asset_deps: Dict[AssetKey, Set[AssetKey]] = {} - asset_ins: Dict[AssetKey, Tuple[str, In]] = {} - asset_outs: Dict[AssetKey, Tuple[str, Out]] = {} - - # These dicts could be refactored as a single dict, mapping from output name to arbitrary - # metadata that we need to store for reference. - group_names_by_key: Dict[AssetKey, str] = {} - freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {} - automation_conditions_by_key: Dict[AssetKey, AutomationCondition] = {} - check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {} - fqns_by_output_name: Dict[str, List[str]] = {} - metadata_by_output_name: Dict[str, Dict[str, Any]] = {} - - for unique_id, parent_unique_ids in deps.items(): - dbt_resource_props = dbt_nodes[unique_id] - - output_name = dagster_name_fn(dbt_resource_props) - fqns_by_output_name[output_name] = dbt_resource_props["fqn"] - - metadata_by_output_name[output_name] = { - key: dbt_resource_props[key] for key in ["unique_id", "resource_type"] - } - - asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props) - - asset_deps[asset_key] = set() - - metadata = merge_dicts( - dagster_dbt_translator.get_metadata(dbt_resource_props), - { - DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest) - if manifest - else None, - DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator, - }, - ) - asset_outs[asset_key] = ( - output_name, - Out( - io_manager_key=io_manager_key, - description=dagster_dbt_translator.get_description(dbt_resource_props), - metadata=metadata, - is_required=False, - dagster_type=Nothing, - code_version=dagster_dbt_translator.get_code_version(dbt_resource_props), - ), - ) - - group_name = dagster_dbt_translator.get_group_name(dbt_resource_props) - if group_name is not None: - group_names_by_key[asset_key] = group_name - - freshness_policy = dagster_dbt_translator.get_freshness_policy(dbt_resource_props) - if freshness_policy is not None: - freshness_policies_by_key[asset_key] = freshness_policy - - automation_condition = dagster_dbt_translator.get_automation_condition(dbt_resource_props) - if automation_condition is not None: - automation_conditions_by_key[asset_key] = automation_condition - - test_unique_ids = [] - if manifest: - test_unique_ids = [ - child_unique_id - for child_unique_id in manifest["child_map"][unique_id] - if child_unique_id.startswith("test") - ] - - for test_unique_id in test_unique_ids: - check_spec = default_asset_check_fn( - manifest, - dbt_nodes, - dagster_dbt_translator, - asset_key, - test_unique_id, - ) - if check_spec: - check_specs_by_key[check_spec.key] = check_spec - - for parent_unique_id in parent_unique_ids: - parent_node_info = dbt_nodes[parent_unique_id] - parent_asset_key = dagster_dbt_translator.get_asset_key(parent_node_info) - - asset_deps[asset_key].add(parent_asset_key) - - # if this parent is not one of the selected nodes, it's an input - if parent_unique_id not in deps: - input_name = dagster_name_fn(parent_node_info) - asset_ins[parent_asset_key] = (input_name, In(Nothing)) - - check_specs_by_output_name = cast( - Dict[str, AssetCheckSpec], - validate_and_assign_output_names_to_check_specs( - list(check_specs_by_key.values()), list(asset_outs.keys()) - ), - ) - - return ( - asset_deps, - asset_ins, - asset_outs, - group_names_by_key, - freshness_policies_by_key, - automation_conditions_by_key, - check_specs_by_output_name, - fqns_by_output_name, - metadata_by_output_name, - ) - - def has_self_dependency(dbt_resource_props: Mapping[str, Any]) -> bool: dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {}) has_self_dependency = dagster_metadata.get("has_self_dependency", False) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py index a7c62aa557152..a2b5af9518601 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py @@ -5,7 +5,6 @@ from typing import ( Any, Callable, - Dict, FrozenSet, List, Mapping, @@ -21,7 +20,6 @@ from dagster import ( AssetExecutionContext, AssetKey, - AssetOut, AssetsDefinition, AutoMaterializePolicy, FreshnessPolicy, @@ -32,6 +30,7 @@ with_resources, ) from dagster._annotations import experimental, experimental_param +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, CacheableAssetsDefinition, @@ -39,19 +38,20 @@ from dagster._core.definitions.metadata import RawMetadataMapping from dagster._core.execution.context.init import build_init_resource_context +from dagster_dbt.asset_specs import build_dbt_asset_specs from dagster_dbt.asset_utils import ( + DAGSTER_DBT_UNIQUE_ID_METADATA_KEY, default_asset_key_fn, default_auto_materialize_policy_fn, default_description_fn, default_freshness_policy_fn, default_group_from_dbt_resource_props, - get_asset_deps, - get_upstream_unique_ids, ) from dagster_dbt.cloud.resources import DbtCloudClient, DbtCloudClientResource, DbtCloudRunStatus from dagster_dbt.cloud.utils import result_to_events from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator from dagster_dbt.errors import DagsterDbtCloudJobInvariantViolationError +from dagster_dbt.utils import get_dbt_resource_props_by_dbt_unique_id_from_manifest DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID" @@ -96,8 +96,8 @@ def __init__( super().__init__(unique_id=f"dbt-cloud-{job_id}") def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: - dbt_nodes, dbt_dependencies = self._get_dbt_nodes_and_dependencies() - return [self._build_dbt_cloud_assets_cacheable_data(dbt_nodes, dbt_dependencies)] + manifest_json, executed_unique_ids = self._get_manifest_json_and_executed_unique_ids() + return [self._build_dbt_cloud_assets_cacheable_data(manifest_json, executed_unique_ids)] def build_definitions( self, data: Sequence[AssetsDefinitionCacheableData] @@ -242,9 +242,9 @@ def _compile_dbt_cloud_job(self, dbt_cloud_job: Mapping[str, Any]) -> Tuple[int, return compile_run_dbt_output.run_id, compile_job_materialization_command_step - def _get_dbt_nodes_and_dependencies( + def _get_manifest_json_and_executed_unique_ids( self, - ) -> Tuple[Mapping[str, Any], Mapping[str, FrozenSet[str]]]: + ) -> Tuple[Mapping[str, Any], FrozenSet[str]]: """For a given dbt Cloud job, fetch the latest run's dependency structure of executed nodes.""" # Fetch information about the job. job = self._dbt_cloud.get_job(job_id=self._job_id) @@ -296,11 +296,6 @@ def _get_dbt_nodes_and_dependencies( ) # Filter the manifest to only include the nodes that were executed. - dbt_nodes: Dict[str, Any] = { - **manifest_json.get("nodes", {}), - **manifest_json.get("sources", {}), - **manifest_json.get("metrics", {}), - } executed_node_ids: Set[str] = set( result["unique_id"] for result in run_results_json["results"] ) @@ -315,17 +310,11 @@ def _get_dbt_nodes_and_dependencies( f"options applied. Received commands: {self._job_commands}." ) - # Generate the dependency structure for the executed nodes. - dbt_dependencies = { - unique_id: frozenset(get_upstream_unique_ids(dbt_nodes, dbt_nodes[unique_id])) - # sort to stabilize job snapshots - for unique_id in sorted(executed_node_ids) - } - - return dbt_nodes, dbt_dependencies + # sort to stabilize job snapshots + return manifest_json, frozenset(sorted(executed_node_ids)) def _build_dbt_cloud_assets_cacheable_data( - self, dbt_nodes: Mapping[str, Any], dbt_dependencies: Mapping[str, FrozenSet[str]] + self, manifest_json: Mapping[str, Any], executed_unique_ids: FrozenSet[str] ) -> AssetsDefinitionCacheableData: """Given all of the nodes and dependencies for a dbt Cloud job, build the cacheable representation that generate the asset definition for the job. @@ -354,67 +343,58 @@ def get_freshness_policy(cls, dbt_resource_props): def get_auto_materialize_policy(cls, dbt_resource_props): return self._node_info_to_auto_materialize_policy_fn(dbt_resource_props) - ( - asset_deps, - asset_ins, - asset_outs, - group_names_by_key, - freshness_policies_by_key, - automation_conditions_by_key, - _, - fqns_by_output_name, - metadata_by_output_name, - ) = get_asset_deps( - dbt_nodes=dbt_nodes, - deps=dbt_dependencies, - # TODO: In the future, allow the IO manager to be specified. - io_manager_key=None, + # generate specs for each executed node + dbt_nodes = get_dbt_resource_props_by_dbt_unique_id_from_manifest(manifest_json) + specs = build_dbt_asset_specs( + manifest=manifest_json, dagster_dbt_translator=CustomDagsterDbtTranslator(), - manifest=None, + select=" ".join( + f"fqn:{'.'.join(dbt_nodes[unique_id]['fqn'])}" for unique_id in executed_unique_ids + ), ) return AssetsDefinitionCacheableData( # TODO: In the future, we should allow additional upstream assets to be specified. - keys_by_input_name={ - input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() - }, - keys_by_output_name={ - output_name: asset_key for asset_key, (output_name, _) in asset_outs.items() - }, + keys_by_output_name={spec.key.to_python_identifier(): spec.key for spec in specs}, internal_asset_deps={ - asset_outs[asset_key][0]: asset_deps for asset_key, asset_deps in asset_deps.items() + spec.key.to_python_identifier(): {dep.asset_key for dep in spec.deps} + for spec in specs }, - # We don't rely on a static group name. Instead, we map over the dbt metadata to - # determine the group name for each asset. - group_name=None, metadata_by_output_name={ - output_name: self._build_dbt_cloud_assets_metadata(dbt_metadata) - for output_name, dbt_metadata in metadata_by_output_name.items() + spec.key.to_python_identifier(): self._build_dbt_cloud_assets_metadata( + dbt_nodes[spec.metadata[DAGSTER_DBT_UNIQUE_ID_METADATA_KEY]] + ) + for spec in specs }, - # TODO: In the future, we should allow the key prefix to be specified. - key_prefix=None, - can_subset=True, extra_metadata={ "job_id": self._job_id, "job_commands": self._job_commands, "job_materialization_command_step": self._job_materialization_command_step, "group_names_by_output_name": { - asset_outs[asset_key][0]: group_name - for asset_key, group_name in group_names_by_key.items() + spec.key.to_python_identifier(): spec.group_name for spec in specs + }, + "fqns_by_output_name": { + spec.key.to_python_identifier(): dbt_nodes[ + spec.metadata[DAGSTER_DBT_UNIQUE_ID_METADATA_KEY] + ]["fqn"] + for spec in specs }, - "fqns_by_output_name": fqns_by_output_name, }, freshness_policies_by_output_name={ - asset_outs[asset_key][0]: freshness_policy - for asset_key, freshness_policy in freshness_policies_by_key.items() + spec.key.to_python_identifier(): spec.freshness_policy + for spec in specs + if spec.freshness_policy }, auto_materialize_policies_by_output_name={ - asset_outs[asset_key][0]: automation_condition.as_auto_materialize_policy() - for asset_key, automation_condition in automation_conditions_by_key.items() + spec.key.to_python_identifier(): spec.auto_materialize_policy + for spec in specs + if spec.auto_materialize_policy }, ) - def _build_dbt_cloud_assets_metadata(self, dbt_metadata: Dict[str, Any]) -> RawMetadataMapping: + def _build_dbt_cloud_assets_metadata( + self, resource_props: Mapping[str, Any] + ) -> RawMetadataMapping: metadata = { "dbt Cloud Job": MetadataValue.url( self._dbt_cloud.build_url_for_job( @@ -428,13 +408,35 @@ def _build_dbt_cloud_assets_metadata(self, dbt_metadata: Dict[str, Any]) -> RawM metadata["dbt Cloud Documentation"] = MetadataValue.url( self._dbt_cloud.build_url_for_cloud_docs( job_id=self._job_id, - resource_type=dbt_metadata["resource_type"], - unique_id=dbt_metadata["unique_id"], + resource_type=resource_props["resource_type"], + unique_id=resource_props["unique_id"], ) ) return metadata + def _rebuild_specs(self, cacheable_data: AssetsDefinitionCacheableData) -> Sequence[AssetSpec]: + specs = [] + for id, key in (cacheable_data.keys_by_output_name or {}).items(): + specs.append( + AssetSpec( + key=key, + group_name=(cacheable_data.extra_metadata or {})[ + "group_names_by_output_name" + ].get(id), + deps=(cacheable_data.internal_asset_deps or {}).get(id), + metadata=(cacheable_data.metadata_by_output_name or {}).get(id), + freshness_policy=(cacheable_data.freshness_policies_by_output_name or {}).get( + id + ), + auto_materialize_policy=( + cacheable_data.auto_materialize_policies_by_output_name or {} + ).get(id), + skippable=False, + ) + ) + return specs + def _build_dbt_cloud_assets_from_cacheable_data( self, assets_definition_cacheable_data: AssetsDefinitionCacheableData ) -> AssetsDefinition: @@ -442,44 +444,13 @@ def _build_dbt_cloud_assets_from_cacheable_data( job_id = cast(int, metadata["job_id"]) job_commands = cast(List[str], list(metadata["job_commands"])) job_materialization_command_step = cast(int, metadata["job_materialization_command_step"]) - group_names_by_output_name = cast(Mapping[str, str], metadata["group_names_by_output_name"]) fqns_by_output_name = cast(Mapping[str, List[str]], metadata["fqns_by_output_name"]) @multi_asset( name=f"dbt_cloud_job_{job_id}", - deps=list((assets_definition_cacheable_data.keys_by_input_name or {}).values()), - outs={ - output_name: AssetOut( - key=asset_key, - group_name=group_names_by_output_name.get(output_name), - freshness_policy=( - assets_definition_cacheable_data.freshness_policies_by_output_name or {} - ).get( - output_name, - ), - auto_materialize_policy=( - assets_definition_cacheable_data.auto_materialize_policies_by_output_name - or {} - ).get( - output_name, - ), - metadata=(assets_definition_cacheable_data.metadata_by_output_name or {}).get( - output_name - ), - is_required=False, - ) - for output_name, asset_key in ( - assets_definition_cacheable_data.keys_by_output_name or {} - ).items() - }, - internal_asset_deps={ - output_name: set(asset_deps) - for output_name, asset_deps in ( - assets_definition_cacheable_data.internal_asset_deps or {} - ).items() - }, + specs=self._rebuild_specs(assets_definition_cacheable_data), partitions_def=self._partitions_def, - can_subset=assets_definition_cacheable_data.can_subset, + can_subset=True, required_resource_keys={"dbt_cloud"}, compute_kind="dbt", ) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/utils.py index c7e580b77d908..e4a3b5c229359 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/utils.py @@ -12,7 +12,7 @@ from dagster._core.definitions.metadata import RawMetadataValue from dagster_dbt.cloud.types import DbtCloudOutput -from dagster_dbt.utils import ASSET_RESOURCE_TYPES, dagster_name_fn, default_node_info_to_asset_key +from dagster_dbt.utils import ASSET_RESOURCE_TYPES, default_node_info_to_asset_key def _resource_type(unique_id: str) -> str: @@ -113,7 +113,7 @@ def result_to_events( if generate_asset_outputs: yield Output( value=None, - output_name=dagster_name_fn(dbt_resource_props), + output_name=node_info_to_asset_key(dbt_resource_props).to_python_identifier(), metadata=metadata, ) else: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/utils.py index 1cf873d55b9ad..9053c9aa1b0e8 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/utils.py @@ -388,10 +388,10 @@ def assert_assets_match_project( def_outputs = sorted(set(assets_op.output_dict.keys())) expected_outputs = sorted( [ - "model_dagster_dbt_test_project_least_caloric", - "model_dagster_dbt_test_project_sort_by_calories", - "model_dagster_dbt_test_project_sort_cold_cereals_by_calories", - "model_dagster_dbt_test_project_sort_hot_cereals_by_calories", + "cold_schema__sort_cold_cereals_by_calories", + "sort_by_calories", + "sort_hot_cereals_by_calories", + "subdir_schema__least_caloric", ] + ( [