Skip to content

Commit

Permalink
update changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Apr 25, 2024
1 parent 8bfa890 commit d15d84e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 11 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240425-133401.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Replace usage of `Set` with `List` to fix issue with index updates intermittently happening out of order
time: 2024-04-25T13:34:01.018399-04:00
custom:
Author: mikealfare
Issue: "72"
18 changes: 11 additions & 7 deletions plugins/postgres/dbt/adapters/postgres/relation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional, Set, FrozenSet
from typing import FrozenSet, List, Optional

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import (
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_index_config_changes(
self,
existing_indexes: FrozenSet[PostgresIndexConfig],
new_indexes: FrozenSet[PostgresIndexConfig],
) -> Set[PostgresIndexConfigChange]:
) -> List[PostgresIndexConfigChange]:
"""
Get the index updates that will occur as a result of a new run
Expand All @@ -74,18 +74,22 @@ def _get_index_config_changes(
3. Index is old -> drop these
4. Indexes are not equal -> drop old, create new -> two actions
*Note:*
The order of the operations matters here because if the same index is dropped and recreated
(e.g. via --full-refresh) then we need to drop it first, then create it.
Returns: a set of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
"""
drop_changes = set(
drop_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.drop, "context": index}
)
for index in existing_indexes.difference(new_indexes)
)
create_changes = set(
]
create_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.create, "context": index}
)
for index in new_indexes.difference(existing_indexes)
)
return set().union(drop_changes, create_changes)
]
return drop_changes + create_changes
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ def parse_relation_results(cls, relation_results: RelationResults) -> dict:

@dataclass
class PostgresMaterializedViewConfigChangeCollection:
indexes: Set[PostgresIndexConfigChange] = field(default_factory=set)
indexes: List[PostgresIndexConfigChange] = field(default_factory=list)

@property
def requires_full_refresh(self) -> bool:
return any(index.requires_full_refresh for index in self.indexes)

@property
def has_changes(self) -> bool:
return self.indexes != set()
return self.indexes != []
65 changes: 63 additions & 2 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from copy import deepcopy

import agate
import decimal
import unittest
Expand All @@ -6,10 +8,16 @@
from dbt.task.debug import DebugTask

from dbt.adapters.base.query_headers import MacroQueryStringSetter
from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.postgres import Plugin as PostgresPlugin
from dbt.adapters.postgres import (
Plugin as PostgresPlugin,
PostgresAdapter,
PostgresRelation,
)
from dbt.adapters.postgres.relation_configs import PostgresIndexConfig
from dbt.adapters.relation_configs.config_change import RelationConfigChangeAction
from dbt.contracts.files import FileHash
from dbt.contracts.graph.manifest import ManifestStateCheck
from dbt.contracts.relation import RelationType
from dbt.clients import agate_helper
from dbt.exceptions import DbtValidationError, DbtConfigError
from psycopg2 import extensions as psycopg2_extensions
Expand Down Expand Up @@ -625,3 +633,56 @@ def test_convert_time_type(self):
expected = ["time", "time", "time"]
for col_idx, expect in enumerate(expected):
assert PostgresAdapter.convert_time_type(agate_table, col_idx) == expect


def test_index_config_changes():
index_0_old = {
"name": "my_index_0",
"column_names": {"column_0"},
"unique": True,
"method": "btree",
}
index_1_old = {
"name": "my_index_1",
"column_names": {"column_1"},
"unique": True,
"method": "btree",
}
index_2_old = {
"name": "my_index_2",
"column_names": {"column_2"},
"unique": True,
"method": "btree",
}
existing_indexes = frozenset(
PostgresIndexConfig.from_dict(index) for index in [index_0_old, index_1_old, index_2_old]
)

index_0_new = deepcopy(index_0_old)
index_2_new = deepcopy(index_2_old)
index_2_new.update(method="hash")
index_3_new = {
"name": "my_index_3",
"column_names": {"column_3"},
"unique": True,
"method": "hash",
}
new_indexes = frozenset(
PostgresIndexConfig.from_dict(index) for index in [index_0_new, index_2_new, index_3_new]
)

relation = PostgresRelation.create(
database="my_database",
schema="my_schema",
identifier="my_materialized_view",
type=RelationType.MaterializedView,
)

index_changes = relation._get_index_config_changes(existing_indexes, new_indexes)

assert isinstance(index_changes, list)
assert len(index_changes) == len(["drop 1", "drop 2", "create 2", "create 3"])
assert index_changes[0].action == RelationConfigChangeAction.drop
assert index_changes[1].action == RelationConfigChangeAction.drop
assert index_changes[2].action == RelationConfigChangeAction.create
assert index_changes[3].action == RelationConfigChangeAction.create

0 comments on commit d15d84e

Please sign in to comment.