Skip to content

Commit

Permalink
Merge branch 'main' into qmalcolm--10867-default-lookback-to-1
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Oct 23, 2024
2 parents ae9c7a1 + bdb79e8 commit 02d44f4
Show file tree
Hide file tree
Showing 22 changed files with 966 additions and 43 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241001-134051.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable use of multi-column unique key in snapshots
time: 2024-10-01T13:40:51.297529-04:00
custom:
Author: gshank
Issue: "9992"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241018-135810.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Exclude hook result from results in on-run-end context
time: 2024-10-18T13:58:10.396884-07:00
custom:
Author: ChenyuLInx
Issue: "7387"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241022-222927.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Implement partial parsing for all-yaml snapshots
time: 2024-10-22T22:29:27.396378-04:00
custom:
Author: gshank
Issue: "10903"
2 changes: 1 addition & 1 deletion core/dbt/artifacts/resources/v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
class SnapshotConfig(NodeConfig):
materialized: str = "snapshot"
strategy: Optional[str] = None
unique_key: Optional[str] = None
unique_key: Optional[Union[str, List[str]]] = None
target_schema: Optional[str] = None
target_database: Optional[str] = None
updated_at: Optional[str] = None
Expand Down
8 changes: 7 additions & 1 deletion core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

@dataclass
class TimingInfo(dbtClassMixin):
"""
Represents a step in the execution of a node.
`name` should be one of: compile, execute, or other
Do not call directly, use `collect_timing_info` instead.
"""

name: str
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
Expand All @@ -21,7 +27,7 @@ def end(self):
self.completed_at = datetime.utcnow()

def to_msg_dict(self):
msg_dict = {"name": self.name}
msg_dict = {"name": str(self.name)}
if self.started_at:
msg_dict["started_at"] = datetime_to_json_string(self.started_at)
if self.completed_at:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
snapshots: List[str] = field(default_factory=list)
# The following field will no longer be used. Leaving
# here to avoid breaking existing projects. To be removed
# later if possible.
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SeedNode,
SemanticModel,
SingularTestNode,
SnapshotNode,
SourceDefinition,
UnitTestDefinition,
UnitTestFileFixture,
Expand Down Expand Up @@ -1600,12 +1601,14 @@ def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=Non
if isinstance(node, GenericTestNode):
assert test_from
source_file.add_test(node.unique_id, test_from)
if isinstance(node, Metric):
elif isinstance(node, Metric):
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
elif isinstance(node, Exposure):
source_file.exposures.append(node.unique_id)
if isinstance(node, Group):
elif isinstance(node, Group):
source_file.groups.append(node.unique_id)
elif isinstance(node, SnapshotNode):
source_file.snapshots.append(node.unique_id)
elif isinstance(source_file, FixtureSourceFile):
pass
else:
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,14 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
key_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if key_diff["changed"]:
for elem in key_diff["changed"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)
if key_diff["deleted"]:
for elem in key_diff["deleted"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
if key_diff["added"]:
for elem in key_diff["added"]:
Expand All @@ -673,6 +677,8 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)

Expand Down Expand Up @@ -828,6 +834,8 @@ def delete_schema_mssa_links(self, schema_file, dict_key, elem):
# remove elem node and remove unique_id from node_patches
for elem_unique_id in elem_unique_ids:
# might have been already removed
# For all-yaml snapshots, we don't do this, since the node
# should have already been removed.
if (
elem_unique_id in self.saved_manifest.nodes
or elem_unique_id in self.saved_manifest.disabled
Expand Down Expand Up @@ -868,6 +876,19 @@ def remove_tests(self, schema_file, dict_key, name):
self.saved_manifest.nodes.pop(test_unique_id)
schema_file.remove_tests(dict_key, name)

def delete_yaml_snapshot(self, schema_file, snapshot_dict):
snapshot_name = snapshot_dict["name"]
snapshots = schema_file.snapshots.copy()
for unique_id in snapshots:
if unique_id in self.saved_manifest.nodes:
snapshot = self.saved_manifest.nodes[unique_id]
if snapshot.name == snapshot_name:
self.saved_manifest.nodes.pop(unique_id)
schema_file.snapshots.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
schema_file.snapshots.remove(unique_id)

def delete_schema_source(self, schema_file, source_dict):
# both patches, tests, and source nodes
source_name = source_dict["name"]
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ def _add_yaml_snapshot_nodes_to_manifest(
snapshot_node.raw_code = "select * from {{ " + snapshot["relation"] + " }}"

# Add our new node to the manifest, and note that ref lookup collections
# will need to be rebuilt.
self.manifest.add_node_nofile(snapshot_node)
# will need to be rebuilt. This adds the node unique_id to the "snapshots"
# list in the SchemaSourceFile.
self.manifest.add_node(block.file, snapshot_node)
rebuild_refs = True

if rebuild_refs:
Expand Down
23 changes: 16 additions & 7 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
RunningStatus,
RunStatus,
TimingInfo,
collect_timing_info,
)
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
Expand Down Expand Up @@ -633,7 +634,6 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
def safe_run_hooks(
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
) -> RunStatus:
started_at = datetime.utcnow()
ordered_hooks = self.get_hooks_by_type(hook_type)

if hook_type == RunHookType.End and ordered_hooks:
Expand All @@ -653,14 +653,20 @@ def safe_run_hooks(
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing = []
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
)

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
)
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)

