Skip to content

Commit

Permalink
fix deprecate interface for chrono
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jun 17, 2024
1 parent 4582b7c commit 202a7ea
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 56 deletions.
10 changes: 4 additions & 6 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
use std::fmt::Write;

use arrow_buffer::OffsetBuffer;
use chrono::{NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;

// This is important because we want to use the arrow version specified by the outer mod.
Expand Down Expand Up @@ -848,11 +848,9 @@ impl FromIntoArrow for Timestamp {

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamp(
NaiveDateTime::from_timestamp_opt(
(value / 1_000_000) as _,
(value % 1_000_000 * 1000) as _,
)
.unwrap(),
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _)
.unwrap()
.naive_utc(),
)
}

Expand Down
10 changes: 4 additions & 6 deletions src/common/src/array/arrow/arrow_impl_52.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::fmt::Write;

use arrow_buffer::OffsetBuffer;
use arrow_buffer_iceberg::IntervalMonthDayNano;
use chrono::{NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;

// This is important because we want to use the arrow version specified by the outer mod.
Expand Down Expand Up @@ -849,11 +849,9 @@ impl FromIntoArrow for Timestamp {

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamp(
NaiveDateTime::from_timestamp_opt(
(value / 1_000_000) as _,
(value % 1_000_000 * 1000) as _,
)
.unwrap(),
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _)
.unwrap()
.naive_utc(),
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ impl HashKeyDe for Date {

impl HashKeySer<'_> for Timestamp {
fn serialize_into(self, mut buf: impl BufMut) {
buf.put_i64_ne(self.0.timestamp());
buf.put_u32_ne(self.0.timestamp_subsec_nanos());
buf.put_i64_ne(self.0.and_utc().timestamp());
buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos());
}

fn exact_size() -> Option<usize> {
Expand Down
17 changes: 10 additions & 7 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::io::Write;
use std::str::FromStr;

use bytes::{Bytes, BytesMut};
use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday};
use chrono::{
DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday,
};
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use thiserror::Error;
Expand Down Expand Up @@ -570,20 +572,21 @@ impl Time {
impl Timestamp {
pub fn with_secs_nsecs(secs: i64, nsecs: u32) -> Result<Self> {
Ok(Timestamp::new({
NaiveDateTime::from_timestamp_opt(secs, nsecs)
DateTime::from_timestamp(secs, nsecs)
.ok_or_else(|| InvalidParamsError::datetime(secs, nsecs))?
.naive_utc()
}))
}

/// Although `Timestamp` takes 12 bytes, we drop 4 bytes in protobuf encoding.
pub fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
output
.write(&(self.0.timestamp_micros()).to_be_bytes())
.write(&(self.0.and_utc().timestamp_micros()).to_be_bytes())
.map_err(Into::into)
}

pub fn get_timestamp_nanos(&self) -> i64 {
self.0.timestamp_nanos_opt().unwrap()
self.0.and_utc().timestamp_nanos_opt().unwrap()
}

pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
Expand All @@ -599,7 +602,7 @@ impl Timestamp {
}

pub fn from_timestamp_uncheck(secs: i64, nsecs: u32) -> Self {
Self::new(NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap())
Self::new(DateTime::from_timestamp(secs, nsecs).unwrap().naive_utc())
}

/// Truncate the timestamp to the precision of microseconds.
Expand Down Expand Up @@ -879,10 +882,10 @@ mod tests {
Timestamp::from_str("2022-08-03 10:34:02").unwrap()
);
let ts = Timestamp::from_str("0001-11-15 07:35:40.999999").unwrap();
assert_eq!(ts.0.timestamp_micros(), -62108094259000001);
assert_eq!(ts.0.and_utc().timestamp_micros(), -62108094259000001);

let ts = Timestamp::from_str("1969-12-31 23:59:59.999999").unwrap();
assert_eq!(ts.0.timestamp_micros(), -1);
assert_eq!(ts.0.and_utc().timestamp_micros(), -1);

// invalid datetime
Date::from_str("1999-01-08AA").unwrap_err();
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,8 @@ impl ScalarRefImpl<'_> {
Self::Interval(v) => v.serialize(ser)?,
Self::Date(v) => v.0.num_days_from_ce().serialize(ser)?,
Self::Timestamp(v) => {
v.0.timestamp().serialize(&mut *ser)?;
v.0.timestamp_subsec_nanos().serialize(ser)?;
v.0.and_utc().timestamp().serialize(&mut *ser)?;
v.0.and_utc().timestamp_subsec_nanos().serialize(ser)?;
}
Self::Timestamptz(v) => v.serialize(ser)?,
Self::Time(v) => {
Expand Down
8 changes: 5 additions & 3 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
ScalarRefImpl::Timestamp(v) => {
serialize_timestamp(v.0.timestamp(), v.0.timestamp_subsec_nanos(), buf)
}
ScalarRefImpl::Timestamp(v) => serialize_timestamp(
v.0.and_utc().timestamp(),
v.0.and_utc().timestamp_subsec_nanos(),
buf,
),
ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
ScalarRefImpl::Time(v) => {
serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,14 @@ mod test {
}
Schema::TimestampMillis => {
let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0);
let timestamp_mills = Value::TimestampMillis(datetime.0.timestamp() * 1_000);
let timestamp_mills =
Value::TimestampMillis(datetime.0.and_utc().timestamp() * 1_000);
Some(timestamp_mills)
}
Schema::TimestampMicros => {
let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0);
let timestamp_micros = Value::TimestampMicros(datetime.0.timestamp() * 1_000_000);
let timestamp_micros =
Value::TimestampMicros(datetime.0.and_utc().timestamp() * 1_000_000);
Some(timestamp_micros)
}
Schema::Duration => {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
.unwrap_or(Ok(None));
match res {
Ok(val) => val.map(|v| {
ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros()))
ScalarImpl::from(Timestamptz::from_micros(
v.and_utc().timestamp_micros(),
))
}),
Err(err) => {
log_error!(name, err, "parse column failed");
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ impl JsonParseOptions {
.as_str()
.unwrap()
.parse::<Timestamp>()
.map(|naive_utc| Timestamptz::from_micros(naive_utc.0.timestamp_micros()))
.map(|naive_utc| {
Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
})
.map_err(|_| create_error())?
.into(),
// Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ fn datum_to_json_object(
}
},
(DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode {
TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()),
TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()),
},
(DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl IcebergSplitEnumerator {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp);
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
Expand Down
30 changes: 21 additions & 9 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PbSystemParams;
use risingwave_pb::user::grant_privilege::PbObject as GrantObject;
use risingwave_pb::user::PbUserInfo;
use sea_orm::prelude::DateTime;
use sea_orm::prelude::DateTimeUtc;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, ConnectionTrait, DatabaseBackend, DbBackend, EntityTrait, IntoActiveModel, NotSet,
Expand Down Expand Up @@ -359,12 +359,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
..Default::default()
};
if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) {
obj.initialized_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.initialized_at = Set(DateTimeUtc::from_timestamp_millis(
epoch.as_unix_millis() as _,
)
.unwrap()
.naive_utc());
}
if let Some(epoch) = object.created_at_epoch.map(Epoch::from) {
obj.created_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.created_at = Set(DateTimeUtc::from_timestamp_millis(
epoch.as_unix_millis() as _,
)
.unwrap()
.naive_utc());
}
Object::insert(obj).exec(&meta_store_sql.conn).await?;
}
Expand All @@ -390,12 +396,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
..Default::default()
};
if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) {
obj.initialized_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.initialized_at = Set(DateTimeUtc::from_timestamp_millis(
epoch.as_unix_millis() as _
)
.unwrap()
.naive_utc());
}
if let Some(epoch) = table.created_at_epoch.map(Epoch::from) {
obj.created_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.created_at = Set(
DateTimeUtc::from_timestamp_millis(epoch.as_unix_millis() as _)
.unwrap()
.naive_utc(),
);
}
Object::insert(obj).exec(&meta_store_sql.conn).await?;
}
Expand Down
7 changes: 4 additions & 3 deletions src/expr/impl/src/scalar/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn extract_from_timelike(time: impl Timelike, unit: Unit) -> Decimal {
fn extract_from_date(date: Date, unit: &Unit) -> Decimal {
match unit {
Epoch => {
let epoch = date.0.and_time(NaiveTime::default()).timestamp();
let epoch = date.0.and_time(NaiveTime::default()).and_utc().timestamp();
epoch.into()
}
Julian => {
Expand All @@ -92,11 +92,12 @@ fn extract_from_time(time: Time, unit: &Unit) -> Decimal {
fn extract_from_timestamp(timestamp: Timestamp, unit: &Unit) -> Decimal {
match unit {
Epoch => {
let epoch = timestamp.0.timestamp_micros();
let epoch = timestamp.0.and_utc().timestamp_micros();
Decimal::from_i128_with_scale(epoch as i128, 6)
}
Julian => {
let epoch = Decimal::from_i128_with_scale(timestamp.0.timestamp_micros() as i128, 6);
let epoch =
Decimal::from_i128_with_scale(timestamp.0.and_utc().timestamp_micros() as i128, 6);
epoch / (24 * 60 * 60).into() + 2_440_588.into()
}
_ if unit.is_date_unit() => extract_from_datelike(timestamp.0.date(), *unit),
Expand Down
4 changes: 2 additions & 2 deletions src/expr/impl/src/scalar/tumble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn tumble_start_date(timestamp: Date, window_size: Interval) -> Result<Times

#[function("tumble_start(timestamp, interval) -> timestamp")]
pub fn tumble_start_date_time(timestamp: Timestamp, window_size: Interval) -> Result<Timestamp> {
let timestamp_micro_second = timestamp.0.timestamp_micros();
let timestamp_micro_second = timestamp.0.and_utc().timestamp_micros();
let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?;
Ok(Timestamp::from_timestamp_uncheck(
window_start_micro_second / 1_000_000,
Expand Down Expand Up @@ -72,7 +72,7 @@ pub fn tumble_start_offset_date_time(
window_size: Interval,
offset: Interval,
) -> Result<Timestamp> {
let timestamp_micro_second = time.0.timestamp_micros();
let timestamp_micro_second = time.0.and_utc().timestamp_micros();
let window_start_micro_second =
get_window_start_with_offset(timestamp_micro_second, window_size, offset)?;

Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ impl From<ObjectModel<table::Model>> for PbTable {
.cardinality
.map(|cardinality| cardinality.to_protobuf()),
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
cleaned_by_watermark: value.0.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
Expand Down Expand Up @@ -183,10 +183,10 @@ impl From<ObjectModel<source::Model>> for PbSource {
connection_id: value.0.connection_id.map(|id| id as _),
// todo: using the timestamp from the database directly.
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
version: value.0.version as _,
optional_associated_table_id: value
Expand Down Expand Up @@ -221,10 +221,10 @@ impl From<ObjectModel<sink::Model>> for PbSink {
definition: value.0.definition,
connection_id: value.0.connection_id.map(|id| id as _),
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
db_name: value.0.db_name,
sink_from_name: value.0.sink_from_name,
Expand All @@ -250,10 +250,10 @@ impl From<ObjectModel<subscription::Model>> for PbSubscription {
retention_seconds: value.0.retention_seconds as _,
definition: value.0.definition,
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
Expand All @@ -276,10 +276,10 @@ impl From<ObjectModel<index::Model>> for PbIndex {
index_item: value.0.index_items.to_protobuf(),
index_columns_len: value.0.index_columns_len as _,
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
Expand Down

0 comments on commit 202a7ea

Please sign in to comment.