Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBT-803 Fix-1.8.0-impala Add partitioning support for Kudu tables using dbt-impala adapter #209

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ demo_project:
|------|-----------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Table with Partitions |Yes| Yes | Yes |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes | Yes |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No |
|Materialization: Incremental - Merge|No| No | No |
|Materialization: Ephemeral|Yes| Yes | No |
Expand All @@ -66,7 +66,7 @@ demo_project:
|------|------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Table with Partitions |Yes| Yes | Yes |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| No | No |
Expand Down
18 changes: 16 additions & 2 deletions dbt/include/impala/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
{%- endif %}
{%- endmacro -%}

{% macro ct_option_kudu_partition_cols(label, required=false) %}
{%- set cols = config.get('partition_by', validator=validation[basestring]) -%}
{%- if cols is not none %}
{%- if cols is string -%}
{{ label }} {{ cols }}
{%- endif -%}
{%- endif %}
{%- endmacro -%}

{% macro ct_option_sort_cols(label, required=false) %}
{%- set cols = config.get('sort_by', validator=validation.any[list, basestring]) -%}
{%- if cols is not none %}
Expand Down Expand Up @@ -173,22 +182,27 @@
{%- set sql_header = config.get('sql_header', none) -%}
{%- set is_external = config.get('external') -%}
{%- set table_type = config.get('table_type') -%}
{%- set stored_as = config.get('stored_as', none) -%}

{{ sql_header if sql_header is not none }}

create {% if is_external == true -%}external{%- endif %} table
{{ relation.include(schema=true) }}
{{ ct_option_primary_key(label="PRIMARY KEY") }}
{% if table_type == 'iceberg' -%}
{{ ct_option_partition_cols(label="partitioned by spec") }}
{% else %}
{{ ct_option_partition_cols(label="partitioned by") }}
{% if stored_as == 'kudu' -%}
{{ ct_option_kudu_partition_cols(label="partition by") }}
{% else %}
{{ ct_option_partition_cols(label="partitioned by") }}
{%- endif %}
{%- endif %}
{{ ct_option_sort_cols(label="sort by") }}
{{ ct_option_comment_relation(label="comment") }}
{{ ct_option_row_format(label="row format") }}
{{ ct_option_with_serdeproperties(label="with serdeproperties") }}
{%- if table_type == 'iceberg' -%} STORED BY ICEBERG {%- endif -%}
{{ ct_option_primary_key(label="PRIMARY KEY") }}
{{ ct_option_stored_as(label="stored as") }}
{{ ct_option_location_clause(label="location") }}
{{ ct_option_cached_in(label="cached in") }}
Expand Down
64 changes: 14 additions & 50 deletions tests/functional/adapter/test_kudu.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
model_incremental,
)

from dbt.tests.adapter.basic.test_table_materialization import BaseTableMaterialization

pytestmark = pytest.mark.skipif(
os.getenv(key="DISABLE_KUDU_TEST", default="true") == "true",
reason="Kudu tests will be run when DISABLE_KUDU_TEST is set to false in test.env",
Expand Down Expand Up @@ -106,77 +108,39 @@ def test_incremental(self, project):
assert len(catalog.sources) == 1


insertoverwrite_sql = """
materialization_hash_partitionby_sql = """
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by="id_partition",
materialized="table",
partition_by="HASH (id) PARTITIONS 2",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
select * from {{ this.schema }}.seed
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestInsertoverwriteKudu(TestIncrementalKudu):
class TestMaterializationWithHashPartitionKudu(BaseTableMaterialization):
@pytest.fixture(scope="class")
def models(self):
return {"incremental_test_model.sql": insertoverwrite_sql, "schema.yml": schema_base_yml}
return {"materialized.sql": materialization_hash_partitionby_sql}


incremental_single_partitionby_sql = """
materialization_range_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by="id_partition",
materialized="table",
partition_by="Range (id) (PARTITION VALUES < 5, PARTITION 5 <= VALUES)",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithSinglePartitionKeyKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_single_partitionby_sql,
"schema.yml": schema_base_yml,
}


incremental_multiple_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by=["id_partition1", "id_partition2"],
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition1, id as id_partition2 from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
select * from {{ this.schema }}.seed
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithMultiplePartitionKeyKudu(TestIncrementalKudu):
class TestMaterializationWithRangePartitionKudu(BaseTableMaterialization):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_multiple_partitionby_sql,
"schema.yml": schema_base_yml,
}
return {"materialized.sql": materialization_range_partitionby_sql}