-
Notifications
You must be signed in to change notification settings - Fork 181
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
232 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
dbt/include/snowflake/macros/relations/external_table/external_table.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
{% macro snowflake__create_external_table(source_node) %} | ||
|
||
{%- set columns = source_node.columns.values() -%} | ||
{%- set external = source_node.external -%} | ||
{%- set partitions = external.partitions -%} | ||
{%- set infer_schema = external.infer_schema -%} | ||
|
||
{% if infer_schema %} | ||
{% set query_infer_schema %} | ||
select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) | ||
{% endset %} | ||
{% if execute %} | ||
{% set columns_infer = run_query(query_infer_schema) %} | ||
{% endif %} | ||
{% endif %} | ||
|
||
{%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} | ||
|
||
{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} | ||
{# This assumes you have already created an external stage #} | ||
create or replace external table {{source(source_node.source_name, source_node.name)}} | ||
{%- if columns or partitions or infer_schema -%} | ||
( | ||
{%- if partitions -%}{%- for partition in partitions %} | ||
{{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} | ||
{%- endfor -%}{%- endif -%} | ||
{%- if not infer_schema -%} | ||
{%- for column in columns %} | ||
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} | ||
{%- set column_alias -%} | ||
{%- if 'alias' in column and column.quote -%} | ||
{{adapter.quote(column.alias)}} | ||
{%- elif 'alias' in column -%} | ||
{{column.alias}} | ||
{%- else -%} | ||
{{column_quoted}} | ||
{%- endif -%} | ||
{%- endset %} | ||
{%- set col_expression -%} | ||
{%- if column.expression -%} | ||
{{column.expression}} | ||
{%- else -%} | ||
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} | ||
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) | ||
{%- endif -%} | ||
{%- endset %} | ||
{{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) | ||
{{- ',' if not loop.last -}} | ||
{% endfor %} | ||
{% else %} | ||
{%- for column in columns_infer %} | ||
{%- set col_expression -%} | ||
{%- set col_id = 'value:' ~ column[0] -%} | ||
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) | ||
{%- endset %} | ||
{{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) | ||
{{- ',' if not loop.last -}} | ||
{% endfor %} | ||
{%- endif -%} | ||
) | ||
{%- endif -%} | ||
{% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} | ||
location = {{external.location}} {# stage #} | ||
{% if external.auto_refresh in (true, false) -%} | ||
auto_refresh = {{external.auto_refresh}} | ||
{%- endif %} | ||
{% if external.aws_sns_topic -%} | ||
aws_sns_topic = '{{external.aws_sns_topic}}' | ||
{%- endif %} | ||
{% if external.table_format | lower == "delta" %} | ||
refresh_on_create = false | ||
{% endif %} | ||
{% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} | ||
{% if external.integration -%} integration = '{{external.integration}}' {%- endif %} | ||
file_format = {{external.file_format}} | ||
{% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} | ||
{% endmacro %} | ||
|
||
{% macro snowflake__refresh_external_table(source_node) %} | ||
|
||
{% set external = source_node.external %} | ||
{% set snowpipe = source_node.external.get('snowpipe', none) %} | ||
|
||
{% set auto_refresh = external.get('auto_refresh', false) %} | ||
{% set partitions = external.get('partitions', none) %} | ||
{% set delta_format = (external.table_format | lower == "delta") %} | ||
|
||
{% set manual_refresh = not auto_refresh %} | ||
|
||
{% if manual_refresh %} | ||
|
||
{% set ddl %} | ||
begin; | ||
alter external table {{source(source_node.source_name, source_node.name)}} refresh; | ||
commit; | ||
{% endset %} | ||
|
||
{% do return([ddl]) %} | ||
|
||
{% else %} | ||
|
||
{% do return([]) %} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} |
125 changes: 125 additions & 0 deletions
125
dbt/include/snowflake/macros/relations/external_table/helpers.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
{% macro snowflake__create_external_schema(source_node) %} | ||
|
||
{% set schema_exists_query %} | ||
show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1; | ||
{% endset %} | ||
{% if execute %} | ||
{% set schema_exists = run_query(schema_exists_query)|length > 0 %} | ||
{% else %} | ||
{% set schema_exists = false %} | ||
{% endif %} | ||
|
||
{% if schema_exists %} | ||
{% set ddl %} | ||
select 'Schema {{ source_node.schema }} exists' from dual; | ||
{% endset %} | ||
{% else %} | ||
{% set fqn %} | ||
{% if source_node.database %} | ||
{{ source_node.database }}.{{ source_node.schema }} | ||
{% else %} | ||
{{ source_node.schema }} | ||
{% endif %} | ||
{% endset %} | ||
|
||
{% set ddl %} | ||
create schema if not exists {{ fqn }}; | ||
{% endset %} | ||
{% endif %} | ||
|
||
{% do return(ddl) %} | ||
|
||
{% endmacro %} | ||
|
||
{% macro snowflake__refresh_external_table(source_node) %} | ||
|
||
{% set external = source_node.external %} | ||
{% set snowpipe = source_node.external.get('snowpipe', none) %} | ||
|
||
{% set auto_refresh = external.get('auto_refresh', false) %} | ||
{% set partitions = external.get('partitions', none) %} | ||
{% set delta_format = (external.table_format | lower == "delta") %} | ||
|
||
{% set manual_refresh = not auto_refresh %} | ||
|
||
{% if manual_refresh %} | ||
|
||
{% set ddl %} | ||
begin; | ||
alter external table {{source(source_node.source_name, source_node.name)}} refresh; | ||
commit; | ||
{% endset %} | ||
|
||
{% do return([ddl]) %} | ||
|
||
{% else %} | ||
|
||
{% do return([]) %} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} | ||
|
||
{% macro is_csv(file_format) %} | ||
|
||
{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: | ||
|
||
Important: The external table does not inherit the file format, if any, in the | ||
stage definition. You must explicitly specify any file format options for the | ||
external table using the FILE_FORMAT parameter. | ||
|
||
Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior, | ||
you should only specify one or the other when creating an external table. | ||
|
||
#} | ||
|
||
{% set ff_ltrimmed = file_format|lower|replace(' ','') %} | ||
|
||
{% if 'type=' in ff_ltrimmed %} | ||
|
||
{% if 'type=csv' in ff_ltrimmed %} | ||
|
||
{{return(true)}} | ||
|
||
{% else %} | ||
|
||
{{return(false)}} | ||
|
||
{% endif %} | ||
|
||
{% else %} | ||
|
||
{% set ff_standardized = ff_ltrimmed | ||
| replace('(','') | replace(')','') | ||
| replace('format_name=','') %} | ||
{% set fqn = ff_standardized.split('.') %} | ||
|
||
{% if fqn | length == 3 %} | ||
{% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} | ||
{% elif fqn | length == 2 %} | ||
{% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} | ||
{% else %} | ||
{% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} | ||
{% endif %} | ||
|
||
{% call statement('get_file_format', fetch_result = True) %} | ||
show file formats in {{ff_database}}.{{ff_schema}} | ||
{% endcall %} | ||
|
||
{% set ffs = load_result('get_file_format').table %} | ||
|
||
{% for ff in ffs %} | ||
|
||
{% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %} | ||
|
||
{{return(true)}} | ||
|
||
{% endif %} | ||
|
||
{% endfor %} | ||
|
||
{{return(false)}} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} |