diff --git a/website/docs/reference/resource-configs/databricks-configs.md b/website/docs/reference/resource-configs/databricks-configs.md index 1a7bd42b848..138d9a746bd 100644 --- a/website/docs/reference/resource-configs/databricks-configs.md +++ b/website/docs/reference/resource-configs/databricks-configs.md @@ -26,7 +26,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this - + 1.8 introduces support for [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) at the table level, in addition to all table configuration supported in 1.7. @@ -35,7 +35,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this | file_format | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` | | location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` | | partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` | -| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL | `date_day` | +| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` | | clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` | | buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` | | tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` | @@ -51,6 +51,29 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this +dbt Core v.9 and Versionless dbt Clouyd support for `table_format: iceberg`, in addition to all previous table configurations supported in 1.8. + +| Option | Description | Required? | Model Support | Example | +|---------------------|-----------------------------|-------------------------------------------|-----------------|--------------------------| +| table_format | Whether or not to provision [Iceberg](https://docs.databricks.com/en/delta/uniform.html) compatibility for the materialization | Optional | SQL, Python | `iceberg` | +| file_format+ | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` | +| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` | +| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` | +| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` | +| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` | +| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` | +| tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` | +| databricks_tags | [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) to be set on the created table | Optional | SQL++, Python++ | `{'my_tag': 'my_value'}` | +| compression | Set the compression algorithm. | Optional | SQL, Python | `zstd` | + +\* We do not yet have a PySpark API to set tblproperties at table creation, so this feature is primarily to allow users to anotate their python-derived tables with tblproperties. +\+ When `table_format` is `iceberg`, `file_format` must be `delta`. +\++ `databricks_tags` are currently only supported at the table level, and applied via `ALTER` statements. + + + + + ### Python submission methods In dbt v1.9 and higher, or in [Versionless](/docs/dbt-versions/versionless-cloud) dbt Cloud, you can use these four options for `submission_method`: @@ -150,6 +173,7 @@ models: + ## Incremental models dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of four values: @@ -160,6 +184,23 @@ dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block. + + + + +## Incremental models + +dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of five values: + - **`append`**: Insert new records without updating or overwriting any existing data. + - **`insert_overwrite`**: If `partition_by` is specified, overwrite partitions in the with new data. If no `partition_by` is specified, overwrite the entire table with new data. + - **`merge`** (default; Delta and Hudi file format only): Match records based on a `unique_key`, updating old records, and inserting new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) + - **`replace_where`** (Delta file format only): Match records based on `incremental_predicates`, replacing all records that match the predicates from the existing table with records matching the predicates from the new data. (If no `incremental_predicates` are specified, all new data is inserted, similar to `append`.) + - **`microbatch`** (Delta file format only): Implements the [microbatch strategy](/docs/build/incremental-microbatch) using `replace_where` with predicates generated based `event_time`. + +Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block. + + + ### The `append` strategy Following the `append` strategy, dbt will perform an `insert into` statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy _cannot_ update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources. @@ -306,7 +347,7 @@ The `merge` incremental strategy requires: - Databricks Runtime 5.1 and above for delta file format - Apache Spark for hudi file format -dbt will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy). +The Databricks adapter will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) similar to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy). Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified. @@ -387,6 +428,123 @@ merge into analytics.merge_incremental as DBT_INTERNAL_DEST + + +Beginning with 1.9, `merge` behavior can be modified with the following additional configuration options: + +- `target_alias`, `source_alias`: Aliases for the target and source to allow you to describe your merge conditions more naturally. These default to `tgt` and `src`, respectively. +- `skip_matched_step`: If set to `true`, the 'matched' clause of the merge statement will not be included. +- `skip_not_matched_step`: If set to `true`, the 'not matched' clause will not be included. +- `matched_condition`: Condition to apply to the `WHEN MATCHED` clause. You should use the `target_alias` and `source_alias` to write a conditional expression, such as `tgt.col1 = hash(src.col2, src.col3)`. This condition further restricts the matched set of rows. +- `not_matched_condition`: Condition to apply to the `WHEN NOT MATCHED [BY TARGET]` clause. This condition further restricts the set of rows in the target that do not match the source that will be inserted into the merged table. +- `not_matched_by_source_condition`: Condition to apply to the further filter `WHEN NOT MATCHED BY SOURCE` clause. Only used in conjunction with `not_matched_by_source_action: delete`. +- `not_matched_by_source_action`: If set to `delete`, a `DELETE` clause is added to the merge statement for `WHEN NOT MATCHED BY SOURCE`. +- `merge_with_schema_evolution`: If set to `true`, the merge statement includes the `WITH SCHEMA EVOLUTION` clause. + +For more details on the meaning of each merge clause, please see [the Databricks documentation](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html). + +The following is an example demonstrating the use of these new options: + + + + + + +```sql +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy='merge', + target_alias='t', + source_alias='s', + matched_condition='t.tech_change_ts < s.tech_change_ts', + not_matched_condition='s.attr1 IS NOT NULL', + not_matched_by_source_condition='t.tech_change_ts < current_timestamp()', + not_matched_by_source_action='delete', + merge_with_schema_evolution=true +) }} + +select + id, + attr1, + attr2, + tech_change_ts +from + {{ ref('source_table') }} as s +``` + + + + + + + +```sql +create temporary view merge_incremental__dbt_tmp as + + select + id, + attr1, + attr2, + tech_change_ts + from upstream.source_table +; + +merge + with schema evolution +into + target_table as t +using ( + select + id, + attr1, + attr2, + tech_change_ts + from + source_table as s +) +on + t.id <=> s.id +when matched + and t.tech_change_ts < s.tech_change_ts + then update set + id = s.id, + attr1 = s.attr1, + attr2 = s.attr2, + tech_change_ts = s.tech_change_ts + +when not matched + and s.attr1 IS NOT NULL + then insert ( + id, + attr1, + attr2, + tech_change_ts + ) values ( + s.id, + s.attr1, + s.attr2, + s.tech_change_ts + ) + +when not matched by source + and t.tech_change_ts < current_timestamp() + then delete +``` + + + + + + + + ### The `replace_where` strategy The `replace_where` incremental strategy requires: @@ -476,6 +634,84 @@ insert into analytics.replace_where_incremental + + +### The `microbatch` strategy + +The Databricks adapter implements the `microbatch` strategy using `replace_where`. Note the requirements and caution statements for `replace_where` above. For more information about this strategy, see the [microbatch reference page](/docs/build/incremental-microbatch). + +In the following example, the upstream table `events` have been annotated with an `event_time` column called `ts` in its schema file. + + + + + + +```sql +{{ config( + materialized='incremental', + file_format='delta', + incremental_strategy = 'microbatch' + event_time='date' # Use 'date' as the grain for this microbatch table +) }} + +with new_events as ( + + select * from {{ ref('events') }} + +) + +select + user_id, + date, + count(*) as visits + +from events +group by 1, 2 +``` + + + + + + + +```sql +create temporary view replace_where__dbt_tmp as + + with new_events as ( + + select * from (select * from analytics.events where ts >= '2024-10-01' and ts < '2024-10-02') + + ) + + select + user_id, + date, + count(*) as visits + from events + group by 1, 2 +; + +insert into analytics.replace_where_incremental + replace where CAST(date as TIMESTAMP) >= '2024-10-01' and CAST(date as TIMESTAMP) < '2024-10-02' + table `replace_where__dbt_tmp` +``` + + + + + + + + + ## Selecting compute per model Beginning in version 1.7.2, you can assign which compute resource to use on a per-model basis.