Skip to content

Commit

Permalink
Merge branch 'release-1.6' into auto-release-1.6-20ac00a6dadc7309d87f…
Browse files Browse the repository at this point in the history
…2787247b72739e06087d
  • Loading branch information
lmatz authored Feb 1, 2024
2 parents 7c390da + 8569104 commit da9408b
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 68 deletions.
4 changes: 4 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3

query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above)
statement ok
create table orders_no_backfill (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) with (
snapshot = 'false'
) from mysql_mytest table 'mytest.orders';

sleep 5s

# table without backfill should not contain historical records
query I
select count(*) from orders_no_backfill
----
0

# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
1 change: 0 additions & 1 deletion e2e_test/source/cdc/mysql_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- USE `my@db`;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
Expand Down
5 changes: 4 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,17 @@ message StreamCdcScanNode {
// Strips the primary key columns if they're unnecessary.
repeated uint32 output_indices = 3;

/// The state table used by Backfill operator for persisting internal state
// The state table used by CdcBackfill operator for persisting internal state
catalog.Table state_table = 4;

// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 7;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
);

// Create a `MaterializeExecutor` to write the changes to storage.
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
};

#[derive(Debug)]
Expand All @@ -38,6 +39,10 @@ impl MockExternalTableReader {
}
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
Expand Down Expand Up @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
// same as mysql offset
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
_table_name: SchemaTableName,
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ impl MySqlOffset {
}
}

pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;

pub trait ExternalTableReader {
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String;

async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;

fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult<CdcOffset>;

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader {
}))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -328,6 +322,14 @@ impl MySqlExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
match self {
ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset),
ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset),
ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset),
}
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand All @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}

impl ExternalTableReaderImpl {
pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
match self {
ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
ExternalTableReaderImpl::Postgres(_) => {
PostgresExternalTableReader::get_cdc_offset_parser()
}
ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
}
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -623,6 +627,10 @@ mod tests {
let offset = reader.current_cdc_offset().await.unwrap();
println!("BinlogOffset: {:?}", offset);

let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
18 changes: 10 additions & 8 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio_postgres::NoTls;
use crate::error::ConnectorError;
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader {
Ok(CdcOffset::Postgres(pg_offset))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -165,6 +159,14 @@ impl PostgresExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.k, stream._row_id) }
└─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] }
├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: HashShard(stream.k) }
│ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
│ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) }
│ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) }
├─StreamExchange { dist: HashShard(stream.k) }
│ └─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: HashShard(stream.k) }
│ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
│ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) }
│ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) }
└─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) }
- name: multi-way temporal join with different keys
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -33,6 +34,7 @@ use risingwave_connector::source;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -818,10 +820,20 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct CdcScan {
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub disable_backfill: bool,
}

impl CdcScan {
Expand Down Expand Up @@ -102,12 +104,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
)
.into()
}
Expand Down Expand Up @@ -94,6 +96,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
)
.into()
}
Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,22 +1049,22 @@ impl LogicalJoin {
let lookup_prefix_len = reorder_idx.len();
let predicate = predicate.reorder(&reorder_idx);

let left = if dist_key_in_order_key_pos.is_empty() {
self.left()
.to_stream_with_dist_required(&RequiredDist::single(), ctx)?
let required_dist = if dist_key_in_order_key_pos.is_empty() {
RequiredDist::single()
} else {
let left_eq_indexes = predicate.left_eq_indexes();
let left_dist_key = dist_key_in_order_key_pos
.iter()
.map(|pos| left_eq_indexes[*pos])
.collect_vec();

self.left().to_stream_with_dist_required(
&RequiredDist::shard_by_key(self.left().schema().len(), &left_dist_key),
ctx,
)?
RequiredDist::hash_shard(&left_dist_key)
};

let left = self.left().to_stream(ctx)?;
// Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange.
let left = required_dist.enforce(left, &Order::any());

if !left.append_only() {
return Err(RwError::from(ErrorCode::NotSupported(
"Temporal join requires an append-only left input".into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
Loading

0 comments on commit da9408b

Please sign in to comment.