Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg v2 table support #432

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
35bc12e
Adding in additional support for iceberg v2 tables
dparent1 Aug 16, 2022
6f96705
Setting _schema rather than replacing it
dparent1 Aug 19, 2022
ee8c7b0
Renaming macro and method name
dparent1 Aug 19, 2022
6764e55
Adding changelog entry.
dparent1 Aug 19, 2022
bbf79e4
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 12, 2022
50f7b94
Removing the is_iceberg check it is not needed
dparent1 Sep 27, 2022
402be01
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 28, 2022
93d6b97
Merge branch 'main' into dparent1/iceberg
dparent1 Sep 29, 2022
8d3984f
Merge branch 'main' into dparent1/iceberg
dparent1 Nov 7, 2022
d482d66
Set up CI with Azure Pipelines
dparent1 Nov 21, 2022
d282b21
Fix incremental runs
Fokko Nov 25, 2022
294b7fb
Merge pull request #9 from Fokko/fd-fix-incremental-runs
dparent1 Nov 28, 2022
b936b7a
Add Iceberg to the list
Fokko Dec 7, 2022
513293b
Merge pull request #10 from Fokko/fd-enable-docs
dparent1 Dec 8, 2022
65eac08
Backing out previous merge which broke unit tests
dparent1 Dec 8, 2022
37307d1
Merge branch 'dbt-labs:main' into main
dparent1 Dec 8, 2022
edb6f01
Merge changes in main to bring branch up to date
dparent1 Dec 8, 2022
3ffcf5b
Removed use of ParsedSourceDefinition, add iceberg
dparent1 Dec 8, 2022
34c5549
Allowing the use of /bin/bash by tox
dparent1 Dec 8, 2022
cad11e2
Merge branch 'dbt-labs:main' into main
dparent1 Dec 20, 2022
29610a5
Merge branch 'main' into dparent1/iceberg
dparent1 Dec 20, 2022
6e6daf5
Cleanup based on comments
Fokko Dec 20, 2022
beac4b4
Merge pull request #12 from Fokko/fd-comments
dparent1 Dec 20, 2022
3a7ca7c
Merge branch 'dbt-labs:main' into main
dparent1 Jan 9, 2023
83f2d61
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 9, 2023
10c9dac
Merge branch 'dbt-labs:main' into main
dparent1 Jan 23, 2023
3ba0a72
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 23, 2023
3c03d9a
Merge branch 'dbt-labs:main' into main
dparent1 Jan 30, 2023
28909f2
Merge branch 'main' into dparent1/iceberg
dparent1 Jan 30, 2023
b286fd0
Revert some stuff
Fokko Feb 1, 2023
908d9fe
Merge branch 'main' of https://github.com/dbt-labs/dbt-spark into fd-…
Fokko Feb 2, 2023
8e1a8f4
Merge pull request #16 from Fokko/fd-revert-some-changes
dparent1 Feb 2, 2023
d231e6b
Merge branch 'dbt-labs:main' into main
dparent1 Feb 13, 2023
d743dd3
Merge branch 'main' into dparent1/iceberg
dparent1 Feb 13, 2023
ac3087c
Merge branch 'dbt-labs:main' into main
dparent1 Feb 21, 2023
fdae2cf
Merge branch 'main' into dparent1/iceberg
dparent1 Feb 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20220819-141350.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: Support for iceberg v2 tables. Added ability to use multiple join conditions
to allow for multiple columns to make a row distinct.
time: 2022-08-19T14:13:50.3145273-04:00
custom:
Author: dparent1
Issue: "294"
PR: "432"
65 changes: 59 additions & 6 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME = "list_relations_show_tables_without_caching"
DESCRIBE_TABLE_EXTENDED_MACRO_NAME = "describe_table_extended_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"

Expand Down Expand Up @@ -123,38 +125,80 @@ def add_schema_to_cache(self, schema) -> str:
# so jinja doesn't render things
return ""

def parse_information(self, table_name):
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
information = ""
kwargs = {"table_name": table_name}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
try:
table_results = self.execute_macro(DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}")
return []
for info_row in table_results:
info_type, info_value, _ = info_row
if info_type.startswith("#") is False:
information += f"{info_type}: {info_value}\n"
return information

def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
try_show_tables = False
expected_result_rows = 4
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try_show_tables = True
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

if try_show_tables:
expected_result_rows = 3
try:
results = self.execute_macro(LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

relations = []
for row in results:
if len(row) != 4:
if len(row) != expected_result_rows:
if try_show_tables:
description = 'Invalid value from "show tables ...", '
else:
description = 'Invalid value from "show table extended ...", '
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
f"{description} got {len(row)} values, expected {expected_result_rows}"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

if try_show_tables:
_, name, _ = row
information = self.parse_information(name)
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
_schema = schema_relation.schema
else:
_schema, name, _, information = row
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
is_iceberg = "Provider: iceberg" in information
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
Expand Down Expand Up @@ -267,7 +311,16 @@ def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkC
return columns

def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)
columns = []
if relation and relation.information:
columns = self.parse_columns_from_information(relation)
else:
# in open source delta 'show table extended' query output doesn't
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

for column in columns:
# convert SparkColumns into catalog dicts
Expand Down
37 changes: 34 additions & 3 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from typing import Optional

