Skip to content

Commit

Permalink
refactor(datagen): generalize TimestampField to ChronoField (#13439)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Nov 15, 2023
1 parent 1b1950e commit a624936
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 38 deletions.
6 changes: 3 additions & 3 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use timestamp::*;
pub use varchar::*;

use crate::array::{ListValue, StructValue};
use crate::types::{DataType, Datum, ScalarImpl};
use crate::types::{DataType, Datum, ScalarImpl, Timestamp};

pub const DEFAULT_MIN: i16 = i16::MIN;
pub const DEFAULT_MAX: i16 = i16::MAX;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub enum FieldGeneratorImpl {
VarcharRandomVariableLength(VarcharRandomVariableLengthField),
VarcharRandomFixedLength(VarcharRandomFixedLengthField),
VarcharConstant,
Timestamp(TimestampField),
Timestamp(ChronoField<Timestamp>),
Struct(Vec<(String, FieldGeneratorImpl)>),
List(Box<FieldGeneratorImpl>, usize),
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl FieldGeneratorImpl {
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
Ok(FieldGeneratorImpl::Timestamp(ChronoField::new(
base,
max_past,
max_past_mode,
Expand Down
78 changes: 44 additions & 34 deletions src/common/src/field_generator/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,28 @@ use chrono::{Duration, DurationRound};
use humantime::parse_duration;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use serde_json::{json, Value};
use serde_json::Value;
use tracing::debug;

use super::DEFAULT_MAX_PAST;
use crate::types::{Datum, Scalar, Timestamp};

#[derive(Debug)]
enum LocalNow {
Relative,
Absolute(NaiveDateTime),
}

pub struct TimestampField {
base: Option<DateTime<FixedOffset>>,
pub struct ChronoField<T: ChronoFieldInner> {
max_past: Duration,
local_now: LocalNow,
absolute_base: Option<T>,
seed: u64,
}

impl TimestampField {
impl<T: ChronoFieldInner> ChronoField<T> {
pub fn new(
base: Option<DateTime<FixedOffset>>,
max_past_option: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
let local_now = match max_past_mode.as_deref() {
Some("relative") => LocalNow::Relative,
_ => {
LocalNow::Absolute(
Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))?,
) // round to 1 us std duration
}
Some("relative") => None,
_ => Some(T::from_now()),
};

let max_past = if let Some(max_past_option) = max_past_option {
Expand All @@ -63,36 +50,59 @@ impl TimestampField {
};
debug!(?local_now, ?max_past, "parse timestamp field option");
Ok(Self {
base,
// convert to chrono::Duration
max_past: chrono::Duration::from_std(max_past)?,
local_now,
absolute_base: base.map(T::from_base).or(local_now),
seed,
})
}

fn generate_data(&mut self, offset: u64) -> NaiveDateTime {
fn generate_data(&mut self, offset: u64) -> T {
let milliseconds = self.max_past.num_milliseconds();
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
let max_milliseconds = rng.gen_range(0..=milliseconds);
let now = match self.base {
Some(base) => base.naive_local(),
None => match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
},
let base = match self.absolute_base {
Some(base) => base,
None => T::from_now(),
};
now - Duration::milliseconds(max_milliseconds)
base.minus(Duration::milliseconds(max_milliseconds))
}

pub fn generate(&mut self, offset: u64) -> Value {
json!(self.generate_data(offset).to_string())
self.generate_data(offset).to_json()
}

pub fn generate_datum(&mut self, offset: u64) -> Datum {
Some(Timestamp::new(self.generate_data(offset)).to_scalar_value())
Some(self.generate_data(offset).to_scalar_value())
}
}

pub trait ChronoFieldInner: std::fmt::Debug + Copy + Scalar {
fn from_now() -> Self;
fn from_base(base: DateTime<FixedOffset>) -> Self;
fn minus(&self, duration: Duration) -> Self;
fn to_json(&self) -> Value;
}

impl ChronoFieldInner for Timestamp {
fn from_now() -> Self {
Timestamp::new(
Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
)
}

fn from_base(base: DateTime<FixedOffset>) -> Self {
Timestamp::new(base.naive_local())
}

fn minus(&self, duration: Duration) -> Self {
Timestamp::new(self.0 - duration)
}

fn to_json(&self) -> Value {
Value::String(self.0.to_string())
}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ fn generator_from_data_type(
match data_type {
DataType::Timestamp => {
let max_past_key = format!("fields.{}.max_past", name);
let max_past_value = fields_option_map.get(&max_past_key).map(|s| s.to_string());
let max_past_value = fields_option_map.get(&max_past_key).cloned();
let max_past_mode_key = format!("fields.{}.max_past_mode", name);
let max_past_mode_value = fields_option_map
.get(&max_past_mode_key)
Expand Down

0 comments on commit a624936

Please sign in to comment.