Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of cross-project ref #7276

Merged
merged 58 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0daea9f
Create publication.py, various Publication classes, Dependency class
gshank Apr 12, 2023
6d511eb
fix merge error
gshank Apr 12, 2023
7ffadaf
Fix test
gshank Apr 12, 2023
8ba476a
Move test_publication into multi_project dir
gshank Apr 13, 2023
9add2a6
Load dependencies.yml and the corresponding publication file
gshank Apr 13, 2023
77d2a5b
use is_latest_version
gshank Apr 14, 2023
6bb6e57
Add version to PublicModel for now
gshank Apr 14, 2023
8dcc2b5
change test to use parse and get manifest
gshank Apr 14, 2023
ede647a
Add unique_id and a couple of properties to PublicModel
gshank Apr 14, 2023
86045ca
add "name" and "package_name" to PublicModel, some changes to ref_lookup
gshank Apr 14, 2023
8009349
Add "external_nodes" and populate ref_lookup
gshank Apr 14, 2023
6a9c06d
resolve_ref working
gshank Apr 14, 2023
f8a7bfd
add "external_nodes" dictionary, save external ref unique_ids to it
gshank Apr 14, 2023
22d6e85
Merge branch 'main' into ct-2327-model_publication
gshank Apr 17, 2023
babaf6a
rename external_nodes to public_nodes
gshank Apr 17, 2023
bd652a7
relation from relation_name. Not tested.
gshank Apr 17, 2023
c96c2a2
Test working with public_node
gshank Apr 17, 2023
17542fb
Add public nodes to parent and child maps
gshank Apr 17, 2023
c3c2940
Bump manifest version and fix tests, use ModelDependsOn
gshank Apr 17, 2023
fb12f7c
re-comment line in test_previous_version_state.py
gshank Apr 18, 2023
22cb844
Merge branch 'main' into ct-2327-model_publication
gshank Apr 18, 2023
0dd963a
Fix refs to manifest.json version, typo in Dependencies
gshank Apr 18, 2023
f7202ae
Split out PublicationArtifact and PublicationConfig, store public_models
gshank Apr 18, 2023
5bd828e
Store dependencies in publication artifact
gshank Apr 19, 2023
a43327d
Remove ModelDependsOn
gshank Apr 19, 2023
942f712
Fix is_latest_version and add some comments
gshank Apr 19, 2023
3054aed
change detection of PublicModel for >= python3.10
gshank Apr 20, 2023
fcd545b
Changie
gshank Apr 20, 2023
a5429cc
Merge branch 'main' into ct-2327-model_publication
gshank Apr 20, 2023
c4db8c0
Fix another isinstance of public model
gshank Apr 20, 2023
3f54bb0
Update another instance check of PublicModel
gshank Apr 21, 2023
e9d5a4c
Handle removing references for re-processing if publication has changed
gshank Apr 21, 2023
9e2284c
Handle only changed publication artifacts
gshank Apr 24, 2023
34b813a
Add another test
gshank Apr 24, 2023
719ab9a
Add some logging events
gshank Apr 24, 2023
41364be
Merge branch 'main' into ct-2327-model_publication
gshank Apr 24, 2023
bac92ea
Remove duplicate nodes from manifest
gshank Apr 24, 2023
dac9fa8
Merge branch 'main' into ct-2327-model_publication
gshank Apr 25, 2023
2e938e3
refactor relation_from_relation_name
gshank Apr 25, 2023
6b01773
Some comments and minor cleanup
gshank Apr 25, 2023
f286af2
get quote character from class, quoting from publication artifact
gshank Apr 25, 2023
4fdfb2b
cleanup depends_on_public_nodes; move call to rebuild_ref_lookup; tweak
gshank Apr 26, 2023
0425c3b
Remove duplicate writing of manifest.json
gshank Apr 26, 2023
acfae95
Add public_nodes to flat_graph
gshank Apr 26, 2023
3254154
Rename some dependencies
gshank Apr 26, 2023
2aad937
Fix test_manifest.py
gshank Apr 26, 2023
fce5c07
Move some file name constants to core/dbt/constants.py
gshank Apr 27, 2023
a256083
Remove "environment" from ProjectDependency. Add
gshank Apr 27, 2023
1589c75
Include external publication dependencies in publication artifact dep…
gshank Apr 27, 2023
97f1fc9
Remove create_from_relation_name, call create_from_node instead
gshank Apr 27, 2023
3fc17fe
Merge branch 'main' into ct-2327-model_publication
gshank Apr 27, 2023
b81157f
Change PublicationArtifactChanged message to debug level
gshank Apr 27, 2023
552b623
Make write_publication_artifact a function in parser/manifest.py
gshank Apr 27, 2023
6369410
Merge branch 'main' into ct-2327-model_publication
gshank Apr 28, 2023
f705d26
Code review cleanup
gshank May 1, 2023
f12e9b6
Merge branch 'main' into ct-2327-model_publication
gshank May 1, 2023
a0611a4
Create fixture to create minimal alternate project (just models)
gshank May 2, 2023
7471b2f
develop multi project test case
gshank May 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230420-124756.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Publication artifacts and cross-project ref
time: 2023-04-20T12:47:56.92683-04:00
custom:
Author: gshank
Issue: "7227"
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_ephemeral_from_node(
def create_from_node(
cls: Type[Self],
config: HasQuoting,
node: ManifestNode,
node,
quote_policy: Optional[Dict[str, bool]] = None,
**kwargs: Any,
) -> Self:
Expand Down
191 changes: 101 additions & 90 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,94 @@ def write_graph(self, outfile: str, manifest: Manifest):
with open(outfile, "wb") as outfh:
pickle.dump(out_graph, outfh, protocol=pickle.HIGHEST_PROTOCOL)

def link_node(self, node: GraphMemberNode, manifest: Manifest):
self.add_node(node.unique_id)

for dependency in node.depends_on_nodes:
if dependency in manifest.nodes:
self.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
elif dependency in manifest.sources:
self.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
self.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
else:
raise GraphDependencyNotFoundError(node, dependency)

def link_graph(self, manifest: Manifest, add_test_edges: bool = False):
for source in manifest.sources.values():
self.add_node(source.unique_id)
for node in manifest.nodes.values():
self.link_node(node, manifest)
for exposure in manifest.exposures.values():
self.link_node(exposure, manifest)
for metric in manifest.metrics.values():
self.link_node(metric, manifest)

cycle = self.find_cycles()

if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

if add_test_edges:
manifest.build_parent_and_child_maps()
self.add_test_edges(manifest)

def add_test_edges(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|

for node_id in self.graph:
# If node is executable (in manifest.nodes) and does _not_
# represent a test, continue.
if (
node_id in manifest.nodes
and manifest.nodes[node_id].resource_type != NodeType.Test
):
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(self.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
upstream_tests += _get_tests_for_node(manifest, upstream_node)

for upstream_test in upstream_tests:
# Get the set of all nodes that the test depends on
# including the upstream_node itself. This is necessary
# because tests can depend on multiple nodes (ex:
# relationship tests). Test nodes do not distinguish
# between what node the test is "testing" and what
# node(s) it depends on.
test_depends_on = set(manifest.nodes[upstream_test].depends_on_nodes)

# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id)

def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)

def get_graph_summary(self, manifest: Manifest) -> Dict[int, Dict[str, Any]]:
"""Create a smaller summary of the graph, suitable for basic diagnostics
and performance tuning. The summary includes only the edge structure,
Expand Down Expand Up @@ -406,98 +494,13 @@ def _compile_code(

return node

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
graph_path = os.path.join(self.config.target_path, filename)
flags = get_flags()
if flags.WRITE_JSON:
linker.write_graph(graph_path, manifest)

def link_node(self, linker: Linker, node: GraphMemberNode, manifest: Manifest):
linker.add_node(node.unique_id)

for dependency in node.depends_on_nodes:
if dependency in manifest.nodes:
linker.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
elif dependency in manifest.sources:
linker.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
linker.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
else:
raise GraphDependencyNotFoundError(node, dependency)

def link_graph(self, linker: Linker, manifest: Manifest):
for source in manifest.sources.values():
linker.add_node(source.unique_id)
for node in manifest.nodes.values():
self.link_node(linker, node, manifest)
for exposure in manifest.exposures.values():
self.link_node(linker, exposure, manifest)
for metric in manifest.metrics.values():
self.link_node(linker, metric, manifest)

cycle = linker.find_cycles()

if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, linker: Linker, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|

for node_id in linker.graph:
# If node is executable (in manifest.nodes) and does _not_
# represent a test, continue.
if (
node_id in manifest.nodes
and manifest.nodes[node_id].resource_type != NodeType.Test
):
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(linker.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
upstream_tests += _get_tests_for_node(manifest, upstream_node)

for upstream_test in upstream_tests:
# Get the set of all nodes that the test depends on
# including the upstream_node itself. This is necessary
# because tests can depend on multiple nodes (ex:
# relationship tests). Test nodes do not distinguish
# between what node the test is "testing" and what
# node(s) it depends on.
test_depends_on = set(manifest.nodes[upstream_test].depends_on_nodes)

# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
linker.graph.add_edge(upstream_test, node_id)

# This method doesn't actually "compile" any of the nodes. That is done by the
# "compile_node" method. This creates a Linker and builds the networkx graph,
# writes out the graph.gpickle file, and prints the stats, returning a Graph object.
def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph:
self.initialize()
linker = Linker()

self.link_graph(linker, manifest)
linker.link_graph(manifest)

# Create a file containing basic information about graph structure,
# supporting diagnostics and performance analysis.
Expand All @@ -507,7 +510,7 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph

if add_test_edges:
manifest.build_parent_and_child_maps()
self.add_test_edges(linker, manifest)
linker.add_test_edges(manifest)

# Create another diagnostic summary, just as above, but this time
# including the test edges.
Expand All @@ -533,10 +536,18 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph
self.config.args.__class__ == argparse.Namespace
and self.config.args.cls == list_task.ListTask
):
stats = _generate_stats(manifest)
print_compile_stats(stats)

return Graph(linker.graph)

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
graph_path = os.path.join(self.config.target_path, filename)
flags = get_flags()
if flags.WRITE_JSON:
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions"
)

DEPENDENCIES_FILE_NAME = "dependencies.yml"
MANIFEST_FILE_NAME = "manifest.json"
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
12 changes: 10 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def resolve(
) -> RelationProxy:
self.model.refs.append(self._repack_args(name, package, version))

# This is not the ref for the "name" passed in, but for the current model.
return self.Relation.create_from(self.config, self.model)


Expand Down Expand Up @@ -512,7 +513,11 @@ def resolve(
return self.create_relation(target_model)

def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
if target_model.is_public_node:
# Get quoting from publication artifact
pub_metadata = self.manifest.publications[target_model.package_name].metadata
return self.Relation.create_from_node(pub_metadata, target_model)
elif target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from_node(self.config, target_model)
else:
Expand All @@ -525,7 +530,10 @@ def validate(
target_package: Optional[str],
target_version: Optional[NodeVersion],
) -> None:
if resolved.unique_id not in self.model.depends_on.nodes:
if (
resolved.unique_id not in self.model.depends_on.nodes
and resolved.unique_id not in self.model.depends_on.public_nodes
):
args = self._repack_args(target_name, target_package, target_version)
raise RefBadContextError(node=self.model, args=args)

Expand Down
Loading