diff --git a/README.md b/README.md index a5651be..76acf71 100644 --- a/README.md +++ b/README.md @@ -45,10 +45,10 @@ demo_project: |------|-----------|---------|------| |Materialization: View|Yes| N/A | N/A | |Materialization: Table|Yes| Yes | Yes | -|Materialization: Table with Partitions |Yes| Yes | No | +|Materialization: Table with Partitions |Yes| Yes | Yes | |Materialization: Incremental - Append|Yes| Yes | Yes | |Materialization: Incremental - Append with Partitions |Yes| Yes | No | -|Materialization: Incremental - Insert+Overwrite |Yes| Yes | Yes | +|Materialization: Incremental - Insert+Overwrite |Yes| Yes | No | |Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No | |Materialization: Incremental - Merge|No| No | No | |Materialization: Ephemeral|Yes| Yes | No | @@ -66,7 +66,7 @@ demo_project: |------|------|---------|------| |Materialization: View|Yes| N/A | N/A | |Materialization: Table|Yes| Yes | Yes | -|Materialization: Table with Partitions |Yes| Yes | No | +|Materialization: Table with Partitions |Yes| Yes | Yes | |Materialization: Incremental - Append|Yes| Yes | Yes | |Materialization: Incremental - Append with Partitions |Yes| Yes | No | |Materialization: Incremental - Insert+Overwrite |Yes| No | No | diff --git a/dbt/include/impala/macros/adapters.sql b/dbt/include/impala/macros/adapters.sql index 6e0b36b..4074087 100644 --- a/dbt/include/impala/macros/adapters.sql +++ b/dbt/include/impala/macros/adapters.sql @@ -29,6 +29,15 @@ {%- endif %} {%- endmacro -%} +{% macro ct_option_kudu_partition_cols(label, required=false) %} + {%- set cols = config.get('partition_by', validator=validation[basestring]) -%} + {%- if cols is not none %} + {%- if cols is string -%} + {{ label }} {{ cols }} + {%- endif -%} + {%- endif %} +{%- endmacro -%} + {% macro ct_option_sort_cols(label, required=false) %} {%- set cols = config.get('sort_by', validator=validation.any[list, basestring]) -%} {%- if cols is not none %} @@ -173,22 +182,27 @@ {%- set sql_header = config.get('sql_header', none) -%} {%- set is_external = config.get('external') -%} {%- set table_type = config.get('table_type') -%} + {%- set stored_as = config.get('stored_as', none) -%} {{ sql_header if sql_header is not none }} create {% if is_external == true -%}external{%- endif %} table {{ relation.include(schema=true) }} + {{ ct_option_primary_key(label="PRIMARY KEY") }} {% if table_type == 'iceberg' -%} {{ ct_option_partition_cols(label="partitioned by spec") }} {% else %} - {{ ct_option_partition_cols(label="partitioned by") }} + {% if stored_as == 'kudu' -%} + {{ ct_option_kudu_partition_cols(label="partition by") }} + {% else %} + {{ ct_option_partition_cols(label="partitioned by") }} + {%- endif %} {%- endif %} {{ ct_option_sort_cols(label="sort by") }} {{ ct_option_comment_relation(label="comment") }} {{ ct_option_row_format(label="row format") }} {{ ct_option_with_serdeproperties(label="with serdeproperties") }} {%- if table_type == 'iceberg' -%} STORED BY ICEBERG {%- endif -%} - {{ ct_option_primary_key(label="PRIMARY KEY") }} {{ ct_option_stored_as(label="stored as") }} {{ ct_option_location_clause(label="location") }} {{ ct_option_cached_in(label="cached in") }} diff --git a/tests/functional/adapter/test_kudu.py b/tests/functional/adapter/test_kudu.py index a91da52..73d7cdd 100644 --- a/tests/functional/adapter/test_kudu.py +++ b/tests/functional/adapter/test_kudu.py @@ -26,6 +26,8 @@ model_incremental, ) +from dbt.tests.adapter.basic.test_table_materialization import BaseTableMaterialization + pytestmark = pytest.mark.skipif( os.getenv(key="DISABLE_KUDU_TEST", default="true") == "true", reason="Kudu tests will be run when DISABLE_KUDU_TEST is set to false in test.env", @@ -106,77 +108,39 @@ def test_incremental(self, project): assert len(catalog.sources) == 1 -insertoverwrite_sql = """ +materialization_hash_partitionby_sql = """ {{ config( - materialized="incremental", - incremental_strategy="insert_overwrite", - partition_by="id_partition", + materialized="table", + partition_by="HASH (id) PARTITIONS 2", stored_as="kudu", primary_key="(id)" ) }} - select *, id as id_partition from {{ source('raw', 'seed') }} - {% if is_incremental() %} - where id > (select max(id) from {{ this }}) - {% endif %} +select * from {{ this.schema }}.seed """.strip() -@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu") -class TestInsertoverwriteKudu(TestIncrementalKudu): +class TestMaterializationWithHashPartitionKudu(BaseTableMaterialization): @pytest.fixture(scope="class") def models(self): - return {"incremental_test_model.sql": insertoverwrite_sql, "schema.yml": schema_base_yml} + return {"materialized.sql": materialization_hash_partitionby_sql} -incremental_single_partitionby_sql = """ +materialization_range_partitionby_sql = """ {{ config( - materialized="incremental", - partition_by="id_partition", + materialized="table", + partition_by="Range (id) (PARTITION VALUES < 5, PARTITION 5 <= VALUES)", stored_as="kudu", primary_key="(id)" ) }} - select *, id as id_partition from {{ source('raw', 'seed') }} - {% if is_incremental() %} - where id > (select max(id) from {{ this }}) - {% endif %} -""".strip() - - -@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu") -class TestIncrementalWithSinglePartitionKeyKudu(TestIncrementalKudu): - @pytest.fixture(scope="class") - def models(self): - return { - "incremental_test_model.sql": incremental_single_partitionby_sql, - "schema.yml": schema_base_yml, - } - - -incremental_multiple_partitionby_sql = """ - {{ - config( - materialized="incremental", - partition_by=["id_partition1", "id_partition2"], - stored_as="kudu", - primary_key="(id)" - ) - }} - select *, id as id_partition1, id as id_partition2 from {{ source('raw', 'seed') }} - {% if is_incremental() %} - where id > (select max(id) from {{ this }}) - {% endif %} +select * from {{ this.schema }}.seed """.strip() -@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu") -class TestIncrementalWithMultiplePartitionKeyKudu(TestIncrementalKudu): +class TestMaterializationWithRangePartitionKudu(BaseTableMaterialization): @pytest.fixture(scope="class") def models(self): - return { - "incremental_test_model.sql": incremental_multiple_partitionby_sql, - "schema.yml": schema_base_yml, - } + return {"materialized.sql": materialization_range_partitionby_sql}