diff --git a/e2e_test/batch/basic/make_timestamptz.slt.part b/e2e_test/batch/basic/make_time.slt.part similarity index 52% rename from e2e_test/batch/basic/make_timestamptz.slt.part rename to e2e_test/batch/basic/make_time.slt.part index 99f5d1369327c..7a11b837c4fdb 100644 --- a/e2e_test/batch/basic/make_timestamptz.slt.part +++ b/e2e_test/batch/basic/make_time.slt.part @@ -9,20 +9,31 @@ SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33); query T SELECT make_timestamptz(-1973, 07, 15, 08, 15, 55.33); ---- --1973-07-15 08:15:55.330+00:00 +-1972-07-15 08:15:55.330+00:00 -query error Invalid parameter sec: invalid sec +query error Invalid parameter year, month, day: invalid date: -3-2-29 +SELECT make_timestamptz(-4, 02, 29, 08, 15, 55.33); + +query T +SELECT make_timestamptz(-5, 02, 29, 08, 15, 55.33); +---- +-0004-02-29 08:15:55.330+00:00 + +query error Invalid parameter sec: invalid sec: -55.33 SELECT make_timestamptz(1973, 07, 15, 08, 15, -55.33); -query error Invalid parameter hour, min, sec: invalid time +query error Invalid parameter hour, min, sec: invalid time: 8:-15:55.33 SELECT make_timestamptz(1973, 07, 15, 08, -15, 55.33); -query error Invalid parameter year, month, day: invalid date +query error Invalid parameter year, month, day: invalid date: 1973--7-15 SELECT make_timestamptz(1973, -07, 15, 08, 15, 55.33); -query error Invalid parameter year, month, day: invalid date +query error Invalid parameter year, month, day: invalid date: 1973-6-31 SELECT make_timestamptz(1973, 06, 31, 08, 15, 55.33); +query error Invalid parameter year, month, day: invalid date: 0-6-31 +SELECT make_timestamptz(0, 06, 31, 08, 15, 55.33); + statement ok set TimeZone to 'America/New_York'; @@ -88,3 +99,62 @@ SELECT make_timestamptz(2013, 7, 15, 8, 15, 23.5, 'America/New_York'); statement ok set timezone to 'UTC'; + +query T +SELECT make_date(2024, 1, 26); +---- +2024-01-26 + +query T +SELECT make_date(-2024, 1, 26); +---- +2024-01-26 BC + +query error Invalid parameter year, month, day: invalid date: -3-2-29 +SELECT make_date(-4, 2, 29); + +query T +SELECT make_date(-5, 2, 29); +---- +0005-02-29 BC + +query error Invalid parameter year, month, day: invalid date: 0-7-15 +select make_date(0, 7, 15); + +query error Invalid parameter year, month, day: invalid date: 2013-2-30 +select make_date(2013, 2, 30); + +query error Invalid parameter year, month, day: invalid date: 2013-13-1 +select make_date(2013, 13, 1); + +query error Invalid parameter year, month, day: invalid date: 2013-11--1 +select make_date(2013, 11, -1); + +query error Invalid parameter hour, min, sec: invalid time: 10:55:100.1 +select make_time(10, 55, 100.1); + +query T +SELECT make_time(14, 20, 26); +---- +14:20:26 + +query error Invalid parameter hour, min, sec: invalid time: 24:0:2.1 +select make_time(24, 0, 2.1); + +query T +SELECT make_timestamp(2024, 1, 26, 14, 20, 26); +---- +2024-01-26 14:20:26 + +query T +SELECT make_timestamp(-1973, 07, 15, 08, 15, 55.33); +---- +-1972-07-15 08:15:55.330 + +query error Invalid parameter year, month, day: invalid date: -3-2-29 +SELECT make_timestamp(-4, 02, 29, 08, 15, 55.33); + +query T +SELECT make_timestamp(-5, 02, 29, 08, 15, 55.33); +---- +-0004-02-29 08:15:55.330 diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index af62d591d6feb..5a97a21ba973a 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3 102 RW 12V car battery 103 RW 12-pack of drill bits with sizes ranging from #40 to #3 +query TTTT +select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id; +---- +2022-12-01 15:08:22 Sam 110 0 \ No newline at end of file diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 9e4c1bac6a139..d4b50ed4db6d6 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t system ok mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');" +system ok +mysql --protocol=tcp -u root mytest -e "FLUSH LOGS" + +# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above) +statement ok +create table orders_no_backfill ( + order_id int, + order_date timestamp, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) with ( + snapshot = 'false' +) from mysql_mytest table 'mytest.orders'; + sleep 5s +# table without backfill should not contain historical records +query I +select count(*) from orders_no_backfill +---- +0 + # check ingestion results query I SELECT * from products_test_cnt diff --git a/e2e_test/source/cdc/mysql_init_data.sql b/e2e_test/source/cdc/mysql_init_data.sql index e954b74aaf500..d01165088eed1 100644 --- a/e2e_test/source/cdc/mysql_init_data.sql +++ b/e2e_test/source/cdc/mysql_init_data.sql @@ -1,4 +1,3 @@ --- USE `my@db`; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties index 4c74b48bbf174..8b1d571082f6e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties @@ -22,4 +22,5 @@ heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000} # In sharing cdc mode, we will subscribe to multiple tables in the given database, # so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. name=${hostname}:${port}:${database.name}.${table.name:-RW_CDC_Sharing} -provide.transaction.metadata=${transactional:-false} +# Enable transaction metadata by default +provide.transaction.metadata=${transactional:-true} diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties index 7f9785b7a34b1..8d0284d03892e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties @@ -20,4 +20,5 @@ heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} # In sharing cdc source mode, we will subscribe to multiple tables in the given database, # so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} -provide.transaction.metadata=${transactional:-false} +# Enable transaction metadata by default +provide.transaction.metadata=${transactional:-true} diff --git a/proto/expr.proto b/proto/expr.proto index 7ab48a405d139..909314322838f 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -49,10 +49,13 @@ message ExprNode { BITWISE_NOT = 34; BITWISE_SHIFT_LEFT = 35; BITWISE_SHIFT_RIGHT = 36; - // date functions + // date/time functions EXTRACT = 101; DATE_PART = 102; TUMBLE_START = 103; + MAKE_DATE = 113; + MAKE_TIME = 114; + MAKE_TIMESTAMP = 115; // From f64 to timestamp. // e.g. `select to_timestamp(1672044740.0)` TO_TIMESTAMP = 104; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 29d9fd3d59c40..d5d0ad959714a 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -560,7 +560,7 @@ message StreamCdcScanNode { // Strips the primary key columns if they're unnecessary. repeated uint32 output_indices = 3; - /// The state table used by Backfill operator for persisting internal state + // The state table used by CdcBackfill operator for persisting internal state catalog.Table state_table = 4; // The external table that will be backfilled for CDC. @@ -568,6 +568,9 @@ message StreamCdcScanNode { // The rate limit for the stream cdc scan node. optional uint32 rate_limit = 6; + + // Whether skip the backfill and only consume from upstream. + bool disable_backfill = 7; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 2f1feec23cd46..7fc3641770a9e 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { Arc::new(StreamingMetrics::unused()), state_table, 4, // 4 rows in a snapshot chunk + false, ); // Create a `MaterializeExecutor` to write the changes to storage. diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index ef482a09cedb4..2c8ee00f67af9 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl; use crate::error::ConnectorError; use crate::source::cdc::external::{ - CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset, + SchemaTableName, }; #[derive(Debug)] @@ -38,6 +39,10 @@ impl MockExternalTableReader { } } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default()))) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner(&self) { let snap_idx = self @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader { } } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - // same as mysql offset - Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, _table_name: SchemaTableName, diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index d520522440840..1d0c0e3974404 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -208,13 +208,13 @@ impl MySqlOffset { } } +pub type CdcOffsetParseFunc = Box ConnectorResult + Send>; + pub trait ExternalTableReader { fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String; async fn current_cdc_offset(&self) -> ConnectorResult; - fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult; - fn snapshot_read( &self, table_name: SchemaTableName, @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader { })) } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -328,6 +322,14 @@ impl MySqlExternalTableReader { }) } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) + }) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl { } } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - match self { - ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset), - ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset), - ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset), - } - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl { } impl ExternalTableReaderImpl { + pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc { + match self { + ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(), + ExternalTableReaderImpl::Postgres(_) => { + PostgresExternalTableReader::get_cdc_offset_parser() + } + ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(), + } + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -623,6 +627,10 @@ mod tests { let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); + let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; + let parser = MySqlExternalTableReader::get_cdc_offset_parser(); + println!("parsed offset: {:?}", parser(off0_str).unwrap()); + let table_name = SchemaTableName { schema_name: "mytest".to_string(), table_name: "t1".to_string(), diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index f940e34157532..036e62abfe129 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -30,8 +30,8 @@ use tokio_postgres::NoTls; use crate::error::ConnectorError; use crate::parser::postgres_row_to_owned_row; use crate::source::cdc::external::{ - CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader, - SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig, + ExternalTableReader, SchemaTableName, }; #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader { Ok(CdcOffset::Postgres(pg_offset)) } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -165,6 +159,14 @@ impl PostgresExternalTableReader { }) } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset( + offset, + )?)) + }) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b3a2bc6554c60..ae9490bca3c56 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode"; pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill"; pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable"; +// User can set snapshot='false' to disable cdc backfill +pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot"; pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; diff --git a/src/expr/impl/src/scalar/make_time.rs b/src/expr/impl/src/scalar/make_time.rs new file mode 100644 index 0000000000000..add8759299197 --- /dev/null +++ b/src/expr/impl/src/scalar/make_time.rs @@ -0,0 +1,160 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use risingwave_common::types::{Date, FloatExt, Time, Timestamp, Timestamptz, F64}; +use risingwave_expr::expr_context::TIME_ZONE; +use risingwave_expr::{capture_context, function, ExprError, Result}; + +use crate::scalar::timestamptz::timestamp_at_time_zone; + +pub fn make_naive_date(mut year: i32, month: i32, day: i32) -> Result { + if year == 0 { + return Err(ExprError::InvalidParam { + name: "year, month, day", + reason: format!("invalid date: {}-{}-{}", year, month, day).into(), + }); + } + if year < 0 { + year += 1 + } + NaiveDate::from_ymd_opt(year, month as u32, day as u32).ok_or_else(|| ExprError::InvalidParam { + name: "year, month, day", + reason: format!("invalid date: {}-{}-{}", year, month, day).into(), + }) +} + +fn make_naive_time(hour: i32, min: i32, sec: F64) -> Result { + if !sec.is_finite() || sec.0.is_sign_negative() { + return Err(ExprError::InvalidParam { + name: "sec", + reason: format!("invalid sec: {}", sec).into(), + }); + } + let sec_u32 = sec.0.trunc() as u32; + let microsecond_u32 = ((sec.0 - sec.0.trunc()) * 1_000_000.0).round_ties_even() as u32; + NaiveTime::from_hms_micro_opt(hour as u32, min as u32, sec_u32, microsecond_u32).ok_or_else( + || ExprError::InvalidParam { + name: "hour, min, sec", + reason: format!("invalid time: {}:{}:{}", hour, min, sec).into(), + }, + ) +} + +// year int, month int, day int +#[function("make_date(int4, int4, int4) -> date")] +pub fn make_date(year: i32, month: i32, day: i32) -> Result { + Ok(Date(make_naive_date(year, month, day)?)) +} + +// hour int, min int, sec double precision +#[function("make_time(int4, int4, float8) -> time")] +pub fn make_time(hour: i32, min: i32, sec: F64) -> Result