Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg v2 table support #432

Closed
wants to merge 36 commits into from
Closed

Conversation

dparent1
Copy link
Contributor

@dparent1 dparent1 commented Aug 19, 2022

resolves #294

Adding in additional support for iceberg v2 tables. Spark throws an exception when running SHOW TABLE EXTENDED with v2 iceberg tables (See https://issues.apache.org/jira/browse/SPARK-33393). The workaround I've come up with will perform a SHOW TABLES when this exception occurs, if this succeeds then information is pulled from a DESCRIBE EXTENDED {{ table_name }} to get extended properties of the table. This returns the Provider information needed to identify that the provider is actually iceberg. Using this information I can set an is_iceberg=True on the relation. In places where interacting differs for iceberg tables I am able to check is_iceberg and follow slightly different logic.

An addition for multiple join conditions to allow for multiple columns to make a row distinct has been added.

Description

Checklist

  • I have signed the CLA
  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have run changie new to create a changelog entry

Found a way to identify iceberg tables given that spark returns
an error when trying to execute "SHOW TABLE EXTENDED..."  See
https://issues.apache.org/jira/browse/SPARK-33393

Instead of show table extended a "DESCRIBE EXTENDED" is
performed to retrieve the provider information.  This allows
for identification of iceberg through an is_iceberg member
variable.

Allow for multiple join conditions to allow for mutliple columns to
make a row distinct

Use is_iceberg everywhere handling iceberg tables differs from other
sources of data.
[CT-276] Apache Iceberg Support dbt-labs#294

The _schema variable was used for non-iceberg tables but was being
overridden by work for iceberg v2 tables.  I've made it so the iceberg
condition will set _schema rather than blanket changing the schema for
all providers.
On second look I wasn't happy with my name choices for macro name and
method, hopefully what I have now makes more sense.

[CT-276] Apache Iceberg Support dbt-labs#294
@cla-bot cla-bot bot added the cla:yes label Aug 19, 2022
@dparent1 dparent1 changed the title Dparent1/iceberg iceberg v2 table support Aug 29, 2022
@kbendick
Copy link

Hi! I see tests are running now and it looks like things are up to date.

I’m going to review this in the next few days :)


def __post_init__(self):
if self.database != self.schema and self.database:
if self.is_iceberg is not True and self.database != self.schema and self.database:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like maybe it should be or? Any of these conditions are failures or otherwise need to be checked everywhere.

If this is only the post_init that seems fine to me, but this feels like two independent pieces of information (is qualified / namespaces relation and then the is iceberg). Is that not correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give that change a try in my testcases. I think you're right here.

Copy link
Contributor Author

@dparent1 dparent1 Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright so using or's in this case quickly made all the tox tests fail :). Likely because the first condition of the if is always going to evaluate to true for the base tests since none of them are for iceberg.

@igorcalabria
Copy link

@dparent1 Hey, thanks a lot a for this PR. Do you think it makes sense to add support for setting the spark catalog(#294 (comment)) too? I'm not sure how other people are using iceberg, but I have the impression that having a separate catalog for iceberg tables is pretty common.

The patch is basically removing all the checks around setting the database. I ran some tests setting the database both on profile and model levels and things seems to work as expected. One thing to note is that "database" name may be confusing in spark, so maybe adding a "catalog" alias may make sense.

CC: @kbendick

diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py
index 951e8ed..fa011c4 100644
--- a/dbt/adapters/spark/connections.py
+++ b/dbt/adapters/spark/connections.py
@@ -59,7 +59,7 @@ class SparkConnectionMethod(StrEnum):
 class SparkCredentials(Credentials):
     host: str
     method: SparkConnectionMethod
-    database: Optional[str]  # type: ignore
+    database: Optional[str]
     driver: Optional[str] = None
     cluster: Optional[str] = None
     endpoint: Optional[str] = None
@@ -87,16 +87,6 @@ class SparkCredentials(Credentials):
         return self.cluster
 
     def __post_init__(self):
-        # spark classifies database and schema as the same thing
-        if self.database is not None and self.database != self.schema:
-            raise dbt.exceptions.RuntimeException(
-                f"    schema: {self.schema} \n"
-                f"    database: {self.database} \n"
-                f"On Spark, database must be omitted or have the same value as"
-                f" schema."
-            )
-        self.database = None
-
         if self.method == SparkConnectionMethod.ODBC:
             try:
                 import pyodbc  # noqa: F401
@@ -147,7 +137,7 @@ class SparkCredentials(Credentials):
         return self.host
 
     def _connection_keys(self):
-        return ("host", "port", "cluster", "endpoint", "schema", "organization")
+        return ("host", "port", "cluster", "endpoint", "schema", "organization", "database")
 
 
 class PyhiveConnectionWrapper(object):
diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py
index f1f1c7d..c58262b 100644
--- a/dbt/adapters/spark/relation.py
+++ b/dbt/adapters/spark/relation.py
@@ -21,7 +21,7 @@ class SparkQuotePolicy(Policy):
 
 @dataclass
 class SparkIncludePolicy(Policy):
-    database: bool = False
+    database: bool = True
     schema: bool = True
     identifier: bool = True
 
@@ -39,10 +39,6 @@ class SparkRelation(BaseRelation):
     source_meta: Optional[Dict[str, Any]] = None
     meta: Optional[Dict[str, Any]] = None
 
-    def __post_init__(self):
-        if self.database != self.schema and self.database:
-            raise RuntimeException("Cannot set database in spark!")
-
     @classmethod
     def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self:
         source_quoting = source.quoting.to_dict(omit_none=True)
@@ -65,9 +61,4 @@ class SparkRelation(BaseRelation):
         )
 
     def render(self):
