-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): Support Deletes for ClickHouse ReplacingMergeTree #17283
feat(sink): Support Deletes for ClickHouse ReplacingMergeTree #17283
Conversation
- ClickHouse now supports [1] a `is_deleted` column (optional) when creating a `ReplacingMergeTree`. - Adds new `clickhouse.delete.column` config option, which when set will enable `upsert` to a `ReplacingMergeTree` - When the value is deleted the value of this column is set to `1` instead of the default `0`.
I thought about trying to use the approach that the Collarpsing merge trees and infer the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! Generally LGTM.
One minor comment on the RW sink example in the Release Note: I think clickhouse.delete.column
option is missing in the example. Would you mind updating that?
@xxhZs PTAL.
Good catch, updated! |
Thanks for bringing this up. This is hacky indeed. I think we should deprecate hacking around DDL and add an option for the Btw, FYI we are developing a feature call stream changelog (#17132) to convert a retractable stream into an append-only changelog stream with the additional # kv_store_changelog's schema is (k, v, ver, op)
# op = 1: INSERT
# op = 2: DELETE
# op = 3: UPDATE_INSERT
# op = 4: UPDATE_DELETE
CREATE SINK kv_store_sink AS
WITH kv_store_changelog as CHANGELOG FROM kv_store
SELECT
k,
v,
ver,
(CASE WHEN op == 2 or op == 4 THEN 1 ELSE 0) as del
FROM kv_store_changelog
WITH (
connector = 'clickhouse',
type = 'append-only', -- kv_store_changelog is append only
clickhouse.url = 'http://localhost:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='kv_store',
primary_key='k'
);
I think we can still merge this PR given that the change is small and it is easier to use from user's perspective. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, Thank you for your contribution,
This will be great! |
- Both the `get_sign_name().is_some()` and `.get_delete_column.is_some()` where incurring unnecessary memory overhead. - These methods were being called on a per-row basis from the `StreamChunk` so the overhead was potentially large. - Calling the `.is_collapsing_engine()` and `.is_delete_replacing_engine()` instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, If you have no other changes, we will merge it.
c710309
ClickHouse now supports [1] a
is_deleted
column (optional) when creating aReplacingMergeTree
.Adds new
clickhouse.delete.column
config option, which when set will enableupsert
to aReplacingMergeTree
When the value is deleted the value of this column is set to
1
instead of the default0
.[1] https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
This patch adds the ability for propagating
DELETE
operations to a ClickHouse table backed by aReplacingMergeTree
. When using thetype='upsert'
you will need to specify aclickhouse.delete.column
corresponding to the respective column in ClickHouse. If you are using theis_deleted
feature then you will need to use aUInt8
type for ClickHouse.When a
DELETE
occurs, the value of this column will be flipped to a value of1
which will allow ClickHouse to ignore this row (and all previous versions) automatically when performing aFROM <table> FINAL
query.Example
1.) Create ClickHouse Table
The following table is a simple K/V store. We use the
ver
column to denote the latest version, and thedel
column to indicate whether or not the key was marked for deletion.2.) Create the RisingWave table
The table below will act as our source table for testing.
3.) Create the Sink Connection
The following sink will write all data from the RisingWave
kv_store
into ourReplacingMergeTree
while using thedel
column to indicate when a key should marked deleted.4.) Insert / Delete Data
5.) Verify Delete Propagates to ClickHouse
6.) Validate Scan With
FINAL
Respects Delete