Skip to content

Commit

Permalink
fix(expr): parse timestamptz without seconds but with offset (#12084)
Browse files Browse the repository at this point in the history
Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
xiangjinwu and StrikeW authored Sep 7, 2023
1 parent 01ce1bb commit f16809d
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 30 deletions.
5 changes: 5 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ disallowed-methods = [
{ path = "num_traits::sign::Signed::is_positive", reason = "This returns true for 0.0 but false for 0." },
{ path = "num_traits::sign::Signed::is_negative", reason = "This returns true for -0.0 but false for 0." },
{ path = "num_traits::sign::Signed::signum", reason = "This returns 1.0 for 0.0 but 0 for 0." },
{ path = "speedate::DateTime::parse_str", reason = "Please use `parse_str_rfc3339` instead." },
{ path = "speedate::DateTime::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." },
{ path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." },
{ path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." },
{ path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." },
]
disallowed-types = [
{ path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/batch/functions/array_concat.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -664,17 +664,17 @@ select array_prepend(1::real, array[1]::real[]);
{1,1}

query T
select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date;
select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date;
----
{"2020-01-02 23:34:56","2020-01-01 00:00:00"}

query T
select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[], '2020-01-01 12:34:56'::timestamp::date);
select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[], '2020-01-01 12:34:56'::timestamp::date);
----
{"2020-01-02 23:34:56","2020-01-01 00:00:00"}

query T
select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[]);
select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[]);
----
{"2020-01-01 00:00:00","2020-01-02 23:34:56"}

Expand Down
13 changes: 13 additions & 0 deletions e2e_test/batch/types/timestamptz_utc.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ select '2022-10-01T12:00:00Z'::timestamp with time zone;
----
2022-10-01 12:00:00+00:00

query T
select '2023-11-05 01:40-07:00'::timestamptz;
----
2023-11-05 08:40:00+00:00

query T
select '2023-11-05 01:40-08:00'::timestamptz;
----
2023-11-05 09:40:00+00:00

statement error
select '0'::timestamptz;

query T
select '2022-10-01 12:00:00+01:00'::timestamp with time zone BETWEEN '2022-10-01T10:59:59Z' AND '2022-10-01T11:00:01Z';
----
Expand Down
5 changes: 5 additions & 0 deletions e2e_test/source/cdc/cdc.check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ query I
select count(*) from person_rw;
----
3

query I
select count(*) from tt3_rw;
----
2
32 changes: 32 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,38 @@ create table orders_2 (
server.id = '5088'
);

statement error
create table tt3_rw (
v1 int,
v2 timestamp,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
table.name = 'tt3',
server.id = '5089'
);

statement ok
create table tt3_rw (
v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
table.name = 'tt3',
server.id = '5089'
);

# Some columns missing and reordered (postgres-cdc)
statement ok
create table shipments_2 (
Expand Down
4 changes: 4 additions & 0 deletions e2e_test/source/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ VALUES (1,1,'no'),

CREATE USER 'dbz'@'%' IDENTIFIED BY '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%';

CREATE TABLE tt3 (v1 int primary key, v2 timestamp);
INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22');
INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22');
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam
return val == Data.DataType.TypeName.DECIMAL_VALUE;
case "varchar":
return val == Data.DataType.TypeName.VARCHAR_VALUE;
case "timestamp":
return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE;
default:
return true; // true for other uncovered types
}
Expand Down
12 changes: 8 additions & 4 deletions src/common/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn str_to_timestamp(elem: &str) -> Result<Timestamp> {

#[inline]
pub fn parse_naive_date(s: &str) -> Result<NaiveDate> {
let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?;
let res = SpeedDate::parse_str_rfc3339(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?;
Ok(Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32).0)
}

Expand All @@ -63,7 +63,10 @@ pub fn parse_naive_time(s: &str) -> Result<NaiveTime> {

#[inline]
pub fn parse_naive_datetime(s: &str) -> Result<NaiveDateTime> {
if let Ok(res) = SpeedDateTime::parse_str(s) {
if let Ok(res) = SpeedDateTime::parse_str_rfc3339(s) {
if res.time.tz_offset.is_some() {
return Err(PARSE_ERROR_STR_TO_TIMESTAMP.into());
}
Ok(Date::from_ymd_uncheck(
res.date.year as i32,
res.date.month as u32,
Expand All @@ -77,7 +80,8 @@ pub fn parse_naive_datetime(s: &str) -> Result<NaiveDateTime> {
)
.0)
} else {
let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?;
let res = SpeedDate::parse_str_rfc3339(s)
.map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?;
Ok(
Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32)
.and_hms_micro_uncheck(0, 0, 0, 0)
Expand Down Expand Up @@ -238,7 +242,7 @@ mod tests {
str_to_timestamp("1999-01-08 04:02").unwrap();
str_to_timestamp("1999-01-08 04:05:06").unwrap();
assert_eq!(
str_to_timestamp("2022-08-03T10:34:02Z").unwrap(),
str_to_timestamp("2022-08-03T10:34:02").unwrap(),
str_to_timestamp("2022-08-03 10:34:02").unwrap()
);
str_to_date("1999-01-08").unwrap();
Expand Down
30 changes: 27 additions & 3 deletions src/common/src/types/timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::io::Write;
use std::str::FromStr;

use bytes::{Bytes, BytesMut};
use chrono::{DateTime, TimeZone, Utc};
use chrono::{TimeZone, Utc};
use chrono_tz::Tz;
use postgres_types::ToSql;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -148,8 +148,32 @@ impl FromStr for Timestamptz {
"Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)"
, "\nFor example: '2021-04-01 00:00:00+00:00'"
);
let ret = s.parse::<DateTime<Utc>>().map_err(|_| ERROR_MSG)?;
Ok(Timestamptz(ret.timestamp_micros()))
// Try `speedate` first
// * It is also used by `str_to_{date,time,timestamp}`
// * It can parse without seconds `2006-01-02 15:04-07:00`
let ret = match speedate::DateTime::parse_str_rfc3339(s) {
Ok(r) => r,
Err(_) => {
// Supplement with `chrono` for existing cases:
// * Extra space before offset `2006-01-02 15:04:05 -07:00`
return s
.parse::<chrono::DateTime<Utc>>()
.map(|t| Timestamptz(t.timestamp_micros()))
.map_err(|_| ERROR_MSG);
}
};
if ret.time.tz_offset.is_none() {
return Err(ERROR_MSG);
}
if ret.date.year < 1600 {
return Err("parsing timestamptz with year < 1600 unsupported");
}
Ok(Timestamptz(
ret.timestamp_tz()
.checked_mul(1000000)
.and_then(|us| us.checked_add(ret.time.microsecond.into()))
.ok_or(ERROR_MSG)?,
))
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ mod tests {
SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamp, ColumnId::from(11)),
SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
]
}
Expand Down Expand Up @@ -333,9 +333,9 @@ mod tests {
assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"1970-01-01T00:00:00".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"1970-01-01T00:00:01".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
"1970-01-01T00:00:01Z".parse().unwrap()
))));
assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
}

Expand Down Expand Up @@ -368,9 +368,9 @@ mod tests {
assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"1970-01-01T00:00:00".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"1970-01-01T00:00:01".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
"1970-01-01T00:00:01Z".parse().unwrap()
))));
assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
}

