Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Experimentally introducing sink into table #13185

Merged
merged 42 commits into from
Dec 9, 2023

Conversation

shanicky
Copy link
Contributor

@shanicky shanicky commented Oct 31, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR depends on #12240 and #12495

This PR attempts to introduce the sink into table feature.

Following PR #12240, all newly created tables come with a union operator used to aggregate source/dml and the current sink into table. According to the rfc, we create various structures of sink under these three types.

The specific method is very rudimentary. Currently, when we create a streaming job, we will have an affected_table_change, which describes the target table that needs to replace the table. The generated command carries both add and update semantics at this time. We tried to use the combined mutation in PR #12495 to perform this process atomically, but it seems that it may not be necessary, so we may delete this mutation after testing.

Mainly, we still have many restrictions that need to be solved in the future:

  1. We can only create 1 sink to the table at present.
  2. Tables with sinks cannot be altered at the moment.

The first case, table with row id as the primary key.

image
dev=> create table m (v1 int, v2 int, v3 int as v1 + v2)  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table t (v1 int);
CREATE_TABLE
dev=> explain create sink s into m as select v1 from t with ( type = 'append-only' , force_append_only = 'true');
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 StreamProject { exprs: [t.v1, null:Int32, (t.v1 + null:Int32) as $expr1, null:Serial] }
 └─StreamProject { exprs: [t.v1, null:Int32, null:Serial] }
   └─StreamSink { type: append-only, columns: [v1, t._row_id(hidden)] }
     └─StreamTableScan { table: t, columns: [v1, _row_id] }
(4 rows)
dev=> create sink s into m as select v1 from t with ( type = 'append-only' , force_append_only = 'true');
CREATE_SINK

table

image

sink

image

The second case, append only table

image
dev=> create table m (v1 int, v2 int, v3 int as v1 + v2) append only with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
NOTICE:  APPEND ONLY TABLE is currently an experimental feature.
CREATE_TABLE
dev=> create table t (v1 int);
CREATE_TABLE
dev=> explain create sink s into m as select v1 from t with ( type = 'append-only' , force_append_only = 'true');
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 StreamProject { exprs: [t.v1, null:Int32, (t.v1 + null:Int32) as $expr1, null:Serial] }
 └─StreamProject { exprs: [t.v1, null:Int32, null:Serial] }
   └─StreamSink { type: append-only, columns: [v1, t._row_id(hidden)] }
     └─StreamTableScan { table: t, columns: [v1, _row_id] }
(4 rows)
dev=> create sink s into m as select v1 from t with ( type = 'append-only' , force_append_only = 'true');
CREATE_SINK

table

image

sink

image

The third case,table with primary key

image
dev=> create table m (v1 int primary key, v2 int) with {
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table t (v1 int);
CREATE_TABLE
dev=> explain create sink s into m as select v1 from t;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 StreamProject { exprs: [t.v1, null:Int32] }
 └─StreamSink { type: upsert, columns: [v1, t._row_id(hidden)], pk: [t._row_id] }
   └─StreamTableScan { table: t, columns: [v1, _row_id] }
(3 rows)
dev=> create sink s into m as select v1 from t;
CREATE_SINK

table

image

sink

image

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

@shanicky shanicky force-pushed the peng/sink-into-table-new branch 3 times, most recently from da882e9 to d6575ba Compare November 3, 2023 10:27
github-actions[bot]

This comment was marked as resolved.

github-actions[bot]

This comment was marked as resolved.

@shanicky shanicky force-pushed the peng/sink-into-table-new branch 3 times, most recently from 7f69aad to 0206548 Compare November 7, 2023 11:33
@shanicky shanicky changed the title [wip] feat: sink into table feat: sink into table Nov 7, 2023
@shanicky shanicky marked this pull request as ready for review November 7, 2023 15:38
@shanicky shanicky changed the title feat: sink into table feat: Experimentally introducing sink into table Nov 7, 2023
@BugenZhao BugenZhao self-requested a review November 8, 2023 03:01
@shanicky shanicky force-pushed the peng/sink-into-table-new branch 5 times, most recently from 463fedb to f2d9b4c Compare November 13, 2023 07:38
@shanicky shanicky changed the base branch from main to peng/new-table-dag November 14, 2023 05:56
@yezizp2012 yezizp2012 requested a review from st1page November 14, 2023 06:59
Copy link
Member

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest of meta part generally LGTM.

src/meta/src/rpc/ddl_controller.rs Show resolved Hide resolved
src/meta/src/rpc/ddl_controller.rs Outdated Show resolved Hide resolved
src/meta/src/rpc/ddl_controller.rs Outdated Show resolved Hide resolved
.prepare_replace_table(table_stream_job, table_fragment_graph)
.await?;

let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some assertion for some frontend work.

src/meta/src/stream/stream_graph/fragment.rs Outdated Show resolved Hide resolved
src/meta/src/stream/stream_manager.rs Show resolved Hide resolved
src/meta/service/src/ddl_service.rs Outdated Show resolved Hide resolved
src/meta/src/barrier/command.rs Outdated Show resolved Hide resolved
src/meta/src/manager/catalog/mod.rs Outdated Show resolved Hide resolved
src/frontend/src/handler/create_table.rs Show resolved Hide resolved
@shanicky shanicky force-pushed the peng/sink-into-table-new branch 3 times, most recently from a44e7c5 to eaabdc2 Compare November 14, 2023 09:57
@shanicky shanicky force-pushed the peng/new-table-dag branch 2 times, most recently from 5059920 to 9a89780 Compare November 15, 2023 07:27
Base automatically changed from peng/new-table-dag to main November 15, 2023 08:37
…ved.

add extract_replace_table_info
add handle for combined
add vistor
Update documentation and imports for clarity and functionality.
Renamed struct `ReplaceTableCommand` to `ReplaceTablePlan`, along with updates in multiple files.
Added assertions for empty incoming sinks and length check for incoming sinks. Removed code for restoring previous incoming sinks.
Refactor module, remove function and struct, utilize new trait
Add version variable, update its value, return in certain scenarios
add check for sink into table dependency

Signed-off-by: Shanicky Chen <[email protected]>
…nd `incoming_sinks` field, removed `visit_stream_node_cont` function.
…n barrier/command.rs, reformat code in rpc/ddl_controller.rs.
… and catalog/mod.rs

tmp

fix conlict

Signed-off-by: Shanicky Chen <[email protected]>
Signed-off-by: Shanicky Chen <[email protected]>
Signed-off-by: Shanicky Chen <[email protected]>
…date function signatures and variable usage.
Signed-off-by: Shanicky Chen <[email protected]>
Signed-off-by: Shanicky Chen <[email protected]>
Signed-off-by: Shanicky Chen <[email protected]>
@shanicky shanicky force-pushed the peng/sink-into-table-new branch from 1478dd3 to 0a05844 Compare December 9, 2023 16:01
@shanicky shanicky enabled auto-merge December 9, 2023 16:06
@shanicky shanicky added this pull request to the merge queue Dec 9, 2023
Merged via the queue into main with commit 19f4254 Dec 9, 2023
28 of 29 checks passed
@shanicky shanicky deleted the peng/sink-into-table-new branch December 9, 2023 16:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants