Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snowflake-specific implementation of min_or_max #32

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wylbee
Copy link

@wylbee wylbee commented Apr 7, 2023

Per #26, the generic implementation of the _min_or_max macro does not work correctly on Snowflake, with the root cause being dbt's safe_cast using a function in Snowflake with limitations that make it unsuitable for this purpose.

Given that this macro is intended to create the equivalent of Snowflake's min_by/max_by functions, the work of this PR is to:

  • Create a version of the macro that uses Snowflake's functions and achieves the same end.
  • Add conditional logic to use the Snowflake-specific macro if on Snowflake, but otherwise default to the current production version.

Test 1- Integration Suite

image

Test 2- Prod project

dbt_project.yml vars

  dbt_activity_schema:
      included_columns:
        - activity_id
        - customer
        - ts
        - activity
        - anonymous_customer_id
        - feature_json 
        - link
        - revenue_impact
        - activity_occurrence
        - activity_repeated_at

query

with

    stream_query as (
        {{
            dbt_activity_schema.dataset(
                ref("account_stream"),
                dbt_activity_schema.activity(
                    dbt_activity_schema.all_ever(), "originates_loan"
                ),
                [
                    dbt_activity_schema.activity(
                        dbt_activity_schema.last_before(), "updates_autopay"
                    )
                ],
            )
        }}

    ),

    final as (

        select * from stream_query

    )

select *
from final

compiled query


  
    

        create or replace transient table analytics.dev_wbrown.ee_loan_at_origination  as
        (with

    stream_query as (
        












with

filter_activity_stream_using_primary_activity as (
    select
        
        stream.activity_id,
        
        stream.customer,
        
        stream.ts,
        
        stream.activity,
        
        stream.anonymous_customer_id,
        
        stream.feature_json,
        
        stream.link,
        
        stream.revenue_impact,
        
        stream.activity_occurrence,
        
        stream.activity_repeated_at
        

    from analytics.dev_wbrown.account_stream as stream

    where stream.activity = 'originates_loan'
        and 
(true)

),




append_and_aggregate__1__last_before
 as (
    select

        -- Primary Activity Columns
        
        stream.activity_id,
        
        stream.customer,
        
        stream.ts,
        
        stream.activity,
        
        stream.anonymous_customer_id,
        
        stream.feature_json,
        
        stream.link,
        
        stream.revenue_impact,
        
        stream.activity_occurrence,
        
        stream.activity_repeated_at,
        

        
            
        max_by(
            appended.activity_id
            , appended.ts)
 as 
last_before_updates_autopay_activity_id
            ,
        
            
        max_by(
            appended.customer
            , appended.ts)
 as 
last_before_updates_autopay_customer
            ,
        
            
        max_by(
            appended.ts
            , appended.ts)
 as 
last_before_updates_autopay_ts
            ,
        
            
        max_by(
            appended.activity
            , appended.ts)
 as 
last_before_updates_autopay_activity
            ,
        
            
        max_by(
            appended.anonymous_customer_id
            , appended.ts)
 as 
last_before_updates_autopay_anonymous_customer_id
            ,
        
            
        max_by(
            appended.feature_json
            , appended.ts)
 as 
last_before_updates_autopay_feature_json
            ,
        
            
        max_by(
            appended.link
            , appended.ts)
 as 
last_before_updates_autopay_link
            ,
        
            
        max_by(
            appended.revenue_impact
            , appended.ts)
 as 
last_before_updates_autopay_revenue_impact
            ,
        
            
        max_by(
            appended.activity_occurrence
            , appended.ts)
 as 
last_before_updates_autopay_activity_occurrence
            ,
        
            
        max_by(
            appended.activity_repeated_at
            , appended.ts)
 as 
last_before_updates_autopay_activity_repeated_at
            
        

    from filter_activity_stream_using_primary_activity as stream

    left join analytics.dev_wbrown.account_stream as appended
        on (
            -- Join on Customer UUID Column
            appended.customer = stream.customer

            -- Join the Correct Activity
            and appended.activity = 'updates_autopay'

            -- Relationship Specific Join Conditions
            and (
            
            
            





(
    appended.ts <= coalesce(stream.ts, '1900-01-01'::timestamp)
)

            
            )
            -- Additional Join Condition
            and ( true )
        )

    group by
        
        stream.activity_id,
        
        stream.customer,
        
        stream.ts,
        
        stream.activity,
        
        stream.anonymous_customer_id,
        
        stream.feature_json,
        
        stream.link,
        
        stream.revenue_impact,
        
        stream.activity_occurrence,
        
        stream.activity_repeated_at
        
),



rejoin_aggregated_activities as (
    select

        
        stream.activity_id,
        
        stream.customer,
        
        stream.ts,
        
        stream.activity,
        
        stream.anonymous_customer_id,
        
        stream.feature_json,
        
        stream.link,
        
        stream.revenue_impact,
        
        stream.activity_occurrence,
        
        stream.activity_repeated_at,
        

        
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_id,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_customer,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_ts,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_anonymous_customer_id,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_feature_json,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_link,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_revenue_impact,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_occurrence,
            
        
append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_repeated_at
            
        

    from filter_activity_stream_using_primary_activity as stream

    

    left join 
append_and_aggregate__1__last_before

        on 
append_and_aggregate__1__last_before
.activity_id = stream.activity_id

    
)

select * from rejoin_aggregated_activities



    ),

    final as (

        select * from stream_query

    )

select *
from final
        );
      
  

