-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP Add basic dbt objects and rule definitions
- Loading branch information
1 parent
7647ddf
commit 794e79b
Showing
5 changed files
with
285 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
from dataclasses import dataclass, field | ||
from typing import Any, List | ||
|
||
|
||
@dataclass | ||
class Constraint: | ||
"""Constraint for a column in a model.""" | ||
|
||
type: str | ||
expression: str | ||
name: str | ||
|
||
|
||
@dataclass | ||
class Test: | ||
"""Test for a column or model.""" | ||
|
||
name: str | ||
type: str | ||
tags: list[str] = field(default_factory=list) | ||
|
||
|
||
@dataclass | ||
class Column: | ||
"""Represents a column in a model.""" | ||
|
||
name: str | ||
description: str | ||
constraints: List[Constraint] | ||
tests: List[Test] = field(default_factory=list) | ||
|
||
|
||
@dataclass | ||
class Model: | ||
"""Represents a dbt model.""" | ||
|
||
id: str | ||
name: str | ||
description: str | ||
file_path: str | ||
config: dict[str, Any] | ||
meta: dict[str, Any] | ||
columns: dict[str, Column] | ||
tests: list[Test] = field(default_factory=list) | ||
|
||
@classmethod | ||
def from_node(cls, node_values: dict[str, Any]) -> "Model": | ||
"""Create a model object from a node in the manifest.""" | ||
columns = { | ||
name: Column( | ||
name=values.get("name"), | ||
description=values.get("description"), | ||
constraints=[ | ||
Constraint( | ||
name=constraint.get("name"), | ||
type=constraint.get("type"), | ||
expression=constraint.get("expression"), | ||
) | ||
for constraint in values.get("constraints", []) | ||
], | ||
) | ||
for name, values in node_values.get("columns", {}).items() | ||
} | ||
|
||
model = cls( | ||
id=node_values["unique_id"], | ||
file_path=node_values["patch_path"], | ||
config=node_values.get("config", {}), | ||
name=node_values["name"], | ||
description=node_values.get("description", ""), | ||
meta=node_values.get("meta", {}), | ||
columns=columns, | ||
) | ||
|
||
return model | ||
|
||
|
||
class ManifestLoader: | ||
"""Load the models and tests from the manifest.""" | ||
|
||
def __init__(self, raw_manifest: dict[str, Any]): | ||
self.raw_manifest = raw_manifest | ||
self.raw_nodes = raw_manifest.get("nodes", {}) | ||
self.models: dict[str, Model] = {} | ||
self.tests: dict[str, Test] = {} | ||
|
||
# Load models first so the tests can be attached to them later. | ||
self.load_models() | ||
self.load_tests() | ||
|
||
def load_models(self) -> None: | ||
"""Load the models from the manifest.""" | ||
for node_values in self.raw_nodes.values(): | ||
if node_values.get("resource_type") == "model": | ||
model = Model.from_node(node_values) | ||
self.models[model.id] = model | ||
|
||
def load_tests(self) -> None: | ||
"""Load the tests from the manifest and attach them to the right object.""" | ||
for node_values in self.raw_nodes.values(): | ||
# Only include tests that are attached to a model. | ||
if node_values.get("resource_type") == "test" and node_values.get( | ||
"attached_node" | ||
): | ||
model = self.models.get(node_values.get("attached_node")) | ||
|
||
if not model: | ||
raise ValueError( | ||
f"Model {node_values.get('attached_node')}" | ||
f"not found, while tests are attached to it." | ||
) | ||
|
||
test = Test( | ||
name=node_values.get("name"), | ||
type=node_values.get("test_metadata").get("name"), | ||
tags=node_values.get("tags"), | ||
) | ||
column_name = ( | ||
node_values.get("test_metadata").get("kwargs").get("column_name") | ||
) | ||
|
||
if column_name: # Test is a column-level test. | ||
model.columns[column_name].tests.append(test) | ||
else: | ||
model.tests.append(test) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import functools | ||
import logging | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
from typing import Any, Callable | ||
|
||
from dbt_score.manifest import Model | ||
|
||
logging.basicConfig() | ||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
class Severity(Enum): | ||
"""The severity/weight of a rule.""" | ||
|
||
LOW = 1 | ||
MEDIUM = 2 | ||
HIGH = 3 | ||
CRITICAL = 4 | ||
|
||
|
||
@dataclass | ||
class RuleViolation: | ||
"""The violation of a rule.""" | ||
|
||
message: str | None = None | ||
|
||
|
||
def rule( | ||
description: str, | ||
hint: str, | ||
severity: Severity = Severity.MEDIUM, | ||
) -> Callable[[Callable[[Model], RuleViolation | None]], Callable[..., None]]: | ||
"""Rule decorator.""" | ||
|
||
def decorator_rule( | ||
func: Callable[[Model], RuleViolation | None], | ||
) -> Callable[..., None]: | ||
@functools.wraps(func) | ||
def wrapper_rule(*args: Any, **kwargs: Any) -> Any: | ||
logger.debug("Executing `%s` with severity: %s.", func.__name__, severity) | ||
return func(*args, **kwargs) | ||
|
||
return wrapper_rule | ||
|
||
return decorator_rule |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
"""All general rules.""" | ||
|
||
from ..manifest import Model | ||
from ..rule import RuleViolation, Severity, rule | ||
|
||
|
||
@rule( | ||
description="A model should have an owner defined.", | ||
hint="Define the owner of the model in the meta section.", | ||
severity=Severity.HIGH, | ||
) | ||
def has_owner(model: Model) -> RuleViolation | None: | ||
"""A model should have an owner defined.""" | ||
if "owner" not in model.meta: | ||
return RuleViolation() | ||
|
||
return None | ||
|
||
|
||
@rule(description="A model should have a primary key defined.", hint="Some hint.") | ||
def has_primary_key(model: Model) -> RuleViolation | None: | ||
"""A model should have a primary key defined, unless it's a view.""" | ||
if not model.config.get("materialized") == "picnic_view": | ||
has_pk = False | ||
for column in model.columns.values(): | ||
if "primary_key" in [constraint.type for constraint in column.constraints]: | ||
has_pk = True | ||
break | ||
|
||
if not has_pk: | ||
return RuleViolation() | ||
|
||
return None | ||
|
||
|
||
@rule( | ||
description="Primary key columns should have a uniqueness test defined.", | ||
hint="Some hint.", | ||
) | ||
def primary_key_has_uniqueness_test(model: Model) -> RuleViolation | None: | ||
"""Primary key columns should have a uniqueness test defined.""" | ||
columns_with_pk = [] | ||
if not model.config.get("materialized") == "picnic_view": | ||
for column_name, column in model.columns.items(): | ||
if "primary_key" in [constraint.type for constraint in column.constraints]: | ||
columns_with_pk.append(column_name) | ||
|
||
tests = ( | ||
model.columns[columns_with_pk[0]].tests | ||
if len(columns_with_pk) == 1 | ||
else model.tests | ||
) | ||
|
||
if columns_with_pk and "unique" not in [test.type for test in tests]: | ||
return RuleViolation() | ||
|
||
return None | ||
|
||
|
||
@rule( | ||
description="All columns of a model should have a description.", hint="Some hint." | ||
) | ||
def columns_have_description(model: Model) -> RuleViolation | None: | ||
"""All columns of a model should have a description.""" | ||
invalid_columns = [ | ||
column_name | ||
for column_name, column in model.columns.items() | ||
if not column.description | ||
] | ||
if invalid_columns: | ||
return RuleViolation( | ||
message=f"The following columns lack a description: " | ||
f"{', '.join(invalid_columns)}." | ||
) | ||
|
||
return None | ||
|
||
|
||
@rule(description="A model should have at least one test defined.", hint="Some hint.") | ||
def has_test(model: Model) -> RuleViolation | None: | ||
"""A model should have at least one model-level and one column-level test. | ||
This does not include singular tests, which are tests defined in a separate .sql | ||
file and not linked to the model in the metadata. | ||
""" | ||
column_tests = [] | ||
for column in model.columns.values(): | ||
column_tests.extend(column.tests) | ||
|
||
if len(model.tests) == 0 or len(column_tests) == 0: | ||
return RuleViolation() | ||
|
||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""Utility functions.""" | ||
|
||
import json | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
|
||
class JsonOpenError(RuntimeError): | ||
"""Raised when there is an error opening a JSON file.""" | ||
|
||
pass | ||
|
||
|
||
def get_json(json_filename: str) -> Any: | ||
"""Get JSON from a file.""" | ||
try: | ||
file_content = Path(json_filename).read_text(encoding="utf-8") | ||
return json.loads(file_content) | ||
except Exception as e: | ||
raise JsonOpenError(f"Error opening {json_filename}.") from e |