From d21d42f9a8ab63a78f67241480c2009ec6f1f61b Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 9 Jan 2024 19:50:06 +0800 Subject: [PATCH] feat(sink): Support without backfill sink with 'create sink from mv'. (#13947) --- ci/scripts/e2e-kafka-sink-test.sh | 10 ++++++ e2e_test/sink/kafka/create_sink.slt | 30 ++++++++++++++++-- e2e_test/sink/kafka/drop_sink.slt | 3 ++ e2e_test/sink/kafka/without_snapshot.result | 3 ++ src/connector/src/sink/mod.rs | 1 + src/frontend/planner_test/src/lib.rs | 1 + src/frontend/src/handler/create_sink.rs | 22 ++++++++++++- src/frontend/src/optimizer/mod.rs | 31 ++++++++++++++++--- .../src/optimizer/plan_node/convert.rs | 14 +++++++++ .../src/optimizer/plan_node/logical_scan.rs | 6 +++- 10 files changed, 113 insertions(+), 8 deletions(-) create mode 100644 e2e_test/sink/kafka/without_snapshot.result diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index d51482a912235..1a319975f32ca 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -7,6 +7,7 @@ set -euo pipefail ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 @@ -93,6 +94,15 @@ else rm e2e_test/sink/kafka/debezium2.tmp.result fi +# test without-snapshot kafka sink +echo "testing without-snapshot kafka sink" +diff ./e2e_test/sink/kafka/without_snapshot.result \ +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null) +if [ $? -ne 0 ]; then + echo "The output for append-only sink is not as expected." + exit 1 +fi + # delete sink data echo "deleting sink data" psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" > /dev/null diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 750807f5cef9b..b693cd33e8d64 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -188,7 +188,33 @@ insert into t_kafka values (4, 'ORjwy3oMNb', 6306, 7406, 24962, 21217.777, 3119.719721891862, '2023-04-14 01:12:07.993742', '\xDEADBEEF', '04:00:00.1234', '1999-12-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (5, 'sSkKswxrYd', 22765, 9253, 9253, 22749.5, 17464.91553421121, '2023-04-14 03:57:22.324742', '\xFEEDBEEF', '0 second', '0001-01-01', '00:01:01.123456', '1970-01-01 01:01:01.123456'::timestamptz, '{}'), (6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), - (7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), + (7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'); + + +statement error +create sink si_kafka_without_snapshot as select * from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-without-snapshot', + type = 'append-only', + force_append_only = 'true', + primary_key = 'id', + snapshot = 'false', +); + +statement ok +create sink si_kafka_without_snapshot from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-without-snapshot', + type = 'append-only', + force_append_only = 'true', + primary_key = 'id', + snapshot = 'false', +); + +statement ok +insert into t_kafka values (8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), - (10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}'); + (10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}'); \ No newline at end of file diff --git a/e2e_test/sink/kafka/drop_sink.slt b/e2e_test/sink/kafka/drop_sink.slt index 9479671b9f0f0..db599e80ec6d1 100644 --- a/e2e_test/sink/kafka/drop_sink.slt +++ b/e2e_test/sink/kafka/drop_sink.slt @@ -10,5 +10,8 @@ drop sink si_kafka_debezium; statement ok drop sink si_kafka_upsert_schema; +statement ok +drop sink si_kafka_without_snapshot; + statement ok drop table t_kafka; diff --git a/e2e_test/sink/kafka/without_snapshot.result b/e2e_test/sink/kafka/without_snapshot.result new file mode 100644 index 0000000000000..dba2aa1eb6321 --- /dev/null +++ b/e2e_test/sink/kafka/without_snapshot.result @@ -0,0 +1,3 @@ +{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01 00:00:00.123456","v_varchar":"0oVqRIHqkb"} +{"id":8,"v_bigint":28641,"v_bytea":"3q26vg==","v_date":719163,"v_double":993.408963466774,"v_float":13652.0732421875,"v_integer":19036,"v_interval":"P0Y0M0DT4H0M0.1234S","v_jsonb":"{}","v_smallint":194,"v_time":1000,"v_timestamp":1681393929356,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"lv7Eq3g8hx"} +{"id":9,"v_bigint":24837,"v_bytea":"3q26vg==","v_date":719163,"v_double":11615.276406159757,"v_float":20699.55859375,"v_integer":20090,"v_interval":"P0Y0M0DT5H1M0.123456S","v_jsonb":"{}","v_smallint":10028,"v_time":1000,"v_timestamp":1681389642487,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"nwRq4zejSQ"} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index e5f9df155c735..a9baaad178ea4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -133,6 +133,7 @@ macro_rules! match_sink_name_str { pub const CONNECTOR_TYPE_KEY: &str = "connector"; pub const SINK_TYPE_OPTION: &str = "type"; +pub const SINK_WITHOUT_BACKFILL: &str = "snapshot"; pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; pub const SINK_TYPE_UPSERT: &str = "upsert"; diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 9e977a3c23ea7..dfcd32c9f4c78 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -793,6 +793,7 @@ impl TestCase { "test_db".into(), "test_table".into(), format_desc, + false, None, ) { Ok(sink_plan) => { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 9a42f98b6428d..d150485bb3cf8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ - CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, + CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; use risingwave_pb::catalog::{PbSource, Table}; use risingwave_pb::ddl_service::ReplaceTablePlan; @@ -105,13 +105,18 @@ pub fn gen_sink_plan( // Used for debezium's table name let sink_from_table_name; + // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...` + // `false` means that sink statement has the form: `CREATE SINK s1 AS ` + let direct_sink; let query = match stmt.sink_from { CreateSink::From(from_name) => { sink_from_table_name = from_name.0.last().unwrap().real_value(); + direct_sink = true; Box::new(gen_sink_query_from_name(from_name)?) } CreateSink::AsQuery(query) => { sink_from_table_name = sink_table_name.clone(); + direct_sink = false; query } }; @@ -190,6 +195,20 @@ pub fn gen_sink_plan( plan_root.set_out_names(col_names)?; }; + let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) { + Some(flag) if flag.eq_ignore_ascii_case("false") => { + if direct_sink { + true + } else { + return Err(ErrorCode::BindError( + "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_string(), + ) + .into()); + } + } + _ => false, + }; + let target_table_catalog = stmt .into_table_name .as_ref() @@ -206,6 +225,7 @@ pub fn gen_sink_plan( db_name.to_owned(), sink_from_table_name, format_desc, + without_backfill, target_table, )?; let sink_desc = sink_plan.sink_desc().clone(); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a059b46e6488a..7988984bedb3d 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -53,6 +53,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::sink::catalog::SinkFormatDesc; use risingwave_pb::catalog::WatermarkDesc; +use risingwave_pb::stream_plan::StreamScanType; use self::heuristic_optimizer::ApplyOrder; use self::plan_node::generic::{self, PhysicalPlanRef}; @@ -319,10 +320,18 @@ impl PlanRoot { /// Generate optimized stream plan fn gen_optimized_stream_plan(&mut self, emit_on_window_close: bool) -> Result { + self.gen_optimized_stream_plan_inner(emit_on_window_close, StreamScanType::Backfill) + } + + fn gen_optimized_stream_plan_inner( + &mut self, + emit_on_window_close: bool, + stream_scan_type: StreamScanType, + ) -> Result { let ctx = self.plan.ctx(); let _explain_trace = ctx.is_explain_trace(); - let mut plan = self.gen_stream_plan(emit_on_window_close)?; + let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?; plan = plan.optimize_by_rules(&OptimizationStage::new( "Merge StreamProject", @@ -370,7 +379,11 @@ impl PlanRoot { } /// Generate create index or create materialize view plan. - fn gen_stream_plan(&mut self, emit_on_window_close: bool) -> Result { + fn gen_stream_plan( + &mut self, + emit_on_window_close: bool, + stream_scan_type: StreamScanType, + ) -> Result { let ctx = self.plan.ctx(); let explain_trace = ctx.is_explain_trace(); @@ -421,7 +434,10 @@ impl PlanRoot { self.out_fields = out_col_change.rewrite_bitset(&self.out_fields); let plan = plan.to_stream_with_dist_required( &self.required_dist, - &mut ToStreamContext::new(emit_on_window_close), + &mut ToStreamContext::new_with_stream_scan_type( + emit_on_window_close, + stream_scan_type, + ), )?; stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close) } @@ -717,9 +733,16 @@ impl PlanRoot { db_name: String, sink_from_table_name: String, format_desc: Option, + without_backfill: bool, target_table: Option, ) -> Result { - let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; + let stream_scan_type = if without_backfill { + StreamScanType::UpstreamOnly + } else { + StreamScanType::Backfill + }; + let stream_plan = + self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?; StreamSink::create( stream_plan, diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index cb0005e602a59..a39ed7d3b048d 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use paste::paste; use risingwave_common::catalog::FieldDisplay; +use risingwave_pb::stream_plan::StreamScanType; use super::*; use crate::optimizer::property::{Order, RequiredDist}; @@ -124,16 +125,29 @@ impl RewriteStreamContext { pub struct ToStreamContext { share_to_stream_map: HashMap, emit_on_window_close: bool, + stream_scan_type: StreamScanType, } impl ToStreamContext { pub fn new(emit_on_window_close: bool) -> Self { + Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill) + } + + pub fn new_with_stream_scan_type( + emit_on_window_close: bool, + stream_scan_type: StreamScanType, + ) -> Self { Self { share_to_stream_map: HashMap::new(), emit_on_window_close, + stream_scan_type, } } + pub fn stream_scan_type(&self) -> StreamScanType { + self.stream_scan_type + } + pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: PlanRef) { self.share_to_stream_map .try_insert(plan_node_id, plan_ref) diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 17b96836f47d7..b179670254024 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -533,7 +533,11 @@ impl ToStream for LogicalScan { ) .into()) } else { - Ok(StreamTableScan::new(self.core.clone()).into()) + Ok(StreamTableScan::new_with_stream_scan_type( + self.core.clone(), + ctx.stream_scan_type(), + ) + .into()) } } else { let (scan, predicate, project_expr) = self.predicate_pull_up();