from typing import Optional, TypeVar, Any, Type, Dict
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.utils import deep_merge
from dataclasses import dataclass

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import RuntimeException
from dbt.events import AdapterLogger

logger = AdapterLogger("Spark")

Self = TypeVar("Self", bound="BaseRelation")


@dataclass
Expand All @@ -27,12 +33,37 @@ class SparkRelation(BaseRelation):
quote_character: str = "`"
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
is_iceberg: Optional[bool] = None
information: Optional[str] = None
loader: Optional[str] = None
source_meta: Optional[Dict[str, Any]] = None
meta: Optional[Dict[str, Any]] = None

def __post_init__(self):
if self.database != self.schema and self.database:
if self.is_iceberg is not True and self.database != self.schema and self.database:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like maybe it should be or? Any of these conditions are failures or otherwise need to be checked everywhere.

If this is only the post_init that seems fine to me, but this feels like two independent pieces of information (is qualified / namespaces relation and then the is iceberg). Is that not correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give that change a try in my testcases. I think you're right here.

Copy link
Contributor Author

@dparent1 dparent1 Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright so using or's in this case quickly made all the tox tests fail :). Likely because the first condition of the if is always going to evaluate to true for the base tests since none of them are for iceberg.

raise RuntimeException("Cannot set database in spark!")

@classmethod
def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
source_quoting.pop("column", None)
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
source_quoting,
kwargs.get("quote_policy", {}),
)

return cls.create(
database=source.database,
schema=source.schema,
identifier=source.identifier,
quote_policy=quote_policy,
loader=source.loader,
source_meta=source.source_meta,
meta=source.meta,
**kwargs,
)

def render(self):
if self.include_policy.database and self.include_policy.schema:
raise RuntimeException(
Expand Down
59 changes: 54 additions & 5 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
{% macro dbt_spark_tblproperties_clause() -%}
{%- set tblproperties = config.get('tblproperties') -%}
{%- if tblproperties is not none %}
tblproperties (
{%- for prop in tblproperties -%}
'{{ prop }}' = '{{ tblproperties[prop] }}' {% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- endmacro -%}

{% macro file_format_clause() %}
{{ return(adapter.dispatch('file_format_clause', 'dbt')()) }}
{%- endmacro -%}
Expand Down Expand Up @@ -135,6 +146,8 @@
{%- else -%}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
create or replace table {{ relation }}
{% elif config.get('file_format', validator=validation.any[basestring]) == 'iceberg' %}
create or replace table {{ relation }}
{% else %}
create table {{ relation }}
{% endif %}
Expand Down Expand Up @@ -191,7 +204,10 @@
{% endmacro %}

{% macro spark__get_columns_in_relation(relation) -%}
{{ return(adapter.get_columns_in_relation(relation)) }}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe extended {{ relation.include(schema=(schema is not none)) }}
{% endcall %}
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}

{% macro spark__list_relations_without_caching(relation) %}
Expand All @@ -202,6 +218,27 @@
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}

{% macro list_relations_show_tables_without_caching(schema_relation) %}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('list_relations_without_caching_show_tables', fetch_result=True) -%}
show tables in {{ schema_relation }} like '*'
{% endcall %}

{% do return(load_result('list_relations_without_caching_show_tables').table) %}
{% endmacro %}

{% macro describe_table_extended_without_caching(table_name) %}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('describe_table_extended_without_caching', fetch_result=True) -%}
describe extended {{ table_name }}
{% endcall %}
{% do return(load_result('describe_table_extended_without_caching').table) %}
{% endmacro %}

{% macro spark__list_schemas(database) -%}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) %}
show databases
Expand Down Expand Up @@ -250,9 +287,15 @@
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
{% set comment_query %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% if relation.is_iceberg %}
alter table {{ relation }} alter column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% else %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% endif %}
{% endset %}
{% do run_query(comment_query) %}
{% endfor %}
Expand Down Expand Up @@ -280,7 +323,13 @@
{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{% if relation.is_delta %}
{% set platform_name = 'Delta Lake' %}
{% elif relation.is_iceberg %}
{% set platform_name = 'Iceberg' %}
{% else %}
{% set platform_name = 'Apache Spark' %}
{% endif %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
{% if target_relation.is_iceberg %}
{# removed table from statement for iceberg #}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
insert overwrite {{ target_relation }}
{# removed partition_cols for iceberg as well #}
{% else %}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
{% endif %}
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}
Expand All @@ -24,29 +30,26 @@
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set update_columns = config.get("merge_update_columns") -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
{% set merge_condition %}
{% if unique_key %}
{# added support for multiple join condition, multiple unique_key #}
on {% if unique_key is string %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}
{% else %}
{%- for k in unique_key %}
DBT_INTERNAL_SOURCE.{{ k }} = DBT_INTERNAL_DEST.{{ k }}
{%- if not loop.last %} AND {%- endif %}
{%- endfor %}
{% endif %}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE

{{ merge_condition }}

when matched then update set
{% if update_columns -%}{%- for column_name in update_columns %}
Expand All @@ -67,7 +70,7 @@
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{#-- merge all columns with databricks delta or iceberg - schema changes are handled for us #}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'iceberg', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,12 +26,12 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when file_format is set to 'delta'
You cannot use this strategy when file_format is set to 'delta' or 'iceberg'
Use the 'append' or 'merge' strategy instead
{%- endset %}

Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
Loading