Skip to content

Commit

Permalink
fix: when target is table, the sink downstream pk is not set (#19515)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Nov 22, 2024
1 parent b2b8945 commit f50ce55
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 129 deletions.
6 changes: 3 additions & 3 deletions e2e_test/sink/sink_into_table/parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ statement ok
SET STREAMING_PARALLELISM TO 2;

statement ok
create table t_simple (v1 int, v2 int);
create table t_simple (v1 int, v2 int) append only;

statement ok
create table m_simple (v1 int primary key, v2 int);
create table m_simple (v1 int, v2 int) append only;

statement ok
SET STREAMING_PARALLELISM TO 3;

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;
create sink s_simple_1 into m_simple as select v1, v2 from t_simple with (type = 'append-only');

query I
select distinct parallelism from rw_fragment_parallelism where name in ('t_simple', 'm_simple', 's_simple_1');
Expand Down
1 change: 1 addition & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ impl TestCase {
false,
None,
None,
false,
) {
Ok(sink_plan) => {
ret.sink_plan = Some(explain_plan(&sink_plan.into()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,15 @@
explain create sink ss into t from s with (type = 'append-only');
expected_outputs:
- explain_output
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 from t2;
expected_outputs:
- explain_output
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 as select b from t2;
expected_outputs:
- explain_output
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
emit on window close
WITH (connector = 'blackhole');
explain_output: |
StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] }
StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], downstream_pk: [] }
└─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] }
└─StreamEowcSort { sort_column: t.tm }
└─StreamExchange { dist: HashShard(t.bar) }
Expand Down
18 changes: 9 additions & 9 deletions src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
table.name='t1sink',
type='upsert');
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] }
- id: create_upsert_jdbc_sink_with_downstream_pk2
Expand All @@ -22,7 +22,7 @@
table.name='t1sink',
type='upsert');
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] }
└─StreamExchange { dist: HashShard(t1.v3, t1.v5) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] }
- id: create_upsert_jdbc_sink_with_downstream_pk1
Expand All @@ -36,7 +36,7 @@
type='upsert');
explain_output: |+
Fragment 0
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
├── tables: [ Sink: 0 ]
├── output: [ t1.v1, t1.v2, t1.v3, t1.v5, t1.v4 ]
├── stream key: [ t1.v3, t1.v4 ]
Expand Down Expand Up @@ -88,7 +88,7 @@
type='upsert');
explain_output: |+
Fragment 0
StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] }
├── tables: [ Sink: 0 ]
├── output: [ t1.v1, t1.v2, t1.v3, t1.v5 ]
├── stream key: [ t1.v1, t1.v2 ]
Expand Down Expand Up @@ -150,7 +150,7 @@
primary_key='v1,v2'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] }
- id: downstream_pk_same_with_upstream
Expand All @@ -163,7 +163,7 @@
primary_key='v2,v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v2, v1, count], downstream_pk: [t1.v2, t1.v1] }
└─StreamProject { exprs: [t1.v2, t1.v1, count] }
└─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
Expand All @@ -173,7 +173,7 @@
create table t2 (a int, b int, watermark for b as b - 4) append only;
explain create sink sk1 from t2 emit on window close with (connector='blackhole');
explain_output: |
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] }
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], downstream_pk: [] }
└─StreamEowcSort { sort_column: t2.b }
└─StreamTableScan { table: t2, columns: [a, b, _row_id] }
- id: create_mock_iceberg_sink_append_only_with_sparse_partition
Expand Down Expand Up @@ -236,7 +236,7 @@
primary_key = 'v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] }
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
Expand All @@ -258,5 +258,5 @@
primary_key = 'v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] }
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,20 @@
StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] }
└─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] }
└─StreamTableScan { table: s, columns: [x, _row_id] }
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 from t2;
explain_output: |
StreamProject { exprs: [t2.a, t2.b] }
└─StreamSink { type: upsert, columns: [a, b], downstream_pk: [t2.a] }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamTableScan { table: t2, columns: [a, b] }
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 as select b from t2;
explain_output: |
StreamProject { exprs: [t2.b, null:Int32] }
└─StreamSink { type: upsert, columns: [b], downstream_pk: [t2.b] }
└─StreamTableScan { table: t2, columns: [b] }
21 changes: 4 additions & 17 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
Expand Down Expand Up @@ -234,8 +233,6 @@ pub async fn gen_sink_plan(
}
}

