Skip to content

Commit

Permalink
feat(sink): Support without backfill sink with 'create sink from mv'. (
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jan 9, 2024
1 parent 37a666a commit d21d42f
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 8 deletions.
10 changes: 10 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set -euo pipefail
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down Expand Up @@ -93,6 +94,15 @@ else
rm e2e_test/sink/kafka/debezium2.tmp.result
fi

# test without-snapshot kafka sink
echo "testing without-snapshot kafka sink"
diff ./e2e_test/sink/kafka/without_snapshot.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
fi

# delete sink data
echo "deleting sink data"
psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" > /dev/null
Expand Down
30 changes: 28 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,33 @@ insert into t_kafka values
(4, 'ORjwy3oMNb', 6306, 7406, 24962, 21217.777, 3119.719721891862, '2023-04-14 01:12:07.993742', '\xDEADBEEF', '04:00:00.1234', '1999-12-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(5, 'sSkKswxrYd', 22765, 9253, 9253, 22749.5, 17464.91553421121, '2023-04-14 03:57:22.324742', '\xFEEDBEEF', '0 second', '0001-01-01', '00:01:01.123456', '1970-01-01 01:01:01.123456'::timestamptz, '{}'),
(6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}');


statement error
create sink si_kafka_without_snapshot as select * from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-without-snapshot',
type = 'append-only',
force_append_only = 'true',
primary_key = 'id',
snapshot = 'false',
);

statement ok
create sink si_kafka_without_snapshot from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-without-snapshot',
type = 'append-only',
force_append_only = 'true',
primary_key = 'id',
snapshot = 'false',
);

statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ drop sink si_kafka_debezium;
statement ok
drop sink si_kafka_upsert_schema;

statement ok
drop sink si_kafka_without_snapshot;

