Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBT-784 Fix-1.8.0-impala Support dbt-core 1.8.0 for dbt-impala #201

Merged
merged 6 commits into from
Sep 25, 2024
Merged
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along

### Requirements

Current version of dbt-impala use dbt-core 1.7.*. We are actively working on supporting the next version of dbt-core 1.8
Current version of dbt-impala uses dbt-core 1.8.*. We are actively working on supporting the next available version of dbt-core.

Python >= 3.7
dbt-core == 1.7.*
dbt-core == 1.8.*

For development/testing or contribution to the dbt-impala, please follow [Contributing](CONTRIBUTING.md) guidelines.

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/impala/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version = "1.7.0"
version = "1.8.0"
4 changes: 2 additions & 2 deletions dbt/adapters/impala/cloudera_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import threading
from dbt.tracking import active_user

from dbt.adapters.base import Credentials
from dbt.events import AdapterLogger
from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.events.logging import AdapterLogger
from decouple import config


Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/impala/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import TypeVar, Optional, Dict, Any

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt_common.dataclass_schema import dbtClassMixin

Self = TypeVar("Self", bound="ImpalaColumn")

Expand Down
17 changes: 9 additions & 8 deletions dbt/adapters/impala/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
import time
import dbt.exceptions

from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterRequiredConfig
from dbt.adapters.contracts.connection import AdapterRequiredConfig

from typing import Optional, Tuple, Any
from multiprocessing.context import SpawnContext

from dbt.contracts.connection import Connection, AdapterResponse, ConnectionState
from dbt.adapters.contracts.connection import Connection, AdapterResponse, ConnectionState

from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
from dbt_common.events.functions import fire_event
from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus

from dbt.events import AdapterLogger
from dbt.adapters.events.logging import AdapterLogger

import impala.dbapi
from impala.error import DatabaseError
Expand Down Expand Up @@ -158,8 +159,8 @@ class ImpalaConnectionManager(SQLConnectionManager):

impala_version = None

def __init__(self, profile: AdapterRequiredConfig):
super().__init__(profile)
def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext):
super().__init__(profile, mp_context)
# generate profile related object for instrumentation.
tracker.generate_profile_info(self)

Expand Down
23 changes: 12 additions & 11 deletions dbt/adapters/impala/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
import re
from collections import OrderedDict
from concurrent.futures import Future
from typing import Any, Dict, Iterable, List
from typing import Any, Dict, Iterable, List, FrozenSet, Tuple

import agate
import dbt.exceptions
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.sql import SQLAdapter
from dbt.clients import agate_helper
from dbt.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker
from dbt.events import AdapterLogger
from dbt.utils import executor
from dbt_common.clients import agate_helper
from dbt_common.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.utils import executor
from dbt.adapters.contracts.relation import RelationConfig

import dbt.adapters.impala.cloudera_tracking as tracker
from dbt.adapters.impala import ImpalaConnectionManager
Expand Down Expand Up @@ -275,23 +276,23 @@ def parse_columns_from_information(self, relation: ImpalaRelation) -> List[Impal

return columns

def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
def get_catalog(
self, relation_configs: Iterable[RelationConfig], used_schemas: FrozenSet[Tuple[str, str]]
):
schema_map = self._get_catalog_schemas(relation_configs)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self, schema, self._get_one_catalog, info, [schema], manifest
)
tpe.submit_connected(self, schema, self._get_one_catalog, info, [schema])
)
catalogs, exceptions = catch_as_completed(futures) # call the default implementation

return catalogs, exceptions

def _get_one_catalog(self, information_schema, schemas, manifest) -> agate.Table:
def _get_one_catalog(self, information_schema, schemas) -> agate.Table:
if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one schema in ImpalaAdapter._get_one_catalog, found " f"{schemas}"
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dbt-tests-adapter==1.7.*
dbt-tests-adapter==1.8.*
pre-commit~=2.21;python_version=="3.7"
pre-commit~=3.2;python_version>="3.8"
pytest
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _get_dbt_core_version():

package_name = "dbt-impala"
# make sure this always matches dbt/adapters/dbt_impala/__version__.py
package_version = "1.7.0"
package_version = "1.8.0"
description = """The Impala adapter plugin for dbt"""

dbt_core_version = _get_dbt_core_version()
Expand Down
34 changes: 33 additions & 1 deletion tests/functional/adapter/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pytest
from dbt.tests.util import run_dbt, check_relations_equal, rm_file, write_file
from dbt.tests.util import (
run_dbt,
check_relations_equal,
rm_file,
write_file,
check_table_does_not_exist,
run_dbt_and_capture,
)
from dbt.tests.adapter.concurrency.test_concurrency import BaseConcurrency, seeds__update_csv


Expand All @@ -21,3 +28,28 @@ def test_concurrency_impala(self, project):
check_relations_equal(project.adapter, ["SEED", "DEP"])
check_relations_equal(project.adapter, ["SEED", "TABLE_A"])
check_relations_equal(project.adapter, ["SEED", "TABLE_B"])

def test_concurrency(self, project):
run_dbt(["seed", "--select", "seed"])
results = run_dbt(["run"], expect_pass=False)
assert len(results) == 7
check_relations_equal(project.adapter, ["seed", "view_model"])
check_relations_equal(project.adapter, ["seed", "dep"])
check_relations_equal(project.adapter, ["seed", "table_a"])
check_relations_equal(project.adapter, ["seed", "table_b"])
check_table_does_not_exist(project.adapter, "invalid")
check_table_does_not_exist(project.adapter, "`skip`")

rm_file(project.project_root, "seeds", "seed.csv")
write_file(seeds__update_csv, project.project_root, "seeds", "seed.csv")

results, output = run_dbt_and_capture(["run"], expect_pass=False)
assert len(results) == 7
check_relations_equal(project.adapter, ["seed", "view_model"])
check_relations_equal(project.adapter, ["seed", "dep"])
check_relations_equal(project.adapter, ["seed", "table_a"])
check_relations_equal(project.adapter, ["seed", "table_b"])
check_table_does_not_exist(project.adapter, "invalid")
check_table_does_not_exist(project.adapter, "`skip`")

assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output