-        if self.include_policy.database and self.include_policy.schema:
-            raise RuntimeException(
-                "Got a spark relation with schema and database set to "
-                "include, but only one can be set"
-            )
         return super().render()
diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql
index 6abfc36..aea7f51 100644
--- a/dbt/include/spark/macros/adapters.sql
+++ b/dbt/include/spark/macros/adapters.sql
@@ -266,9 +266,14 @@
   {%- endcall %}
 {% endmacro %}
 
-
 {% macro spark__generate_database_name(custom_database_name=none, node=none) -%}
-  {% do return(None) %}
+    {%- if custom_database_name is none -%}
+        {% do return(none) if not target.database -%}
+
+        {{ target.database }}
+    {%- else -%}
+        {{ custom_database_name | trim }}
+    {%- endif -%}
 {%- endmacro %}
 
 {% macro spark__persist_docs(relation, model, for_relation, for_columns) -%}

@dparent1
Copy link
Contributor Author

dparent1 commented Nov 7, 2022

@dparent1 Hey, thanks a lot a for this PR. Do you think it makes sense to add support for setting the spark catalog(#294 (comment)) too? I'm not sure how other people are using iceberg, but I have the impression that having a separate catalog for iceberg tables is pretty common.

The patch is basically removing all the checks around setting the database. I ran some tests setting the database both on profile and model levels and things seems to work as expected. One thing to note is that "database" name may be confusing in spark, so maybe adding a "catalog" alias may make sense.

CC: @kbendick

@igorcalabria I've just merged in the latest changes from dbt-spark/main and if everything passes from the tests that it will kick off I'll give your changes a try.

@cccs-jc
Copy link

cccs-jc commented Nov 15, 2022

@kbendick and @jtcohen6, we have added support for Iceberg tables. We are waiting for this PR to be merged. Is there anything missing?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dparent1 for working on this! I did some testing today and think this looks great. I fixed the issue reported by @brandys11 in a PR against your branch: dparent1#9

@jtcohen6 Long time no see :) Hope everything is well! I was wondering if this is something that we can get in

dbt/adapters/spark/impl.py Outdated Show resolved Hide resolved
@Fokko
Copy link
Contributor

Fokko commented Nov 29, 2022

@ChenyuLInx Would you have any time to look at this PR? Having Iceberg support in dbt-cloud would help us a lot!

@Fleid
Copy link
Contributor

Fleid commented Dec 2, 2022

Hey all,

We have this PR planned for review in our next sprint, starting mid next week.
Hopefully we can have it merged it before the end of the year ;)

If I recap what I'm seeing, in here we add the following:

  • update how dbt loads and processes metadata during caching, to take into account the current behavior of Spark around Iceberg v2 (overall refactoring around 3 level namespace, dealing with SHOW TABLE EXTENDED / DESCRIBE EXTENDED)
  • add file_format: iceberg in the incremental (excluding insert_overwrite) and table materializations, and adapt the replacement strategies

Let me know if I get that wrong ;)

@rdblue
Copy link