statement ok
drop table t_kafka;
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/without_snapshot.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01 00:00:00.123456","v_varchar":"0oVqRIHqkb"}
{"id":8,"v_bigint":28641,"v_bytea":"3q26vg==","v_date":719163,"v_double":993.408963466774,"v_float":13652.0732421875,"v_integer":19036,"v_interval":"P0Y0M0DT4H0M0.1234S","v_jsonb":"{}","v_smallint":194,"v_time":1000,"v_timestamp":1681393929356,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"lv7Eq3g8hx"}
{"id":9,"v_bigint":24837,"v_bytea":"3q26vg==","v_date":719163,"v_double":11615.276406159757,"v_float":20699.55859375,"v_integer":20090,"v_interval":"P0Y0M0DT5H1M0.123456S","v_jsonb":"{}","v_smallint":10028,"v_time":1000,"v_timestamp":1681389642487,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"nwRq4zejSQ"}
1 change: 1 addition & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ macro_rules! match_sink_name_str {

pub const CONNECTOR_TYPE_KEY: &str = "connector";
pub const SINK_TYPE_OPTION: &str = "type";
pub const SINK_WITHOUT_BACKFILL: &str = "snapshot";
pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
pub const SINK_TYPE_UPSERT: &str = "upsert";
Expand Down
1 change: 1 addition & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ impl TestCase {
"test_db".into(),
"test_table".into(),
format_desc,
false,
None,
) {
Ok(sink_plan) => {
Expand Down
22 changes: 21 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
Expand Down Expand Up @@ -105,13 +105,18 @@ pub fn gen_sink_plan(

// Used for debezium's table name
let sink_from_table_name;
// `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
// `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
let direct_sink;
let query = match stmt.sink_from {
CreateSink::From(from_name) => {
sink_from_table_name = from_name.0.last().unwrap().real_value();
direct_sink = true;
Box::new(gen_sink_query_from_name(from_name)?)
}
CreateSink::AsQuery(query) => {
sink_from_table_name = sink_table_name.clone();
direct_sink = false;
query
}
};
Expand Down Expand Up @@ -190,6 +195,20 @@ pub fn gen_sink_plan(
plan_root.set_out_names(col_names)?;
};

let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) {
Some(flag) if flag.eq_ignore_ascii_case("false") => {
if direct_sink {
true
} else {
return Err(ErrorCode::BindError(
"`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_string(),
)
.into());
}
}
_ => false,
};

let target_table_catalog = stmt
.into_table_name
.as_ref()
Expand All @@ -206,6 +225,7 @@ pub fn gen_sink_plan(
db_name.to_owned(),
sink_from_table_name,
format_desc,
without_backfill,
target_table,
)?;
let sink_desc = sink_plan.sink_desc().clone();
Expand Down
31 changes: 27 additions & 4 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::sink::catalog::SinkFormatDesc;
use risingwave_pb::catalog::WatermarkDesc;
use risingwave_pb::stream_plan::StreamScanType;

use self::heuristic_optimizer::ApplyOrder;
use self::plan_node::generic::{self, PhysicalPlanRef};
Expand Down Expand Up @@ -319,10 +320,18 @@ impl PlanRoot {

/// Generate optimized stream plan
fn gen_optimized_stream_plan(&mut self, emit_on_window_close: bool) -> Result<PlanRef> {
self.gen_optimized_stream_plan_inner(emit_on_window_close, StreamScanType::Backfill)
}

fn gen_optimized_stream_plan_inner(
&mut self,
emit_on_window_close: bool,
stream_scan_type: StreamScanType,
) -> Result<PlanRef> {
let ctx = self.plan.ctx();
let _explain_trace = ctx.is_explain_trace();

let mut plan = self.gen_stream_plan(emit_on_window_close)?;
let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;

plan = plan.optimize_by_rules(&OptimizationStage::new(
"Merge StreamProject",
Expand Down Expand Up @@ -370,7 +379,11 @@ impl PlanRoot {
}

/// Generate create index or create materialize view plan.
fn gen_stream_plan(&mut self, emit_on_window_close: bool) -> Result<PlanRef> {
fn gen_stream_plan(
&mut self,
emit_on_window_close: bool,
stream_scan_type: StreamScanType,
) -> Result<PlanRef> {
let ctx = self.plan.ctx();
let explain_trace = ctx.is_explain_trace();

Expand Down Expand Up @@ -421,7 +434,10 @@ impl PlanRoot {
self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
let plan = plan.to_stream_with_dist_required(
&self.required_dist,
&mut ToStreamContext::new(emit_on_window_close),
&mut ToStreamContext::new_with_stream_scan_type(
emit_on_window_close,
stream_scan_type,
),
)?;
stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)
}
Expand Down Expand Up @@ -717,9 +733,16 @@ impl PlanRoot {
db_name: String,
sink_from_table_name: String,
format_desc: Option<SinkFormatDesc>,
without_backfill: bool,
target_table: Option<TableId>,
) -> Result<StreamSink> {
let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?;
let stream_scan_type = if without_backfill {
StreamScanType::UpstreamOnly
} else {
StreamScanType::Backfill
};
let stream_plan =
self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;

StreamSink::create(
stream_plan,
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use paste::paste;
use risingwave_common::catalog::FieldDisplay;
use risingwave_pb::stream_plan::StreamScanType;

use super::*;
use crate::optimizer::property::{Order, RequiredDist};
Expand Down Expand Up @@ -124,16 +125,29 @@ impl RewriteStreamContext {
pub struct ToStreamContext {
share_to_stream_map: HashMap<PlanNodeId, PlanRef>,
emit_on_window_close: bool,
stream_scan_type: StreamScanType,
}

impl ToStreamContext {
pub fn new(emit_on_window_close: bool) -> Self {
Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
}

pub fn new_with_stream_scan_type(
emit_on_window_close: bool,
stream_scan_type: StreamScanType,
) -> Self {
Self {
share_to_stream_map: HashMap::new(),
emit_on_window_close,
stream_scan_type,
}
}

pub fn stream_scan_type(&self) -> StreamScanType {
self.stream_scan_type
}

pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: PlanRef) {
self.share_to_stream_map
.try_insert(plan_node_id, plan_ref)
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,11 @@ impl ToStream for LogicalScan {
)
.into())
} else {
Ok(StreamTableScan::new(self.core.clone()).into())
Ok(StreamTableScan::new_with_stream_scan_type(
self.core.clone(),
ctx.stream_scan_type(),
)
.into())
}
} else {
let (scan, predicate, project_expr) = self.predicate_pull_up();
Expand Down

0 comments on commit d21d42f

Please sign in to comment.