let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id());

let sink_plan = plan_root.gen_sink_plan(
sink_table_name,
definition,
Expand All @@ -245,8 +242,9 @@ pub async fn gen_sink_plan(
sink_from_table_name,
format_desc,
without_backfill,
target_table,
target_table_catalog.clone(),
partition_info,
user_specified_columns,
)?;

let sink_desc = sink_plan.sink_desc().clone();
Expand Down Expand Up @@ -278,18 +276,7 @@ pub async fn gen_sink_plan(
}
}

let user_defined_primary_key_table =
!(table_catalog.append_only || table_catalog.row_id_index.is_some());

if !(user_defined_primary_key_table
|| sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly)
{
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys.".to_string(),
)));
}

let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
let exprs = derive_default_column_project_for_sink(
&sink_catalog,
sink_plan.schema(),
Expand Down
47 changes: 44 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
use std::num::NonZeroU32;
use std::ops::DerefMut;
use std::sync::Arc;

pub mod plan_node;

Expand Down Expand Up @@ -40,7 +41,7 @@ mod plan_expr_visitor;
mod rule;

use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use fixedbitset::FixedBitSet;
use itertools::Itertools as _;
Expand All @@ -50,7 +51,7 @@ use plan_expr_rewriter::ConstEvalRewriter;
use property::Order;
use risingwave_common::bail;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId,
ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema,
};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand Down Expand Up @@ -82,6 +83,7 @@ use crate::optimizer::plan_node::{
use crate::optimizer::plan_visitor::TemporalJoinValidator;
use crate::optimizer::property::Distribution;
use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
use crate::TableCatalog;

/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
Expand Down Expand Up @@ -939,8 +941,9 @@ impl PlanRoot {
sink_from_table_name: String,
format_desc: Option<SinkFormatDesc>,
without_backfill: bool,
target_table: Option<TableId>,
target_table: Option<Arc<TableCatalog>>,
partition_info: Option<PartitionComputeInfo>,
user_specified_columns: bool,
) -> Result<StreamSink> {
let stream_scan_type = if without_backfill {
StreamScanType::UpstreamOnly
Expand All @@ -958,12 +961,17 @@ impl PlanRoot {
self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
assert_eq!(self.phase, PlanPhase::Stream);
assert_eq!(stream_plan.convention(), Convention::Stream);
let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
let columns = t.columns_without_rw_timestamp();
self.target_columns_to_plan_mapping(&columns, user_specified_columns)
});
StreamSink::create(
stream_plan,
sink_name,
db_name,
sink_from_table_name,
target_table,
target_columns_to_plan_mapping,
self.required_dist.clone(),
self.required_order.clone(),
self.out_fields.clone(),
Expand Down Expand Up @@ -993,6 +1001,39 @@ impl PlanRoot {
.config()
.streaming_use_snapshot_backfill()
}

/// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
pub fn target_columns_to_plan_mapping(
&self,
tar_cols: &[ColumnCatalog],
user_specified_columns: bool,
) -> Vec<Option<usize>> {
#[allow(clippy::disallowed_methods)]
let visible_cols: Vec<(usize, String)> = self
.out_fields
.ones()
.zip_eq(self.out_names.iter().cloned())
.collect_vec();

let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
let visible_col_idxes_by_name = visible_cols
.iter()
.map(|(i, name)| (name.as_ref(), *i))
.collect::<BTreeMap<_, _>>();

tar_cols
.iter()
.enumerate()
.filter(|(_, tar_col)| tar_col.can_dml())
.map(|(tar_i, tar_col)| {
if user_specified_columns {
visible_col_idxes_by_name.get(tar_col.name()).cloned()
} else {
(tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
}
})
.collect()
}
}

fn find_version_column_index(
Expand Down
Loading

0 comments on commit f50ce55

Please sign in to comment.