Skip to content

Commit

Permalink
Update databricks-configs.md for remaining 1.9 changes (#6379)
Browse files Browse the repository at this point in the history
## What are you changing in this pull request and why?
<!--
Describe your changes and why you're making them. If related to an open
issue or a pull request on dbt Core or another repository, then link to
them here!

To learn more about the writing conventions used in the dbt Labs docs,
see the [Content style
guide](https://github.com/dbt-labs/docs.getdbt.com/blob/current/contributing/content-style-guide.md).
-->
Updates documentation for the remaining 1.9 changes, including: iceberg
support, expanded merge configurability, and microbatch

## Checklist
- [x] I have reviewed the [Content style
guide](https://github.com/dbt-labs/docs.getdbt.com/blob/current/contributing/content-style-guide.md)
so my content adheres to these guidelines.
- [x] The topic I'm writing about is for specific dbt version(s) and I
have versioned it according to the [version a whole
page](https://github.com/dbt-labs/docs.getdbt.com/blob/current/contributing/single-sourcing-content.md#adding-a-new-version)
and/or [version a block of
content](https://github.com/dbt-labs/docs.getdbt.com/blob/current/contributing/single-sourcing-content.md#versioning-blocks-of-content)
guidelines.
- [x] I have added checklist item(s) to this list for anything anything
that needs to happen before this PR is merged, such as "needs technical
review" or "change base branch."

---------

Co-authored-by: Matt Shaver <[email protected]>
  • Loading branch information
benc-db and matthewshaver authored Oct 30, 2024
1 parent b1f9c68 commit ccceae4
Showing 1 changed file with 239 additions and 3 deletions.
242 changes: 239 additions & 3 deletions website/docs/reference/resource-configs/databricks-configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this

</VersionBlock>

<VersionBlock firstVersion="1.8">
<VersionBlock firstVersion="1.8" lastVersion="1.8">

1.8 introduces support for [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) at the table level, in addition to all table configuration supported in 1.7.

Expand All @@ -35,7 +35,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this
| file_format | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` |
| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` |
| tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` |
Expand All @@ -51,6 +51,29 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this

<VersionBlock firstVersion="1.9">

dbt Core v.9 and Versionless dbt Clouyd support for `table_format: iceberg`, in addition to all previous table configurations supported in 1.8.

| Option | Description | Required? | Model Support | Example |
|---------------------|-----------------------------|-------------------------------------------|-----------------|--------------------------|
| table_format | Whether or not to provision [Iceberg](https://docs.databricks.com/en/delta/uniform.html) compatibility for the materialization | Optional | SQL, Python | `iceberg` |
| file_format+ | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` |
| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` |
| tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` |
| databricks_tags | [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) to be set on the created table | Optional | SQL++, Python++ | `{'my_tag': 'my_value'}` |
| compression | Set the compression algorithm. | Optional | SQL, Python | `zstd` |

\* We do not yet have a PySpark API to set tblproperties at table creation, so this feature is primarily to allow users to anotate their python-derived tables with tblproperties.
\+ When `table_format` is `iceberg`, `file_format` must be `delta`.
\++ `databricks_tags` are currently only supported at the table level, and applied via `ALTER` statements.

</VersionBlock>

<VersionBlock firstVersion="1.9">

### Python submission methods

In dbt v1.9 and higher, or in [Versionless](/docs/dbt-versions/versionless-cloud) dbt Cloud, you can use these four options for `submission_method`:
Expand Down Expand Up @@ -150,6 +173,7 @@ models:
</VersionBlock>
<VersionBlock lastVersion="1.8">
## Incremental models
dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of four values:
Expand All @@ -160,6 +184,23 @@ dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block.

</VersionBlock>

<VersionBlock firstVersion="1.9">

## Incremental models

dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of five values:
- **`append`**: Insert new records without updating or overwriting any existing data.
- **`insert_overwrite`**: If `partition_by` is specified, overwrite partitions in the <Term id="table" /> with new data. If no `partition_by` is specified, overwrite the entire table with new data.
- **`merge`** (default; Delta and Hudi file format only): Match records based on a `unique_key`, updating old records, and inserting new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.)
- **`replace_where`** (Delta file format only): Match records based on `incremental_predicates`, replacing all records that match the predicates from the existing table with records matching the predicates from the new data. (If no `incremental_predicates` are specified, all new data is inserted, similar to `append`.)
- **`microbatch`** (Delta file format only): Implements the [microbatch strategy](/docs/build/incremental-microbatch) using `replace_where` with predicates generated based `event_time`.

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block.

</VersionBlock>

### The `append` strategy

Following the `append` strategy, dbt will perform an `insert into` statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy _cannot_ update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.
Expand Down Expand Up @@ -306,7 +347,7 @@ The `merge` incremental strategy requires:
- Databricks Runtime 5.1 and above for delta file format
- Apache Spark for hudi file format

dbt will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).
The Databricks adapter will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) similar to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).

Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified.

Expand Down Expand Up @@ -387,6 +428,123 @@ merge into analytics.merge_incremental as DBT_INTERNAL_DEST
</TabItem>
</Tabs>

<VersionBlock firstVersion="1.9">

Beginning with 1.9, `merge` behavior can be modified with the following additional configuration options:

- `target_alias`, `source_alias`: Aliases for the target and source to allow you to describe your merge conditions more naturally. These default to `tgt` and `src`, respectively.
- `skip_matched_step`: If set to `true`, the 'matched' clause of the merge statement will not be included.
- `skip_not_matched_step`: If set to `true`, the 'not matched' clause will not be included.
- `matched_condition`: Condition to apply to the `WHEN MATCHED` clause. You should use the `target_alias` and `source_alias` to write a conditional expression, such as `tgt.col1 = hash(src.col2, src.col3)`. This condition further restricts the matched set of rows.
- `not_matched_condition`: Condition to apply to the `WHEN NOT MATCHED [BY TARGET]` clause. This condition further restricts the set of rows in the target that do not match the source that will be inserted into the merged table.
- `not_matched_by_source_condition`: Condition to apply to the further filter `WHEN NOT MATCHED BY SOURCE` clause. Only used in conjunction with `not_matched_by_source_action: delete`.
- `not_matched_by_source_action`: If set to `delete`, a `DELETE` clause is added to the merge statement for `WHEN NOT MATCHED BY SOURCE`.
- `merge_with_schema_evolution`: If set to `true`, the merge statement includes the `WITH SCHEMA EVOLUTION` clause.

For more details on the meaning of each merge clause, please see [the Databricks documentation](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html).

The following is an example demonstrating the use of these new options:

<Tabs
defaultValue="source"
values={[
{ label: 'Source code', value: 'source', },
{ label: 'Run code', value: 'run', },
]
}>
<TabItem value="source">

<File name='merge_incremental_options.sql'>

```sql
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
merge_with_schema_evolution=true
) }}

select
id,
attr1,
attr2,
tech_change_ts
from
{{ ref('source_table') }} as s
```

</File>
</TabItem>
<TabItem value="run">

<File name='target/run/merge_incremental_options.sql'>

```sql
create temporary view merge_incremental__dbt_tmp as

select
id,
attr1,
attr2,
tech_change_ts
from upstream.source_table
;

merge
with schema evolution
into
target_table as t
using (
select
id,
attr1,
attr2,
tech_change_ts
from
source_table as s
)
on
t.id <=> s.id
when matched
and t.tech_change_ts < s.tech_change_ts
then update set
id = s.id,
attr1 = s.attr1,
attr2 = s.attr2,
tech_change_ts = s.tech_change_ts

when not matched
and s.attr1 IS NOT NULL
then insert (
id,
attr1,
attr2,
tech_change_ts
) values (
s.id,
s.attr1,
s.attr2,
s.tech_change_ts
)

when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
```

</File>

</TabItem>
</Tabs>

</VersionBlock>

### The `replace_where` strategy

The `replace_where` incremental strategy requires:
Expand Down Expand Up @@ -476,6 +634,84 @@ insert into analytics.replace_where_incremental
</TabItem>
</Tabs>

<VersionBlock firstVersion="1.9">

### The `microbatch` strategy

The Databricks adapter implements the `microbatch` strategy using `replace_where`. Note the requirements and caution statements for `replace_where` above. For more information about this strategy, see the [microbatch reference page](/docs/build/incremental-microbatch).

In the following example, the upstream table `events` have been annotated with an `event_time` column called `ts` in its schema file.

<Tabs
defaultValue="source"
values={[
{ label: 'Source code', value: 'source', },
{ label: 'Run code', value: 'run', },
]
}>
<TabItem value="source">

<File name='microbatch_incremental.sql'>

```sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'microbatch'
event_time='date' # Use 'date' as the grain for this microbatch table
) }}

with new_events as (

select * from {{ ref('events') }}

)

select
user_id,
date,
count(*) as visits

from events
group by 1, 2
```

</File>
</TabItem>
<TabItem value="run">

<File name='target/run/replace_where_incremental.sql'>

```sql
create temporary view replace_where__dbt_tmp as

with new_events as (

select * from (select * from analytics.events where ts >= '2024-10-01' and ts < '2024-10-02')

)

select
user_id,
date,
count(*) as visits
from events
group by 1, 2
;

insert into analytics.replace_where_incremental
replace where CAST(date as TIMESTAMP) >= '2024-10-01' and CAST(date as TIMESTAMP) < '2024-10-02'
table `replace_where__dbt_tmp`
```

</File>

</TabItem>
</Tabs>

</VersionBlock>


## Selecting compute per model

Beginning in version 1.7.2, you can assign which compute resource to use on a per-model basis.
Expand Down

0 comments on commit ccceae4

Please sign in to comment.