diff --git a/src/common/src/field_generator/mod.rs b/src/common/src/field_generator/mod.rs index 9d61c01e41861..02a958aa0ebec 100644 --- a/src/common/src/field_generator/mod.rs +++ b/src/common/src/field_generator/mod.rs @@ -26,7 +26,7 @@ pub use timestamp::*; pub use varchar::*; use crate::array::{ListValue, StructValue}; -use crate::types::{DataType, Datum, ScalarImpl}; +use crate::types::{DataType, Datum, ScalarImpl, Timestamp}; pub const DEFAULT_MIN: i16 = i16::MIN; pub const DEFAULT_MAX: i16 = i16::MAX; @@ -95,7 +95,7 @@ pub enum FieldGeneratorImpl { VarcharRandomVariableLength(VarcharRandomVariableLengthField), VarcharRandomFixedLength(VarcharRandomFixedLengthField), VarcharConstant, - Timestamp(TimestampField), + Timestamp(ChronoField), Struct(Vec<(String, FieldGeneratorImpl)>), List(Box, usize), } @@ -181,7 +181,7 @@ impl FieldGeneratorImpl { max_past_mode: Option, seed: u64, ) -> Result { - Ok(FieldGeneratorImpl::Timestamp(TimestampField::new( + Ok(FieldGeneratorImpl::Timestamp(ChronoField::new( base, max_past, max_past_mode, diff --git a/src/common/src/field_generator/timestamp.rs b/src/common/src/field_generator/timestamp.rs index 000f806c66d50..54c55de273965 100644 --- a/src/common/src/field_generator/timestamp.rs +++ b/src/common/src/field_generator/timestamp.rs @@ -18,26 +18,19 @@ use chrono::{Duration, DurationRound}; use humantime::parse_duration; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; -use serde_json::{json, Value}; +use serde_json::Value; use tracing::debug; use super::DEFAULT_MAX_PAST; use crate::types::{Datum, Scalar, Timestamp}; -#[derive(Debug)] -enum LocalNow { - Relative, - Absolute(NaiveDateTime), -} - -pub struct TimestampField { - base: Option>, +pub struct ChronoField { max_past: Duration, - local_now: LocalNow, + absolute_base: Option, seed: u64, } -impl TimestampField { +impl ChronoField { pub fn new( base: Option>, max_past_option: Option, @@ -45,14 +38,8 @@ impl TimestampField { seed: u64, ) -> Result { let local_now = match max_past_mode.as_deref() { - Some("relative") => LocalNow::Relative, - _ => { - LocalNow::Absolute( - Local::now() - .naive_local() - .duration_round(Duration::microseconds(1))?, - ) // round to 1 us std duration - } + Some("relative") => None, + _ => Some(T::from_now()), }; let max_past = if let Some(max_past_option) = max_past_option { @@ -63,36 +50,59 @@ impl TimestampField { }; debug!(?local_now, ?max_past, "parse timestamp field option"); Ok(Self { - base, // convert to chrono::Duration max_past: chrono::Duration::from_std(max_past)?, - local_now, + absolute_base: base.map(T::from_base).or(local_now), seed, }) } - fn generate_data(&mut self, offset: u64) -> NaiveDateTime { + fn generate_data(&mut self, offset: u64) -> T { let milliseconds = self.max_past.num_milliseconds(); let mut rng = StdRng::seed_from_u64(offset ^ self.seed); let max_milliseconds = rng.gen_range(0..=milliseconds); - let now = match self.base { - Some(base) => base.naive_local(), - None => match self.local_now { - LocalNow::Relative => Local::now() - .naive_local() - .duration_round(Duration::microseconds(1)) - .unwrap(), - LocalNow::Absolute(now) => now, - }, + let base = match self.absolute_base { + Some(base) => base, + None => T::from_now(), }; - now - Duration::milliseconds(max_milliseconds) + base.minus(Duration::milliseconds(max_milliseconds)) } pub fn generate(&mut self, offset: u64) -> Value { - json!(self.generate_data(offset).to_string()) + self.generate_data(offset).to_json() } pub fn generate_datum(&mut self, offset: u64) -> Datum { - Some(Timestamp::new(self.generate_data(offset)).to_scalar_value()) + Some(self.generate_data(offset).to_scalar_value()) + } +} + +pub trait ChronoFieldInner: std::fmt::Debug + Copy + Scalar { + fn from_now() -> Self; + fn from_base(base: DateTime) -> Self; + fn minus(&self, duration: Duration) -> Self; + fn to_json(&self) -> Value; +} + +impl ChronoFieldInner for Timestamp { + fn from_now() -> Self { + Timestamp::new( + Local::now() + .naive_local() + .duration_round(Duration::microseconds(1)) + .unwrap(), + ) + } + + fn from_base(base: DateTime) -> Self { + Timestamp::new(base.naive_local()) + } + + fn minus(&self, duration: Duration) -> Self { + Timestamp::new(self.0 - duration) + } + + fn to_json(&self) -> Value { + Value::String(self.0.to_string()) } } diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index bd9f74ee3aa9a..11cb9db08c48a 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -215,7 +215,7 @@ fn generator_from_data_type( match data_type { DataType::Timestamp => { let max_past_key = format!("fields.{}.max_past", name); - let max_past_value = fields_option_map.get(&max_past_key).map(|s| s.to_string()); + let max_past_value = fields_option_map.get(&max_past_key).cloned(); let max_past_mode_key = format!("fields.{}.max_past_mode", name); let max_past_mode_value = fields_option_map .get(&max_past_mode_key)