Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAP-835: Optimize manual refresh on auto-refreshed materialized views #637

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231019-131629.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Optimize refreshing materialized views when autorefreshing
time: 2023-10-19T13:16:29.243999-07:00
custom:
Author: mikealfare
Issue: "637"
24 changes: 18 additions & 6 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from collections import namedtuple
from dataclasses import dataclass
from typing import Optional, Set, Any, Dict, Type
from collections import namedtuple
from dbt.adapters.base import PythonJobHelper
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.meta import available

from dbt.adapters.base import (
AdapterConfig,
ConstraintSupport,
PythonJobHelper,
available,
)
from dbt.adapters.relation_configs import RelationConfigFactory
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import ConstraintType
from dbt.contracts.relation import RelationType
from dbt.events import AdapterLogger


import dbt.exceptions

from dbt.adapters.redshift import RedshiftConnectionManager, RedshiftRelation
from dbt.adapters.redshift.relation_configs import RedshiftMaterializedViewConfig


logger = AdapterLogger("Redshift")
Expand Down Expand Up @@ -46,6 +51,13 @@ class RedshiftAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

def _relation_config_factory(self) -> RelationConfigFactory:
return RelationConfigFactory(
relation_configs={
RelationType.MaterializedView: RedshiftMaterializedViewConfig,
},
)

@classmethod
def date_function(cls):
return "getdate()"
Expand Down
21 changes: 1 addition & 20 deletions dbt/adapters/redshift/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.context.providers import RuntimeConfigObject
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import RelationType
from dbt.exceptions import DbtRuntimeError

Expand All @@ -29,9 +27,6 @@
class RedshiftRelation(BaseRelation):
include_policy = RedshiftIncludePolicy # type: ignore
quote_policy = RedshiftQuotePolicy # type: ignore
relation_configs = {
RelationType.MaterializedView.value: RedshiftMaterializedViewConfig,
}
renameable_relations = frozenset(
{
RelationType.View,
Expand Down Expand Up @@ -60,18 +55,6 @@ def __post_init__(self):
def relation_max_name_length(self):
return MAX_CHARACTERS_IN_IDENTIFIER

@classmethod
def from_runtime_config(cls, runtime_config: RuntimeConfigObject) -> RelationConfigBase:
model_node: ModelNode = runtime_config.model
relation_type: str = model_node.config.materialized

if relation_config := cls.relation_configs.get(relation_type):
return relation_config.from_model_node(model_node)

raise DbtRuntimeError(
f"from_runtime_config() is not supported for the provided relation type: {relation_type}"
)

@classmethod
def materialized_view_config_changeset(
cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject
Expand All @@ -81,9 +64,7 @@ def materialized_view_config_changeset(
existing_materialized_view = RedshiftMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = RedshiftMaterializedViewConfig.from_model_node(
runtime_config.model
)
new_materialized_view = RedshiftMaterializedViewConfig.from_node(runtime_config.model)
assert isinstance(existing_materialized_view, RedshiftMaterializedViewConfig)
assert isinstance(new_materialized_view, RedshiftMaterializedViewConfig)

Expand Down
11 changes: 11 additions & 0 deletions dbt/adapters/redshift/relation_configs/agate_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import agate


def get_first_row(table: agate.Table) -> agate.Row:
"""
Returns the first row of the table. If the table is empty, it returns an empty row.
"""
try:
return table.rows[0]
except IndexError:
return agate.Row(values=set())
70 changes: 0 additions & 70 deletions dbt/adapters/redshift/relation_configs/base.py

This file was deleted.

23 changes: 11 additions & 12 deletions dbt/adapters/redshift/relation_configs/dist.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from dataclasses import dataclass
from typing import Optional, Set
from typing import Any, Dict, Optional, Set
from typing_extensions import Self

import agate
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChange,
RelationConfigChangeAction,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.graph.nodes import ParsedNode
from dbt.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError

from dbt.adapters.redshift.relation_configs.base import RedshiftRelationConfigBase


class RedshiftDistStyle(StrEnum):
auto = "auto"
Expand All @@ -27,7 +27,7 @@ def default(cls) -> "RedshiftDistStyle":


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class RedshiftDistConfig(RedshiftRelationConfigBase, RelationConfigValidationMixin):
class RedshiftDistConfig(RelationConfigBase, RelationConfigValidationMixin):
"""
This config fallows the specs found here:
https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
Expand Down Expand Up @@ -65,29 +65,28 @@ def validation_rules(self) -> Set[RelationConfigValidationRule]:
}

@classmethod
def from_dict(cls, config_dict) -> "RedshiftDistConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"diststyle": config_dict.get("diststyle"),
"distkey": config_dict.get("distkey"),
}
dist: "RedshiftDistConfig" = super().from_dict(kwargs_dict) # type: ignore
return dist
return super().from_dict(kwargs_dict)

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
"""
Translate ModelNode objects from the user-provided config into a standard dictionary.

Args:
model_node: the description of the distkey and diststyle from the user in this format:
node: the description of the distkey and diststyle from the user in this format:

{
"dist": any("auto", "even", "all") or "<column_name>"
}

Returns: a standard dictionary describing this `RedshiftDistConfig` instance
"""
dist = model_node.config.extra.get("dist", "")
dist = node.config.extra.get("dist", "")

diststyle = dist.lower()

Expand All @@ -107,7 +106,7 @@ def parse_model_node(cls, model_node: ModelNode) -> dict:
return config

@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]:
"""
Translate agate objects from the database into a standard dictionary.

Expand Down
Loading
Loading