-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(frontend): fix changelog out_col_change error and add snowflake upsert sink demo #17515
Conversation
37c05a6
to
0522b2b
Compare
let out_col_change = if self.base.stream_key().is_none() { | ||
ColIndexMapping::new(output_vec, len + 1) | ||
} else { | ||
(changelog, input_col_change) | ||
} | ||
output_vec.push(Some(len)); | ||
ColIndexMapping::new(output_vec, len + 1) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this logic, probably because I am not familiar with the optimzier codes. But it seems that we are relying on some implicit assumption here. Can you elaborate more?
- Why do we need to extend the
ColIndexMapping
without extendingoutput_vec
when the stream key is none? - When will the stream key be none?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream_key.is_none means that _changelog_row_id is not included in the select result. This has been changed to need_changelog_row_id to make the logic clearer.
When the results does not contain _changelog_row_id , _changelog_row_id is the hidden column added by this operator, and the ColIndexMapping should be null->index (here is len)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Forgot to add _changelog_row_id records in out_col_change , this is an error.
Added demo that implements snowflake upsert sink using changelog
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Need to add the demo to the documentation