image

@bcodell
Copy link
Collaborator

bcodell commented Apr 7, 2023

Hey @brown5628 - thanks for opening this! For sake of auditing, would you mind installing your fork in the Snowflake-based project you have and running a query that uses the dataset macro to validate that the query compiles and runs successfully, then pasting a screenshot of the dbt CLI logs or pasting the compiled query here for reference? Sorry for the tedious ask, but since the project's CI pipeline only runs on duckdb and this PR is a Snowflake-specific fix, it'd be nice to have evidence of the code change working as expected.

@wylbee wylbee marked this pull request as ready for review April 14, 2023 19:55
@wylbee
Copy link
Author

wylbee commented Apr 14, 2023

@bcodell Should be ready for review. Thanks for your patience on this one with the slow turnaround time. Quick callouts:

  • I hacked together getting the integration test suite running in Snowflake- pasted the screenshot above showing full passes.
  • I pasted the artifacts that you requested from my current implementation showing input --> successful output & the compiled SQL.

Two questions from me:

  1. Does the project have a style guide? belated realizing that my IDE reformatted everything using sqlfmt. Happy to turn that off and restate a cleaner diff if that is preferred, just let me know.
  2. Is there utility in maintaining a parallel set of integration tests for non-DuckDB connectors, even if they are not being run automatically? If so I can open a new PR with a cleaned-up version of what I did to get the tests running on Snowflake that can sit in a separate folder from the integration tests. Changes needed were:
    • In the models, change every instance of json_extract({{ dbt_activity_schema.primary() }}.feature_json, 'type') = json_extract({{ dbt_activity_schema.appended() }}.feature_json, 'type') to parse_json({{ dbt_activity_schema.primary() }}.feature_json):"type"= parse_json({{ dbt_activity_schema.appended() }}.feature_json):"type"
    • Point to the Snowflake profile
    • Remove the model configs in the dbt_project.yml

Copy link
Owner

@tnightengale tnightengale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for tackling this! I think we just need to make a small tweak to leverage dbt dispatch functionality. See the comment in the code.

Otherwise, if you wouldn't mind turning off sqlfrmt so the diff is cleaner!

100% we should implement a style guide in the near future. Until then, let's refrain from style changes outside the PR scope.

Awesome work though!

@@ -1,56 +1,65 @@
{% macro _min_or_max(min_or_max, qualified_col) %}

{% if target.type == "snowflake" %}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! But I think we ought to change this to use dbt dispatch.

I think we could just abstract this by doing {% set aggregation = get_db_aggregation() %}. And have that get_db_aggregation() macro contain default__ and snowflake__ dispatches.

Sound reasonable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, absolutely! Wasn't aware of that functionality but seems like a perfect fit here. Will make the changes & tee this up!

Copy link
Author

@wylbee wylbee Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tnightengale:

  • Restated the file without sqlfmt so the diff should be cleaner.
  • Struggling a bit with how to implement the dbt dispatch logic. I see how it can be used to replace the if/then logic I was using and have done so, but I'm not following applying this at the {% set aggregation = get_db_aggregation() %} level since there are other changes to the Snowflake specific logic. Would you be able to spell out a bit further what you are looking for there? Happy to take another pass but recognize that I'm spinning my wheels.

Validation

  1. No row differences between dev and prod pre change
    image
  2. No row differences between dev and prod post change
    image

Copy link
Owner

@tnightengale tnightengale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the _min_or_max macro actually uses a trick to prepend the ts, then aggregate, then trim off the prepended ts column, so that the aggregation abstraction can work the same across sum and count.

So I think that logic needs to be replicated for snowflake as well, as far as I understand.

I don't think this will work as is?

{{ return(adapter.dispatch('_min_or_max','dbt_activity_schema')(min_or_max, qualified_col)) }}
{%- endmacro -%}

{% macro default___min_or_max(min_or_max, qualified_col) -%}

{% set aggregation = "min" if min_or_max == "min" else "max" %}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this assignment should be via a dispatched macro. And then we can call that aggregation with {{ }} in the relevant place.

Copy link
Author

@wylbee wylbee Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnightengale Apologies for the back and forth. I think I'm missing something here that would help me address this feedback. Three questions for you:

  1. I follow how the default _min_or_max macro works- per the discussion in TRY_CAST SQL compilation error #26, this code uses built-in Snowflake functionality to achieve the same end rather than replicate the prepend approach. In my dbt project and the Snowflake version of the integration tests, I see that the min_by/max_by version produces the expected results. Could you explain further what you mean by I don't think this will work as is??
  2. For So I think that logic needs to be replicated for Snowflake as well, as far as I understand., are you saying that you would prefer a Snowflake version of the prepend logic rather than the native Snowflake approach used here or something else?
  3. I'm not following the comment on the assignment via dispatch macro. The version here in this PR is essentially a carbon copy of the example provided in the dbt docs for this functionality. Could you explain more about what you are looking for or point me to an example on which I can base the refactoring?

Appreciate the feedback to give me what I need to get this all squared away!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants