Skip to content

Commit

Permalink
Selectors in docs generate limits catalog generation (#8772)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored and QMalcolm committed Oct 6, 2023
1 parent e5d5fe4 commit 81da465
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231004-170155.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Selectors with docs generate limits catalog generation
time: 2023-10-04T17:01:55.845479-04:00
custom:
Author: gshank
Issue: "6014"
45 changes: 31 additions & 14 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,8 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
return info_schema_name_map

def _get_catalog_relations_by_info_schema(
self, manifest: Manifest
self, relations
) -> Dict[InformationSchema, List[BaseRelation]]:
relations = self._get_catalog_relations(manifest)
relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict()
for relation in relations:
info_schema = relation.information_schema_only()
Expand All @@ -446,15 +445,30 @@ def _get_catalog_relations_by_info_schema(

return relations_by_info_schema

def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]:
nodes: Iterator[ResultNode] = chain(
[
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
],
manifest.sources.values(),
)
def _get_catalog_relations(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> List[BaseRelation]:
nodes: Iterator[ResultNode]
if selected_nodes:
selected: List[ResultNode] = []
for unique_id in selected_nodes:
if unique_id in manifest.nodes:
node = manifest.nodes[unique_id]
if node.is_relational and not node.is_ephemeral_model:
selected.append(node)
elif unique_id in manifest.sources:
source = manifest.sources[unique_id]
selected.append(source)
nodes = iter(selected)
else:
nodes = chain(
[
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
],
manifest.sources.values(),
)

relations = [self.Relation.create_from(self.config, n) for n in nodes]
return relations
Expand Down Expand Up @@ -1142,13 +1156,16 @@ def _get_one_catalog_by_relations(
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> Tuple[agate.Table, List[Exception]]:

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
relation_count = len(self._get_catalog_relations(manifest))
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relation_count = len(catalog_relations)
if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations):
relations_by_schema = self._get_catalog_relations_by_info_schema(manifest)
relations_by_schema = self._get_catalog_relations_by_info_schema(catalog_relations)
for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
relations = relations_by_schema[info_schema]
Expand Down
20 changes: 19 additions & 1 deletion core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CatalogArtifact,
)
from dbt.exceptions import DbtInternalError, AmbiguousCatalogMatchError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.include.global_project import DOCS_INDEX_FILE_PATH
from dbt.events.functions import fire_event
from dbt.events.types import (
Expand Down Expand Up @@ -218,6 +220,11 @@ def run(self) -> CatalogArtifact:
DOCS_INDEX_FILE_PATH, os.path.join(self.config.project_target_path, "index.html")
)

# Get the list of nodes that have been selected
selected_nodes = None
if self.job_queue is not None:
selected_nodes = self.job_queue.get_selected_nodes()

for asset_path in self.config.asset_paths:
to_asset_path = os.path.join(self.config.project_target_path, asset_path)

Expand All @@ -237,7 +244,8 @@ def run(self) -> CatalogArtifact:
adapter = get_adapter(self.config)
with adapter.connection_named("generate_catalog"):
fire_event(BuildingCatalog())
catalog_table, exceptions = adapter.get_catalog(self.manifest)
# This generates the catalog as an agate.Table
catalog_table, exceptions = adapter.get_catalog(self.manifest, selected_nodes)

catalog_data: List[PrimitiveDict] = [
dict(zip(catalog_table.column_names, map(dbt.utils._coerce_decimal, row)))
Expand Down Expand Up @@ -269,6 +277,16 @@ def run(self) -> CatalogArtifact:
fire_event(CatalogWritten(path=os.path.abspath(path)))
return results

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=NodeType.executable(),
)

def get_catalog_results(
self,
nodes: Dict[str, CatalogTable],
Expand Down
3 changes: 1 addition & 2 deletions tests/functional/defer_state/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ def test_run_and_defer(self, project, unique_schema, other_schema):
"otherschema",
]
)
assert other_schema not in catalog.nodes["seed.test.seed"].metadata.schema
assert unique_schema in catalog.nodes["seed.test.seed"].metadata.schema
assert "seed.test.seed" not in catalog.nodes

# with state it should work though
results = run_dbt(
Expand Down
19 changes: 11 additions & 8 deletions tests/functional/docs/test_generate.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import pytest

from dbt.tests.util import run_dbt, get_manifest
import json


class TestGenerate:
@pytest.fixture(scope="class")
def models(self):
return {"my_model.sql": "select 1 as fun"}
return {"my_model.sql": "select 1 as fun", "alt_model.sql": "select 1 as notfun"}

def test_manifest_not_compiled(self, project):
run_dbt(["docs", "generate", "--no-compile"])
Expand All @@ -19,9 +18,13 @@ def test_manifest_not_compiled(self, project):
assert manifest.nodes[model_id].compiled is False

def test_generate_empty_catalog(self, project):
run_dbt(["docs", "generate", "--empty-catalog"])
with open("./target/catalog.json") as file:
catalog = json.load(file)
assert catalog["nodes"] == {}, "nodes should be empty"
assert catalog["sources"] == {}, "sources should be empty"
assert catalog["errors"] is None, "errors should be null"
catalog = run_dbt(["docs", "generate", "--empty-catalog"])
assert catalog.nodes == {}, "nodes should be empty"
assert catalog.sources == {}, "sources should be empty"
assert catalog.errors is None, "errors should be null"

def test_select_limits_catalog(self, project):
run_dbt(["run"])
catalog = run_dbt(["docs", "generate", "--select", "my_model"])
assert len(catalog.nodes) == 1
assert "model.test.my_model" in catalog.nodes

0 comments on commit 81da465

Please sign in to comment.