From f16809d114f2ed96fc64b08c4bed94840c0b43ad Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Thu, 7 Sep 2023 13:58:38 +0800 Subject: [PATCH] fix(expr): parse `timestamptz` without seconds but with offset (#12084) Co-authored-by: StrikeW --- clippy.toml | 5 +++ .../batch/functions/array_concat.slt.part | 6 ++-- e2e_test/batch/types/timestamptz_utc.slt.part | 13 ++++++++ e2e_test/source/cdc/cdc.check.slt | 5 +++ e2e_test/source/cdc/cdc.load.slt | 32 +++++++++++++++++++ e2e_test/source/cdc/mysql_cdc.sql | 4 +++ .../source/common/MySqlValidator.java | 2 ++ src/common/src/cast/mod.rs | 12 ++++--- src/common/src/types/timestamptz.rs | 30 +++++++++++++++-- .../src/parser/debezium/simd_json_parser.rs | 26 +++++++-------- src/expr/src/vector_op/timestamptz.rs | 10 ++---- 11 files changed, 115 insertions(+), 30 deletions(-) diff --git a/clippy.toml b/clippy.toml index 465ccb68ced30..bcc3c789ae35a 100644 --- a/clippy.toml +++ b/clippy.toml @@ -8,6 +8,11 @@ disallowed-methods = [ { path = "num_traits::sign::Signed::is_positive", reason = "This returns true for 0.0 but false for 0." }, { path = "num_traits::sign::Signed::is_negative", reason = "This returns true for -0.0 but false for 0." }, { path = "num_traits::sign::Signed::signum", reason = "This returns 1.0 for 0.0 but 0 for 0." }, + { path = "speedate::DateTime::parse_str", reason = "Please use `parse_str_rfc3339` instead." }, + { path = "speedate::DateTime::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." }, + { path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." }, + { path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." }, + { path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." }, ] disallowed-types = [ { path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." }, diff --git a/e2e_test/batch/functions/array_concat.slt.part b/e2e_test/batch/functions/array_concat.slt.part index b2cee32de28f5..7c853262c587d 100644 --- a/e2e_test/batch/functions/array_concat.slt.part +++ b/e2e_test/batch/functions/array_concat.slt.part @@ -664,17 +664,17 @@ select array_prepend(1::real, array[1]::real[]); {1,1} query T -select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date; +select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date; ---- {"2020-01-02 23:34:56","2020-01-01 00:00:00"} query T -select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[], '2020-01-01 12:34:56'::timestamp::date); +select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[], '2020-01-01 12:34:56'::timestamp::date); ---- {"2020-01-02 23:34:56","2020-01-01 00:00:00"} query T -select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[]); +select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[]); ---- {"2020-01-01 00:00:00","2020-01-02 23:34:56"} diff --git a/e2e_test/batch/types/timestamptz_utc.slt.part b/e2e_test/batch/types/timestamptz_utc.slt.part index a75529b30b1cb..5a0fe6ebd13cc 100644 --- a/e2e_test/batch/types/timestamptz_utc.slt.part +++ b/e2e_test/batch/types/timestamptz_utc.slt.part @@ -42,6 +42,19 @@ select '2022-10-01T12:00:00Z'::timestamp with time zone; ---- 2022-10-01 12:00:00+00:00 +query T +select '2023-11-05 01:40-07:00'::timestamptz; +---- +2023-11-05 08:40:00+00:00 + +query T +select '2023-11-05 01:40-08:00'::timestamptz; +---- +2023-11-05 09:40:00+00:00 + +statement error +select '0'::timestamptz; + query T select '2022-10-01 12:00:00+01:00'::timestamp with time zone BETWEEN '2022-10-01T10:59:59Z' AND '2022-10-01T11:00:01Z'; ---- diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 876d41d9adcbd..f9feed9429d2c 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -46,3 +46,8 @@ query I select count(*) from person_rw; ---- 3 + +query I +select count(*) from tt3_rw; +---- +2 diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 39e99b39df4fb..4cfacf120648e 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -110,6 +110,38 @@ create table orders_2 ( server.id = '5088' ); +statement error +create table tt3_rw ( + v1 int, + v2 timestamp, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'my@db', + table.name = 'tt3', + server.id = '5089' +); + +statement ok +create table tt3_rw ( + v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'my@db', + table.name = 'tt3', + server.id = '5089' +); + # Some columns missing and reordered (postgres-cdc) statement ok create table shipments_2 ( diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 1e8ca93ddf864..89e5274ac3ee1 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -51,3 +51,7 @@ VALUES (1,1,'no'), CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; + +CREATE TABLE tt3 (v1 int primary key, v2 timestamp); +INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); +INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22'); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 3155e1848446a..54094bc21862d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -230,6 +230,8 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam return val == Data.DataType.TypeName.DECIMAL_VALUE; case "varchar": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "timestamp": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; default: return true; // true for other uncovered types } diff --git a/src/common/src/cast/mod.rs b/src/common/src/cast/mod.rs index fdf3c7e598da2..82c69984ec0ea 100644 --- a/src/common/src/cast/mod.rs +++ b/src/common/src/cast/mod.rs @@ -43,7 +43,7 @@ pub fn str_to_timestamp(elem: &str) -> Result { #[inline] pub fn parse_naive_date(s: &str) -> Result { - let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?; + let res = SpeedDate::parse_str_rfc3339(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?; Ok(Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32).0) } @@ -63,7 +63,10 @@ pub fn parse_naive_time(s: &str) -> Result { #[inline] pub fn parse_naive_datetime(s: &str) -> Result { - if let Ok(res) = SpeedDateTime::parse_str(s) { + if let Ok(res) = SpeedDateTime::parse_str_rfc3339(s) { + if res.time.tz_offset.is_some() { + return Err(PARSE_ERROR_STR_TO_TIMESTAMP.into()); + } Ok(Date::from_ymd_uncheck( res.date.year as i32, res.date.month as u32, @@ -77,7 +80,8 @@ pub fn parse_naive_datetime(s: &str) -> Result { ) .0) } else { - let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?; + let res = SpeedDate::parse_str_rfc3339(s) + .map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?; Ok( Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32) .and_hms_micro_uncheck(0, 0, 0, 0) @@ -238,7 +242,7 @@ mod tests { str_to_timestamp("1999-01-08 04:02").unwrap(); str_to_timestamp("1999-01-08 04:05:06").unwrap(); assert_eq!( - str_to_timestamp("2022-08-03T10:34:02Z").unwrap(), + str_to_timestamp("2022-08-03T10:34:02").unwrap(), str_to_timestamp("2022-08-03 10:34:02").unwrap() ); str_to_date("1999-01-08").unwrap(); diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 0d9af9a5e3d3f..1f9b962c9d376 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -16,7 +16,7 @@ use std::io::Write; use std::str::FromStr; use bytes::{Bytes, BytesMut}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use chrono_tz::Tz; use postgres_types::ToSql; use serde::{Deserialize, Serialize}; @@ -148,8 +148,32 @@ impl FromStr for Timestamptz { "Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)" , "\nFor example: '2021-04-01 00:00:00+00:00'" ); - let ret = s.parse::>().map_err(|_| ERROR_MSG)?; - Ok(Timestamptz(ret.timestamp_micros())) + // Try `speedate` first + // * It is also used by `str_to_{date,time,timestamp}` + // * It can parse without seconds `2006-01-02 15:04-07:00` + let ret = match speedate::DateTime::parse_str_rfc3339(s) { + Ok(r) => r, + Err(_) => { + // Supplement with `chrono` for existing cases: + // * Extra space before offset `2006-01-02 15:04:05 -07:00` + return s + .parse::>() + .map(|t| Timestamptz(t.timestamp_micros())) + .map_err(|_| ERROR_MSG); + } + }; + if ret.time.tz_offset.is_none() { + return Err(ERROR_MSG); + } + if ret.date.year < 1600 { + return Err("parsing timestamptz with year < 1600 unsupported"); + } + Ok(Timestamptz( + ret.timestamp_tz() + .checked_mul(1000000) + .and_then(|us| us.checked_add(ret.time.microsecond.into())) + .ok_or(ERROR_MSG)?, + )) } } diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 42c3e82c65e35..0bfd69a7bb6fe 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -298,7 +298,7 @@ mod tests { SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)), SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)), SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)), - SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamp, ColumnId::from(11)), + SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)), SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)), ] } @@ -333,9 +333,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "1970-01-01T00:00:00".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:01".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "1970-01-01T00:00:01Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); } @@ -368,9 +368,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "1970-01-01T00:00:00".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:01".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "1970-01-01T00:00:01Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); } @@ -404,9 +404,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "5138-11-16T09:46:39".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "2038-01-09T03:14:07".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "2038-01-09T03:14:07Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}"); } @@ -441,9 +441,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "5138-11-16T09:46:39".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "2038-01-09T03:14:07".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "2038-01-09T03:14:07Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}"); } diff --git a/src/expr/src/vector_op/timestamptz.rs b/src/expr/src/vector_op/timestamptz.rs index ca24200300244..716a521f742e4 100644 --- a/src/expr/src/vector_op/timestamptz.rs +++ b/src/expr/src/vector_op/timestamptz.rs @@ -276,17 +276,13 @@ mod tests { #[test] fn test_timestamptz_to_and_from_string() { - let str1 = "0001-11-15 15:35:40.999999+08:00"; + let str1 = "1600-11-15 15:35:40.999999+08:00"; let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap(); - assert_eq!(timestamptz1.timestamp_micros(), -62108094259000001); + assert_eq!(timestamptz1.timestamp_micros(), -11648507059000001); let mut writer = String::new(); timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap(); - assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00"); - - let mut writer = String::new(); - timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap(); - assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00"); + assert_eq!(writer, "1600-11-15 07:35:40.999999+00:00"); let str2 = "1969-12-31 23:59:59.999999+00:00"; let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();