Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update databricks-configs.md for remaining 1.9 changes #6379

Merged
merged 9 commits into from
Oct 30, 2024
241 changes: 238 additions & 3 deletions website/docs/reference/resource-configs/databricks-configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this

</VersionBlock>

<VersionBlock firstVersion="1.8">
<VersionBlock firstVersion="1.8" lastVersion="1.8">

1.8 introduces support for [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) at the table level, in addition to all table configuration supported in 1.7.

Expand All @@ -51,7 +51,7 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this
| file_format | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` |
| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` |
| tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` |
Expand All @@ -67,6 +67,29 @@ We do not yet have a PySpark API to set tblproperties at table creation, so this

<VersionBlock firstVersion="1.9">

1.9 introduces support for `table_format: iceberg`, in addition to all table configuration supported in 1.8.
matthewshaver marked this conversation as resolved.
Show resolved Hide resolved

| Option | Description | Required? | Model Support | Example |
|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------|-----------------|--------------------------|
| table_format | Whether or not to provision [Iceberg](https://docs.databricks.com/en/delta/uniform.html) compatibility for the materialization | Optional | SQL, Python | `iceberg` |
| file_format+ | The file format to use when creating tables (`parquet`, `delta`, `hudi`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | SQL, Python | `delta` |
| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | SQL, Python | `/mnt/root` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | SQL, Python | `date_day` |
| liquid_clustered_by | Cluster the created table by the specified columns. Clustering method is based on [Delta's Liquid Clustering feature](https://docs.databricks.com/en/delta/clustering.html). Available since dbt-databricks 1.6.2. | Optional | SQL, Python | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | SQL, Python | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | SQL, Python | `8` |
| tblproperties | [Tblproperties](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-tblproperties.html) to be set on the created table | Optional | SQL, Python* | `{'this.is.my.key': 12}` |
| databricks_tags | [Tags](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html) to be set on the created table | Optional | SQL++, Python++ | `{'my_tag': 'my_value'}` |
| compression | Set the compression algorithm. | Optional | SQL, Python | `zstd` |

\* We do not yet have a PySpark API to set tblproperties at table creation, so this feature is primarily to allow users to anotate their python-derived tables with tblproperties.
\+ When `table_format` is `iceberg`, `file_format` must be `delta`.
\++ `databricks_tags` are currently only supported at the table level, and applied via `ALTER` statements.

</VersionBlock>

<VersionBlock firstVersion="1.9">

### Python submission methods

In dbt v1.9 and higher, or in [Versionless](/docs/dbt-versions/versionless-cloud) dbt Cloud, you can use these four options for `submission_method`:
Expand Down Expand Up @@ -166,6 +189,7 @@ models:

</VersionBlock>

<VersionBlock lastVersion="1.8">
## Incremental models

dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of four values:
Expand All @@ -176,6 +200,22 @@ dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block.

</VersionBlock>

<VersionBlock firstVersion="1.9">
matthewshaver marked this conversation as resolved.
Show resolved Hide resolved
## Incremental models

dbt-databricks plugin leans heavily on the [`incremental_strategy` config](/docs/build/incremental-strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of five values:
- **`append`**: Insert new records without updating or overwriting any existing data.
- **`insert_overwrite`**: If `partition_by` is specified, overwrite partitions in the <Term id="table" /> with new data. If no `partition_by` is specified, overwrite the entire table with new data.
- **`merge`** (default; Delta and Hudi file format only): Match records based on a `unique_key`, updating old records, and inserting new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.)
- **`replace_where`** (Delta file format only): Match records based on `incremental_predicates`, replacing all records that match the predicates from the existing table with records matching the predicates from the new data. (If no `incremental_predicates` are specified, all new data is inserted, similar to `append`.)
- **`microbatch`** (Delta file format only): Implements the [microbatch strategy](/docs/build/incremental-microbatch) using `replace_where` with predicates generated based `event_time`.

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block.

</VersionBlock>

### The `append` strategy

Following the `append` strategy, dbt will perform an `insert into` statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy _cannot_ update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.
Expand Down Expand Up @@ -322,7 +362,7 @@ The `merge` incremental strategy requires:
- Databricks Runtime 5.1 and above for delta file format
- Apache Spark for hudi file format

dbt will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).
The Databricks adapter will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) similar to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).

Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified.

Expand Down Expand Up @@ -403,6 +443,122 @@ merge into analytics.merge_incremental as DBT_INTERNAL_DEST
</TabItem>
</Tabs>

<VersionBlock firstVersion="1.9">

Beginning with 1.9, `merge` behavior can be modified with the following additional configuration options:

* `target_alias`, `source_alias`: aliases for the target and source to allow you to describe your merge conditions more naturally. These default to `tgt` and `src` respectively.
* `skip_matched_step`: if set to `true`, the 'matched' clause of the merge statement will not be included.
* `skip_not_matched_step`: if set to `true` the 'not matched' clause will not be included.
* `matched_condition`: condition to apply to the `WHEN MATCHED` clause. You should use the `target_alias` and `source_alias` to write a conditional expression, such as `tgt.col1 = hash(src.col2, src.col3)`. This condition further restricts the matched set of rows.
* `not_matched_condition`: condition to apply to the `WHEN NOT MATCHED [BY TARGET]` clause. This condition further restricts the set of rows in the target that do not match the source that will be inserted into the merged table.
* `not_matched_by_source_condition`: condition to apply to the further filter `WHEN NOT MATCHED BY SOURCE` clause. Only used in conjunction with `not_matched_by_source_action: delete`.
* `not_matched_by_source_action`: if set to `delete` a `DELETE` clause is added to the merge statement for `WHEN NOT MATCHED BY SOURCE`.
* `merge_with_schema_evolution`: if set to `true`, the merge statement includes the `WITH SCHEMA EVOLUTION` clause.

For more details on the meaning of each merge clause, please see [the Databricks documentation](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html).
Here is an example demonstrating the use of these new options:

<Tabs
defaultValue="source"
values={[
{ label: 'Source code', value: 'source', },
{ label: 'Run code', value: 'run', },
]
}>
<TabItem value="source">

<File name='merge_incremental_options.sql'>

```sql
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
merge_with_schema_evolution=true
) }}

select
id,
attr1,
attr2,
tech_change_ts
from
{{ ref('source_table') }} as s
```

</File>
</TabItem>
<TabItem value="run">

<File name='target/run/merge_incremental_options.sql'>

```sql
create temporary view merge_incremental__dbt_tmp as

select
id,
attr1,
attr2,
tech_change_ts
from upstream.source_table
;

merge
with schema evolution
into
target_table as t
using (
select
id,
attr1,
attr2,
tech_change_ts
from
source_table as s
)
on
t.id <=> s.id
when matched
and t.tech_change_ts < s.tech_change_ts
then update set
id = s.id,
attr1 = s.attr1,
attr2 = s.attr2,
tech_change_ts = s.tech_change_ts

when not matched
and s.attr1 IS NOT NULL
then insert (
id,
attr1,
attr2,
tech_change_ts
) values (
s.id,
s.attr1,
s.attr2,
s.tech_change_ts
)

when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
```

</File>

</TabItem>
</Tabs>

</VersionBlock>

### The `replace_where` strategy

The `replace_where` incremental strategy requires:
Expand Down Expand Up @@ -492,6 +648,85 @@ insert into analytics.replace_where_incremental
</TabItem>
</Tabs>

<VersionBlock firstVersion="1.9">

### The `microbatch` strategy

The Databricks adapter implements the `microbatch` strategy using `replace_where`.
As such, please take note of the requirements and caution statements for `replace_where` above.
For general discussion of using this strategy, please see the [microbatch reference page](/docs/build/incremental-microbatch).

In the example below, the upstream table `events` has been annotated with an `event_time` column called `ts` in its schema file.

<Tabs
defaultValue="source"
values={[
{ label: 'Source code', value: 'source', },
{ label: 'Run code', value: 'run', },
]
}>
<TabItem value="source">

<File name='microbatch_incremental.sql'>

```sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'microbatch'
event_time='date' # Use 'date' as the grain for this microbatch table
) }}

with new_events as (

select * from {{ ref('events') }}

)

select
user_id,
date,
count(*) as visits

from events
group by 1, 2
```

</File>
</TabItem>
<TabItem value="run">

<File name='target/run/replace_where_incremental.sql'>

```sql
create temporary view replace_where__dbt_tmp as

with new_events as (

select * from (select * from analytics.events where ts >= '2024-10-01' and ts < '2024-10-02')

)

select
user_id,
date,
count(*) as visits
from events
group by 1, 2
;

insert into analytics.replace_where_incremental
replace where CAST(date as TIMESTAMP) >= '2024-10-01' and CAST(date as TIMESTAMP) < '2024-10-02'
table `replace_where__dbt_tmp`
```

</File>

</TabItem>
</Tabs>

</VersionBlock>

<VersionBlock firstVersion="1.7">
matthewshaver marked this conversation as resolved.
Show resolved Hide resolved

## Selecting compute per model
Expand Down
Loading