-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update refs in exposure and semantic model definitions when downstream of a split resource #194
Changes from all commits
c187597
59f89ed
b26c8c4
3b95b34
f283e6c
aa004af
2472420
283f291
785b35d
e61faaa
934eabc
70a2e1a
8fb895f
ec95adf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,17 @@ | ||
import re | ||
from pathlib import Path | ||
from typing import List, Optional, Union | ||
from typing import Any, Dict, List, Optional, Union | ||
|
||
from dbt.contracts.graph.nodes import CompiledNode | ||
from dbt.contracts.graph.nodes import CompiledNode, Exposure, Resource, SemanticModel | ||
from loguru import logger | ||
|
||
from dbt_meshify.change import ChangeSet, EntityType, FileChange, Operation | ||
from dbt_meshify.change import ( | ||
ChangeSet, | ||
EntityType, | ||
FileChange, | ||
Operation, | ||
ResourceChange, | ||
) | ||
from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject | ||
|
||
|
||
|
@@ -135,32 +141,75 @@ def replace_source_with_ref__python( | |
|
||
return new_code | ||
|
||
def update_yml_resource_references( | ||
self, | ||
project_name: str, | ||
upstream_resource_name: str, | ||
resource: Union[Exposure, SemanticModel], | ||
) -> Dict[str, Any]: | ||
new_ref = f"ref('{project_name}', '{upstream_resource_name}')" | ||
if isinstance(resource, SemanticModel): | ||
# we can return early, since semantic models only have one ref and no depends_on | ||
return {"model": new_ref} | ||
refs = resource.refs | ||
ref_to_update = f"ref('{upstream_resource_name}')" | ||
str_refs = [] | ||
for ref in refs: | ||
package_clause = f"'{ref.package}', " if ref.package else "" | ||
name_clause = f"'{ref.name}'" | ||
version_clause = f", v={ref.version}" if ref.version else "" | ||
str_ref = f"ref({package_clause}{name_clause}{version_clause})" | ||
if str_ref != ref_to_update: | ||
str_refs.append(str_ref) | ||
str_refs.append(new_ref) | ||
return {"depends_on": str_refs} | ||
|
||
def generate_reference_update( | ||
self, | ||
project_name: str, | ||
upstream_node: CompiledNode, | ||
downstream_node: CompiledNode, | ||
downstream_node: Union[Resource, CompiledNode], | ||
downstream_project: PathedProject, | ||
code: str, | ||
) -> FileChange: | ||
) -> Union[FileChange, ResourceChange]: | ||
"""Generate FileChanges that update the references in the downstream_node's code.""" | ||
|
||
updated_code = self.ref_update_methods[downstream_node.language]( | ||
model_name=upstream_node.name, | ||
project_name=project_name, | ||
model_code=code, | ||
) | ||
change: Union[FileChange, ResourceChange] | ||
if isinstance(downstream_node, CompiledNode): | ||
updated_code = self.ref_update_methods[downstream_node.language]( | ||
model_name=upstream_node.name, | ||
project_name=project_name, | ||
model_code=code, | ||
) | ||
|
||
return FileChange( | ||
operation=Operation.Update, | ||
entity_type=EntityType.Code, | ||
identifier=downstream_node.name, | ||
path=downstream_project.resolve_file_path(downstream_node), | ||
data=updated_code, | ||
) | ||
return FileChange( | ||
operation=Operation.Update, | ||
entity_type=EntityType.Code, | ||
identifier=downstream_node.name, | ||
path=downstream_project.resolve_file_path(downstream_node), | ||
data=updated_code, | ||
) | ||
|
||
elif isinstance(downstream_node, Exposure) or isinstance(downstream_node, SemanticModel): | ||
is_exposure = isinstance(downstream_node, Exposure) | ||
data = self.update_yml_resource_references( | ||
project_name=project_name, | ||
upstream_resource_name=upstream_node.name, | ||
resource=downstream_node, | ||
) | ||
return ResourceChange( | ||
operation=Operation.Update, | ||
entity_type=EntityType.Exposure if is_exposure else EntityType.SemanticModel, | ||
identifier=downstream_node.name, | ||
path=downstream_project.resolve_file_path(downstream_node), | ||
data=data, | ||
) | ||
raise Exception("Invalid node type provided to generate_reference_update.") | ||
Comment on lines
+193
to
+207
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
def update_child_refs( | ||
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None | ||
self, | ||
resource: CompiledNode, | ||
current_change_set: Optional[ChangeSet] = None, | ||
Comment on lines
+210
to
+212
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hooray for improved formatting! |
||
) -> ChangeSet: | ||
"""Generate a set of FileChanges to update child references""" | ||
|
||
|
@@ -175,33 +224,29 @@ def update_child_refs( | |
else: | ||
compare_project = self.project.name | ||
|
||
for model in self.project.child_map[resource.unique_id]: | ||
if model in self.project.resources or model.split(".")[1] != compare_project: | ||
continue | ||
model_node = self.project.get_manifest_node(model) | ||
if not model_node: | ||
raise KeyError(f"Resource {model} not found in manifest") | ||
|
||
# Don't process Resources missing a language attribute | ||
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode): | ||
for child in self.project.child_map[resource.unique_id]: | ||
if child in self.project.resources or child.split(".")[1] != compare_project: | ||
continue | ||
node = self.project.get_manifest_node(child) | ||
if not node: | ||
raise KeyError(f"Resource {child} not found in manifest") | ||
|
||
if current_change_set: | ||
previous_change = get_latest_file_change( | ||
changeset=current_change_set, | ||
identifier=model_node.name, | ||
path=self.project.parent_project.resolve_file_path(model_node), | ||
identifier=node.name, | ||
path=self.project.parent_project.resolve_file_path(node), | ||
) | ||
code = ( | ||
previous_change.data | ||
if (previous_change and previous_change.data) | ||
else model_node.raw_code | ||
else getattr(node, "raw_code", "") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is such a better way of getting the code. Good thinking! |
||
) | ||
|
||
change = self.generate_reference_update( | ||
project_name=self.project.name, | ||
upstream_node=resource, | ||
downstream_node=model_node, | ||
downstream_node=node, | ||
code=code, | ||
downstream_project=self.project.parent_project, | ||
) | ||
|
@@ -258,8 +303,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet: | |
if change.data is None: | ||
raise Exception(f"Change has null data despite being provided code. {change}") | ||
|
||
code = change.data | ||
|
||
change_set.add(change) | ||
|
||
return change_set | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct in believing that this code fully specifies each resource ref, and then adds the updated upstream source ref? If so, this seems sufficient. Perhaps it would be valuable to create a function that generates ref strings for resources? Either way, ✅