diff --git a/.changes/unreleased/Fixes-20220819-141350.yaml b/.changes/unreleased/Fixes-20220819-141350.yaml new file mode 100644 index 000000000..594c5216a --- /dev/null +++ b/.changes/unreleased/Fixes-20220819-141350.yaml @@ -0,0 +1,8 @@ +kind: Fixes +body: Support for iceberg v2 tables. Added ability to use multiple join conditions + to allow for multiple columns to make a row distinct. +time: 2022-08-19T14:13:50.3145273-04:00 +custom: + Author: dparent1 + Issue: "294" + PR: "432" diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index fea5bbacf..ee71d60f1 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -32,6 +32,8 @@ GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw" LIST_SCHEMAS_MACRO_NAME = "list_schemas" LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching" +LIST_RELATIONS_SHOW_TABLES_MACRO_NAME = "list_relations_show_tables_without_caching" +DESCRIBE_TABLE_EXTENDED_MACRO_NAME = "describe_table_extended_without_caching" DROP_RELATION_MACRO_NAME = "drop_relation" FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" @@ -133,38 +135,83 @@ def add_schema_to_cache(self, schema) -> str: # so jinja doesn't render things return "" + def parse_information(self, table_name: str) -> str: + information = "" + try: + table_results = self.execute_macro( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name} + ) + except dbt.exceptions.DbtRuntimeError as e: + logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") + return "" + for info_row in table_results: + info_type, info_value, _ = info_row + if not info_type.startswith("#"): + information += f"{info_type}: {info_value}\n" + return information + def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {"schema_relation": schema_relation} + try_show_tables = False + expected_result_rows = 4 try: - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + results = self.execute_macro( + LIST_RELATIONS_MACRO_NAME, kwargs={"schema_relation": schema_relation} + ) except dbt.exceptions.DbtRuntimeError as e: errmsg = getattr(e, "msg", "") if f"Database '{schema_relation}' not found" in errmsg: return [] + elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg: + # this happens with spark-iceberg with v2 iceberg tables + # https://issues.apache.org/jira/browse/SPARK-33393 + try_show_tables = True else: description = "Error while retrieving information about" logger.debug(f"{description} {schema_relation}: {e.msg}") return [] + if try_show_tables: + expected_result_rows = 3 + try: + results = self.execute_macro( + LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, + kwargs={"schema_relation": schema_relation}, + ) + except dbt.exceptions.DbtRuntimeError as e: + description = "Error while retrieving information about" + logger.debug(f"{description} {schema_relation}: {e.msg}") + return [] + relations = [] for row in results: - if len(row) != 4: + if len(row) != expected_result_rows: + if try_show_tables: + description = 'Invalid value from "show tables ...", ' + else: + description = 'Invalid value from "show table extended ...", ' raise dbt.exceptions.DbtRuntimeError( - f'Invalid value from "show table extended ...", ' - f"got {len(row)} values, expected 4" + f"{description} got {len(row)} values, expected {expected_result_rows}" ) - _schema, name, _, information = row - rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table + + if try_show_tables: + _schema, name, _ = row + information = self.parse_information(f"{_schema}.{name}") + else: + _schema, name, _, information = row is_delta = "Provider: delta" in information is_hudi = "Provider: hudi" in information + is_iceberg = "Provider: iceberg" in information + rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table + relation: BaseRelation = self.Relation.create( schema=_schema, identifier=name, type=rel_type, information=information, is_delta=is_delta, + is_iceberg=is_iceberg, is_hudi=is_hudi, ) relations.append(relation) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 0b0c58bc1..b06aa388a 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,9 +1,14 @@ -from typing import Optional - +from typing import Optional, TypeVar from dataclasses import dataclass, field from dbt.adapters.base.relation import BaseRelation, Policy + from dbt.exceptions import DbtRuntimeError +from dbt.events import AdapterLogger + +logger = AdapterLogger("Spark") + +Self = TypeVar("Self", bound="BaseRelation") @dataclass @@ -27,6 +32,7 @@ class SparkRelation(BaseRelation): quote_character: str = "`" is_delta: Optional[bool] = None is_hudi: Optional[bool] = None + is_iceberg: Optional[bool] = None information: Optional[str] = None def __post_init__(self): diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 21350ea3f..13deb09bf 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -1,3 +1,14 @@ +{% macro dbt_spark_tblproperties_clause() -%} + {%- set tblproperties = config.get('tblproperties') -%} + {%- if tblproperties is not none %} + tblproperties ( + {%- for prop in tblproperties -%} + '{{ prop }}' = '{{ tblproperties[prop] }}' {% if not loop.last %}, {% endif %} + {%- endfor %} + ) + {%- endif %} +{%- endmacro -%} + {% macro file_format_clause() %} {{ return(adapter.dispatch('file_format_clause', 'dbt')()) }} {%- endmacro -%} @@ -133,7 +144,7 @@ {%- if temporary -%} {{ create_temporary_view(relation, compiled_code) }} {%- else -%} - {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} + {% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'iceberg'] %} create or replace table {{ relation }} {% else %} create table {{ relation }} @@ -243,7 +254,10 @@ {% endmacro %} {% macro spark__get_columns_in_relation(relation) -%} - {{ return(adapter.get_columns_in_relation(relation)) }} + {% call statement('get_columns_in_relation', fetch_result=True) %} + describe extended {{ relation.include(schema=(schema is not none)) }} + {% endcall %} + {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} {% macro spark__list_relations_without_caching(relation) %} @@ -254,6 +268,27 @@ {% do return(load_result('list_relations_without_caching').table) %} {% endmacro %} +{% macro list_relations_show_tables_without_caching(schema_relation) %} + {#-- Spark with iceberg tables don't work with show table extended for #} + {#-- V2 iceberg tables #} + {#-- https://issues.apache.org/jira/browse/SPARK-33393 #} + {% call statement('list_relations_without_caching_show_tables', fetch_result=True) -%} + show tables in {{ schema_relation }} like '*' + {% endcall %} + + {% do return(load_result('list_relations_without_caching_show_tables').table) %} +{% endmacro %} + +{% macro describe_table_extended_without_caching(table_name) %} + {#-- Spark with iceberg tables don't work with show table extended for #} + {#-- V2 iceberg tables #} + {#-- https://issues.apache.org/jira/browse/SPARK-33393 #} + {% call statement('describe_table_extended_without_caching', fetch_result=True) -%} + describe extended {{ table_name }} + {% endcall %} + {% do return(load_result('describe_table_extended_without_caching').table) %} +{% endmacro %} + {% macro spark__list_schemas(database) -%} {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} show databases @@ -293,14 +328,20 @@ {% endmacro %} {% macro spark__alter_column_comment(relation, column_dict) %} - {% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %} + {% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi', 'iceberg'] %} {% for column_name in column_dict %} {% set comment = column_dict[column_name]['description'] %} {% set escaped_comment = comment | replace('\'', '\\\'') %} {% set comment_query %} - alter table {{ relation }} change column - {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} - comment '{{ escaped_comment }}'; + {% if relation.is_iceberg %} + alter table {{ relation }} alter column + {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} + comment '{{ escaped_comment }}'; + {% else %} + alter table {{ relation }} change column + {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} + comment '{{ escaped_comment }}'; + {% endif %} {% endset %} {% do run_query(comment_query) %} {% endfor %} @@ -328,7 +369,13 @@ {% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} {% if remove_columns %} - {% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %} + {% if relation.is_delta %} + {% set platform_name = 'Delta Lake' %} + {% elif relation.is_iceberg %} + {% set platform_name = 'Iceberg' %} + {% else %} + {% set platform_name = 'Apache Spark' %} + {% endif %} {{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }} {% endif %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index cc46d4c14..d2c1f5e43 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -55,7 +55,7 @@ {%- endcall -%} {%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} {%- call statement('main') -%} - {{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, incremental_predicates) }} + {{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, existing_relation, unique_key, incremental_predicates) }} {%- endcall -%} {%- if language == 'python' -%} {#-- diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index facfaadff..697055386 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -1,9 +1,15 @@ -{% macro get_insert_overwrite_sql(source_relation, target_relation) %} +{% macro get_insert_overwrite_sql(source_relation, target_relation, existing_relation) %} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert overwrite table {{ target_relation }} - {{ partition_cols(label="partition") }} + {% if existing_relation.is_iceberg %} + {# removed table from statement for iceberg #} + insert overwrite {{ target_relation }} + {# removed partition_cols for iceberg as well #} + {% else %} + insert overwrite table {{ target_relation }} + {{ partition_cols(label="partition") }} + {% endif %} select {{dest_cols_csv}} from {{ source_relation }} {% endmacro %} @@ -62,15 +68,15 @@ {% endmacro %} -{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key, incremental_predicates) %} +{% macro dbt_spark_get_incremental_sql(strategy, source, target, existing, unique_key, incremental_predicates) %} {%- if strategy == 'append' -%} {#-- insert new records into existing table, without updating or overwriting #} {{ get_insert_into_sql(source, target) }} {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} - {{ get_insert_overwrite_sql(source, target) }} + {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} + {#-- merge all columns with databricks delta or iceberg - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }} {%- else -%} {% set no_sql_for_strategy_msg -%} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index ffd56f106..71ec01821 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -1,7 +1,7 @@ {% macro dbt_spark_validate_get_file_format(raw_file_format) %} {#-- Validate the file format #} - {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %} + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'iceberg', 'libsvm', 'hudi'] %} {% set invalid_file_format_msg -%} Invalid file format provided: {{ raw_file_format }} @@ -26,12 +26,12 @@ {% set invalid_merge_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - You can only choose this strategy when file_format is set to 'delta' or 'hudi' + You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi' {%- endset %} {% set invalid_insert_overwrite_delta_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - You cannot use this strategy when file_format is set to 'delta' + You cannot use this strategy when file_format is set to 'delta' or 'iceberg' Use the 'append' or 'merge' strategy instead {%- endset %} @@ -44,7 +44,7 @@ {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} - {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} {% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 6cf2358fe..a397f84e5 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -15,7 +15,12 @@ {% macro spark__snapshot_merge_sql(target, source, insert_cols) -%} merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE + {% if target.is_iceberg %} + {# create view only supports a name (no catalog, or schema) #} + using {{ source.identifier }} as DBT_INTERNAL_SOURCE + {% else %} + using {{ source }} as DBT_INTERNAL_SOURCE + {% endif %} on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id when matched and DBT_INTERNAL_DEST.dbt_valid_to is null @@ -33,10 +38,18 @@ {% macro spark_build_snapshot_staging_table(strategy, sql, target_relation) %} {% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=target_relation.schema, - database=none, - type='view') -%} + {% if target_relation.is_iceberg %} + {# iceberg catalog does not support create view, but regular spark does. We removed the catalog and schema #} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=none, + database=none, + type='view') -%} + {% else %} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=target_relation.schema, + database=none, + type='view') -%} + {% endif %} {% set select = snapshot_staging_table(strategy, sql, target_relation) %} @@ -83,25 +96,25 @@ identifier=target_table, type='table') -%} - {%- if file_format not in ['delta', 'hudi'] -%} + {%- if file_format not in ['delta', 'iceberg', 'hudi'] -%} {% set invalid_format_msg -%} Invalid file format: {{ file_format }} - Snapshot functionality requires file_format be set to 'delta' or 'hudi' + Snapshot functionality requires file_format be set to 'delta' or 'iceberg' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} {%- if target_relation_exists -%} - {%- if not target_relation.is_delta and not target_relation.is_hudi -%} + {%- if not target_relation.is_delta and not target_relation.is_iceberg and not target_relation.is_hudi -%} {% set invalid_format_msg -%} - The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi' + The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'iceberg' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} {% endif %} {% if not adapter.check_schema_exists(model.database, model.schema) %} - {% do create_schema(model.database, model.schema) %} + {% do create_schema(model.schema) %} {% endif %} {%- if not target_relation.is_table -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index d323e4f34..927816de9 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -12,12 +12,16 @@ {{ run_hooks(pre_hooks) }} -- setup: if the target relation already exists, drop it - -- in case if the existing and future table is delta, we want to do a + -- in case if the existing and future table is delta or iceberg, we want to do a -- create or replace table instead of dropping, so we don't have the table unavailable {% if old_relation and not (old_relation.is_delta and config.get('file_format', validator=validation.any[basestring]) == 'delta') -%} {{ adapter.drop_relation(old_relation) }} {%- endif %} + {% if old_relation and not (old_relation.is_iceberg and config.get('file_format', validator=validation.any[basestring]) == 'iceberg') -%} + {{ adapter.drop_relation(old_relation) }} + {%- endif %} + -- build model {%- call statement('main', language=language) -%}