-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
iceberg v2 table support #432
Conversation
Found a way to identify iceberg tables given that spark returns an error when trying to execute "SHOW TABLE EXTENDED..." See https://issues.apache.org/jira/browse/SPARK-33393 Instead of show table extended a "DESCRIBE EXTENDED" is performed to retrieve the provider information. This allows for identification of iceberg through an is_iceberg member variable. Allow for multiple join conditions to allow for mutliple columns to make a row distinct Use is_iceberg everywhere handling iceberg tables differs from other sources of data.
[CT-276] Apache Iceberg Support dbt-labs#294 The _schema variable was used for non-iceberg tables but was being overridden by work for iceberg v2 tables. I've made it so the iceberg condition will set _schema rather than blanket changing the schema for all providers.
On second look I wasn't happy with my name choices for macro name and method, hopefully what I have now makes more sense. [CT-276] Apache Iceberg Support dbt-labs#294
Hi! I see tests are running now and it looks like things are up to date. I’m going to review this in the next few days :) |
dbt/adapters/spark/relation.py
Outdated
|
||
def __post_init__(self): | ||
if self.database != self.schema and self.database: | ||
if self.is_iceberg is not True and self.database != self.schema and self.database: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like maybe it should be or
? Any of these conditions are failures or otherwise need to be checked everywhere.
If this is only the post_init that seems fine to me, but this feels like two independent pieces of information (is qualified / namespaces relation and then the is iceberg). Is that not correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll give that change a try in my testcases. I think you're right here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright so using or's in this case quickly made all the tox tests fail :). Likely because the first condition of the if is always going to evaluate to true for the base tests since none of them are for iceberg.
Upon further investigation this check is not needed since self.database will not be set.
@dparent1 Hey, thanks a lot a for this PR. Do you think it makes sense to add support for setting the spark catalog(#294 (comment)) too? I'm not sure how other people are using iceberg, but I have the impression that having a separate catalog for iceberg tables is pretty common. The patch is basically removing all the checks around setting the database. I ran some tests setting the database both on profile and model levels and things seems to work as expected. One thing to note is that "database" name may be confusing in spark, so maybe adding a "catalog" alias may make sense. CC: @kbendick diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py
index 951e8ed..fa011c4 100644
--- a/dbt/adapters/spark/connections.py
+++ b/dbt/adapters/spark/connections.py
@@ -59,7 +59,7 @@ class SparkConnectionMethod(StrEnum):
class SparkCredentials(Credentials):
host: str
method: SparkConnectionMethod
- database: Optional[str] # type: ignore
+ database: Optional[str]
driver: Optional[str] = None
cluster: Optional[str] = None
endpoint: Optional[str] = None
@@ -87,16 +87,6 @@ class SparkCredentials(Credentials):
return self.cluster
def __post_init__(self):
- # spark classifies database and schema as the same thing
- if self.database is not None and self.database != self.schema:
- raise dbt.exceptions.RuntimeException(
- f" schema: {self.schema} \n"
- f" database: {self.database} \n"
- f"On Spark, database must be omitted or have the same value as"
- f" schema."
- )
- self.database = None
-
if self.method == SparkConnectionMethod.ODBC:
try:
import pyodbc # noqa: F401
@@ -147,7 +137,7 @@ class SparkCredentials(Credentials):
return self.host
def _connection_keys(self):
- return ("host", "port", "cluster", "endpoint", "schema", "organization")
+ return ("host", "port", "cluster", "endpoint", "schema", "organization", "database")
class PyhiveConnectionWrapper(object):
diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py
index f1f1c7d..c58262b 100644
--- a/dbt/adapters/spark/relation.py
+++ b/dbt/adapters/spark/relation.py
@@ -21,7 +21,7 @@ class SparkQuotePolicy(Policy):
@dataclass
class SparkIncludePolicy(Policy):
- database: bool = False
+ database: bool = True
schema: bool = True
identifier: bool = True
@@ -39,10 +39,6 @@ class SparkRelation(BaseRelation):
source_meta: Optional[Dict[str, Any]] = None
meta: Optional[Dict[str, Any]] = None
- def __post_init__(self):
- if self.database != self.schema and self.database:
- raise RuntimeException("Cannot set database in spark!")
-
@classmethod
def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
@@ -65,9 +61,4 @@ class SparkRelation(BaseRelation):
)
def render(self):
- if self.include_policy.database and self.include_policy.schema:
- raise RuntimeException(
- "Got a spark relation with schema and database set to "
- "include, but only one can be set"
- )
return super().render()
diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql
index 6abfc36..aea7f51 100644
--- a/dbt/include/spark/macros/adapters.sql
+++ b/dbt/include/spark/macros/adapters.sql
@@ -266,9 +266,14 @@
{%- endcall %}
{% endmacro %}
-
{% macro spark__generate_database_name(custom_database_name=none, node=none) -%}
- {% do return(None) %}
+ {%- if custom_database_name is none -%}
+ {% do return(none) if not target.database -%}
+
+ {{ target.database }}
+ {%- else -%}
+ {{ custom_database_name | trim }}
+ {%- endif -%}
{%- endmacro %}
{% macro spark__persist_docs(relation, model, for_relation, for_columns) -%}
|
@igorcalabria I've just merged in the latest changes from dbt-spark/main and if everything passes from the tests that it will kick off I'll give your changes a try. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dparent1 for working on this! I did some testing today and think this looks great. I fixed the issue reported by @brandys11 in a PR against your branch: dparent1#9
@jtcohen6 Long time no see :) Hope everything is well! I was wondering if this is something that we can get in
Fix incremental runs
@ChenyuLInx Would you have any time to look at this PR? Having Iceberg support in dbt-cloud would help us a lot! |
Hey all, We have this PR planned for review in our next sprint, starting mid next week. If I recap what I'm seeing, in here we add the following:
Let me know if I get that wrong ;) |
@Fleid, it is going to take a while to get |
Cleanup based on comments
@mikealfare: @Fokko was nice enough to make the changes and send me a PR. I've tried it with the latest merge of main that I've done and all the tox tests are passing in my dev environment. I ran jaffle_shop as well against a spark-iceberg docker container I have setup and everything seems to still work for me. |
I presume (hope) the basic image form their site? That'd be really good news. Context here, I spent time before our holiday break digging into the logic, systems, and implementation here--a lot of moving parts I had to get up to speed with. I must test a few things to confirm with the engineering team in-house that this is indeed looking good to ship. 🙏 Could you do me a huge favor and share what the proper |
It's actually a very simple profiles.yml that I use and hopefully it helps you out here:
I am using the Spark + Iceberg Quickstart Image but I modified the docker-compose.yml so that I could mount my dbt-spark folder into the container. I also mount a folder that includes the profiles.yml file that I use as well, just a shortcut so I can test out my changes quickly. I mount the folders under /root since you are the root user when you
|
Thanks so much for recommending the session strategy. That was absolutely the ticket -- of course the simple solution was the solution all along. I was in the wrong setup method (spark has so many ways of doing this my goodness). I'm on the case and prioritizing reviewing this with Florian and the team this week |
That's awesome @VersusFacit! I was just doing a write-up on yet another way to connect, in case it might be helpful: Setup the local docker-spark-iceberg setup:
It is helpful to keep this open in a tab just in case something happens during development, you can see the stack traces. Next, I'll fire up a browser and make sure that the database %%sql
CREATE DATABASE default; Make sure that you have your jaffle_shop:
target: dev
outputs:
dev:
type: spark
method: thrift
schema: default
host: localhost
port: 10000 Next we can setup dbt-spark. Checkout the branch that you'd like to test:
Run the Jaffle shop:
And the actual run:
I had to change everything to a table, because Iceberg does not yet supports views:
Everything in the catalog is being forced to be of the format Iceberg by the test setup (Spark configuration). In normal circumstances, you can just mix Iceberg tables with non-Iceberg views. And you can see the result ✨ We can also see that the |
Thanks to you too Fokko. I was attempting to use Thrift with little success, so I'll take a look at these instructions and see if I can get both to work (more coverage is better!). Moreover, these notes you've left should go into our docs to help future users along. Great additions. I''ll move onto reviewing this immediately! 🖖 |
@VersusFacit How's the review going, anything I can help with? |
@Fokko I've got some questions that would be benefit from a faster chat cycle and messaged you on dbt slack. Mind following up there for now? |
@VersusFacit Of course, I've pinged you on Slack. There are many Fokko Driesprongs on Slack that are no longer active (old email addresses). Sorry for the confusion there |
- Fixing up merge conflicts around Exception/Error types - Putting back accidental newline removal in tox.ini
- in impl.py changed: List[agate.Row] to AttrDict This fixed the pre-commit error after merging in changes from main.
I noticed that two Spark tests are failing, so tried to revert some of the changes that seemed related. They are now passing and also my Iceberg dbt project is still running fine
…revert-some-changes
Revert some stuff
Hello @dparent1! Thanks for your patience. Testing this thing has proven to be one of the trickier tasks of my career thus far. @Fokko provided critical support in revising the docker image into a stable enough test env despite me finding new and interesting ways to break the sandbox via Without further pause, I have this PR to offer
We need to do some testing before we merge with confidence:
We're officially in the home stretch 🏁 Let me know how you plan to proceed with local verification. <3 |
Hi @VersusFacit - I'm ok with your PR going in instead of mine, either way the change makes it in and that's my only goal :). I can close this PR if you're ok with that. I'll pull your PR and try my local testing, if @Fokko hasn't had issues that usually means I'm good too. I'll verify and let you know. Quick update to make sure there is no confusion, I've tested the new PR and it works fine for my testcase. There's a lot going on in this particular PR so I'd like to leave it with some closure. |
Cool, it works well for incremental config(append/merge) with thrift-service (version 1.5.0-b2). |
resolves #294
Adding in additional support for iceberg v2 tables. Spark throws an exception when running SHOW TABLE EXTENDED with v2 iceberg tables (See https://issues.apache.org/jira/browse/SPARK-33393). The workaround I've come up with will perform a SHOW TABLES when this exception occurs, if this succeeds then information is pulled from a DESCRIBE EXTENDED {{ table_name }} to get extended properties of the table. This returns the Provider information needed to identify that the provider is actually iceberg. Using this information I can set an is_iceberg=True on the relation. In places where interacting differs for iceberg tables I am able to check is_iceberg and follow slightly different logic.
An addition for multiple join conditions to allow for multiple columns to make a row distinct has been added.
Description
Checklist
changie new
to create a changelog entry