Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: when target is table, the sink downstream pk is not set #19515

Merged
merged 9 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions e2e_test/sink/sink_into_table/parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ statement ok
SET STREAMING_PARALLELISM TO 2;

statement ok
create table t_simple (v1 int, v2 int);
create table t_simple (v1 int, v2 int) append only;

statement ok
create table m_simple (v1 int primary key, v2 int);
create table m_simple (v1 int, v2 int) append only;

statement ok
SET STREAMING_PARALLELISM TO 3;

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;
create sink s_simple_1 into m_simple as select v1, v2 from t_simple with (type = 'append-only');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the query because if there is primary key, the original sink plan will be

dev=> explain create sink s_simple_1 into m_simple as select v1, v2 from t_simple;
;
                                                QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
 StreamProject { exprs: [t_simple.v1, t_simple.v2] }
 └─StreamSink { type: upsert, columns: [v1, v2, t_simple._row_id(hidden)], downstream_pk: [t_simple.v1] }
   └─StreamExchange { dist: HashShard(t_simple.v1) }
     └─StreamTableScan { table: t_simple, columns: [v1, v2, _row_id] }
(4 rows)


query I
select distinct parallelism from rw_fragment_parallelism where name in ('t_simple', 'm_simple', 's_simple_1');
Expand Down
1 change: 1 addition & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ impl TestCase {
false,
None,
None,
false,
) {
Ok(sink_plan) => {
ret.sink_plan = Some(explain_plan(&sink_plan.into()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,15 @@
explain create sink ss into t from s with (type = 'append-only');
expected_outputs:
- explain_output
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 from t2;
expected_outputs:
- explain_output
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 as select b from t2;
expected_outputs:
- explain_output
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
emit on window close
WITH (connector = 'blackhole');
explain_output: |
StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] }
StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], downstream_pk: [] }
└─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] }
└─StreamEowcSort { sort_column: t.tm }
└─StreamExchange { dist: HashShard(t.bar) }
Expand Down
18 changes: 9 additions & 9 deletions src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
table.name='t1sink',
type='upsert');
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] }
- id: create_upsert_jdbc_sink_with_downstream_pk2
Expand All @@ -22,7 +22,7 @@
table.name='t1sink',
type='upsert');
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] }
└─StreamExchange { dist: HashShard(t1.v3, t1.v5) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] }
- id: create_upsert_jdbc_sink_with_downstream_pk1
Expand All @@ -36,7 +36,7 @@
type='upsert');
explain_output: |+
Fragment 0
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
├── tables: [ Sink: 0 ]
├── output: [ t1.v1, t1.v2, t1.v3, t1.v5, t1.v4 ]
├── stream key: [ t1.v3, t1.v4 ]
Expand Down Expand Up @@ -89,7 +89,7 @@
type='upsert');
explain_output: |+
Fragment 0
StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] }
├── tables: [ Sink: 0 ]
├── output: [ t1.v1, t1.v2, t1.v3, t1.v5 ]
├── stream key: [ t1.v1, t1.v2 ]
Expand Down Expand Up @@ -152,7 +152,7 @@
primary_key='v1,v2'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] }
StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] }
- id: downstream_pk_same_with_upstream
Expand All @@ -165,7 +165,7 @@
primary_key='v2,v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] }
StreamSink { type: upsert, columns: [v2, v1, count], downstream_pk: [t1.v2, t1.v1] }
└─StreamProject { exprs: [t1.v2, t1.v1, count] }
└─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
Expand All @@ -175,7 +175,7 @@
create table t2 (a int, b int, watermark for b as b - 4) append only;
explain create sink sk1 from t2 emit on window close with (connector='blackhole');
explain_output: |
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] }
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], downstream_pk: [] }
└─StreamEowcSort { sort_column: t2.b }
└─StreamTableScan { table: t2, columns: [a, b, _row_id] }
- id: create_mock_iceberg_sink_append_only_with_sparse_partition
Expand Down Expand Up @@ -238,7 +238,7 @@
primary_key = 'v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] }
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
Expand All @@ -260,5 +260,5 @@
primary_key = 'v1'
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] }
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,20 @@
StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] }
└─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] }
└─StreamTableScan { table: s, columns: [x, _row_id] }
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 from t2;
explain_output: |
StreamProject { exprs: [t2.a, t2.b] }
└─StreamSink { type: upsert, columns: [a, b], downstream_pk: [t2.a] }
└─StreamExchange { dist: HashShard(t2.a) }
└─StreamTableScan { table: t2, columns: [a, b] }
- sql: |
create table t1 (a int primary key, b int);
create table t2 (a int, b int primary key);
explain create sink s into t1 as select b from t2;
explain_output: |
StreamProject { exprs: [t2.b, null:Int32] }
└─StreamSink { type: upsert, columns: [b], downstream_pk: [t2.b] }
└─StreamTableScan { table: t2, columns: [b] }
23 changes: 3 additions & 20 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, UserId};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
use risingwave_connector::sink::kafka::KAFKA_SINK;
use risingwave_connector::sink::{
Expand Down Expand Up @@ -224,8 +224,6 @@ pub async fn gen_sink_plan(
}
}

