diff --git a/README.md b/README.md index 96a34a9..ed21de3 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along ### Requirements -Current version of dbt-impala use dbt-core 1.7.*. We are actively working on supporting the next version of dbt-core 1.8 +Current version of dbt-impala uses dbt-core 1.8.*. We are actively working on supporting the next available version of dbt-core. Python >= 3.7 -dbt-core == 1.7.* +dbt-core == 1.8.* For development/testing or contribution to the dbt-impala, please follow [Contributing](CONTRIBUTING.md) guidelines. diff --git a/dbt/adapters/impala/__version__.py b/dbt/adapters/impala/__version__.py index 9068c7e..d5241c1 100644 --- a/dbt/adapters/impala/__version__.py +++ b/dbt/adapters/impala/__version__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = "1.7.0" +version = "1.8.0" diff --git a/dbt/adapters/impala/cloudera_tracking.py b/dbt/adapters/impala/cloudera_tracking.py index a9bd69f..56d0b50 100644 --- a/dbt/adapters/impala/cloudera_tracking.py +++ b/dbt/adapters/impala/cloudera_tracking.py @@ -23,8 +23,8 @@ import threading from dbt.tracking import active_user -from dbt.adapters.base import Credentials -from dbt.events import AdapterLogger +from dbt.adapters.contracts.connection import Credentials +from dbt.adapters.events.logging import AdapterLogger from decouple import config diff --git a/dbt/adapters/impala/column.py b/dbt/adapters/impala/column.py index d399a98..babe78c 100644 --- a/dbt/adapters/impala/column.py +++ b/dbt/adapters/impala/column.py @@ -16,7 +16,7 @@ from typing import TypeVar, Optional, Dict, Any from dbt.adapters.base.column import Column -from dbt.dataclass_schema import dbtClassMixin +from dbt_common.dataclass_schema import dbtClassMixin Self = TypeVar("Self", bound="ImpalaColumn") diff --git a/dbt/adapters/impala/connections.py b/dbt/adapters/impala/connections.py index 4fc38e5..2d0f958 100644 --- a/dbt/adapters/impala/connections.py +++ b/dbt/adapters/impala/connections.py @@ -18,18 +18,19 @@ import time import dbt.exceptions -from dbt.adapters.base import Credentials +from dbt.adapters.contracts.connection import Credentials from dbt.adapters.sql import SQLConnectionManager -from dbt.contracts.connection import AdapterRequiredConfig +from dbt.adapters.contracts.connection import AdapterRequiredConfig from typing import Optional, Tuple, Any +from multiprocessing.context import SpawnContext -from dbt.contracts.connection import Connection, AdapterResponse, ConnectionState +from dbt.adapters.contracts.connection import Connection, AdapterResponse, ConnectionState -from dbt.events.functions import fire_event -from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus +from dbt_common.events.functions import fire_event +from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus -from dbt.events import AdapterLogger +from dbt.adapters.events.logging import AdapterLogger import impala.dbapi from impala.error import DatabaseError @@ -158,8 +159,8 @@ class ImpalaConnectionManager(SQLConnectionManager): impala_version = None - def __init__(self, profile: AdapterRequiredConfig): - super().__init__(profile) + def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext): + super().__init__(profile, mp_context) # generate profile related object for instrumentation. tracker.generate_profile_info(self) diff --git a/dbt/adapters/impala/impl.py b/dbt/adapters/impala/impl.py index 36c4b2c..c9fd2c2 100644 --- a/dbt/adapters/impala/impl.py +++ b/dbt/adapters/impala/impl.py @@ -15,16 +15,17 @@ import re from collections import OrderedDict from concurrent.futures import Future -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, FrozenSet, Tuple import agate import dbt.exceptions from dbt.adapters.base.impl import catch_as_completed from dbt.adapters.sql import SQLAdapter -from dbt.clients import agate_helper -from dbt.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker -from dbt.events import AdapterLogger -from dbt.utils import executor +from dbt_common.clients import agate_helper +from dbt_common.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker +from dbt.adapters.events.logging import AdapterLogger +from dbt_common.utils import executor +from dbt.adapters.contracts.relation import RelationConfig import dbt.adapters.impala.cloudera_tracking as tracker from dbt.adapters.impala import ImpalaConnectionManager @@ -275,23 +276,23 @@ def parse_columns_from_information(self, relation: ImpalaRelation) -> List[Impal return columns - def get_catalog(self, manifest): - schema_map = self._get_catalog_schemas(manifest) + def get_catalog( + self, relation_configs: Iterable[RelationConfig], used_schemas: FrozenSet[Tuple[str, str]] + ): + schema_map = self._get_catalog_schemas(relation_configs) with executor(self.config) as tpe: futures: List[Future[agate.Table]] = [] for info, schemas in schema_map.items(): for schema in schemas: futures.append( - tpe.submit_connected( - self, schema, self._get_one_catalog, info, [schema], manifest - ) + tpe.submit_connected(self, schema, self._get_one_catalog, info, [schema]) ) catalogs, exceptions = catch_as_completed(futures) # call the default implementation return catalogs, exceptions - def _get_one_catalog(self, information_schema, schemas, manifest) -> agate.Table: + def _get_one_catalog(self, information_schema, schemas) -> agate.Table: if len(schemas) != 1: dbt.exceptions.raise_compiler_error( f"Expected only one schema in ImpalaAdapter._get_one_catalog, found " f"{schemas}" diff --git a/dev-requirements.txt b/dev-requirements.txt index 450f11f..f9138ec 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -dbt-tests-adapter==1.7.* +dbt-tests-adapter==1.8.* pre-commit~=2.21;python_version=="3.7" pre-commit~=3.2;python_version>="3.8" pytest diff --git a/setup.py b/setup.py index d73c961..5afa078 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,7 @@ def _get_dbt_core_version(): package_name = "dbt-impala" # make sure this always matches dbt/adapters/dbt_impala/__version__.py -package_version = "1.7.0" +package_version = "1.8.0" description = """The Impala adapter plugin for dbt""" dbt_core_version = _get_dbt_core_version() diff --git a/tests/functional/adapter/test_concurrency.py b/tests/functional/adapter/test_concurrency.py index 9d74cbe..afb09d9 100644 --- a/tests/functional/adapter/test_concurrency.py +++ b/tests/functional/adapter/test_concurrency.py @@ -1,5 +1,12 @@ import pytest -from dbt.tests.util import run_dbt, check_relations_equal, rm_file, write_file +from dbt.tests.util import ( + run_dbt, + check_relations_equal, + rm_file, + write_file, + check_table_does_not_exist, + run_dbt_and_capture, +) from dbt.tests.adapter.concurrency.test_concurrency import BaseConcurrency, seeds__update_csv @@ -21,3 +28,28 @@ def test_concurrency_impala(self, project): check_relations_equal(project.adapter, ["SEED", "DEP"]) check_relations_equal(project.adapter, ["SEED", "TABLE_A"]) check_relations_equal(project.adapter, ["SEED", "TABLE_B"]) + + def test_concurrency(self, project): + run_dbt(["seed", "--select", "seed"]) + results = run_dbt(["run"], expect_pass=False) + assert len(results) == 7 + check_relations_equal(project.adapter, ["seed", "view_model"]) + check_relations_equal(project.adapter, ["seed", "dep"]) + check_relations_equal(project.adapter, ["seed", "table_a"]) + check_relations_equal(project.adapter, ["seed", "table_b"]) + check_table_does_not_exist(project.adapter, "invalid") + check_table_does_not_exist(project.adapter, "`skip`") + + rm_file(project.project_root, "seeds", "seed.csv") + write_file(seeds__update_csv, project.project_root, "seeds", "seed.csv") + + results, output = run_dbt_and_capture(["run"], expect_pass=False) + assert len(results) == 7 + check_relations_equal(project.adapter, ["seed", "view_model"]) + check_relations_equal(project.adapter, ["seed", "dep"]) + check_relations_equal(project.adapter, ["seed", "table_a"]) + check_relations_equal(project.adapter, ["seed", "table_b"]) + check_table_does_not_exist(project.adapter, "invalid") + check_table_does_not_exist(project.adapter, "`skip`") + + assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output