rdblue commented Dec 5, 2022

@Fleid, it is going to take a while to get SHOW TABLES EXTENDED into Spark (6 month release cycle) and it is not very portable SQL. I think a faster way to get this done is to use INFORMATION_SCHEMA tables. Is that okay with you? What information do you need from the SHOW TABLES EXTENDED or INFORMATION_SCHEMA query?

Cleanup based on comments
@dparent1
Copy link
Contributor Author

@mikealfare: @Fokko was nice enough to make the changes and send me a PR. I've tried it with the latest merge of main that I've done and all the tox tests are passing in my dev environment. I ran jaffle_shop as well against a spark-iceberg docker container I have setup and everything seems to still work for me.

@VersusFacit
Copy link
Contributor

VersusFacit commented Jan 9, 2023

@dparent1

I ran jaffle_shop as well against a spark-iceberg docker container I have setup and everything seems to still work for me.

I presume (hope) the basic image form their site? That'd be really good news. Context here, I spent time before our holiday break digging into the logic, systems, and implementation here--a lot of moving parts I had to get up to speed with.

I must test a few things to confirm with the engineering team in-house that this is indeed looking good to ship.

🙏 Could you do me a huge favor and share what the proper profiles.yml config (no secrets of course) would be for the default iceberg image? I got the thing spun up on my system. However, with all the ways spark can be configured, I'm unsure how to connect a jaffle_shop to my local instance. Share that and you'll do a lot to speed up this "final check" ❤️

@dparent1
Copy link
Contributor Author

dparent1 commented Jan 9, 2023

@dparent1

I ran jaffle_shop as well against a spark-iceberg docker container I have setup and everything seems to still work for me.

I presume (hope) the basic image form their site? That'd be really good news. Context here, I spent time before our holiday break digging into the logic, systems, and implementation here--a lot of moving parts I had to get up to speed with.

I must test a few things to confirm with the engineering team in-house that this is indeed looking good to ship.

🙏 Could you do me a huge favor and share what the proper profiles.yml config (no secrets of course) would be for the default iceberg image? I got the thing spun up on my system. However, with all the ways spark can be configured, I'm unsure how to connect a jaffle_shop to my local instance. Share that and you'll do a lot to speed up this "final check" ❤️

It's actually a very simple profiles.yml that I use and hopefully it helps you out here:

jaffle_shop:
  target: dev
  outputs:
    dev:
      type: spark
      method: session
      schema: demo
      host: NA
config:
  send_anonymous_usage_stats: False

I am using the Spark + Iceberg Quickstart Image but I modified the docker-compose.yml so that I could mount my dbt-spark folder into the container. I also mount a folder that includes the profiles.yml file that I use as well, just a shortcut so I can test out my changes quickly. I mount the folders under /root since you are the root user when you docker exec -it spark-iceberg /bin/bash. I also set the following environment variables:

export PYTHONPATH=/opt/spark/python:/opt/spark/python/lib/py4j-0.10.9.3-src.zip
export DBT_PROFILES_DIR=/root/scratch/dbt/profiles

@VersusFacit
Copy link
Contributor

Thanks so much for recommending the session strategy. That was absolutely the ticket -- of course the simple solution was the solution all along.

I was in the wrong setup method (spark has so many ways of doing this my goodness). I'm on the case and prioritizing reviewing this with Florian and the team this week

@Fokko
Copy link
Contributor

Fokko commented Jan 9, 2023

That's awesome @VersusFacit! I was just doing a write-up on yet another way to connect, in case it might be helpful:

Setup the local docker-spark-iceberg setup:

➜  Desktop [email protected]:tabular-io/docker-spark-iceberg.git
➜  Desktop cd docker-spark-iceberg
➜  Desktop docker-compose up --no-build

It is helpful to keep this open in a tab just in case something happens during development, you can see the stack traces.

Next, I'll fire up a browser and make sure that the database default is created by running a Jupyter cell on http://localhost:8888/

%%sql

CREATE DATABASE default;

Make sure that you have your dbt-profile.yml setup:

jaffle_shop:
  target: dev
  outputs:
    dev:
      type: spark
      method: thrift
      schema: default
      host: localhost
      port: 10000

Next we can setup dbt-spark. Checkout the branch that you'd like to test:

➜  dbt-spark git:(dparent1/iceberg) git show

commit 83f2d619afe9d2d0d0f399a0de89f4ddcaed15d2 (HEAD -> dparent1/iceberg, dparent1/dparent1/iceberg)
Merge: beac4b4 3a7ca7c
Author: Dan Parent <[email protected]>
Date:   Mon Jan 9 11:12:28 2023 -0500

    Merge branch 'main' into dparent1/iceberg

➜  dbt-spark git:(dparent1/iceberg) pip install -e ".[PyHive]"

Run the Jaffle shop:

➜  jaffle_shop git:(main) ✗ dbt seed 
20:26:52  Running with dbt=1.4.0-b1
20:26:52  Partial parse save file not found. Starting full parse.
20:26:53  Found 5 models, 20 tests, 0 snapshots, 0 analyses, 332 macros, 0 operations, 3 seed files, 0 sources, 0 exposures, 0 metrics
20:26:53  
20:26:53  Concurrency: 1 threads (target='dev')
20:26:53  
20:26:53  1 of 3 START seed file default.raw_customers ................................... [RUN]
20:26:54  1 of 3 OK loaded seed file default.raw_customers ............................... [INSERT 100 in 0.40s]
20:26:54  2 of 3 START seed file default.raw_orders ...................................... [RUN]
20:26:54  2 of 3 OK loaded seed file default.raw_orders .................................. [INSERT 99 in 0.38s]
20:26:54  3 of 3 START seed file default.raw_payments .................................... [RUN]
20:26:54  3 of 3 OK loaded seed file default.raw_payments ................................ [INSERT 113 in 0.47s]
20:26:54  
20:26:54  Finished running 3 seeds in 0 hours 0 minutes and 1.69 seconds (1.69s).
20:26:54  
20:26:54  Completed successfully
20:26:54  
20:26:54  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

And the actual run:

➜  jaffle_shop git:(main) ✗ dbt run
20:27:02  Running with dbt=1.4.0-b1
20:27:02  Found 5 models, 20 tests, 0 snapshots, 0 analyses, 332 macros, 0 operations, 3 seed files, 0 sources, 0 exposures, 0 metrics
20:27:02  
20:27:02  Concurrency: 1 threads (target='dev')
20:27:02  
20:27:02  1 of 5 START sql table model default.stg_customers ............................. [RUN]
20:27:03  1 of 5 OK created sql table model default.stg_customers ........................ [OK in 0.36s]
20:27:03  2 of 5 START sql table model default.stg_orders ................................ [RUN]
20:27:03  2 of 5 OK created sql table model default.stg_orders ........................... [OK in 0.35s]
20:27:03  3 of 5 START sql table model default.stg_payments .............................. [RUN]
20:27:03  3 of 5 OK created sql table model default.stg_payments ......................... [OK in 0.30s]
20:27:03  4 of 5 START sql table model default.customers ................................. [RUN]
20:27:04  4 of 5 OK created sql table model default.customers ............................ [OK in 0.78s]
20:27:04  5 of 5 START sql table model default.orders .................................... [RUN]
20:27:05  5 of 5 OK created sql table model default.orders ............................... [OK in 0.45s]
20:27:05  
20:27:05  Finished running 5 table models in 0 hours 0 minutes and 2.69 seconds (2.69s).
20:27:05  
20:27:05  Completed successfully
20:27:05  
20:27:05  Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5

I had to change everything to a table, because Iceberg does not yet supports views:

models:
  jaffle_shop:
      materialized: table
      staging:
-        materialized: view
+        materialized: table

Everything in the catalog is being forced to be of the format Iceberg by the test setup (Spark configuration). In normal circumstances, you can just mix Iceberg tables with non-Iceberg views.

And you can see the result ✨

image

We can also see that the Provider is set to Iceberg:

image

@VersusFacit
Copy link
Contributor

Thanks to you too Fokko. I was attempting to use Thrift with little success, so I'll take a look at these instructions and see if I can get both to work (more coverage is better!).

Moreover, these notes you've left should go into our docs to help future users along. Great additions.

I''ll move onto reviewing this immediately! 🖖

@Fokko
Copy link
Contributor

Fokko commented Jan 17, 2023

@VersusFacit How's the review going, anything I can help with?

@VersusFacit
Copy link
Contributor

@Fokko I've got some questions that would be benefit from a faster chat cycle and messaged you on dbt slack. Mind following up there for now?

@Fokko
Copy link
Contributor

