Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.6.latest] [Bug] Use a list instead of a set for index changes to perserve order (dbt-postgres#73) #10047

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240425-133401.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Replace usage of `Set` with `List` to fix issue with index updates intermittently happening out of order
time: 2024-04-25T13:34:01.018399-04:00
custom:
Author: mikealfare
Issue: "72"
18 changes: 11 additions & 7 deletions plugins/postgres/dbt/adapters/postgres/relation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional, Set, FrozenSet
from typing import FrozenSet, List, Optional

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import (
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_index_config_changes(
self,
existing_indexes: FrozenSet[PostgresIndexConfig],
new_indexes: FrozenSet[PostgresIndexConfig],
) -> Set[PostgresIndexConfigChange]:
) -> List[PostgresIndexConfigChange]:
"""
Get the index updates that will occur as a result of a new run

Expand All @@ -74,18 +74,22 @@ def _get_index_config_changes(
3. Index is old -> drop these
4. Indexes are not equal -> drop old, create new -> two actions

*Note:*
The order of the operations matters here because if the same index is dropped and recreated
(e.g. via --full-refresh) then we need to drop it first, then create it.

Returns: a set of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
"""
drop_changes = set(
drop_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.drop, "context": index}
)
for index in existing_indexes.difference(new_indexes)
)
create_changes = set(
]
create_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.create, "context": index}
)
for index in new_indexes.difference(existing_indexes)
)
return set().union(drop_changes, create_changes)
]
return drop_changes + create_changes
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ def parse_relation_results(cls, relation_results: RelationResults) -> dict:

@dataclass
class PostgresMaterializedViewConfigChangeCollection:
indexes: Set[PostgresIndexConfigChange] = field(default_factory=set)
indexes: List[PostgresIndexConfigChange] = field(default_factory=list)

@property
def requires_full_refresh(self) -> bool:
return any(index.requires_full_refresh for index in self.indexes)

@property
def has_changes(self) -> bool:
return self.indexes != set()
return self.indexes != []
65 changes: 63 additions & 2 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from copy import deepcopy

import agate
import decimal
import unittest
Expand All @@ -6,10 +8,16 @@
from dbt.task.debug import DebugTask

from dbt.adapters.base.query_headers import MacroQueryStringSetter
from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.postgres import Plugin as PostgresPlugin
from dbt.adapters.postgres import (
Plugin as PostgresPlugin,
PostgresAdapter,
PostgresRelation,
)
from dbt.adapters.postgres.relation_configs import PostgresIndexConfig
from dbt.adapters.relation_configs.config_change import RelationConfigChangeAction
from dbt.contracts.files import FileHash
from dbt.contracts.graph.manifest import ManifestStateCheck
from dbt.contracts.relation import RelationType
from dbt.clients import agate_helper
from dbt.exceptions import DbtValidationError, DbtConfigError
from psycopg2 import extensions as psycopg2_extensions
Expand Down Expand Up @@ -625,3 +633,56 @@ def test_convert_time_type(self):
expected = ["time", "time", "time"]
for col_idx, expect in enumerate(expected):
assert PostgresAdapter.convert_time_type(agate_table, col_idx) == expect


def test_index_config_changes():
index_0_old = {
"name": "my_index_0",
"column_names": {"column_0"},
"unique": True,
"method": "btree",
}
index_1_old = {
"name": "my_index_1",
"column_names": {"column_1"},
"unique": True,
"method": "btree",
}
index_2_old = {
"name": "my_index_2",
"column_names": {"column_2"},
"unique": True,
"method": "btree",
}
existing_indexes = frozenset(
PostgresIndexConfig.from_dict(index) for index in [index_0_old, index_1_old, index_2_old]
)

index_0_new = deepcopy(index_0_old)
index_2_new = deepcopy(index_2_old)
index_2_new.update(method="hash")
index_3_new = {
"name": "my_index_3",
"column_names": {"column_3"},
"unique": True,
"method": "hash",
}
new_indexes = frozenset(
PostgresIndexConfig.from_dict(index) for index in [index_0_new, index_2_new, index_3_new]
)

relation = PostgresRelation.create(
database="my_database",
schema="my_schema",
identifier="my_materialized_view",
type=RelationType.MaterializedView,
)

index_changes = relation._get_index_config_changes(existing_indexes, new_indexes)

assert isinstance(index_changes, list)
assert len(index_changes) == len(["drop 1", "drop 2", "create 2", "create 3"])
assert index_changes[0].action == RelationConfigChangeAction.drop
assert index_changes[1].action == RelationConfigChangeAction.drop
assert index_changes[2].action == RelationConfigChangeAction.create
assert index_changes[3].action == RelationConfigChangeAction.create
Loading