diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 773efdd088e3d..10ea6b6e6ec4d 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -43,7 +43,7 @@ use std::fmt::Write; use arrow_buffer::OffsetBuffer; -use chrono::{NaiveDateTime, NaiveTime}; +use chrono::{DateTime, NaiveDateTime, NaiveTime}; use itertools::Itertools; // This is important because we want to use the arrow version specified by the outer mod. @@ -848,11 +848,9 @@ impl FromIntoArrow for Timestamp { fn from_arrow(value: Self::ArrowType) -> Self { Timestamp( - NaiveDateTime::from_timestamp_opt( - (value / 1_000_000) as _, - (value % 1_000_000 * 1000) as _, - ) - .unwrap(), + DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) + .unwrap() + .naive_utc(), ) } diff --git a/src/common/src/array/arrow/arrow_impl_52.rs b/src/common/src/array/arrow/arrow_impl_52.rs index 6d57e475b14f2..70de4104c483c 100644 --- a/src/common/src/array/arrow/arrow_impl_52.rs +++ b/src/common/src/array/arrow/arrow_impl_52.rs @@ -44,7 +44,7 @@ use std::fmt::Write; use arrow_buffer::OffsetBuffer; use arrow_buffer_iceberg::IntervalMonthDayNano; -use chrono::{NaiveDateTime, NaiveTime}; +use chrono::{DateTime, NaiveDateTime, NaiveTime}; use itertools::Itertools; // This is important because we want to use the arrow version specified by the outer mod. @@ -849,11 +849,9 @@ impl FromIntoArrow for Timestamp { fn from_arrow(value: Self::ArrowType) -> Self { Timestamp( - NaiveDateTime::from_timestamp_opt( - (value / 1_000_000) as _, - (value % 1_000_000 * 1000) as _, - ) - .unwrap(), + DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) + .unwrap() + .naive_utc(), ) } diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index c7e57173a3e74..e9f7e83ac9146 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -567,8 +567,8 @@ impl HashKeyDe for Date { impl HashKeySer<'_> for Timestamp { fn serialize_into(self, mut buf: impl BufMut) { - buf.put_i64_ne(self.0.timestamp()); - buf.put_u32_ne(self.0.timestamp_subsec_nanos()); + buf.put_i64_ne(self.0.and_utc().timestamp()); + buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos()); } fn exact_size() -> Option { diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index 7058d36ec6fd5..b064ca1107972 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -21,7 +21,9 @@ use std::io::Write; use std::str::FromStr; use bytes::{Bytes, BytesMut}; -use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; +use chrono::{ + DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday, +}; use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use risingwave_common_estimate_size::ZeroHeapSize; use thiserror::Error; @@ -570,20 +572,21 @@ impl Time { impl Timestamp { pub fn with_secs_nsecs(secs: i64, nsecs: u32) -> Result { Ok(Timestamp::new({ - NaiveDateTime::from_timestamp_opt(secs, nsecs) + DateTime::from_timestamp(secs, nsecs) .ok_or_else(|| InvalidParamsError::datetime(secs, nsecs))? + .naive_utc() })) } /// Although `Timestamp` takes 12 bytes, we drop 4 bytes in protobuf encoding. pub fn to_protobuf(self, output: &mut T) -> ArrayResult { output - .write(&(self.0.timestamp_micros()).to_be_bytes()) + .write(&(self.0.and_utc().timestamp_micros()).to_be_bytes()) .map_err(Into::into) } pub fn get_timestamp_nanos(&self) -> i64 { - self.0.timestamp_nanos_opt().unwrap() + self.0.and_utc().timestamp_nanos_opt().unwrap() } pub fn with_millis(timestamp_millis: i64) -> Result { @@ -599,7 +602,7 @@ impl Timestamp { } pub fn from_timestamp_uncheck(secs: i64, nsecs: u32) -> Self { - Self::new(NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap()) + Self::new(DateTime::from_timestamp(secs, nsecs).unwrap().naive_utc()) } /// Truncate the timestamp to the precision of microseconds. @@ -879,10 +882,10 @@ mod tests { Timestamp::from_str("2022-08-03 10:34:02").unwrap() ); let ts = Timestamp::from_str("0001-11-15 07:35:40.999999").unwrap(); - assert_eq!(ts.0.timestamp_micros(), -62108094259000001); + assert_eq!(ts.0.and_utc().timestamp_micros(), -62108094259000001); let ts = Timestamp::from_str("1969-12-31 23:59:59.999999").unwrap(); - assert_eq!(ts.0.timestamp_micros(), -1); + assert_eq!(ts.0.and_utc().timestamp_micros(), -1); // invalid datetime Date::from_str("1999-01-08AA").unwrap_err(); diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index d793561f5957c..69e727cbde655 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -1001,8 +1001,8 @@ impl ScalarRefImpl<'_> { Self::Interval(v) => v.serialize(ser)?, Self::Date(v) => v.0.num_days_from_ce().serialize(ser)?, Self::Timestamp(v) => { - v.0.timestamp().serialize(&mut *ser)?; - v.0.timestamp_subsec_nanos().serialize(ser)?; + v.0.and_utc().timestamp().serialize(&mut *ser)?; + v.0.and_utc().timestamp_subsec_nanos().serialize(ser)?; } Self::Timestamptz(v) => v.serialize(ser)?, Self::Time(v) => { diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index aa80c15617048..a3da88911ad9a 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -214,9 +214,11 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) { ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf), ScalarRefImpl::Interval(v) => serialize_interval(&v, buf), ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf), - ScalarRefImpl::Timestamp(v) => { - serialize_timestamp(v.0.timestamp(), v.0.timestamp_subsec_nanos(), buf) - } + ScalarRefImpl::Timestamp(v) => serialize_timestamp( + v.0.and_utc().timestamp(), + v.0.and_utc().timestamp_subsec_nanos(), + buf, + ), ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()), ScalarRefImpl::Time(v) => { serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf) diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 4b73c6ac7c80f..6d69460ee1198 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -373,12 +373,14 @@ mod test { } Schema::TimestampMillis => { let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let timestamp_mills = Value::TimestampMillis(datetime.0.timestamp() * 1_000); + let timestamp_mills = + Value::TimestampMillis(datetime.0.and_utc().timestamp() * 1_000); Some(timestamp_mills) } Schema::TimestampMicros => { let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let timestamp_micros = Value::TimestampMicros(datetime.0.timestamp() * 1_000_000); + let timestamp_micros = + Value::TimestampMicros(datetime.0.and_utc().timestamp() * 1_000_000); Some(timestamp_micros) } Schema::Duration => { diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index cfc7f1fe76e61..d1df27263e808 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -98,7 +98,9 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne .unwrap_or(Ok(None)); match res { Ok(val) => val.map(|v| { - ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros())) + ScalarImpl::from(Timestamptz::from_micros( + v.and_utc().timestamp_micros(), + )) }), Err(err) => { log_error!(name, err, "parse column failed"); diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 09704d9192a41..e4a229bb61b98 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -496,7 +496,9 @@ impl JsonParseOptions { .as_str() .unwrap() .parse::() - .map(|naive_utc| Timestamptz::from_micros(naive_utc.0.timestamp_micros())) + .map(|naive_utc| { + Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros()) + }) .map_err(|_| create_error())? .into(), // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`. diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index e1ce9e61b6a1e..4748116609700 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -269,7 +269,7 @@ fn datum_to_json_object( } }, (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { - TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()), + TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()), TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), }, (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index e7e4971dd42c5..1090159fa9ca3 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -190,7 +190,7 @@ impl IcebergSplitEnumerator { Some(snapshot) => snapshot.snapshot_id, None => { // convert unix time to human readable time - let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp); + let time = chrono::DateTime::from_timestamp_millis(timestamp); if time.is_some() { bail!("Cannot find a snapshot older than {}", time.unwrap()); } else { diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index b9aa81b677519..87d76466f2231 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -68,7 +68,7 @@ use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PbSystemParams; use risingwave_pb::user::grant_privilege::PbObject as GrantObject; use risingwave_pb::user::PbUserInfo; -use sea_orm::prelude::DateTime; +use sea_orm::prelude::DateTimeUtc; use sea_orm::ActiveValue::Set; use sea_orm::{ ColumnTrait, ConnectionTrait, DatabaseBackend, DbBackend, EntityTrait, IntoActiveModel, NotSet, @@ -359,12 +359,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an ..Default::default() }; if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) { - obj.initialized_at = - Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + obj.initialized_at = Set(DateTimeUtc::from_timestamp_millis( + epoch.as_unix_millis() as _, + ) + .unwrap() + .naive_utc()); } if let Some(epoch) = object.created_at_epoch.map(Epoch::from) { - obj.created_at = - Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + obj.created_at = Set(DateTimeUtc::from_timestamp_millis( + epoch.as_unix_millis() as _, + ) + .unwrap() + .naive_utc()); } Object::insert(obj).exec(&meta_store_sql.conn).await?; } @@ -390,12 +396,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an ..Default::default() }; if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) { - obj.initialized_at = - Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + obj.initialized_at = Set(DateTimeUtc::from_timestamp_millis( + epoch.as_unix_millis() as _ + ) + .unwrap() + .naive_utc()); } if let Some(epoch) = table.created_at_epoch.map(Epoch::from) { - obj.created_at = - Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + obj.created_at = Set( + DateTimeUtc::from_timestamp_millis(epoch.as_unix_millis() as _) + .unwrap() + .naive_utc(), + ); } Object::insert(obj).exec(&meta_store_sql.conn).await?; } diff --git a/src/expr/impl/src/scalar/extract.rs b/src/expr/impl/src/scalar/extract.rs index 080c41037736f..74a50cf6869ec 100644 --- a/src/expr/impl/src/scalar/extract.rs +++ b/src/expr/impl/src/scalar/extract.rs @@ -65,7 +65,7 @@ fn extract_from_timelike(time: impl Timelike, unit: Unit) -> Decimal { fn extract_from_date(date: Date, unit: &Unit) -> Decimal { match unit { Epoch => { - let epoch = date.0.and_time(NaiveTime::default()).timestamp(); + let epoch = date.0.and_time(NaiveTime::default()).and_utc().timestamp(); epoch.into() } Julian => { @@ -92,11 +92,12 @@ fn extract_from_time(time: Time, unit: &Unit) -> Decimal { fn extract_from_timestamp(timestamp: Timestamp, unit: &Unit) -> Decimal { match unit { Epoch => { - let epoch = timestamp.0.timestamp_micros(); + let epoch = timestamp.0.and_utc().timestamp_micros(); Decimal::from_i128_with_scale(epoch as i128, 6) } Julian => { - let epoch = Decimal::from_i128_with_scale(timestamp.0.timestamp_micros() as i128, 6); + let epoch = + Decimal::from_i128_with_scale(timestamp.0.and_utc().timestamp_micros() as i128, 6); epoch / (24 * 60 * 60).into() + 2_440_588.into() } _ if unit.is_date_unit() => extract_from_datelike(timestamp.0.date(), *unit), diff --git a/src/expr/impl/src/scalar/tumble.rs b/src/expr/impl/src/scalar/tumble.rs index 3e383e59236ee..bf6f32253207a 100644 --- a/src/expr/impl/src/scalar/tumble.rs +++ b/src/expr/impl/src/scalar/tumble.rs @@ -38,7 +38,7 @@ pub fn tumble_start_date(timestamp: Date, window_size: Interval) -> Result timestamp")] pub fn tumble_start_date_time(timestamp: Timestamp, window_size: Interval) -> Result { - let timestamp_micro_second = timestamp.0.timestamp_micros(); + let timestamp_micro_second = timestamp.0.and_utc().timestamp_micros(); let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?; Ok(Timestamp::from_timestamp_uncheck( window_start_micro_second / 1_000_000, @@ -72,7 +72,7 @@ pub fn tumble_start_offset_date_time( window_size: Interval, offset: Interval, ) -> Result { - let timestamp_micro_second = time.0.timestamp_micros(); + let timestamp_micro_second = time.0.and_utc().timestamp_micros(); let window_start_micro_second = get_window_start_with_offset(timestamp_micro_second, window_size, offset)?; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index ac61ceb67b77f..160400b42d2af 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -143,10 +143,10 @@ impl From> for PbTable { .cardinality .map(|cardinality| cardinality.to_protobuf()), initialized_at_epoch: Some( - Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0, ), created_at_epoch: Some( - Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), cleaned_by_watermark: value.0.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. @@ -183,10 +183,10 @@ impl From> for PbSource { connection_id: value.0.connection_id.map(|id| id as _), // todo: using the timestamp from the database directly. initialized_at_epoch: Some( - Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0, ), created_at_epoch: Some( - Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), version: value.0.version as _, optional_associated_table_id: value @@ -221,10 +221,10 @@ impl From> for PbSink { definition: value.0.definition, connection_id: value.0.connection_id.map(|id| id as _), initialized_at_epoch: Some( - Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0, ), created_at_epoch: Some( - Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), db_name: value.0.db_name, sink_from_name: value.0.sink_from_name, @@ -250,10 +250,10 @@ impl From> for PbSubscription { retention_seconds: value.0.retention_seconds as _, definition: value.0.definition, initialized_at_epoch: Some( - Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0, ), created_at_epoch: Some( - Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, @@ -276,10 +276,10 @@ impl From> for PbIndex { index_item: value.0.index_items.to_protobuf(), index_columns_len: value.0.index_columns_len as _, initialized_at_epoch: Some( - Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0, ), created_at_epoch: Some( - Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. initialized_at_cluster_version: value.1.initialized_at_cluster_version,