diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 75195f9a3..2ad42dc0b 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum): CTE = "cte" External = "external" DynamicTable = "dynamic_table" + ExternalTable = "external_table" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql new file mode 100644 index 000000000..02fce568a --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -0,0 +1,77 @@ +{% macro snowflake__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + {%- set partitions = external.partitions -%} + {%- set infer_schema = external.infer_schema -%} + + {% if infer_schema %} + {% set query_infer_schema %} + select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) + {% endset %} + {% if execute %} + {% set columns_infer = run_query(query_infer_schema) %} + {% endif %} + {% endif %} + + {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + +{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} +{# This assumes you have already created an external stage #} + create or replace external table {{source(source_node.source_name, source_node.name)}} + {%- if columns or partitions or infer_schema -%} + ( + {%- if partitions -%}{%- for partition in partitions %} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} + {%- endfor -%}{%- endif -%} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set column_alias -%} + {%- if 'alias' in column and column.quote -%} + {{adapter.quote(column.alias)}} + {%- elif 'alias' in column -%} + {{column.alias}} + {%- else -%} + {{column_quoted}} + {%- endif -%} + {%- endset %} + {%- set col_expression -%} + {%- if column.expression -%} + {{column.expression}} + {%- else -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endif -%} + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} + ) + {%- endif -%} + {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} + location = {{external.location}} {# stage #} + {% if external.auto_refresh in (true, false) -%} + auto_refresh = {{external.auto_refresh}} + {%- endif %} + {% if external.aws_sns_topic -%} + aws_sns_topic = '{{external.aws_sns_topic}}' + {%- endif %} + {% if external.table_format | lower == "delta" %} + refresh_on_create = false + {% endif %} + {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} + {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} + file_format = {{external.file_format}} + {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql new file mode 100644 index 000000000..257f35b54 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -0,0 +1,125 @@ +{% macro snowflake__create_external_schema(source_node) %} + + {% set schema_exists_query %} + show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1; + {% endset %} + {% if execute %} + {% set schema_exists = run_query(schema_exists_query)|length > 0 %} + {% else %} + {% set schema_exists = false %} + {% endif %} + + {% if schema_exists %} + {% set ddl %} + select 'Schema {{ source_node.schema }} exists' from dual; + {% endset %} + {% else %} + {% set fqn %} + {% if source_node.database %} + {{ source_node.database }}.{{ source_node.schema }} + {% else %} + {{ source_node.schema }} + {% endif %} + {% endset %} + + {% set ddl %} + create schema if not exists {{ fqn }}; + {% endset %} + {% endif %} + + {% do return(ddl) %} + +{% endmacro %} + +{% macro snowflake__refresh_external_table(source_node) %} + + {% set external = source_node.external %} + {% set snowpipe = source_node.external.get('snowpipe', none) %} + + {% set auto_refresh = external.get('auto_refresh', false) %} + {% set partitions = external.get('partitions', none) %} + {% set delta_format = (external.table_format | lower == "delta") %} + + {% set manual_refresh = not auto_refresh %} + + {% if manual_refresh %} + + {% set ddl %} + begin; + alter external table {{source(source_node.source_name, source_node.name)}} refresh; + commit; + {% endset %} + + {% do return([ddl]) %} + + {% else %} + + {% do return([]) %} + + {% endif %} + +{% endmacro %} + +{% macro is_csv(file_format) %} + +{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: + +Important: The external table does not inherit the file format, if any, in the +stage definition. You must explicitly specify any file format options for the +external table using the FILE_FORMAT parameter. + +Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior, +you should only specify one or the other when creating an external table. + +#} + + {% set ff_ltrimmed = file_format|lower|replace(' ','') %} + + {% if 'type=' in ff_ltrimmed %} + + {% if 'type=csv' in ff_ltrimmed %} + + {{return(true)}} + + {% else %} + + {{return(false)}} + + {% endif %} + + {% else %} + + {% set ff_standardized = ff_ltrimmed + | replace('(','') | replace(')','') + | replace('format_name=','') %} + {% set fqn = ff_standardized.split('.') %} + + {% if fqn | length == 3 %} + {% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} + {% elif fqn | length == 2 %} + {% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} + {% else %} + {% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} + {% endif %} + + {% call statement('get_file_format', fetch_result = True) %} + show file formats in {{ff_database}}.{{ff_schema}} + {% endcall %} + + {% set ffs = load_result('get_file_format').table %} + + {% for ff in ffs %} + + {% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %} + + {{return(true)}} + + {% endif %} + + {% endfor %} + + {{return(false)}} + + {% endif %} + +{% endmacro %}