fire_event(
LogHookStartLine(
statement=hook_name,
Expand All @@ -670,11 +676,12 @@ def safe_run_hooks(
)
)

status, message = get_execution_status(sql, adapter)
finished_at = datetime.utcnow()
with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
timing = [TimingInfo(hook_name, started_at, finished_at)]
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
Expand Down Expand Up @@ -767,7 +774,9 @@ def after_run(self, adapter, results) -> None:

extras = {
"schemas": list({s for _, s in database_schema_set}),
"results": results,
"results": [
r for r in results if r.thread_id != "main" or r.status == RunStatus.Error
], # exclude that didn't fail to preserve backwards compatibility
"database_schemas": list(database_schema_set),
}
with adapter.connection_named("master"):
Expand Down
46 changes: 26 additions & 20 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import threading
import traceback
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

import dbt_common.exceptions
from dbt.adapters.factory import get_adapter
from dbt.artifacts.schemas.results import RunStatus, TimingInfo
from dbt.artifacts.schemas.results import RunStatus, TimingInfo, collect_timing_info
from dbt.artifacts.schemas.run import RunResult, RunResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
Expand Down Expand Up @@ -51,25 +51,29 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table":
return res

def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
timing: List[TimingInfo] = []

success = True
with collect_timing_info("compile", timing.append):
self.compile_manifest()

start = timing[0].started_at

success = True
package_name, macro_name = self._get_macro_parts()

try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
with collect_timing_info("execute", timing.append):
try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False

end = datetime.utcnow()
end = timing[1].completed_at

macro = (
self.manifest.find_macro_by_name(macro_name, self.config.project_name, package_name)
Expand All @@ -85,10 +89,12 @@ def run(self) -> RunResultsArtifact:
f"dbt could not find a macro with the name '{macro_name}' in any package"
)

execution_time = (end - start).total_seconds() if start and end else 0.0

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
execution_time=execution_time,
failures=0 if success else 1,
message=None,
node=HookNode(
Expand All @@ -105,13 +111,13 @@ def run(self) -> RunResultsArtifact:
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
timing=timing,
batch_results=None,
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
generated_at=end or datetime.utcnow(),
elapsed_time=execution_time,
args={
k: v
for k, v in self.args.__dict__.items()
Expand Down
1 change: 0 additions & 1 deletion core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]):
self.started_at = time.time()
try:
before_run_status = self.before_run(adapter, selected_uids)

if before_run_status == RunStatus.Success or (
not get_flags().skip_nodes_if_on_run_start_fails
):
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"dbt-extractor>=0.5.0,<=0.6",
"dbt-semantic-interfaces>=0.7.3,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.9.0,<2.0",
"dbt-common>=1.11.0,<2.0",
"dbt-adapters>=1.7.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
Expand Down
12 changes: 12 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -6540,6 +6540,12 @@
{
"type": "string"
},
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "null"
}
Expand Down Expand Up @@ -16425,6 +16431,12 @@
{
"type": "string"
},
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "null"
}
Expand Down
Loading

0 comments on commit 02d44f4

Please sign in to comment.