let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id());

let sink_plan = plan_root.gen_sink_plan(
sink_table_name,
definition,
Expand All @@ -235,8 +233,9 @@ pub async fn gen_sink_plan(
sink_from_table_name,
format_desc,
without_backfill,
target_table,
target_table_catalog.clone(),
partition_info,
user_specified_columns,
)?;

let sink_desc = sink_plan.sink_desc().clone();
Expand Down Expand Up @@ -268,22 +267,6 @@ pub async fn gen_sink_plan(
}
}

let user_defined_primary_key_table = table_catalog.row_id_index.is_none();
let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly;

if !user_defined_primary_key_table && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

if table_catalog.append_only && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
let exprs = derive_default_column_project_for_sink(
&sink_catalog,
Expand Down
47 changes: 44 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
use std::num::NonZeroU32;
use std::ops::DerefMut;
use std::sync::Arc;

pub mod plan_node;

Expand Down Expand Up @@ -41,7 +42,7 @@ mod plan_expr_visitor;
mod rule;

use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use fixedbitset::FixedBitSet;
use itertools::Itertools as _;
Expand All @@ -51,7 +52,7 @@ use plan_expr_rewriter::ConstEvalRewriter;
use property::Order;
use risingwave_common::bail;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId,
ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema,
};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand Down Expand Up @@ -83,6 +84,7 @@ use crate::optimizer::plan_node::{
use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
use crate::optimizer::property::Distribution;
use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
use crate::TableCatalog;

/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
Expand Down Expand Up @@ -958,8 +960,9 @@ impl PlanRoot {
sink_from_table_name: String,
format_desc: Option<SinkFormatDesc>,
without_backfill: bool,
target_table: Option<TableId>,
target_table: Option<Arc<TableCatalog>>,
partition_info: Option<PartitionComputeInfo>,
user_specified_columns: bool,
) -> Result<StreamSink> {
let stream_scan_type = if without_backfill {
StreamScanType::UpstreamOnly
Expand All @@ -977,12 +980,17 @@ impl PlanRoot {
self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
assert_eq!(self.phase, PlanPhase::Stream);
assert_eq!(stream_plan.convention(), Convention::Stream);
let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
let columns = t.columns_without_rw_timestamp();
self.target_columns_to_plan_mapping(&columns, user_specified_columns)
});
StreamSink::create(
stream_plan,
sink_name,
db_name,
sink_from_table_name,
target_table,
target_columns_to_plan_mapping,
self.required_dist.clone(),
self.required_order.clone(),
self.out_fields.clone(),
Expand Down Expand Up @@ -1012,6 +1020,39 @@ impl PlanRoot {
.config()
.streaming_use_snapshot_backfill()
}

/// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
pub fn target_columns_to_plan_mapping(
&self,
tar_cols: &[ColumnCatalog],
user_specified_columns: bool,
) -> Vec<Option<usize>> {
#[allow(clippy::disallowed_methods)]
let visible_cols: Vec<(usize, String)> = self
.out_fields
.ones()
.zip_eq(self.out_names.iter().cloned())
.collect_vec();

let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
let visible_col_idxes_by_name = visible_cols
.iter()
.map(|(i, name)| (name.as_ref(), *i))
.collect::<BTreeMap<_, _>>();

tar_cols
.iter()
.enumerate()
.filter(|(_, tar_col)| tar_col.can_dml())
.map(|(tar_i, tar_col)| {
if user_specified_columns {
visible_col_idxes_by_name.get(tar_col.name()).cloned()
} else {
(tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
}
})
.collect()
}
}

fn find_version_column_index(
Expand Down
Loading
Loading