Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade chrono to 0.4.38 #17279

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
use std::fmt::Write;

use arrow_buffer::OffsetBuffer;
use chrono::{NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;

// This is important because we want to use the arrow version specified by the outer mod.
Expand Down Expand Up @@ -848,11 +848,9 @@ impl FromIntoArrow for Timestamp {

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamp(
NaiveDateTime::from_timestamp_opt(
(value / 1_000_000) as _,
(value % 1_000_000 * 1000) as _,
)
.unwrap(),
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _)
.unwrap()
.naive_utc(),
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ impl HashKeyDe for Date {

impl HashKeySer<'_> for Timestamp {
fn serialize_into(self, mut buf: impl BufMut) {
buf.put_i64_ne(self.0.timestamp());
buf.put_u32_ne(self.0.timestamp_subsec_nanos());
buf.put_i64_ne(self.0.and_utc().timestamp());
buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos());
}

fn exact_size() -> Option<usize> {
Expand Down
13 changes: 8 additions & 5 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::io::Write;
use std::str::FromStr;

use bytes::{Bytes, BytesMut};
use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday};
use chrono::{
DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday,
};
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use thiserror::Error;
Expand Down Expand Up @@ -570,20 +572,21 @@ impl Time {
impl Timestamp {
pub fn with_secs_nsecs(secs: i64, nsecs: u32) -> Result<Self> {
Ok(Timestamp::new({
NaiveDateTime::from_timestamp_opt(secs, nsecs)
DateTime::from_timestamp(secs, nsecs)
.map(|t| t.naive_utc())
.ok_or_else(|| InvalidParamsError::datetime(secs, nsecs))?
}))
}

/// Although `Timestamp` takes 12 bytes, we drop 4 bytes in protobuf encoding.
pub fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
output
.write(&(self.0.timestamp_micros()).to_be_bytes())
.write(&(self.0.and_utc().timestamp_micros()).to_be_bytes())
.map_err(Into::into)
}

pub fn get_timestamp_nanos(&self) -> i64 {
self.0.timestamp_nanos_opt().unwrap()
self.0.and_utc().timestamp_nanos_opt().unwrap()
}

pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
Expand All @@ -599,7 +602,7 @@ impl Timestamp {
}

pub fn from_timestamp_uncheck(secs: i64, nsecs: u32) -> Self {
Self::new(NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap())
Self::new(DateTime::from_timestamp(secs, nsecs).unwrap().naive_utc())
}

/// Truncate the timestamp to the precision of microseconds.
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,8 @@ impl ScalarRefImpl<'_> {
Self::Interval(v) => v.serialize(ser)?,
Self::Date(v) => v.0.num_days_from_ce().serialize(ser)?,
Self::Timestamp(v) => {
v.0.timestamp().serialize(&mut *ser)?;
v.0.timestamp_subsec_nanos().serialize(ser)?;
v.0.and_utc().timestamp().serialize(&mut *ser)?;
v.0.and_utc().timestamp_subsec_nanos().serialize(ser)?;
}
Self::Timestamptz(v) => v.serialize(ser)?,
Self::Time(v) => {
Expand Down
8 changes: 5 additions & 3 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
ScalarRefImpl::Timestamp(v) => {
serialize_timestamp(v.0.timestamp(), v.0.timestamp_subsec_nanos(), buf)
}
ScalarRefImpl::Timestamp(v) => serialize_timestamp(
v.0.and_utc().timestamp(),
v.0.and_utc().timestamp_subsec_nanos(),
buf,
),
ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
ScalarRefImpl::Time(v) => {
serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
.unwrap_or(Ok(None));
match res {
Ok(val) => val.map(|v| {
ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros()))
ScalarImpl::from(Timestamptz::from_micros(
v.and_utc().timestamp_micros(),
))
}),
Err(err) => {
log_error!(name, err, "parse column failed");
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ impl JsonParseOptions {
.as_str()
.unwrap()
.parse::<Timestamp>()
.map(|naive_utc| Timestamptz::from_micros(naive_utc.0.timestamp_micros()))
.map(|naive_utc| {
Timestamptz::from_micros(naive_utc.0.and_utc().timestamp_micros())
})
.map_err(|_| create_error())?
.into(),
// Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ fn datum_to_json_object(
}
},
(DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode {
TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()),
TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()),
},
(DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl IcebergSplitEnumerator {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp);
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
Expand Down
20 changes: 13 additions & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeSet, HashMap};
use std::time::Duration;

use anyhow::Context;
use chrono::DateTime;
use etcd_client::ConnectOptions;
use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
Expand Down Expand Up @@ -68,7 +69,6 @@ use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PbSystemParams;
use risingwave_pb::user::grant_privilege::PbObject as GrantObject;
use risingwave_pb::user::PbUserInfo;
use sea_orm::prelude::DateTime;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, ConnectionTrait, DatabaseBackend, DbBackend, EntityTrait, IntoActiveModel, NotSet,
Expand Down Expand Up @@ -360,11 +360,15 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
};
if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) {
obj.initialized_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _)
.unwrap()
.naive_utc());
}
if let Some(epoch) = object.created_at_epoch.map(Epoch::from) {
obj.created_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _)
.unwrap()
.naive_utc());
}
Object::insert(obj).exec(&meta_store_sql.conn).await?;
}
Expand All @@ -390,12 +394,14 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
..Default::default()
};
if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) {
obj.initialized_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.initialized_at = Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _)
.unwrap()
.naive_utc());
}
if let Some(epoch) = table.created_at_epoch.map(Epoch::from) {
obj.created_at =
Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap());
obj.created_at = Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _)
.unwrap()
.naive_utc());
}
Object::insert(obj).exec(&meta_store_sql.conn).await?;
}
Expand Down
7 changes: 4 additions & 3 deletions src/expr/impl/src/scalar/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn extract_from_timelike(time: impl Timelike, unit: Unit) -> Decimal {
fn extract_from_date(date: Date, unit: &Unit) -> Decimal {
match unit {
Epoch => {
let epoch = date.0.and_time(NaiveTime::default()).timestamp();
let epoch = date.0.and_time(NaiveTime::default()).and_utc().timestamp();
epoch.into()
}
Julian => {
Expand All @@ -92,11 +92,12 @@ fn extract_from_time(time: Time, unit: &Unit) -> Decimal {
fn extract_from_timestamp(timestamp: Timestamp, unit: &Unit) -> Decimal {
match unit {
Epoch => {
let epoch = timestamp.0.timestamp_micros();
let epoch = timestamp.0.and_utc().timestamp_micros();
Decimal::from_i128_with_scale(epoch as i128, 6)
}
Julian => {
let epoch = Decimal::from_i128_with_scale(timestamp.0.timestamp_micros() as i128, 6);
let epoch =
Decimal::from_i128_with_scale(timestamp.0.and_utc().timestamp_micros() as i128, 6);
epoch / (24 * 60 * 60).into() + 2_440_588.into()
}
_ if unit.is_date_unit() => extract_from_datelike(timestamp.0.date(), *unit),
Expand Down
2 changes: 2 additions & 0 deletions src/expr/impl/src/scalar/to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ fn format_inner(w: &mut impl Write, interval: Interval, item: &Item<'_>) -> Resu
| WeekFromMon | IsoYearDiv100 | Timestamp | YearDiv100 | Internal(_) => {
unreachable!()
}
_ => unreachable!(),
}
Ok(())
}
Expand Down Expand Up @@ -395,6 +396,7 @@ fn format_inner(w: &mut impl Write, interval: Interval, item: &Item<'_>) -> Resu
| Nanosecond9
| RFC2822
| RFC3339 => unreachable!(),
_ => unreachable!(),
}
}
Item::Error => Err(invalid_pattern_err()),
Expand Down
4 changes: 2 additions & 2 deletions src/expr/impl/src/scalar/tumble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn tumble_start_date(timestamp: Date, window_size: Interval) -> Result<Times

#[function("tumble_start(timestamp, interval) -> timestamp")]
pub fn tumble_start_date_time(timestamp: Timestamp, window_size: Interval) -> Result<Timestamp> {
let timestamp_micro_second = timestamp.0.timestamp_micros();
let timestamp_micro_second = timestamp.0.and_utc().timestamp_micros();
let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?;
Ok(Timestamp::from_timestamp_uncheck(
window_start_micro_second / 1_000_000,
Expand Down Expand Up @@ -72,7 +72,7 @@ pub fn tumble_start_offset_date_time(
window_size: Interval,
offset: Interval,
) -> Result<Timestamp> {
let timestamp_micro_second = time.0.timestamp_micros();
let timestamp_micro_second = time.0.and_utc().timestamp_micros();
let window_start_micro_second =
get_window_start_with_offset(timestamp_micro_second, window_size, offset)?;

Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ impl From<ObjectModel<table::Model>> for PbTable {
.cardinality
.map(|cardinality| cardinality.to_protobuf()),
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
cleaned_by_watermark: value.0.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
Expand Down Expand Up @@ -183,10 +183,10 @@ impl From<ObjectModel<source::Model>> for PbSource {
connection_id: value.0.connection_id.map(|id| id as _),
// todo: using the timestamp from the database directly.
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
version: value.0.version as _,
optional_associated_table_id: value
Expand Down Expand Up @@ -221,10 +221,10 @@ impl From<ObjectModel<sink::Model>> for PbSink {
definition: value.0.definition,
connection_id: value.0.connection_id.map(|id| id as _),
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
db_name: value.0.db_name,
sink_from_name: value.0.sink_from_name,
Expand All @@ -250,10 +250,10 @@ impl From<ObjectModel<subscription::Model>> for PbSubscription {
retention_seconds: value.0.retention_seconds as _,
definition: value.0.definition,
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
Expand All @@ -276,10 +276,10 @@ impl From<ObjectModel<index::Model>> for PbIndex {
index_item: value.0.index_items.to_protobuf(),
index_columns_len: value.0.index_columns_len as _,
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
),
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
Expand Down
Loading