Fokko commented Jan 18, 2023

@VersusFacit Of course, I've pinged you on Slack. There are many Fokko Driesprongs on Slack that are no longer active (old email addresses). Sorry for the confusion there

dparent1 and others added 7 commits January 23, 2023 15:25
- Fixing up merge conflicts around Exception/Error types
- Putting back accidental newline removal in tox.ini
- in impl.py changed:
    List[agate.Row]
    to
    AttrDict

This fixed the pre-commit error after merging in changes from main.
I noticed that two Spark tests are failing, so tried to revert
some of the changes that seemed related. They are now passing
and also my Iceberg dbt project is still running fine
@Fokko Fokko mentioned this pull request Feb 2, 2023
6 tasks
@VersusFacit VersusFacit mentioned this pull request Feb 24, 2023
6 tasks
@VersusFacit
Copy link
Contributor

VersusFacit commented Feb 24, 2023

Hello @dparent1! Thanks for your patience. Testing this thing has proven to be one of the trickier tasks of my career thus far. @Fokko provided critical support in revising the docker image into a stable enough test env despite me finding new and interesting ways to break the sandbox via dbt invocations.

Without further pause, I have this PR to offer

  • I didn't want to push directly to your branch here and didn't have permissions to open a branch/PR in your repo
  • My PR represents some refactors as part of our wrap up process. I refactored the python logic since the multi-format complexities of dbt-spark has forced our hand. I aimed for code that permitted future format additions and fewer.
  • I really don't have a major preference between:
    1. you merge my commits into this PR and we merge it after local testing/CI passes green/last minute review
    2. we just do all this process on my PR (maybe this is best because my colleagues may have code comments 🤷‍♀️ )
    3. open suggestions or a feature branch with revisions to merge against my existing PR, which we merge into dbt-spark's main afterwards

We need to do some testing before we merge with confidence:

  • ✅ CI must pass (and it does 🙏 )
  • ❓ I understand you have local test cases. I want those operations -- tables, snapshots, and incrementals, I hope -- verifying all 🟢 before we go ahead for final merge. Our CI passing is decent indirect evidence Iceberg works. However, some direct coverage would be even better!

We're officially in the home stretch 🏁 Let me know how you plan to proceed with local verification. <3

@dparent1 dparent1 closed this Feb 24, 2023
@dparent1
Copy link
Contributor Author

dparent1 commented Feb 24, 2023

Hello @dparent1! Thanks for your patience. Testing this thing has proven to be one of the trickier tasks of my career thus far. @Fokko provided critical support in revising the docker image into a stable enough test env despite me finding new and interesting ways to break the sandbox via dbt invocations.

Without further pause, I have this PR to offer

* I didn't want to push directly to your branch here and didn't have permissions to open a branch/PR in your repo

* My PR represents some refactors as part of our wrap up process. I refactored the python logic since the multi-format complexities of `dbt-spark` has forced our hand. I aimed for code that permitted future format additions and fewer.

* I really don't have a major preference between:
  
  1. you merge my commits into this PR and we merge it after local testing/CI passes green/last minute review
  2. we just do all this process on my PR (maybe this is best because my colleagues may have code comments 🤷‍♀️ )
  3. open suggestions or a feature branch with revisions to merge against my existing PR, which we merge into `dbt-spark`'s `main` afterwards

We need to do some testing before we merge with confidence:

* ✅ CI must pass (and it does 🙏  )

* ❓ I understand you have local test cases. I want those operations -- tables, snapshots, and incrementals, I hope -- verifying all 🟢 before we go ahead for final merge. Our CI passing is decent indirect evidence Iceberg works. However, some direct coverage would be even better!

We're officially in the home stretch 🏁 Let me know how you plan to proceed with local verification. <3

Hi @VersusFacit - I'm ok with your PR going in instead of mine, either way the change makes it in and that's my only goal :). I can close this PR if you're ok with that. I'll pull your PR and try my local testing, if @Fokko hasn't had issues that usually means I'm good too. I'll verify and let you know.

Quick update to make sure there is no confusion, I've tested the new PR and it works fine for my testcase. There's a lot going on in this particular PR so I'd like to leave it with some closure.

@meyergin
Copy link

meyergin commented Mar 8, 2023

Cool, it works well for incremental config(append/merge) with thrift-service (version 1.5.0-b2).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CT-276] Apache Iceberg Support