From d15d84e001116f1c566cb02dab35e20847fabd05 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Thu, 25 Apr 2024 19:04:39 -0400 Subject: [PATCH] update changelog --- .../unreleased/Fixes-20240425-133401.yaml | 6 ++ .../dbt/adapters/postgres/relation.py | 18 +++-- .../relation_configs/materialized_view.py | 4 +- tests/unit/test_postgres_adapter.py | 65 ++++++++++++++++++- 4 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 .changes/unreleased/Fixes-20240425-133401.yaml diff --git a/.changes/unreleased/Fixes-20240425-133401.yaml b/.changes/unreleased/Fixes-20240425-133401.yaml new file mode 100644 index 00000000000..f7a9f0085d2 --- /dev/null +++ b/.changes/unreleased/Fixes-20240425-133401.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Replace usage of `Set` with `List` to fix issue with index updates intermittently happening out of order +time: 2024-04-25T13:34:01.018399-04:00 +custom: + Author: mikealfare + Issue: "72" diff --git a/plugins/postgres/dbt/adapters/postgres/relation.py b/plugins/postgres/dbt/adapters/postgres/relation.py index 43822efb11f..d4fa56c8826 100644 --- a/plugins/postgres/dbt/adapters/postgres/relation.py +++ b/plugins/postgres/dbt/adapters/postgres/relation.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional, Set, FrozenSet +from typing import FrozenSet, List, Optional from dbt.adapters.base.relation import BaseRelation from dbt.adapters.relation_configs import ( @@ -63,7 +63,7 @@ def _get_index_config_changes( self, existing_indexes: FrozenSet[PostgresIndexConfig], new_indexes: FrozenSet[PostgresIndexConfig], - ) -> Set[PostgresIndexConfigChange]: + ) -> List[PostgresIndexConfigChange]: """ Get the index updates that will occur as a result of a new run @@ -74,18 +74,22 @@ def _get_index_config_changes( 3. Index is old -> drop these 4. Indexes are not equal -> drop old, create new -> two actions + *Note:* + The order of the operations matters here because if the same index is dropped and recreated + (e.g. via --full-refresh) then we need to drop it first, then create it. + Returns: a set of index updates in the form {"action": "drop/create", "context": } """ - drop_changes = set( + drop_changes = [ PostgresIndexConfigChange.from_dict( {"action": RelationConfigChangeAction.drop, "context": index} ) for index in existing_indexes.difference(new_indexes) - ) - create_changes = set( + ] + create_changes = [ PostgresIndexConfigChange.from_dict( {"action": RelationConfigChangeAction.create, "context": index} ) for index in new_indexes.difference(existing_indexes) - ) - return set().union(drop_changes, create_changes) + ] + return drop_changes + create_changes diff --git a/plugins/postgres/dbt/adapters/postgres/relation_configs/materialized_view.py b/plugins/postgres/dbt/adapters/postgres/relation_configs/materialized_view.py index 15e700e777a..4bfa1f61242 100644 --- a/plugins/postgres/dbt/adapters/postgres/relation_configs/materialized_view.py +++ b/plugins/postgres/dbt/adapters/postgres/relation_configs/materialized_view.py @@ -102,7 +102,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> dict: @dataclass class PostgresMaterializedViewConfigChangeCollection: - indexes: Set[PostgresIndexConfigChange] = field(default_factory=set) + indexes: List[PostgresIndexConfigChange] = field(default_factory=list) @property def requires_full_refresh(self) -> bool: @@ -110,4 +110,4 @@ def requires_full_refresh(self) -> bool: @property def has_changes(self) -> bool: - return self.indexes != set() + return self.indexes != [] diff --git a/tests/unit/test_postgres_adapter.py b/tests/unit/test_postgres_adapter.py index d7d91a8988b..193171c68bc 100644 --- a/tests/unit/test_postgres_adapter.py +++ b/tests/unit/test_postgres_adapter.py @@ -1,3 +1,5 @@ +from copy import deepcopy + import agate import decimal import unittest @@ -6,10 +8,16 @@ from dbt.task.debug import DebugTask from dbt.adapters.base.query_headers import MacroQueryStringSetter -from dbt.adapters.postgres import PostgresAdapter -from dbt.adapters.postgres import Plugin as PostgresPlugin +from dbt.adapters.postgres import ( + Plugin as PostgresPlugin, + PostgresAdapter, + PostgresRelation, +) +from dbt.adapters.postgres.relation_configs import PostgresIndexConfig +from dbt.adapters.relation_configs.config_change import RelationConfigChangeAction from dbt.contracts.files import FileHash from dbt.contracts.graph.manifest import ManifestStateCheck +from dbt.contracts.relation import RelationType from dbt.clients import agate_helper from dbt.exceptions import DbtValidationError, DbtConfigError from psycopg2 import extensions as psycopg2_extensions @@ -625,3 +633,56 @@ def test_convert_time_type(self): expected = ["time", "time", "time"] for col_idx, expect in enumerate(expected): assert PostgresAdapter.convert_time_type(agate_table, col_idx) == expect + + +def test_index_config_changes(): + index_0_old = { + "name": "my_index_0", + "column_names": {"column_0"}, + "unique": True, + "method": "btree", + } + index_1_old = { + "name": "my_index_1", + "column_names": {"column_1"}, + "unique": True, + "method": "btree", + } + index_2_old = { + "name": "my_index_2", + "column_names": {"column_2"}, + "unique": True, + "method": "btree", + } + existing_indexes = frozenset( + PostgresIndexConfig.from_dict(index) for index in [index_0_old, index_1_old, index_2_old] + ) + + index_0_new = deepcopy(index_0_old) + index_2_new = deepcopy(index_2_old) + index_2_new.update(method="hash") + index_3_new = { + "name": "my_index_3", + "column_names": {"column_3"}, + "unique": True, + "method": "hash", + } + new_indexes = frozenset( + PostgresIndexConfig.from_dict(index) for index in [index_0_new, index_2_new, index_3_new] + ) + + relation = PostgresRelation.create( + database="my_database", + schema="my_schema", + identifier="my_materialized_view", + type=RelationType.MaterializedView, + ) + + index_changes = relation._get_index_config_changes(existing_indexes, new_indexes) + + assert isinstance(index_changes, list) + assert len(index_changes) == len(["drop 1", "drop 2", "create 2", "create 3"]) + assert index_changes[0].action == RelationConfigChangeAction.drop + assert index_changes[1].action == RelationConfigChangeAction.drop + assert index_changes[2].action == RelationConfigChangeAction.create + assert index_changes[3].action == RelationConfigChangeAction.create