Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg v2 table support #432

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
35bc12e
Adding in additional support for iceberg v2 tables
dparent1 Aug 16, 2022
6f96705
Setting _schema rather than replacing it
dparent1 Aug 19, 2022
ee8c7b0
Renaming macro and method name
dparent1 Aug 19, 2022
6764e55
Adding changelog entry.
dparent1 Aug 19, 2022
bbf79e4
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 12, 2022
50f7b94
Removing the is_iceberg check it is not needed
dparent1 Sep 27, 2022
402be01
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 28, 2022
93d6b97
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 29, 2022
8d3984f
Merge branch 'main' into dparent1/iceberg
dparent1 Nov 7, 2022
d482d66
Set up CI with Azure Pipelines
dparent1 Nov 21, 2022
d282b21
Fix incremental runs
Fokko Nov 25, 2022
294b7fb
Merge pull request #9 from Fokko/fd-fix-incremental-runs
dparent1 Nov 28, 2022
b936b7a
Add Iceberg to the list
Fokko Dec 7, 2022
513293b
Merge pull request #10 from Fokko/fd-enable-docs
dparent1 Dec 8, 2022
65eac08
Backing out previous merge which broke unit tests
dparent1 Dec 8, 2022
37307d1
Merge branch 'dbt-labs:main' into main
dparent1 Dec 8, 2022
edb6f01
Merge changes in main to bring branch up to date
dparent1 Dec 8, 2022
3ffcf5b
Removed use of ParsedSourceDefinition, add iceberg
dparent1 Dec 8, 2022
34c5549
Allowing the use of /bin/bash by tox
dparent1 Dec 8, 2022
cad11e2
Merge branch 'dbt-labs:main' into main
dparent1 Dec 20, 2022
29610a5
Merge branch 'main' into dparent1/iceberg
dparent1 Dec 20, 2022
6e6daf5
Cleanup based on comments
Fokko Dec 20, 2022
beac4b4
Merge pull request #12 from Fokko/fd-comments
dparent1 Dec 20, 2022
3a7ca7c
Merge branch 'dbt-labs:main' into main
dparent1 Jan 9, 2023
83f2d61
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 9, 2023
10c9dac
Merge branch 'dbt-labs:main' into main
dparent1 Jan 23, 2023
3ba0a72
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 23, 2023
3c03d9a
Merge branch 'dbt-labs:main' into main
dparent1 Jan 30, 2023
28909f2
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 30, 2023
b286fd0
Revert some stuff
Fokko Feb 1, 2023
908d9fe
Merge branch 'main' of https://github.com/dbt-labs/dbt-spark into fd-…
Fokko Feb 2, 2023
8e1a8f4
Merge pull request #16 from Fokko/fd-revert-some-changes
dparent1 Feb 2, 2023
d231e6b
Merge branch 'dbt-labs:main' into main
dparent1 Feb 13, 2023
d743dd3
Merge branch 'main' into dparent1/iceberg
dparent1 Feb 13, 2023
ac3087c
Merge branch 'dbt-labs:main' into main
dparent1 Feb 21, 2023
fdae2cf
Merge branch 'main' into dparent1/iceberg
dparent1 Feb 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20220819-141350.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: Support for iceberg v2 tables. Added ability to use multiple join conditions
to allow for multiple columns to make a row distinct.
time: 2022-08-19T14:13:50.3145273-04:00
custom:
Author: dparent1
Issue: "294"
PR: "432"
61 changes: 54 additions & 7 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME = "list_relations_show_tables_without_caching"
DESCRIBE_TABLE_EXTENDED_MACRO_NAME = "describe_table_extended_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"

Expand Down Expand Up @@ -133,38 +135,83 @@ def add_schema_to_cache(self, schema) -> str:
# so jinja doesn't render things
return ""

def parse_information(self, table_name: str) -> str:
information = ""
try:
table_results = self.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name}
)
except dbt.exceptions.DbtRuntimeError as e:
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}")
return ""
for info_row in table_results:
info_type, info_value, _ = info_row
if not info_type.startswith("#"):
information += f"{info_type}: {info_value}\n"
return information

def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
try_show_tables = False
expected_result_rows = 4
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
results = self.execute_macro(
LIST_RELATIONS_MACRO_NAME, kwargs={"schema_relation": schema_relation}
)
except dbt.exceptions.DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try_show_tables = True
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

if try_show_tables:
expected_result_rows = 3
try:
results = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME,
kwargs={"schema_relation": schema_relation},
)
except dbt.exceptions.DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

