Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groups and access for sources and exposures #10426

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/dbt/artifacts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ These types of minor, non-breaking changes are tested by [tests/unit/artifacts/t

#### Updating [schemas.getdbt.com](https://schemas.getdbt.com)
Non-breaking changes to artifact schemas require an update to the corresponding jsonschemas published to [schemas.getdbt.com](https://schemas.getdbt.com), which are defined in https://github.com/dbt-labs/schemas.getdbt.com. To do so:
1. Create a PR in https://github.com/dbt-labs/schemas.getdbt.com which reflects the schema changes to the artifact. The schema can be updated in-place for non-breaking changes. Example PR: https://github.com/dbt-labs/schemas.getdbt.com/pull/39
2. Merge the https://github.com/dbt-labs/schemas.getdbt.com PR
3. Observe the `Artifact Schema Check` CI check pass on the `dbt-core` PR that updates the artifact schemas, and merge the `dbt-core` PR!

1. Run `scripts/collect-artifact-schema.py --path schemas --artifact [manifest | catalog | run-results | sources]` to generate the jsonschema inclusive of your changes.
2. Copy the generated schema into the corresponding artifact file in the schemas.getdbt.com repository. (e.g. `cp schemas/dbt/manifest/v12.json ../../schemas.getdbt.com/dbt/manifest/`)
3. Create a PR in https://github.com/dbt-labs/schemas.getdbt.com which reflects the schema changes to the artifact. The schema can be updated in-place for non-breaking changes. Example PR: https://github.com/dbt-labs/schemas.getdbt.com/pull/39
4. Merge the https://github.com/dbt-labs/schemas.getdbt.com PR
5. Observe the `Artifact Schema Check` CI check pass on the `dbt-core` PR that updates the artifact schemas, and merge the `dbt-core` PR!

Note: Although `jsonschema` validation using the schemas in [schemas.getdbt.com](https://schemas.getdbt.com) is not encouraged or formally supported, `jsonschema` validation should still continue to work once the schemas are updated because they are forward-compatible and can therefore be used to validate previous minor versions of the schema.

Expand Down
4 changes: 3 additions & 1 deletion core/dbt/artifacts/resources/v1/exposure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Dict, List, Literal, Optional

from dbt.artifacts.resources.base import GraphResource
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.v1.components import DependsOn, RefArgs
from dbt.artifacts.resources.v1.owner import Owner
from dbt_common.contracts.config.base import BaseConfig
Expand Down Expand Up @@ -47,3 +47,5 @@ class Exposure(GraphResource):
sources: List[List[str]] = field(default_factory=list)
metrics: List[List[str]] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
access: AccessType = AccessType.Public
group: Optional[str] = None
4 changes: 3 additions & 1 deletion core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Dict, List, Literal, Optional, Union

from dbt.artifacts.resources.base import GraphResource
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.v1.components import (
ColumnInfo,
FreshnessThreshold,
Expand Down Expand Up @@ -70,3 +70,5 @@ class SourceDefinition(ParsedSourceMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
access: AccessType = AccessType.Protected
group: Optional[str] = None
32 changes: 32 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ class Disabled(Generic[D]):

MaybeNonSource = Optional[Union[ManifestNode, Disabled[ManifestNode]]]

MaybeSource = Optional[Union[SourceDefinition, Disabled[SourceDefinition]]]


T = TypeVar("T", bound=GraphMemberNode)

Expand Down Expand Up @@ -1467,6 +1469,36 @@ def is_invalid_protected_ref(
node.package_name != target_model.package_name and restrict_package_access
)

def is_invalid_private_source(self, node: GraphMemberNode, target_source: MaybeSource) -> bool:
if not isinstance(target_source, SourceDefinition):
return False

is_private_source = target_source.access == AccessType.Private

return is_private_source and (
not hasattr(node, "group") or not node.group or node.group != target_source.group
)

def is_invalid_protected_source(
self, node: GraphMemberNode, target_source: MaybeSource, dependencies: Optional[Mapping]
) -> bool:
dependencies = dependencies or {}
if not isinstance(target_source, SourceDefinition):
return False

is_protected_source = (
target_source.access == AccessType.Protected
# don't raise this reference error for ad hoc 'preview' queries
and node.resource_type != NodeType.SqlOperation
and node.resource_type != NodeType.RPCCall # TODO: rm
)
target_dependency = dependencies.get(target_source.package_name)
restrict_package_access = target_dependency.restrict_access if target_dependency else False

return is_protected_source and (
node.package_name != target_source.package_name and restrict_package_access
)

# Called in GraphRunnableTask.before_run, RunTask.before_run, CloneTask.before_run
def merge_from_artifact(self, other: "Manifest") -> None:
"""Update this manifest by adding the 'defer_relation' attribute to all nodes
Expand Down
16 changes: 8 additions & 8 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,9 @@ def same_config(self, old: "SourceDefinition") -> bool:
old.unrendered_config,
)

def same_source_representation(self, old: "SourceDefinition") -> bool:
return self.access == old.access and self.group == old.group

def same_contents(self, old: Optional["SourceDefinition"]) -> bool:
# existing when it didn't before is a change!
if old is None:
Expand All @@ -1252,6 +1255,7 @@ def same_contents(self, old: Optional["SourceDefinition"]) -> bool:
and self.same_quoting(old)
and self.same_freshness(old)
and self.same_external(old)
and self.same_source_representation(old)
and True
)

Expand Down Expand Up @@ -1297,10 +1301,6 @@ def has_freshness(self) -> bool:
def search_name(self):
return f"{self.source_name}.{self.name}"

@property
def group(self):
return None


# ====================================
# Exposure node
Expand Down Expand Up @@ -1342,6 +1342,9 @@ def same_exposure_type(self, old: "Exposure") -> bool:
def same_url(self, old: "Exposure") -> bool:
return self.url == old.url

def same_group(self, old: "Exposure") -> bool:
return self.group == old.group

def same_config(self, old: "Exposure") -> bool:
return self.config.same_contents(
self.unrendered_config,
Expand All @@ -1364,13 +1367,10 @@ def same_contents(self, old: Optional["Exposure"]) -> bool:
and self.same_label(old)
and self.same_depends_on(old)
and self.same_config(old)
and self.same_group(old)
and True
)

@property
def group(self):
return None


# ====================================
# Metric node
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ class UnparsedSourceDefinition(dbtClassMixin):
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
group: Optional[str] = None
access: Optional[str] = None

@classmethod
def validate(cls, data):
Expand Down Expand Up @@ -462,6 +464,7 @@ class UnparsedExposure(dbtClassMixin):
url: Optional[str] = None
depends_on: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
group: Optional[str] = None

@classmethod
def validate(cls, data):
Expand Down
64 changes: 57 additions & 7 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def load(self) -> Manifest:
# These check the created_at time on the nodes to
# determine whether they need processing.
start_process = time.perf_counter()
self.process_sources(self.root_project.project_name)
self.process_sources(self.root_project.project_name, self.root_project.dependencies)
self.process_refs(self.root_project.project_name, self.root_project.dependencies)
self.process_unit_tests(self.root_project.project_name)
self.process_docs(self.root_project)
Expand Down Expand Up @@ -1246,18 +1246,18 @@ def process_docs(self, config: RuntimeConfig):
# Loops through all nodes and exposures, for each element in
# 'sources' array finds the source node and updates the
# 'depends_on.nodes' array with the unique id
def process_sources(self, current_project: str):
def process_sources(self, current_project: str, dependencies: Optional[Mapping[str, Project]]):
for node in self.manifest.nodes.values():
if node.resource_type == NodeType.Source:
continue
assert not isinstance(node, SourceDefinition)
if node.created_at < self.started_at:
continue
_process_sources_for_node(self.manifest, current_project, node)
_process_sources_for_node(self.manifest, current_project, node, dependencies)
for exposure in self.manifest.exposures.values():
if exposure.created_at < self.started_at:
continue
_process_sources_for_exposure(self.manifest, current_project, exposure)
_process_sources_for_exposure(self.manifest, current_project, exposure, dependencies)

# Loops through all nodes, for each element in
# 'unit_test' array finds the node and updates the
Expand Down Expand Up @@ -1793,7 +1793,12 @@ def remove_dependent_project_references(manifest, external_node_unique_id):
node.created_at = time.time()


def _process_sources_for_exposure(manifest: Manifest, current_project: str, exposure: Exposure):
def _process_sources_for_exposure(
manifest: Manifest,
current_project: str,
exposure: Exposure,
dependencies: Optional[Mapping[str, Project]],
):
target_source: Optional[Union[Disabled, SourceDefinition]] = None
for source_name, table_name in exposure.sources:
target_source = manifest.resolve_source(
Expand All @@ -1811,6 +1816,21 @@ def _process_sources_for_exposure(manifest: Manifest, current_project: str, expo
disabled=(isinstance(target_source, Disabled)),
)
continue

if manifest.is_invalid_private_source(exposure, target_source):
raise dbt.exceptions.DbtReferenceError(
unique_id=exposure.unique_id,
ref_unique_id=target_source.unique_id,
access=AccessType.Private,
scope=dbt_common.utils.cast_to_str(target_source.group),
)
elif manifest.is_invalid_protected_source(exposure, target_source, dependencies):
raise dbt.exceptions.DbtReferenceError(
unique_id=exposure.unique_id,
ref_unique_id=target_source.unique_id,
access=AccessType.Protected,
scope=exposure.package_name,
)
target_source_id = target_source.unique_id
exposure.depends_on.add_node(target_source_id)

Expand All @@ -1837,7 +1857,12 @@ def _process_sources_for_metric(manifest: Manifest, current_project: str, metric
metric.depends_on.add_node(target_source_id)


def _process_sources_for_node(manifest: Manifest, current_project: str, node: ManifestNode):
def _process_sources_for_node(
manifest: Manifest,
current_project: str,
node: ManifestNode,
dependencies: Optional[Mapping[str, Project]],
):
if isinstance(node, SeedNode):
return

Expand All @@ -1851,6 +1876,31 @@ def _process_sources_for_node(manifest: Manifest, current_project: str, node: Ma
)

if target_source is None or isinstance(target_source, Disabled):
node.config.enabled = False
invalid_target_fail_unless_test(
node=node,
target_name=f"{source_name}.{table_name}",
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
continue

if manifest.is_invalid_private_source(node, target_source):
raise dbt.exceptions.DbtReferenceError(
unique_id=node.unique_id,
ref_unique_id=target_source.unique_id,
access=AccessType.Private,
scope=dbt_common.utils.cast_to_str(target_source.group),
)
elif manifest.is_invalid_protected_source(node, target_source, dependencies):
raise dbt.exceptions.DbtReferenceError(
unique_id=node.unique_id,
ref_unique_id=target_source.unique_id,
access=AccessType.Protected,
scope=node.package_name,
)

elif target_source is None or isinstance(target_source, Disabled):
# this follows the same pattern as refs
node.config.enabled = False
invalid_target_fail_unless_test(
Expand Down Expand Up @@ -1879,7 +1929,7 @@ def process_macro(config: RuntimeConfig, manifest: Manifest, macro: Macro) -> No
# This is called in task.rpc.sql_commands when a "dynamic" node is
# created in the manifest, in 'add_refs'
def process_node(config: RuntimeConfig, manifest: Manifest, node: ManifestNode):
_process_sources_for_node(manifest, config.project_name, node)
_process_sources_for_node(manifest, config.project_name, node, config.dependencies)
_process_refs(manifest, config.project_name, node, config.dependencies)
ctx = generate_runtime_docs_context(config, node, manifest, config.project_name)
_process_docs_for_node(ctx, node)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/parser/schema_yaml_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def parse_exposure(self, unparsed: UnparsedExposure) -> None:
unique_id=unique_id,
fqn=fqn,
name=unparsed.name,
group=unparsed.group,
type=unparsed.type,
url=unparsed.url,
meta=unparsed.meta,
Expand Down
17 changes: 17 additions & 0 deletions core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dbt.adapters.capability import Capability
from dbt.adapters.factory import get_adapter
from dbt.artifacts.resources import FreshnessThreshold, SourceConfig, Time
from dbt.artifacts.resources.types import AccessType
from dbt.config import RuntimeConfig
from dbt.context.context_config import (
BaseContextConfigGenerator,
Expand All @@ -26,6 +27,7 @@
UnparsedSourceTableDefinition,
)
from dbt.events.types import FreshnessConfigProblem, UnusedTables
from dbt.exceptions import InvalidAccessTypeError
from dbt.node_types import NodeType
from dbt.parser.common import ParserRef
from dbt.parser.schema_generic_tests import SchemaGenericTestParser
Expand Down Expand Up @@ -127,6 +129,19 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
unique_id = target.unique_id
description = table.description or ""
source_description = source.description or ""
group = source.group or None
access = AccessType.Protected
# make sure sources are not public and are a valid access type
if source.access:
if (
not AccessType.is_valid(source.access)
or AccessType(source.access) == AccessType.Public
):
raise InvalidAccessTypeError(
unique_id=unique_id,
field_value=source.access,
)
access = AccessType(source.access)

# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
Expand Down Expand Up @@ -190,6 +205,8 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
tags=tags,
config=config,
unrendered_config=unrendered_config,
group=group,
access=access,
)

if (
Expand Down
Loading
Loading