Skip to content

Commit

Permalink
Merge branch 'main' into jianwei/update-cherry
Browse files Browse the repository at this point in the history
  • Loading branch information
huangjw806 authored Feb 1, 2024
2 parents bfc10ed + 82d1277 commit e153ba6
Show file tree
Hide file tree
Showing 25 changed files with 385 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,31 @@ SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33);
query T
SELECT make_timestamptz(-1973, 07, 15, 08, 15, 55.33);
----
-1973-07-15 08:15:55.330+00:00
-1972-07-15 08:15:55.330+00:00

query error Invalid parameter sec: invalid sec
query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamptz(-4, 02, 29, 08, 15, 55.33);

query T
SELECT make_timestamptz(-5, 02, 29, 08, 15, 55.33);
----
-0004-02-29 08:15:55.330+00:00

query error Invalid parameter sec: invalid sec: -55.33
SELECT make_timestamptz(1973, 07, 15, 08, 15, -55.33);

query error Invalid parameter hour, min, sec: invalid time
query error Invalid parameter hour, min, sec: invalid time: 8:-15:55.33
SELECT make_timestamptz(1973, 07, 15, 08, -15, 55.33);

query error Invalid parameter year, month, day: invalid date
query error Invalid parameter year, month, day: invalid date: 1973--7-15
SELECT make_timestamptz(1973, -07, 15, 08, 15, 55.33);

query error Invalid parameter year, month, day: invalid date
query error Invalid parameter year, month, day: invalid date: 1973-6-31
SELECT make_timestamptz(1973, 06, 31, 08, 15, 55.33);

query error Invalid parameter year, month, day: invalid date: 0-6-31
SELECT make_timestamptz(0, 06, 31, 08, 15, 55.33);

statement ok
set TimeZone to 'America/New_York';

Expand Down Expand Up @@ -88,3 +99,62 @@ SELECT make_timestamptz(2013, 7, 15, 8, 15, 23.5, 'America/New_York');

statement ok
set timezone to 'UTC';

query T
SELECT make_date(2024, 1, 26);
----
2024-01-26

query T
SELECT make_date(-2024, 1, 26);
----
2024-01-26 BC

query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_date(-4, 2, 29);

query T
SELECT make_date(-5, 2, 29);
----
0005-02-29 BC

query error Invalid parameter year, month, day: invalid date: 0-7-15
select make_date(0, 7, 15);

query error Invalid parameter year, month, day: invalid date: 2013-2-30
select make_date(2013, 2, 30);

query error Invalid parameter year, month, day: invalid date: 2013-13-1
select make_date(2013, 13, 1);

query error Invalid parameter year, month, day: invalid date: 2013-11--1
select make_date(2013, 11, -1);

query error Invalid parameter hour, min, sec: invalid time: 10:55:100.1
select make_time(10, 55, 100.1);

query T
SELECT make_time(14, 20, 26);
----
14:20:26

query error Invalid parameter hour, min, sec: invalid time: 24:0:2.1
select make_time(24, 0, 2.1);

query T
SELECT make_timestamp(2024, 1, 26, 14, 20, 26);
----
2024-01-26 14:20:26

query T
SELECT make_timestamp(-1973, 07, 15, 08, 15, 55.33);
----
-1972-07-15 08:15:55.330

query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamp(-4, 02, 29, 08, 15, 55.33);

query T
SELECT make_timestamp(-5, 02, 29, 08, 15, 55.33);
----
-0004-02-29 08:15:55.330
4 changes: 4 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3

query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above)
statement ok
create table orders_no_backfill (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) with (
snapshot = 'false'
) from mysql_mytest table 'mytest.orders';

sleep 5s

# table without backfill should not contain historical records
query I
select count(*) from orders_no_backfill
----
0

# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
1 change: 0 additions & 1 deletion e2e_test/source/cdc/mysql_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- USE `my@db`;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
# In sharing cdc mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${table.name:-RW_CDC_Sharing}
provide.transaction.metadata=${transactional:-false}
# Enable transaction metadata by default
provide.transaction.metadata=${transactional:-true}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing}
provide.transaction.metadata=${transactional:-false}
# Enable transaction metadata by default
provide.transaction.metadata=${transactional:-true}
5 changes: 4 additions & 1 deletion proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ message ExprNode {
BITWISE_NOT = 34;
BITWISE_SHIFT_LEFT = 35;
BITWISE_SHIFT_RIGHT = 36;
// date functions
// date/time functions
EXTRACT = 101;
DATE_PART = 102;
TUMBLE_START = 103;
MAKE_DATE = 113;
MAKE_TIME = 114;
MAKE_TIMESTAMP = 115;
// From f64 to timestamp.
// e.g. `select to_timestamp(1672044740.0)`
TO_TIMESTAMP = 104;
Expand Down
5 changes: 4 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,17 @@ message StreamCdcScanNode {
// Strips the primary key columns if they're unnecessary.
repeated uint32 output_indices = 3;

/// The state table used by Backfill operator for persisting internal state
// The state table used by CdcBackfill operator for persisting internal state
catalog.Table state_table = 4;

// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 7;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
);

// Create a `MaterializeExecutor` to write the changes to storage.
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
};

#[derive(Debug)]
Expand All @@ -38,6 +39,10 @@ impl MockExternalTableReader {
}
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
Expand Down Expand Up @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
// same as mysql offset
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
_table_name: SchemaTableName,
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ impl MySqlOffset {
}
}

pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;

pub trait ExternalTableReader {
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String;

async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;

fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult<CdcOffset>;

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader {
}))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -328,6 +322,14 @@ impl MySqlExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
match self {
ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset),
ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset),
ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset),
}
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand All @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}

impl ExternalTableReaderImpl {
pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
match self {
ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
ExternalTableReaderImpl::Postgres(_) => {
PostgresExternalTableReader::get_cdc_offset_parser()
}
ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
}
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -623,6 +627,10 @@ mod tests {
let offset = reader.current_cdc_offset().await.unwrap();
println!("BinlogOffset: {:?}", offset);

let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
18 changes: 10 additions & 8 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio_postgres::NoTls;
use crate::error::ConnectorError;
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader {
Ok(CdcOffset::Postgres(pg_offset))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -165,6 +159,14 @@ impl PostgresExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
Loading

0 comments on commit e153ba6

Please sign in to comment.