Skip to content

Commit

Permalink
fix: parse timestamp before epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze committed Jan 29, 2024
1 parent 3a49045 commit 69f6c07
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ impl Timestamp {
self.0.timestamp_nanos_opt().unwrap()
}

pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
let secs = timestamp_millis.div_euclid(1_000);
let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
Self::with_secs_nsecs(secs, nsecs as u32)
}

pub fn with_micros(timestamp_micros: i64) -> Result<Self> {
let secs = timestamp_micros.div_euclid(1_000_000);
let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result<DataType> {
DataType::Decimal
}
Schema::Date => DataType::Date,
Schema::LocalTimestampMillis => DataType::Timestamp,
Schema::LocalTimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
Expand Down
27 changes: 19 additions & 8 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use chrono::Datelike;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
use risingwave_common::types::{
DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;

use super::{Access, AccessError, AccessResult};
Expand Down Expand Up @@ -181,19 +182,29 @@ impl<'a> AvroParseOptions<'a> {
}
(Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(),
// ---- Timestamp -----
(Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => {
i64_to_timestamp(*ms).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => {
Timestamp::with_millis(*ms)
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => {
i64_to_timestamp(*us).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => {
Timestamp::with_micros(*us)
.map_err(|_| create_error())?
.into()
}

// ---- TimestampTz -----
(Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => {
i64_to_timestamptz(*ms).map_err(|_| create_error())?.into()
tracing::debug!("timestamp milliseconds {:?}", ms);
Timestamptz::from_millis(*ms)
.ok_or(AccessError::Other(anyhow!(
"timestamp with milliseconds {:?} * 1000 is out of range",
ms
)))?
.into()
}
(Some(DataType::Timestamptz), Value::TimestampMicros(us)) => {
i64_to_timestamptz(*us).map_err(|_| create_error())?.into()
Timestamptz::from_micros(*us).into()
}

// ---- Interval -----
Expand Down

0 comments on commit 69f6c07

Please sign in to comment.