Skip to content

Commit

Permalink
All commands support --defer
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Dec 3, 2023
1 parent 20262ab commit 94bcaab
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 108 deletions.
8 changes: 7 additions & 1 deletion core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,13 @@ def add_fn(x):
if k == "macro" and command == CliCommand.RUN_OPERATION:
add_fn(v)
# None is a Singleton, False is a Flyweight, only one instance of each.
elif v is None or v is False:
elif (v is None or v is False) and k not in (
# These are None by default but they do not support --no-{flag}
"defer_state",
"warn_error",
"warn_error_options",
"log_format",
):
add_fn(f"--no-{spinal_cased}")
elif v is True:
add_fn(f"--{spinal_cased}")
Expand Down
74 changes: 10 additions & 64 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,18 @@ def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult:
def global_flags(func):
@p.cache_selected_only
@p.debug
@p.defer
@p.deprecated_defer
@p.defer_state
@p.deprecated_favor_state
@p.deprecated_print
@p.deprecated_state
@p.enable_legacy_logger
@p.fail_fast
@p.favor_state
@p.log_cache_events
@p.log_file_max_bytes
@p.log_format
@p.log_format_file
@p.log_level
@p.log_level_file
Expand All @@ -143,12 +150,15 @@ def global_flags(func):
@p.record_timing_info
@p.send_anonymous_usage_stats
@p.single_threaded
@p.state
@p.static_parser
@p.use_colors
@p.use_colors_file
@p.use_experimental_parser
@p.version
@p.version_check
@p.warn_error
@p.warn_error_options
@p.write_json
@functools.wraps(func)
def wrapper(*args, **kwargs):
Expand All @@ -166,9 +176,6 @@ def wrapper(*args, **kwargs):
)
@click.pass_context
@global_flags
@p.warn_error
@p.warn_error_options
@p.log_format
@p.show_resource_report
def cli(ctx, **kwargs):
"""An ELT tool for managing your SQL transformations and data models.
Expand All @@ -180,11 +187,7 @@ def cli(ctx, **kwargs):
@cli.command("build")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.include_saved_query
@p.indirect_selection
Expand All @@ -195,9 +198,6 @@ def cli(ctx, **kwargs):
@p.select
@p.selector
@p.show
@p.state
@p.defer_state
@p.deprecated_state
@p.store_failures
@p.target
@p.target_path
Expand Down Expand Up @@ -259,21 +259,14 @@ def docs(ctx, **kwargs):
@click.pass_context
@global_flags
@p.compile_docs
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.empty_catalog
@p.static
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -330,11 +323,7 @@ def docs_serve(ctx, **kwargs):
@cli.command("compile")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.show_output_format
@p.indirect_selection
Expand All @@ -346,9 +335,6 @@ def docs_serve(ctx, **kwargs):
@p.select
@p.selector
@p.inline
@p.state
@p.defer_state
@p.deprecated_state
@p.compile_inject_ephemeral_ctes
@p.target
@p.target_path
Expand Down Expand Up @@ -378,11 +364,7 @@ def compile(ctx, **kwargs):
@cli.command("show")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.show_output_format
@p.show_limit
Expand All @@ -394,9 +376,6 @@ def compile(ctx, **kwargs):
@p.select
@p.selector
@p.inline
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -533,9 +512,6 @@ def init(ctx, **kwargs):
@p.resource_type
@p.raw_select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.vars
Expand Down Expand Up @@ -591,10 +567,6 @@ def parse(ctx, **kwargs):
@cli.command("run")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.favor_state
@p.deprecated_favor_state
@p.exclude
@p.full_refresh
@p.profile
Expand All @@ -603,9 +575,6 @@ def parse(ctx, **kwargs):
@p.empty
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -638,7 +607,6 @@ def run(ctx, **kwargs):
@p.vars
@p.profile
@p.target
@p.state
@p.threads
@requires.postflight
@requires.preflight
Expand All @@ -663,7 +631,6 @@ def retry(ctx, **kwargs):
@cli.command("clone")
@click.pass_context
@global_flags
@p.defer_state
@p.exclude
@p.full_refresh
@p.profile
Expand All @@ -672,7 +639,6 @@ def retry(ctx, **kwargs):
@p.resource_type
@p.select
@p.selector
@p.state # required
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -740,9 +706,6 @@ def run_operation(ctx, **kwargs):
@p.select
@p.selector
@p.show
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand All @@ -769,19 +732,12 @@ def seed(ctx, **kwargs):
@cli.command("snapshot")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -824,9 +780,6 @@ def source(ctx, **kwargs):
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -860,20 +813,13 @@ def freshness(ctx, **kwargs):
@cli.command("test")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.indirect_selection
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.store_failures
@p.target
@p.target_path
Expand Down
24 changes: 1 addition & 23 deletions core/dbt/task/compile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import threading
from typing import AbstractSet, Optional

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunStatus, RunResult
from dbt.events.base_types import EventLevel
from dbt.events.functions import fire_event
Expand All @@ -14,7 +12,7 @@

from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.parser.manifest import write_manifest, process_node
from dbt.parser.manifest import process_node
from dbt.parser.sql import SqlBlockParser
from dbt.task.base import BaseRunner
from dbt.task.runnable import GraphRunnableTask
Expand Down Expand Up @@ -101,26 +99,6 @@ def task_end_messages(self, results):
)
)

def _get_deferred_manifest(self) -> Optional[WritableManifest]:
return super()._get_deferred_manifest() if self.args.defer else None

def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise DbtInternalError(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.merge_from_artifact(
adapter=adapter,
other=deferred_manifest,
selected=selected_uids,
favor_state=bool(self.args.favor_state),
)
# TODO: is it wrong to write the manifest here? I think it's right...
write_manifest(self.manifest, self.config.project_target_path)

def _runtime_initialize(self):
if getattr(self.args, "inline", None):
try:
Expand Down
4 changes: 0 additions & 4 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ def node_is_match(self, node):


class FreshnessTask(GraphRunnableTask):
def defer_to_manifest(self, adapter, selected_uids):
# freshness don't defer
return

def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand Down
4 changes: 0 additions & 4 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,6 @@ def selection_arg(self):
else:
return self.args.select

def defer_to_manifest(self, adapter, selected_uids):
# list don't defer
return

def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
Expand Down
20 changes: 18 additions & 2 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,22 @@ def get_selection_spec(self) -> SelectionSpec:
def get_node_selector(self) -> NodeSelector:
raise NotImplementedError(f"get_node_selector not implemented for task {type(self)}")

@abstractmethod
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
raise NotImplementedError(f"defer_to_manifest not implemented for task {type(self)}")
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise DbtInternalError(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.merge_from_artifact(
adapter=adapter,
other=deferred_manifest,
selected=selected_uids,
favor_state=bool(self.args.favor_state),
)
# TODO: is it wrong to write the manifest here? I think it's right...
write_manifest(self.manifest, self.config.project_target_path)

def get_graph_queue(self) -> GraphQueue:
selector = self.get_node_selector()
Expand Down Expand Up @@ -605,6 +618,9 @@ def task_end_messages(self, results):
print_run_end_messages(results)

def _get_deferred_manifest(self) -> Optional[WritableManifest]:
if not self.args.defer:
return None

state = self.previous_defer_state or self.previous_state
if not state:
raise DbtRuntimeError(
Expand Down
4 changes: 0 additions & 4 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ def print_result_line(self, result):


class SeedTask(RunTask):
def defer_to_manifest(self, adapter, selected_uids):
# seeds don't defer
return

def raise_on_first_error(self):
return False

Expand Down
6 changes: 0 additions & 6 deletions tests/functional/defer_state/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import pytest

from dbt.cli.exceptions import DbtUsageException
from dbt.contracts.results import RunStatus
from dbt.exceptions import DbtRuntimeError
from dbt.tests.util import run_dbt, write_file, rm_file
Expand Down Expand Up @@ -105,11 +104,6 @@ def run_and_save_state(self, project_root, with_snapshot=False):


class TestDeferStateUnsupportedCommands(BaseDeferState):
def test_unsupported_commands(self, project):
# make sure these commands don"t work with --defer
with pytest.raises(DbtUsageException):
run_dbt(["seed", "--defer"])

def test_no_state(self, project):
# no "state" files present, snapshot fails
with pytest.raises(DbtRuntimeError):
Expand Down

0 comments on commit 94bcaab

Please sign in to comment.