Skip to content

Commit

Permalink
fix(sink): force sink shuffle with the sink pk (#13516)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Nov 23, 2023
1 parent e2f7fdb commit 6ec975a
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
22 changes: 22 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,25 @@
force_append_only='true');
expected_outputs:
- explain_output
- id: create_upsert_kafka_sink_with_downstream_pk1
sql: |
create table t1 (v1 int, v2 double precision, v3 varchar, v4 bigint, v5 decimal, primary key (v3,v4));
explain create sink s1_mysql as select v1, v2, v3, v5 from t1 WITH (
connector='kafka',
topic='abc',
type='upsert',
primary_key='v1,v2'
);
expected_outputs:
- explain_output
- id: downstream_pk_same_with_upstream
sql: |
create table t1 (v1 int, v2 double precision, v3 varchar, v4 bigint, v5 decimal, primary key (v3,v4));
explain create sink s1_mysql as select v2, v1, count(*) from t1 group by v1, v2 WITH (
connector='kafka',
topic='abc',
type='upsert',
primary_key='v2,v1'
);
expected_outputs:
- explain_output
28 changes: 28 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,31 @@
explain_output: |
StreamSink { type: append-only, columns: [v1, v2, v3, v5] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] }
- id: create_upsert_kafka_sink_with_downstream_pk1
sql: |
create table t1 (v1 int, v2 double precision, v3 varchar, v4 bigint, v5 decimal, primary key (v3,v4));
explain create sink s1_mysql as select v1, v2, v3, v5 from t1 WITH (
connector='kafka',
topic='abc',
type='upsert',
primary_key='v1,v2'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] }
- id: downstream_pk_same_with_upstream
sql: |
create table t1 (v1 int, v2 double precision, v3 varchar, v4 bigint, v5 decimal, primary key (v3,v4));
explain create sink s1_mysql as select v2, v1, count(*) from t1 group by v1, v2 WITH (
connector='kafka',
topic='abc',
type='upsert',
primary_key='v2,v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] }
└─StreamProject { exprs: [t1.v2, t1.v1, count] }
└─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4] }
14 changes: 13 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,19 @@ impl StreamSink {
}
_ => {
assert_matches!(user_distributed_by, RequiredDist::Any);
RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key())
if downstream_pk.is_empty() {
RequiredDist::shard_by_key(
input.schema().len(),
input.expect_stream_key(),
)
} else {
// force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
// https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
RequiredDist::shard_by_key(
input.schema().len(),
downstream_pk.as_slice(),
)
}
}
}
}
Expand Down

0 comments on commit 6ec975a

Please sign in to comment.