Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBT-556 Added support for materializing Kudu table through impala adapter #207

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ When `dbt-impala` is installed this way, any changes you make to the `dbt-impala
`dbt-impala` contains [functional](https://github.com/cloudera/dbt-impala/tree/master/tests/functional/) tests. Functional tests require an actual Impala warehouse to test against.

- You can run functional tests "locally" by configuring a `test.env` file with appropriate `ENV` variables.
- To run `Kudu functional tests` as part of the test suite when underlying storage is `Kudu`, please set the `ENV` variable `DISABLE_KUDU_TEST` to `false`. Kudu tests are disabled by default as this `ENV` variable is set to true.

```
cp test.env.example test.env
Expand Down
18 changes: 18 additions & 0 deletions KUDU_INTEGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Kudu Integration using dbt-impala

The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along with [Apache Kudu](https://kudu.apache.org) and [Cloudera Data Platform](https://cloudera.com)


## Getting started

- [Install dbt](https://docs.getdbt.com/docs/installation)
- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)

### Requirements

- In a CDP public cloud deployment, Kudu is available as one of the many Cloudera Runtime services within the Real-time Data Mart template.
- To use Kudu, you can create a Data Hub cluster by selecting Real-time Data Mart template template in the Management Console.
- Follow this [article](https://blog.cloudera.com/integrating-cloudera-data-warehouse-with-kudu-clusters) on integrating the created Kudu service with Impala CDW.


For general instructions, please follow [Readme](README.md) guidelines.
67 changes: 34 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along

- [Install dbt](https://docs.getdbt.com/docs/installation)
- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)
- For using `dbt-impala` adapter against [Apache Kudu](https://kudu.apache.org), please follow [Kudu Integration](KUDU_INTEGRATION.md) guidelines.

### Requirements

Expand Down Expand Up @@ -40,40 +41,40 @@ demo_project:
```

## Supported features
| Name | Supported | Iceberg |
|------|-----------|---------|
|Materialization: View|Yes| N/A |
|Materialization: Table|Yes| Yes |
|Materialization: Table with Partitions |Yes| Yes |
|Materialization: Incremental - Append|Yes| Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes |
|Materialization: Incremental - Merge|No| No |
|Materialization: Ephemeral|Yes| Yes |
|Seeds|Yes| Yes |
|Tests|Yes| Yes |
|Snapshots|No| No |
|Documentation|Yes| Yes |
|Authentication: LDAP|Yes| Yes |
|Authentication: Kerberos|Yes| No |
| Name | Supported | Iceberg | Kudu |
|------|-----------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes | Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No |
|Materialization: Incremental - Merge|No| No | No |
|Materialization: Ephemeral|Yes| Yes | No |
|Seeds|Yes| Yes | Yes |
|Tests|Yes| Yes | Yes |
|Snapshots|No| No | No |
|Documentation|Yes| Yes | Yes |
|Authentication: LDAP|Yes| Yes | Yes |
|Authentication: Kerberos|Yes| No | No |

### Tests Coverage

#### Functional Tests
| Name | Base | Iceberg |
|------|------|---------|
|Materialization: View|Yes| N/A |
|Materialization: Table|Yes| Yes |
|Materialization: Table with Partitions |Yes| Yes |
|Materialization: Incremental - Append|Yes| Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes |
|Materialization: Ephemeral|Yes| Yes |
|Seeds|Yes| Yes |
|Tests|Yes| Yes |
|Snapshots|No| No |
|Documentation| Yes | Yes |
|Authentication: LDAP|Yes| Yes |
|Authentication: Kerberos|No| No |
| Name | Base | Iceberg | Kudu |
|------|------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| No | No |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No |
|Materialization: Ephemeral|Yes| Yes | No |
|Seeds|Yes| Yes | Yes |
|Tests|Yes| Yes | Yes |
|Snapshots|No| No | No |
|Documentation| Yes | Yes | Yes |
|Authentication: LDAP|Yes| Yes | Yes |
|Authentication: Kerberos|No| No | No |
2 changes: 1 addition & 1 deletion dbt/adapters/impala/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ImpalaIncludePolicy(Policy):
class ImpalaRelation(BaseRelation):
quote_policy: ImpalaQuotePolicy = field(default_factory=lambda: ImpalaQuotePolicy())
include_policy: ImpalaIncludePolicy = field(default_factory=lambda: ImpalaIncludePolicy())
quote_character: str = None
quote_character: str = "`"
information: str = None

def __post_init__(self):
Expand Down
9 changes: 9 additions & 0 deletions dbt/include/impala/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@
{%- endif %}
{%- endmacro -%}

{% macro ct_option_primary_key(label, required=false) %}
{%- set primaryKey = config.get('primary_key', validator=validation[basestring]) -%}

{%- if primaryKey is not none %}
{{label}} {{primaryKey}}
{%- endif %}
{%- endmacro -%}

{% macro ct_option_stored_as(label, required=false) %}
{%- set storedAs = config.get('stored_as', validator=validation[basestring]) -%}

Expand Down Expand Up @@ -180,6 +188,7 @@
{{ ct_option_row_format(label="row format") }}
{{ ct_option_with_serdeproperties(label="with serdeproperties") }}
{%- if table_type == 'iceberg' -%} STORED BY ICEBERG {%- endif -%}
{{ ct_option_primary_key(label="PRIMARY KEY") }}
{{ ct_option_stored_as(label="stored as") }}
{{ ct_option_location_clause(label="location") }}
{{ ct_option_cached_in(label="cached in") }}
Expand Down
1 change: 1 addition & 0 deletions test.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ IMPALA_SCHEMA=my_schema
IMPALA_USER=my_user
IMPALA_PASSWORD=my_password
IMPALA_HTTP_PATH=my_http_path
DISABLE_KUDU_TEST=true
182 changes: 182 additions & 0 deletions tests/functional/adapter/test_kudu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2024 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import os
from dbt.tests.util import run_dbt, relation_from_name, check_relations_equal

from dbt.tests.adapter.basic.test_incremental import (
BaseIncremental,
BaseIncrementalNotSchemaChange,
)

from dbt.tests.adapter.basic.files import (
schema_base_yml,
model_incremental,
)

pytestmark = pytest.mark.skipif(
os.getenv(key="DISABLE_KUDU_TEST", default="true") == "true",
reason="Kudu tests will be run when DISABLE_KUDU_TEST is set to false in test.env",
)

incremental_kudu_sql = (
"""
{{
config(
materialized="incremental",
stored_as="kudu",
primary_key="(id)"
)
}}
""".strip()
+ model_incremental
)


class TestIncrementalKudu(BaseIncremental):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "incremental_test_model"}

@pytest.fixture(scope="class")
def models(self):
return {"incremental_test_model.sql": incremental_kudu_sql, "schema.yml": schema_base_yml}

def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# base table rowcount
relation = relation_from_name(project.adapter, "base")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10

# added table rowcount
relation = relation_from_name(project.adapter, "added")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 20

# run command
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1

# check relations equal
check_relations_equal(project.adapter, ["base", "incremental_test_model"])

# change seed_name var
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: added"])
assert len(results) == 1

# check relations equal
check_relations_equal(project.adapter, ["added", "incremental_test_model"])

# run full-refresh and compare with base table again
results = run_dbt(
[
"run",
"--select",
"incremental_test_model",
"--full-refresh",
"--vars",
"seed_name: base",
]
)
assert len(results) == 1

check_relations_equal(project.adapter, ["base", "incremental_test_model"])

# get catalog from docs generate
catalog = run_dbt(["docs", "generate"])
assert len(catalog.nodes) == 3
assert len(catalog.sources) == 1


insertoverwrite_sql = """
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by="id_partition",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestInsertoverwriteKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {"incremental_test_model.sql": insertoverwrite_sql, "schema.yml": schema_base_yml}


incremental_single_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by="id_partition",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithSinglePartitionKeyKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_single_partitionby_sql,
"schema.yml": schema_base_yml,
}


incremental_multiple_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by=["id_partition1", "id_partition2"],
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition1, id as id_partition2 from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithMultiplePartitionKeyKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_multiple_partitionby_sql,
"schema.yml": schema_base_yml,
}