Expand Down Expand Up @@ -404,9 +404,9 @@ mod tests {
assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"5138-11-16T09:46:39".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"2038-01-09T03:14:07".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
"2038-01-09T03:14:07Z".parse().unwrap()
))));
assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
}

Expand Down Expand Up @@ -441,9 +441,9 @@ mod tests {
assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"5138-11-16T09:46:39".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
"2038-01-09T03:14:07".parse().unwrap()
)))));
assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
"2038-01-09T03:14:07Z".parse().unwrap()
))));
assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
}

Expand Down
10 changes: 3 additions & 7 deletions src/expr/src/vector_op/timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,13 @@ mod tests {

#[test]
fn test_timestamptz_to_and_from_string() {
let str1 = "0001-11-15 15:35:40.999999+08:00";
let str1 = "1600-11-15 15:35:40.999999+08:00";
let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap();
assert_eq!(timestamptz1.timestamp_micros(), -62108094259000001);
assert_eq!(timestamptz1.timestamp_micros(), -11648507059000001);

let mut writer = String::new();
timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00");

let mut writer = String::new();
timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00");
assert_eq!(writer, "1600-11-15 07:35:40.999999+00:00");

let str2 = "1969-12-31 23:59:59.999999+00:00";
let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();
Expand Down

0 comments on commit f16809d

Please sign in to comment.