From 40fca9306dac2996a864b240a01b6266253af926 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 1 Oct 2024 14:25:50 -0700 Subject: [PATCH 01/19] add dep on dbt_config & and update protocol --- dbt/adapters/contracts/connection.py | 3 +++ pyproject.toml | 2 ++ 2 files changed, 5 insertions(+) diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index e3baf284f..2b694e008 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -19,6 +19,8 @@ ValidatedStringMixin, dbtClassMixin, ) +from dbt_config.external_config import ExternalCatalogConfig + # TODO: this is a very bad dependency - shared global state from dbt_common.events.contextvars import get_node_info @@ -226,3 +228,4 @@ class AdapterRequiredConfig(HasCredentials, Protocol): cli_vars: Dict[str, Any] target_path: str log_cache_events: bool + catalogs = Optional[ExternalCatalogConfig] diff --git a/pyproject.toml b/pyproject.toml index 76ca3deeb..6356078e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ ] dependencies = [ "dbt-common>=1.10,<2.0", + "dbt-config>=0.1,<1.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0", @@ -54,6 +55,7 @@ include = ["dbt/adapters", "dbt/include", "dbt/__init__.py"] [tool.hatch.envs.default] dependencies = [ + "dbt-config @ git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#subdirectory=config", "dbt_common @ git+https://github.com/dbt-labs/dbt-common.git", 'pre-commit==3.7.0;python_version>="3.9"', 'pre-commit==3.5.0;python_version=="3.8"', From 00a74fe8cc434aa2823f404dc4936db4633e69b6 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 3 Oct 2024 15:53:25 -0700 Subject: [PATCH 02/19] add catalog concept --- dbt/adapters/base/__init__.py | 1 + dbt/adapters/capability.py | 2 ++ dbt/adapters/contracts/connection.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index ade1af3da..f097cad9e 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -1,5 +1,6 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.column import Column +from dbt.adapters.base.catalogs import Catalog from dbt.adapters.base.connections import BaseConnectionManager from dbt.adapters.base.impl import ( AdapterConfig, diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index 2bd491123..4e410bb22 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -21,6 +21,8 @@ class Capability(str, Enum): """Indicates support for getting catalog information including table-level and column-level metadata for a single relation.""" + CreateExternalCatalog = "CreateExternalCatalog" + class Support(str, Enum): Unknown = "Unknown" diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index 2b694e008..751d6135b 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -19,7 +19,7 @@ ValidatedStringMixin, dbtClassMixin, ) -from dbt_config.external_config import ExternalCatalogConfig +from dbt_config.catalog_config import ExternalCatalogConfig # TODO: this is a very bad dependency - shared global state From 345ced94d10a918bab0c61c43677a26e246d3c97 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 9 Oct 2024 15:17:35 -0700 Subject: [PATCH 03/19] add catalogs.py --- dbt/adapters/base/catalogs.py | 19 +++++++++++++++++++ dbt/adapters/base/relation.py | 2 ++ 2 files changed, 21 insertions(+) create mode 100644 dbt/adapters/base/catalogs.py diff --git a/dbt/adapters/base/catalogs.py b/dbt/adapters/base/catalogs.py new file mode 100644 index 000000000..66428f3fa --- /dev/null +++ b/dbt/adapters/base/catalogs.py @@ -0,0 +1,19 @@ +from typing import Dict + +from dbt_config.config import ExternalCatalogConfig, ExternalCatalog, Type as CatalogType + + +class CatalogConfig: + identifier: str + type: CatalogType + + + +class CatalogManager: + catalogs: Dict[str, ExternalCatalog] + + def add_catalog(self, catalog: ExternalCatalog): + self.catalogs[catalog.name] = catalog + + def get_catalog(self, catalog_name: str): + return self.catalogs.get(catalog_name) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 80dbd34ba..7b5270045 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -16,6 +16,7 @@ from dbt_common.exceptions import CompilationError, DbtRuntimeError from dbt_common.utils import deep_merge, filter_null_values +from dbt_config.external_config import ExternalCatalog from dbt.adapters.contracts.relation import ( ComponentName, @@ -71,6 +72,7 @@ class BaseRelation(FakeAPIObject, Hashable): # e.g. adding RelationType.View in dbt-postgres requires that you define: # include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql() replaceable_relations: SerializableIterable = field(default_factory=frozenset) + external_catalog = Optional[ExternalCatalog] def _is_exactish_match(self, field: ComponentName, value: str) -> bool: if self.dbt_created and self.quote_policy.get_part(field) is False: From d75155d69825bfadc36ed07917d7f31327c9e0b4 Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 11 Oct 2024 16:39:40 -0700 Subject: [PATCH 04/19] update contracts --- dbt/adapters/base/catalog.py | 5 +++++ dbt/adapters/base/catalogs.py | 19 ------------------- 2 files changed, 5 insertions(+), 19 deletions(-) create mode 100644 dbt/adapters/base/catalog.py delete mode 100644 dbt/adapters/base/catalogs.py diff --git a/dbt/adapters/base/catalog.py b/dbt/adapters/base/catalog.py new file mode 100644 index 000000000..6d016b290 --- /dev/null +++ b/dbt/adapters/base/catalog.py @@ -0,0 +1,5 @@ +from typing import Dict + +from dbt_config.catalog_config import ExternalCatalog + + diff --git a/dbt/adapters/base/catalogs.py b/dbt/adapters/base/catalogs.py deleted file mode 100644 index 66428f3fa..000000000 --- a/dbt/adapters/base/catalogs.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Dict - -from dbt_config.config import ExternalCatalogConfig, ExternalCatalog, Type as CatalogType - - -class CatalogConfig: - identifier: str - type: CatalogType - - - -class CatalogManager: - catalogs: Dict[str, ExternalCatalog] - - def add_catalog(self, catalog: ExternalCatalog): - self.catalogs[catalog.name] = catalog - - def get_catalog(self, catalog_name: str): - return self.catalogs.get(catalog_name) From f9eae6fc084d23415973c542db9cc6957b462e56 Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 11 Oct 2024 16:40:01 -0700 Subject: [PATCH 05/19] update contracts --- dbt/adapters/base/__init__.py | 2 +- dbt/adapters/base/catalog.py | 56 +++++++++++++++++++++++++++++- dbt/adapters/base/impl.py | 5 +++ dbt/adapters/base/relation.py | 3 +- dbt/adapters/contracts/relation.py | 2 ++ dbt/adapters/protocol.py | 6 ++++ pyproject.toml | 2 +- 7 files changed, 71 insertions(+), 5 deletions(-) diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index f097cad9e..9e7549f88 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -1,6 +1,6 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.column import Column -from dbt.adapters.base.catalogs import Catalog +from dbt.adapters.base.catalog import ExternalCatalogIntegration from dbt.adapters.base.connections import BaseConnectionManager from dbt.adapters.base.impl import ( AdapterConfig, diff --git a/dbt/adapters/base/catalog.py b/dbt/adapters/base/catalog.py index 6d016b290..a1aa66e69 100644 --- a/dbt/adapters/base/catalog.py +++ b/dbt/adapters/base/catalog.py @@ -1,5 +1,59 @@ -from typing import Dict +import abc +from typing import Self, ValuesView from dbt_config.catalog_config import ExternalCatalog +from dbt.adapters.base import BaseRelation, BaseConnectionManager + +class ExternalCatalogIntegration(abc.ABC): + name: str + external_catalog: ExternalCatalog + _connection_manager: BaseConnectionManager + _exists: bool + + @classmethod + def create(cls, external_catalog: ExternalCatalog, connection_manager: BaseConnectionManager) -> Self: + integration = ExternalCatalogIntegration() + integration.external_catalog = external_catalog + integration.name = external_catalog.name + _connection_manager = connection_manager + return integration + + @abc.abstractmethod + def _exists(self) -> bool: + pass + + def exists(self) -> bool: + return self._exists + @abc.abstractmethod + def relation_exists(self, relation: BaseRelation) -> bool: + pass + + @abc.abstractmethod + def refresh_relation(self, table_name: str) -> None: + pass + + @abc.abstractmethod + def create_relation(self, table_name: str) -> None: + pass + + +class ExternalCatalogIntegrations: + def get(self, name: str) -> ExternalCatalogIntegration: + return self.integrations[name] + + @property + def integrations(self) -> dict[str, ExternalCatalogIntegration]: + return self.integrations + + @classmethod + def from_json_strings(cls, json_strings: ValuesView[str], + integration_class: ExternalCatalogIntegration, + connection_manager: BaseConnectionManager) -> Self: + new_instance = cls() + for json_string in json_strings: + external_catalog = ExternalCatalog.model_validate_json(json_string) + integration = integration_class.create(external_catalog, connection_manager) + new_instance.integrations[integration.name] = integration + return new_instance diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index f3788fe39..7f8820dc3 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -50,6 +50,7 @@ ) from dbt.adapters.base.column import Column as BaseColumn +from dbt.adapters.base.catalog import ExternalCatalogIntegration from dbt.adapters.base.connections import ( AdapterResponse, BaseConnectionManager, @@ -228,6 +229,7 @@ class BaseAdapter(metaclass=AdapterMeta): - expand_column_types - list_relations_without_caching - is_cancelable + - execute - create_schema - drop_schema - quote @@ -241,11 +243,14 @@ class BaseAdapter(metaclass=AdapterMeta): Macros: - get_catalog + + External Catalog support: Attach an implementation of ExternalCatalogIntegration """ Relation: Type[BaseRelation] = BaseRelation Column: Type[BaseColumn] = BaseColumn ConnectionManager: Type[BaseConnectionManager] + ExternalCatalogIntegration: Type[ExternalCatalogIntegration] # A set of clobber config fields accepted by this adapter # for use in materializations diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 7b5270045..db6f86b68 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -16,7 +16,6 @@ from dbt_common.exceptions import CompilationError, DbtRuntimeError from dbt_common.utils import deep_merge, filter_null_values -from dbt_config.external_config import ExternalCatalog from dbt.adapters.contracts.relation import ( ComponentName, @@ -72,7 +71,7 @@ class BaseRelation(FakeAPIObject, Hashable): # e.g. adding RelationType.View in dbt-postgres requires that you define: # include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql() replaceable_relations: SerializableIterable = field(default_factory=frozenset) - external_catalog = Optional[ExternalCatalog] + catalog: Optional[str] = None def _is_exactish_match(self, field: ComponentName, value: str) -> bool: if self.dbt_created and self.quote_policy.get_part(field) is False: diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index 42beb579c..636ea9ccb 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -10,6 +10,7 @@ from dbt_common.dataclass_schema import StrEnum, dbtClassMixin from dbt_common.exceptions import CompilationError, DataclassNotDictError from dbt_common.utils import deep_merge +from dbt_config.catalog_config import ExternalCatalog from typing_extensions import Protocol @@ -58,6 +59,7 @@ class RelationConfig(Protocol): tags: List[str] quoting_dict: Dict[str, bool] config: Optional[MaterializationConfig] + catalog_name: Optional[str] class ComponentName(StrEnum): diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 352198663..58ff001bb 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -41,6 +41,9 @@ class ConnectionManagerProtocol(Protocol): class ColumnProtocol(Protocol): pass +class ExternalCatalogIntegrationProtocol(Protocol): + pass + Self = TypeVar("Self", bound="RelationProtocol") @@ -62,6 +65,7 @@ def create_from( ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol) Relation_T = TypeVar("Relation_T", bound=RelationProtocol) Column_T = TypeVar("Column_T", bound=ColumnProtocol) +ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=ExternalCatalogIntegrationProtocol) class MacroContextGeneratorCallable(Protocol): @@ -82,6 +86,7 @@ class AdapterProtocol( # type: ignore[misc] ConnectionManager_T, Relation_T, Column_T, + ExtCatInteg_T, ], ): # N.B. Technically these are ClassVars, but mypy doesn't support putting type vars in a @@ -92,6 +97,7 @@ class AdapterProtocol( # type: ignore[misc] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] connections: ConnectionManager_T + ExternalCatalogIntegration: Type[ExtCatInteg_T] def __init__(self, config: AdapterRequiredConfig) -> None: ... diff --git a/pyproject.toml b/pyproject.toml index 6356078e0..6efffefee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ ] dependencies = [ "dbt-common>=1.10,<2.0", - "dbt-config>=0.1,<1.0", + "dbt-config<1.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0", From 8386b5f48186f5cfac364fe6e9b7f074c58fcea2 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 5 Dec 2024 16:57:08 -0800 Subject: [PATCH 06/19] update to add catalog integration client --- dbt/adapters/base/__init__.py | 2 +- dbt/adapters/base/catalog.py | 59 ------------------------ dbt/adapters/base/impl.py | 14 ++++-- dbt/adapters/base/relation.py | 3 +- dbt/adapters/capability.py | 2 - dbt/adapters/clients/catalogs.py | 24 ++++++++++ dbt/adapters/contracts/catalog.py | 36 +++++++++++++++ dbt/adapters/contracts/connection.py | 4 +- dbt/adapters/contracts/relation.py | 1 - dbt/adapters/protocol.py | 39 ++++++++++------ dbt/adapters/relation_configs/formats.py | 19 ++++++++ 11 files changed, 118 insertions(+), 85 deletions(-) delete mode 100644 dbt/adapters/base/catalog.py create mode 100644 dbt/adapters/clients/catalogs.py create mode 100644 dbt/adapters/contracts/catalog.py create mode 100644 dbt/adapters/relation_configs/formats.py diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index 0600fc009..947cda22c 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -1,6 +1,6 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.column import Column -from dbt.adapters.base.catalog import ExternalCatalogIntegration +from dbt.adapters.base.catalog import CatalogIntegration from dbt.adapters.base.connections import BaseConnectionManager from dbt.adapters.base.impl import ( AdapterConfig, diff --git a/dbt/adapters/base/catalog.py b/dbt/adapters/base/catalog.py deleted file mode 100644 index a1aa66e69..000000000 --- a/dbt/adapters/base/catalog.py +++ /dev/null @@ -1,59 +0,0 @@ -import abc -from typing import Self, ValuesView - -from dbt_config.catalog_config import ExternalCatalog - -from dbt.adapters.base import BaseRelation, BaseConnectionManager - - -class ExternalCatalogIntegration(abc.ABC): - name: str - external_catalog: ExternalCatalog - _connection_manager: BaseConnectionManager - _exists: bool - - @classmethod - def create(cls, external_catalog: ExternalCatalog, connection_manager: BaseConnectionManager) -> Self: - integration = ExternalCatalogIntegration() - integration.external_catalog = external_catalog - integration.name = external_catalog.name - _connection_manager = connection_manager - return integration - - @abc.abstractmethod - def _exists(self) -> bool: - pass - - def exists(self) -> bool: - return self._exists - @abc.abstractmethod - def relation_exists(self, relation: BaseRelation) -> bool: - pass - - @abc.abstractmethod - def refresh_relation(self, table_name: str) -> None: - pass - - @abc.abstractmethod - def create_relation(self, table_name: str) -> None: - pass - - -class ExternalCatalogIntegrations: - def get(self, name: str) -> ExternalCatalogIntegration: - return self.integrations[name] - - @property - def integrations(self) -> dict[str, ExternalCatalogIntegration]: - return self.integrations - - @classmethod - def from_json_strings(cls, json_strings: ValuesView[str], - integration_class: ExternalCatalogIntegration, - connection_manager: BaseConnectionManager) -> Self: - new_instance = cls() - for json_string in json_strings: - external_catalog = ExternalCatalog.model_validate_json(json_string) - integration = integration_class.create(external_catalog, connection_manager) - new_instance.integrations[integration.name] = integration - return new_instance diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 25c5e5fda..4972a3bc1 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -50,7 +50,6 @@ ) from dbt.adapters.base.column import Column as BaseColumn -from dbt.adapters.base.catalog import ExternalCatalogIntegration from dbt.adapters.base.connections import ( AdapterResponse, BaseConnectionManager, @@ -66,6 +65,8 @@ ) from dbt.adapters.cache import RelationsCache, _make_ref_key_dict from dbt.adapters.capability import Capability, CapabilityDict +from dbt.adapters.clients import catalogs as catalogs_client +from dbt.adapters.contracts.catalog import CatalogIntegration from dbt.adapters.contracts.connection import Credentials from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.contracts.relation import RelationConfig @@ -89,7 +90,7 @@ SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) -from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable +from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfig if TYPE_CHECKING: import agate @@ -261,13 +262,12 @@ class BaseAdapter(metaclass=AdapterMeta): Macros: - get_catalog - External Catalog support: Attach an implementation of ExternalCatalogIntegration """ Relation: Type[BaseRelation] = BaseRelation Column: Type[BaseColumn] = BaseColumn ConnectionManager: Type[BaseConnectionManager] - ExternalCatalogIntegration: Type[ExternalCatalogIntegration] + CatalogIntegrations: Dict[str, Type[CatalogIntegration]] # A set of clobber config fields accepted by this adapter # for use in materializations @@ -294,7 +294,13 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self._macro_resolver: Optional[MacroResolverProtocol] = None self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore + self.add_catalog_integrations(config.catalog_integrations) + def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None: + if catalog_integrations: + for integration_config in catalog_integrations: + integration = self.CatalogIntegrations[integration_config.type](integration_config) + catalogs_client.add_catalog(integration) ### # Methods to set / access a macro resolver ### diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index b60528fe7..8fe724ce4 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -18,6 +18,7 @@ from dbt_common.exceptions import CompilationError, DbtRuntimeError from dbt_common.utils import deep_merge, filter_null_values +from dbt.adapters.base import CatalogIntegration from dbt.adapters.contracts.relation import ( ComponentName, HasQuoting, @@ -72,7 +73,7 @@ class BaseRelation(FakeAPIObject, Hashable): # e.g. adding RelationType.View in dbt-postgres requires that you define: # include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql() replaceable_relations: SerializableIterable = field(default_factory=frozenset) - catalog: Optional[str] = None + catalog: Optional[CatalogIntegration] = None def _is_exactish_match(self, field: ComponentName, value: str) -> bool: if self.dbt_created and self.quote_policy.get_part(field) is False: diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index ca61f7528..f0243053a 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -21,8 +21,6 @@ class Capability(str, Enum): """Indicates support for getting catalog information including table-level and column-level metadata for a single relation.""" - CreateExternalCatalog = "CreateExternalCatalog" - MicrobatchConcurrency = "MicrobatchConcurrency" """Indicates support running the microbatch incremental materialization strategy concurrently across threads.""" diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py new file mode 100644 index 000000000..d5cbe65a5 --- /dev/null +++ b/dbt/adapters/clients/catalogs.py @@ -0,0 +1,24 @@ +from dbt.adapters.contracts.catalog import CatalogIntegration + + +class CatalogIntegrations: + def get(self, name: str) -> CatalogIntegration: + return self.integrations[name] + + @property + def integrations(self) -> dict[str, CatalogIntegration]: + return self.integrations + + def add_integration(self, integration: CatalogIntegration): + self.integrations[integration.name] = integration + + +_CATALOG_CLIENT = CatalogIntegrations() + + +def get_catalog(integration_name: str) -> CatalogIntegration: + return _CATALOG_CLIENT.get(integration_name) + + +def add_catalog(integration: CatalogIntegration): + _CATALOG_CLIENT.add_integration(integration) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py new file mode 100644 index 000000000..db27df0fc --- /dev/null +++ b/dbt/adapters/contracts/catalog.py @@ -0,0 +1,36 @@ +import abc +from enum import Enum +from typing import Optional + +from dbt.adapters.protocol import CatalogIntegrationConfig +from dbt.adapters.relation_configs.formats import TableFormat + + +class CatalogIntegrationType(Enum): + iceberg_rest = 'iceberg_rest' + glue = 'glue' + unity = 'unity' + + +class CatalogIntegration(abc.ABC): + """ + An external catalog integration is a connection to an external catalog that can be used to + interact with the catalog. This class is an abstract base class that should be subclassed by + specific integrations in the adapters. + + """ + name: str + table_format: TableFormat + type: CatalogIntegrationType + external_volume: Optional[str] = None + namespace: Optional[str] = None + + def __init__( + self, integration_config: CatalogIntegrationConfig + ): + self.name = integration_config.name + self.table_format = TableFormat(integration_config.table_format) + self.type = CatalogIntegrationType(integration_config.type) + self.external_volume = integration_config.external_volume + self.namespace = integration_config.namespace + diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index 1c317c53c..a40d06720 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -19,7 +19,6 @@ ValidatedStringMixin, dbtClassMixin, ) -from dbt_config.catalog_config import ExternalCatalogConfig # TODO: this is a very bad dependency - shared global state @@ -30,6 +29,7 @@ from mashumaro.jsonschema.annotations import Pattern from typing_extensions import Protocol, Annotated +from dbt.adapters.contracts.catalog import CatalogIntegrations from dbt.adapters.events.types import NewConnectionOpening from dbt.adapters.utils import translate_aliases @@ -229,4 +229,4 @@ class AdapterRequiredConfig(HasCredentials, Protocol): cli_vars: Dict[str, Any] target_path: str log_cache_events: bool - catalogs = Optional[ExternalCatalogConfig] + catalog_integrations: Optional[CatalogIntegrations] diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index 636ea9ccb..7bb0c531e 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -10,7 +10,6 @@ from dbt_common.dataclass_schema import StrEnum, dbtClassMixin from dbt_common.exceptions import CompilationError, DataclassNotDictError from dbt_common.utils import deep_merge -from dbt_config.catalog_config import ExternalCatalog from typing_extensions import Protocol diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 58ff001bb..7a7932190 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -41,10 +41,19 @@ class ConnectionManagerProtocol(Protocol): class ColumnProtocol(Protocol): pass -class ExternalCatalogIntegrationProtocol(Protocol): + +class CatalogIntegrationProtocol(Protocol): pass +class CatalogIntegrationConfig(Protocol): + name: str + table_format: str + type: str + external_volume: Optional[str] + namespace: Optional[str] + + Self = TypeVar("Self", bound="RelationProtocol") @@ -54,10 +63,10 @@ def get_default_quote_policy(cls) -> Policy: ... @classmethod def create_from( - cls: Type[Self], - quoting: HasQuoting, - relation_config: RelationConfig, - **kwargs: Any, + cls: Type[Self], + quoting: HasQuoting, + relation_config: RelationConfig, + **kwargs: Any, ) -> Self: ... @@ -65,16 +74,16 @@ def create_from( ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol) Relation_T = TypeVar("Relation_T", bound=RelationProtocol) Column_T = TypeVar("Column_T", bound=ColumnProtocol) -ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=ExternalCatalogIntegrationProtocol) +ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=CatalogIntegrationProtocol) class MacroContextGeneratorCallable(Protocol): def __call__( - self, - macro_protocol: MacroProtocol, - config: AdapterRequiredConfig, - macro_resolver: MacroResolverProtocol, - package_name: Optional[str], + self, + macro_protocol: MacroProtocol, + config: AdapterRequiredConfig, + macro_resolver: MacroResolverProtocol, + package_name: Optional[str], ) -> Dict[str, Any]: ... @@ -97,7 +106,7 @@ class AdapterProtocol( # type: ignore[misc] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] connections: ConnectionManager_T - ExternalCatalogIntegration: Type[ExtCatInteg_T] + CatalogIntegration: Type[ExtCatInteg_T] def __init__(self, config: AdapterRequiredConfig) -> None: ... @@ -108,8 +117,8 @@ def get_macro_resolver(self) -> Optional[MacroResolverProtocol]: ... def clear_macro_resolver(self) -> None: ... def set_macro_context_generator( - self, - macro_context_generator: MacroContextGeneratorCallable, + self, + macro_context_generator: MacroContextGeneratorCallable, ) -> None: ... @classmethod @@ -152,5 +161,5 @@ def close(cls, connection: Connection) -> Connection: ... def commit_if_has_connection(self) -> None: ... def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False + self, sql: str, auto_begin: bool = False, fetch: bool = False ) -> Tuple[AdapterResponse, "agate.Table"]: ... diff --git a/dbt/adapters/relation_configs/formats.py b/dbt/adapters/relation_configs/formats.py new file mode 100644 index 000000000..f440e530b --- /dev/null +++ b/dbt/adapters/relation_configs/formats.py @@ -0,0 +1,19 @@ +from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 +from typing_extensions import Self + + +class TableFormat(StrEnum): + """ + Some platforms may refer to this 'Object' or 'File Format'. + Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here. + """ + + DEFAULT = "default" + ICEBERG = "iceberg" + + @classmethod + def default(cls) -> Self: + return cls("default") + + def __str__(self): + return self.value From 4822c2eb431b1206457559e2591d6d60d0ecc393 Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 13 Dec 2024 11:30:13 -0800 Subject: [PATCH 07/19] update with adapter_configs --- dbt/adapters/contracts/catalog.py | 6 +++++- dbt/adapters/protocol.py | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index db27df0fc..31da0420d 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -1,6 +1,6 @@ import abc from enum import Enum -from typing import Optional +from typing import Optional, Tuple, List, Dict from dbt.adapters.protocol import CatalogIntegrationConfig from dbt.adapters.relation_configs.formats import TableFormat @@ -33,4 +33,8 @@ def __init__( self.type = CatalogIntegrationType(integration_config.type) self.external_volume = integration_config.external_volume self.namespace = integration_config.namespace + self._handle_adapter_configs(integration_config.adapter_configs) + + def _handle_adapter_configs(self, adapter_configs: Dict) -> None: + ... diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 7a7932190..ce9917360 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -47,11 +47,12 @@ class CatalogIntegrationProtocol(Protocol): class CatalogIntegrationConfig(Protocol): - name: str + integration_name: str table_format: str - type: str + catalog_type: str external_volume: Optional[str] namespace: Optional[str] + adapter_configs: Optional[Dict] Self = TypeVar("Self", bound="RelationProtocol") From 98095f4eb34676f8d04604832edb430a71d25e2a Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 13 Dec 2024 11:55:24 -0800 Subject: [PATCH 08/19] add catalog_name as key --- dbt/adapters/base/impl.py | 11 +++++++++-- dbt/adapters/clients/catalogs.py | 8 ++++---- dbt/adapters/contracts/catalog.py | 4 ++-- dbt/adapters/protocol.py | 1 + 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 4972a3bc1..cb5fd08c5 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -299,8 +299,15 @@ def __init__(self, config, mp_context: SpawnContext) -> None: def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None: if catalog_integrations: for integration_config in catalog_integrations: - integration = self.CatalogIntegrations[integration_config.type](integration_config) - catalogs_client.add_catalog(integration) + catalog_type = integration_config.catalog_type + if catalog_type not in self.CatalogIntegrations: + raise DbtValidationError(f"{catalog_type} is not supported!!! - <3 Colin") + integration = self.CatalogIntegrations[catalog_type](integration_config) + catalogs_client.add_catalog(integration, integration_config.catalog_name) + + @available + def get_catalog_integration(self, integration_name) -> CatalogIntegration: + return catalogs_client.get_catalog(integration_name) ### # Methods to set / access a macro resolver ### diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py index d5cbe65a5..4b466a94e 100644 --- a/dbt/adapters/clients/catalogs.py +++ b/dbt/adapters/clients/catalogs.py @@ -9,8 +9,8 @@ def get(self, name: str) -> CatalogIntegration: def integrations(self) -> dict[str, CatalogIntegration]: return self.integrations - def add_integration(self, integration: CatalogIntegration): - self.integrations[integration.name] = integration + def add_integration(self, integration: CatalogIntegration, catalog_name: str): + self.integrations[catalog_name] = integration _CATALOG_CLIENT = CatalogIntegrations() @@ -20,5 +20,5 @@ def get_catalog(integration_name: str) -> CatalogIntegration: return _CATALOG_CLIENT.get(integration_name) -def add_catalog(integration: CatalogIntegration): - _CATALOG_CLIENT.add_integration(integration) +def add_catalog(integration: CatalogIntegration, catalog_name: str): + _CATALOG_CLIENT.add_integration(integration, catalog_name) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index 31da0420d..08a52f658 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -19,9 +19,9 @@ class CatalogIntegration(abc.ABC): specific integrations in the adapters. """ - name: str + integration_name: str table_format: TableFormat - type: CatalogIntegrationType + integration_type: CatalogIntegrationType external_volume: Optional[str] = None namespace: Optional[str] = None diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index ce9917360..b70120bb5 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -47,6 +47,7 @@ class CatalogIntegrationProtocol(Protocol): class CatalogIntegrationConfig(Protocol): + catalog_name: str integration_name: str table_format: str catalog_type: str From 58550d543d7da679111ea8f9207af912b0db40a1 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 17 Dec 2024 09:02:09 -0800 Subject: [PATCH 09/19] add managed --- dbt/adapters/contracts/catalog.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index 08a52f658..a3bbabdd8 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -2,11 +2,13 @@ from enum import Enum from typing import Optional, Tuple, List, Dict +from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.protocol import CatalogIntegrationConfig from dbt.adapters.relation_configs.formats import TableFormat class CatalogIntegrationType(Enum): + managed = 'managed' iceberg_rest = 'iceberg_rest' glue = 'glue' unity = 'unity' @@ -19,6 +21,7 @@ class CatalogIntegration(abc.ABC): specific integrations in the adapters. """ + catalog_name: str integration_name: str table_format: TableFormat integration_type: CatalogIntegrationType @@ -26,11 +29,12 @@ class CatalogIntegration(abc.ABC): namespace: Optional[str] = None def __init__( - self, integration_config: CatalogIntegrationConfig + self, integration_config: CatalogIntegrationConfig ): - self.name = integration_config.name + self.catalog_name = integration_config.catalog_name + self.integration_name = integration_config.integration_name self.table_format = TableFormat(integration_config.table_format) - self.type = CatalogIntegrationType(integration_config.type) + self.type = CatalogIntegrationType(integration_config.catalog_type) self.external_volume = integration_config.external_volume self.namespace = integration_config.namespace self._handle_adapter_configs(integration_config.adapter_configs) @@ -38,3 +42,5 @@ def __init__( def _handle_adapter_configs(self, adapter_configs: Dict) -> None: ... + def render_ddl_predicates(self, relation_config: RelationConfig) -> str: + ... From 3fc57cf6047664cfdc2cd7aefd6aa7eb1784adb8 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 17 Dec 2024 16:03:28 -0800 Subject: [PATCH 10/19] fix broken import --- dbt/adapters/base/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index 947cda22c..c30dd01f5 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -1,6 +1,5 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.column import Column -from dbt.adapters.base.catalog import CatalogIntegration from dbt.adapters.base.connections import BaseConnectionManager from dbt.adapters.base.impl import ( AdapterConfig, From c5dd13103255dc92492e7f2e4da19f9eba509d96 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 17 Dec 2024 16:41:23 -0800 Subject: [PATCH 11/19] remove CatalogIntegration from relation --- dbt/adapters/base/relation.py | 2 -- dbt/adapters/contracts/connection.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 8fe724ce4..7d4888e42 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -18,7 +18,6 @@ from dbt_common.exceptions import CompilationError, DbtRuntimeError from dbt_common.utils import deep_merge, filter_null_values -from dbt.adapters.base import CatalogIntegration from dbt.adapters.contracts.relation import ( ComponentName, HasQuoting, @@ -73,7 +72,6 @@ class BaseRelation(FakeAPIObject, Hashable): # e.g. adding RelationType.View in dbt-postgres requires that you define: # include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql() replaceable_relations: SerializableIterable = field(default_factory=frozenset) - catalog: Optional[CatalogIntegration] = None def _is_exactish_match(self, field: ComponentName, value: str) -> bool: if self.dbt_created and self.quote_policy.get_part(field) is False: diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index a40d06720..da9fce85d 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -29,7 +29,6 @@ from mashumaro.jsonschema.annotations import Pattern from typing_extensions import Protocol, Annotated -from dbt.adapters.contracts.catalog import CatalogIntegrations from dbt.adapters.events.types import NewConnectionOpening from dbt.adapters.utils import translate_aliases @@ -229,4 +228,3 @@ class AdapterRequiredConfig(HasCredentials, Protocol): cli_vars: Dict[str, Any] target_path: str log_cache_events: bool - catalog_integrations: Optional[CatalogIntegrations] From 349cc0e8ba92348a3d5ed68e91db8e748252291e Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 17 Dec 2024 17:00:32 -0800 Subject: [PATCH 12/19] remove CatalogIntegration from impl init --- dbt/adapters/base/impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index cb5fd08c5..8ff23cd49 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -294,7 +294,6 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self._macro_resolver: Optional[MacroResolverProtocol] = None self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore - self.add_catalog_integrations(config.catalog_integrations) def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None: if catalog_integrations: From 8d06ebda62531d1d52ff0b40934ed473e09ba460 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 6 Jan 2025 11:11:32 -0800 Subject: [PATCH 13/19] add concrete catalog integration config --- dbt/adapters/contracts/catalog.py | 15 +++++++++++++-- dbt/adapters/protocol.py | 7 +------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index a3bbabdd8..cf8130462 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -1,9 +1,9 @@ import abc +from dataclasses import dataclass from enum import Enum -from typing import Optional, Tuple, List, Dict +from typing import Optional, Dict from dbt.adapters.contracts.relation import RelationConfig -from dbt.adapters.protocol import CatalogIntegrationConfig from dbt.adapters.relation_configs.formats import TableFormat @@ -14,6 +14,17 @@ class CatalogIntegrationType(Enum): unity = 'unity' +@dataclass +class CatalogIntegrationConfig: + catalog_name: str + integration_name: str + table_format: str + catalog_type: str + external_volume: Optional[str] + namespace: Optional[str] + adapter_configs: Optional[Dict] + + class CatalogIntegration(abc.ABC): """ An external catalog integration is a connection to an external catalog that can be used to diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index b70120bb5..391bb3f1c 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -42,11 +42,7 @@ class ColumnProtocol(Protocol): pass -class CatalogIntegrationProtocol(Protocol): - pass - - -class CatalogIntegrationConfig(Protocol): +class CatalogIntegrationConfigProtocol(Protocol): catalog_name: str integration_name: str table_format: str @@ -108,7 +104,6 @@ class AdapterProtocol( # type: ignore[misc] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] connections: ConnectionManager_T - CatalogIntegration: Type[ExtCatInteg_T] def __init__(self, config: AdapterRequiredConfig) -> None: ... From 83e5c2abb83e5321d8a8cb4298166d0671609694 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 6 Jan 2025 11:12:07 -0800 Subject: [PATCH 14/19] add concrete catalog integration config --- .hatch/test.env | 17 +++++++++++++++++ .hatch/test_against_adapter.sh | 18 ++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 .hatch/test.env create mode 100755 .hatch/test_against_adapter.sh diff --git a/.hatch/test.env b/.hatch/test.env new file mode 100644 index 000000000..d6a58579a --- /dev/null +++ b/.hatch/test.env @@ -0,0 +1,17 @@ +SNOWFLAKE_TEST_ACCOUNT=ska67070 +SNOWFLAKE_TEST_ALT_DATABASE=DBT_TEST_ALT +SNOWFLAKE_TEST_ALT_WAREHOUSE=DBT_TESTING_ALT +SNOWFLAKE_TEST_DATABASE=DBT_TEST +SNOWFLAKE_TEST_PASSWORD=8npWWFNssVJt{gP/Dew9 +SNOWFLAKE_TEST_QUOTED_DATABASE=1234-QUOTED-DATABASE +SNOWFLAKE_TEST_USER=CIRCLECI +SNOWFLAKE_TEST_WAREHOUSE=DBT_TESTING +SNOWFLAKE_TEST_ROLE=TESTER + +SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN="ver:2-hint:3720210485144318-did:1009-ETMsDgAAAYK3VIGdABRBRVMvQ0JDL1BLQ1M1UGFkZGluZwEAABAAEFgz38mlVkrqqam+7purbNAAAADwKZ925XLofVLF5JYGWSFmDjDPM3f511xegtwSfNrXlhzDGmwdU7rTC0fttZc3gJIo8h3XI10s/h+uuRqQbdrNgnGRvhmKIE1cWMv86KyHPh9DnpCKWJkWPSsTkQO6gm6IuurQDYouARyYvgjGM9bMgiZf7L31GSPYEJh1FzwEkjEy5/nDqN4POvA3h4rGbmSOKyFQSrgy9uoPu73OrkZAHWiNp46QfZqI/Trs/Uob5WQLTl9QS+ItTqmDrWp1Nq9/B4tqfg7GIfok6WZyBHLQSGQBjnqHmGEy6MJA5w6RKAeXc09fWOpr64fqO+rkRR0pABSwKU/f+CGIZn0EiQUfxiprFKYmng==" +SNOWFLAKE_TEST_OAUTH_CLIENT_ID="P9FYlIvcuxE0kppp4iGls3j20A8=" +SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET="kazYHsEs18MVu0VjlgrhZN9gciJ2TxUV27Id+MecBHc=" + +DBT_TEST_USER_1=dbt_test_role_1 +DBT_TEST_USER_2=dbt_test_role_2 +DBT_TEST_USER_3=dbt_test_role_3 \ No newline at end of file diff --git a/.hatch/test_against_adapter.sh b/.hatch/test_against_adapter.sh new file mode 100755 index 000000000..ed110404b --- /dev/null +++ b/.hatch/test_against_adapter.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -eo pipefail +adapter=$1 +branch=$2 +source .hatch/test.env +adapter_dir="dbt-$adapter" + +cd .hatch/ +if [ ! -d "$adapter_dir" ]; then + git clone "https://github.com/dbt-labs/dbt-$adapter.git" +fi + +cd "$adapter_dir" +git pull +git switch "$branch" +pip install -e . +python -m pytest tests/functional/ From 7e2432615f857bbf6cc074cf48c9acd1c85a552c Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 7 Jan 2025 13:03:02 -0800 Subject: [PATCH 15/19] fix args and protocols --- dbt/adapters/base/impl.py | 9 +++++---- dbt/adapters/clients/catalogs.py | 7 +++++-- dbt/adapters/contracts/catalog.py | 8 +++++--- dbt/adapters/protocol.py | 17 +++++++++++++++-- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 80bffc56f..ee9573e62 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -90,7 +90,7 @@ SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) -from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfig +from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfigProtocol if TYPE_CHECKING: import agate @@ -268,7 +268,7 @@ class BaseAdapter(metaclass=AdapterMeta): Relation: Type[BaseRelation] = BaseRelation Column: Type[BaseColumn] = BaseColumn ConnectionManager: Type[BaseConnectionManager] - CatalogIntegrations: Dict[str, Type[CatalogIntegration]] + CatalogIntegrations: Dict[str, Type[CatalogIntegrationConfigProtocol]] # A set of clobber config fields accepted by this adapter # for use in materializations @@ -296,7 +296,7 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore - def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None: + def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfigProtocol]]) -> None: if catalog_integrations: for integration_config in catalog_integrations: catalog_type = integration_config.catalog_type @@ -306,8 +306,9 @@ def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIn catalogs_client.add_catalog(integration, integration_config.catalog_name) @available - def get_catalog_integration(self, integration_name) -> CatalogIntegration: + def get_catalog_integration(self, integration_name: str) -> CatalogIntegration: return catalogs_client.get_catalog(integration_name) + ### # Methods to set / access a macro resolver ### diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py index 4b466a94e..624b0c0a8 100644 --- a/dbt/adapters/clients/catalogs.py +++ b/dbt/adapters/clients/catalogs.py @@ -2,15 +2,18 @@ class CatalogIntegrations: + def __init__(self): + self._integrations = None + def get(self, name: str) -> CatalogIntegration: return self.integrations[name] @property def integrations(self) -> dict[str, CatalogIntegration]: - return self.integrations + return self._integrations def add_integration(self, integration: CatalogIntegration, catalog_name: str): - self.integrations[catalog_name] = integration + self._integrations[catalog_name] = integration _CATALOG_CLIENT = CatalogIntegrations() diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index cf8130462..5cfc6be96 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -20,9 +20,9 @@ class CatalogIntegrationConfig: integration_name: str table_format: str catalog_type: str - external_volume: Optional[str] - namespace: Optional[str] - adapter_configs: Optional[Dict] + external_volume: Optional[str] = None + namespace: Optional[str] = None + adapter_configs: Optional[Dict] = None class CatalogIntegration(abc.ABC): @@ -31,6 +31,8 @@ class CatalogIntegration(abc.ABC): interact with the catalog. This class is an abstract base class that should be subclassed by specific integrations in the adapters. + Implements the CatalogIntegrationProtocol. + """ catalog_name: str integration_name: str diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 391bb3f1c..3305618bd 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -52,6 +52,19 @@ class CatalogIntegrationConfigProtocol(Protocol): adapter_configs: Optional[Dict] +class CatalogIntegrationProtocol(Protocol): + catalog_name: str + integration_name: str + table_format: str + integration_type: str + external_volume: Optional[str] + namespace: Optional[str] + + def __init__( + self, integration_config: CatalogIntegrationConfigProtocol + ) -> None: ... + + Self = TypeVar("Self", bound="RelationProtocol") @@ -72,7 +85,7 @@ def create_from( ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol) Relation_T = TypeVar("Relation_T", bound=RelationProtocol) Column_T = TypeVar("Column_T", bound=ColumnProtocol) -ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=CatalogIntegrationProtocol) +CatalogIntegration_T = TypeVar("CatalogIntegration_T", bound=CatalogIntegrationProtocol) class MacroContextGeneratorCallable(Protocol): @@ -93,7 +106,6 @@ class AdapterProtocol( # type: ignore[misc] ConnectionManager_T, Relation_T, Column_T, - ExtCatInteg_T, ], ): # N.B. Technically these are ClassVars, but mypy doesn't support putting type vars in a @@ -103,6 +115,7 @@ class AdapterProtocol( # type: ignore[misc] Column: Type[Column_T] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] + CatalogIntegrations: Dict[str, Type[CatalogIntegration_T]] connections: ConnectionManager_T def __init__(self, config: AdapterRequiredConfig) -> None: ... From 4e333f6b9735651a4f85fdbb06f937fec816a98a Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 7 Jan 2025 14:33:19 -0800 Subject: [PATCH 16/19] delete file --- .hatch/test.env | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 .hatch/test.env diff --git a/.hatch/test.env b/.hatch/test.env deleted file mode 100644 index d6a58579a..000000000 --- a/.hatch/test.env +++ /dev/null @@ -1,17 +0,0 @@ -SNOWFLAKE_TEST_ACCOUNT=ska67070 -SNOWFLAKE_TEST_ALT_DATABASE=DBT_TEST_ALT -SNOWFLAKE_TEST_ALT_WAREHOUSE=DBT_TESTING_ALT -SNOWFLAKE_TEST_DATABASE=DBT_TEST -SNOWFLAKE_TEST_PASSWORD=8npWWFNssVJt{gP/Dew9 -SNOWFLAKE_TEST_QUOTED_DATABASE=1234-QUOTED-DATABASE -SNOWFLAKE_TEST_USER=CIRCLECI -SNOWFLAKE_TEST_WAREHOUSE=DBT_TESTING -SNOWFLAKE_TEST_ROLE=TESTER - -SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN="ver:2-hint:3720210485144318-did:1009-ETMsDgAAAYK3VIGdABRBRVMvQ0JDL1BLQ1M1UGFkZGluZwEAABAAEFgz38mlVkrqqam+7purbNAAAADwKZ925XLofVLF5JYGWSFmDjDPM3f511xegtwSfNrXlhzDGmwdU7rTC0fttZc3gJIo8h3XI10s/h+uuRqQbdrNgnGRvhmKIE1cWMv86KyHPh9DnpCKWJkWPSsTkQO6gm6IuurQDYouARyYvgjGM9bMgiZf7L31GSPYEJh1FzwEkjEy5/nDqN4POvA3h4rGbmSOKyFQSrgy9uoPu73OrkZAHWiNp46QfZqI/Trs/Uob5WQLTl9QS+ItTqmDrWp1Nq9/B4tqfg7GIfok6WZyBHLQSGQBjnqHmGEy6MJA5w6RKAeXc09fWOpr64fqO+rkRR0pABSwKU/f+CGIZn0EiQUfxiprFKYmng==" -SNOWFLAKE_TEST_OAUTH_CLIENT_ID="P9FYlIvcuxE0kppp4iGls3j20A8=" -SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET="kazYHsEs18MVu0VjlgrhZN9gciJ2TxUV27Id+MecBHc=" - -DBT_TEST_USER_1=dbt_test_role_1 -DBT_TEST_USER_2=dbt_test_role_2 -DBT_TEST_USER_3=dbt_test_role_3 \ No newline at end of file From e2fb7a38dcdb3a96b84a2a259b44be058b2ca8e8 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 9 Jan 2025 16:30:36 -0800 Subject: [PATCH 17/19] add catalog_name to base relation --- dbt/adapters/base/relation.py | 2 ++ dbt/adapters/clients/catalogs.py | 2 +- dbt/adapters/contracts/catalog.py | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 7d4888e42..0e65a24b2 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -60,6 +60,7 @@ class BaseRelation(FakeAPIObject, Hashable): require_alias: bool = ( True # used to govern whether to add an alias when render_limited is called ) + catalog_name: Optional[str] = None # register relation types that can be renamed for the purpose of replacing relations using stages and backups # adding a relation type here also requires defining the associated rename macro @@ -318,6 +319,7 @@ def create_from( schema=relation_config.schema, identifier=relation_config.identifier, quote_policy=quote_policy, + catalog_name=relation_config.catalog_name, **kwargs, ) diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py index 624b0c0a8..111fd233f 100644 --- a/dbt/adapters/clients/catalogs.py +++ b/dbt/adapters/clients/catalogs.py @@ -3,7 +3,7 @@ class CatalogIntegrations: def __init__(self): - self._integrations = None + self._integrations = {} def get(self, name: str) -> CatalogIntegration: return self.integrations[name] diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py index 5cfc6be96..948d12152 100644 --- a/dbt/adapters/contracts/catalog.py +++ b/dbt/adapters/contracts/catalog.py @@ -55,5 +55,5 @@ def __init__( def _handle_adapter_configs(self, adapter_configs: Dict) -> None: ... - def render_ddl_predicates(self, relation_config: RelationConfig) -> str: + def render_ddl_predicates(self, relation, config: RelationConfig) -> str: ... From b981f04fa94a0095ce82a8f8ac9053d43d7b40a6 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 15 Jan 2025 12:07:11 -0800 Subject: [PATCH 18/19] add integration validation --- dbt/adapters/base/relation.py | 7 ++++++- dbt/adapters/clients/catalogs.py | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 0e65a24b2..866b97974 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -306,6 +306,11 @@ def create_from( config_quoting = relation_config.quoting_dict config_quoting.pop("column", None) + + catalog_name = relation_config.catalog_name \ + if hasattr(relation_config, "catalog_name") \ + else relation_config.config.get("catalog", None) + # precedence: kwargs quoting > relation config quoting > base quoting > default quoting quote_policy = deep_merge( cls.get_default_quote_policy().to_dict(omit_none=True), @@ -319,7 +324,7 @@ def create_from( schema=relation_config.schema, identifier=relation_config.identifier, quote_policy=quote_policy, - catalog_name=relation_config.catalog_name, + catalog_name=catalog_name, **kwargs, ) diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py index 111fd233f..1c9baf241 100644 --- a/dbt/adapters/clients/catalogs.py +++ b/dbt/adapters/clients/catalogs.py @@ -1,3 +1,5 @@ +from dbt_common.exceptions import DbtValidationError + from dbt.adapters.contracts.catalog import CatalogIntegration @@ -20,7 +22,12 @@ def add_integration(self, integration: CatalogIntegration, catalog_name: str): def get_catalog(integration_name: str) -> CatalogIntegration: - return _CATALOG_CLIENT.get(integration_name) + try: + return _CATALOG_CLIENT.get(integration_name) + except KeyError: + raise DbtValidationError( + f"Catalog integration '{integration_name}' not found in the catalog client" + ) def add_catalog(integration: CatalogIntegration, catalog_name: str): From 54a62a25a7edbf03f51aa71f66eef3cc3e82140c Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 15 Jan 2025 12:08:15 -0800 Subject: [PATCH 19/19] add catalog unit test --- tests/unit/clients/test_catalogs.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 tests/unit/clients/test_catalogs.py diff --git a/tests/unit/clients/test_catalogs.py b/tests/unit/clients/test_catalogs.py new file mode 100644 index 000000000..d429d593a --- /dev/null +++ b/tests/unit/clients/test_catalogs.py @@ -0,0 +1,22 @@ +from dbt.adapters.clients.catalogs import add_catalog, get_catalog +from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationConfig, CatalogIntegrationType +from dbt.adapters.relation_configs.formats import TableFormat + + +class FakeCatalogIntegration(CatalogIntegration): + def render_ddl_predicates(self, relation): + return "mocked" + + +def test_adding_catalog_integration(): + catalog = FakeCatalogIntegration( + integration_config=CatalogIntegrationConfig( + catalog_type=CatalogIntegrationType.glue.value, + catalog_name="snowflake_managed", + integration_name="test_integration", + table_format=TableFormat.ICEBERG, + external_volume="test_volume", + ) + ) + add_catalog(catalog, catalog_name="fake_catalog") + get_catalog("fake_catalog")