relations = []
for row in results:
if len(row) != 4:
if len(row) != expected_result_rows:
if try_show_tables:
description = 'Invalid value from "show tables ...", '
else:
description = 'Invalid value from "show table extended ...", '
raise dbt.exceptions.DbtRuntimeError(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
f"{description} got {len(row)} values, expected {expected_result_rows}"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

if try_show_tables:
_schema, name, _ = row
information = self.parse_information(f"{_schema}.{name}")
else:
_schema, name, _, information = row
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
is_iceberg = "Provider: iceberg" in information
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
Expand Down
10 changes: 8 additions & 2 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import Optional

from typing import Optional, TypeVar
from dataclasses import dataclass, field

from dbt.adapters.base.relation import BaseRelation, Policy

from dbt.exceptions import DbtRuntimeError
from dbt.events import AdapterLogger

logger = AdapterLogger("Spark")

Self = TypeVar("Self", bound="BaseRelation")


@dataclass
Expand All @@ -27,6 +32,7 @@ class SparkRelation(BaseRelation):
quote_character: str = "`"
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
is_iceberg: Optional[bool] = None
information: Optional[str] = None

def __post_init__(self):
Expand Down
61 changes: 54 additions & 7 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
{% macro dbt_spark_tblproperties_clause() -%}
{%- set tblproperties = config.get('tblproperties') -%}
{%- if tblproperties is not none %}
tblproperties (
{%- for prop in tblproperties -%}
'{{ prop }}' = '{{ tblproperties[prop] }}' {% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- endmacro -%}

{% macro file_format_clause() %}
{{ return(adapter.dispatch('file_format_clause', 'dbt')()) }}
{%- endmacro -%}
Expand Down Expand Up @@ -133,7 +144,7 @@
{%- if temporary -%}
{{ create_temporary_view(relation, compiled_code) }}
{%- else -%}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'iceberg'] %}
create or replace table {{ relation }}
{% else %}
create table {{ relation }}
Expand Down Expand Up @@ -243,7 +254,10 @@
{% endmacro %}

{% macro spark__get_columns_in_relation(relation) -%}
{{ return(adapter.get_columns_in_relation(relation)) }}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe extended {{ relation.include(schema=(schema is not none)) }}
{% endcall %}
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}

{% macro spark__list_relations_without_caching(relation) %}
Expand All @@ -254,6 +268,27 @@
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}

{% macro list_relations_show_tables_without_caching(schema_relation) %}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('list_relations_without_caching_show_tables', fetch_result=True) -%}
show tables in {{ schema_relation }} like '*'
{% endcall %}

{% do return(load_result('list_relations_without_caching_show_tables').table) %}
{% endmacro %}

{% macro describe_table_extended_without_caching(table_name) %}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('describe_table_extended_without_caching', fetch_result=True) -%}
describe extended {{ table_name }}
{% endcall %}
{% do return(load_result('describe_table_extended_without_caching').table) %}
{% endmacro %}

{% macro spark__list_schemas(database) -%}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) %}
show databases
Expand Down Expand Up @@ -293,14 +328,20 @@
{% endmacro %}

{% macro spark__alter_column_comment(relation, column_dict) %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi', 'iceberg'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
{% set comment_query %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% if relation.is_iceberg %}
alter table {{ relation }} alter column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% else %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% endif %}
{% endset %}
{% do run_query(comment_query) %}
{% endfor %}
Expand Down Expand Up @@ -328,7 +369,13 @@
{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{% if relation.is_delta %}
{% set platform_name = 'Delta Lake' %}
{% elif relation.is_iceberg %}
{% set platform_name = 'Iceberg' %}
{% else %}
{% set platform_name = 'Apache Spark' %}
{% endif %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
{%- endcall -%}
{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{%- call statement('main') -%}
{{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, incremental_predicates) }}
{{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, existing_relation, unique_key, incremental_predicates) }}
{%- endcall -%}
{%- if language == 'python' -%}
{#--
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}
{% macro get_insert_overwrite_sql(source_relation, target_relation, existing_relation) %}

VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
{% if existing_relation.is_iceberg %}
{# removed table from statement for iceberg #}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
insert overwrite {{ target_relation }}
{# removed partition_cols for iceberg as well #}
{% else %}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
{% endif %}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}
Expand Down Expand Up @@ -62,15 +68,15 @@
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key, incremental_predicates) %}
{% macro dbt_spark_get_incremental_sql(strategy, source, target, existing, unique_key, incremental_predicates) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{#-- merge all columns with databricks delta or iceberg - schema changes are handled for us #}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'iceberg', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,12 +26,12 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when file_format is set to 'delta'
You cannot use this strategy when file_format is set to 'delta' or 'iceberg'
Use the 'append' or 'merge' strategy instead
{%- endset %}

Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
33 changes: 23 additions & 10 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
{% macro spark__snapshot_merge_sql(target, source, insert_cols) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
{% if target.is_iceberg %}
{# create view only supports a name (no catalog, or schema) #}
using {{ source.identifier }} as DBT_INTERNAL_SOURCE
{% else %}
using {{ source }} as DBT_INTERNAL_SOURCE
{% endif %}
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
Expand All @@ -33,10 +38,18 @@
{% macro spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=none,
type='view') -%}
{% if target_relation.is_iceberg %}
{# iceberg catalog does not support create view, but regular spark does. We removed the catalog and schema #}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=none,
database=none,
type='view') -%}
{% else %}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
database=none,
type='view') -%}
{% endif %}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

Expand Down Expand Up @@ -83,25 +96,25 @@
identifier=target_table,
type='table') -%}

{%- if file_format not in ['delta', 'hudi'] -%}
{%- if file_format not in ['delta', 'iceberg', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta' or 'hudi'
Snapshot functionality requires file_format be set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{%- if target_relation_exists -%}
{%- if not target_relation.is_delta and not target_relation.is_hudi -%}
{%- if not target_relation.is_delta and not target_relation.is_iceberg and not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'iceberg' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
{% endif %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% do create_schema(model.schema) %}
{% endif %}

{%- if not target_relation.is_table -%}
Expand Down
Loading