Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(datagen): generalize TimestampField to ChronoField #13439

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use timestamp::*;
pub use varchar::*;

use crate::array::{ListValue, StructValue};
use crate::types::{DataType, Datum, ScalarImpl};
use crate::types::{DataType, Datum, ScalarImpl, Timestamp};

pub const DEFAULT_MIN: i16 = i16::MIN;
pub const DEFAULT_MAX: i16 = i16::MAX;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub enum FieldGeneratorImpl {
VarcharRandomVariableLength(VarcharRandomVariableLengthField),
VarcharRandomFixedLength(VarcharRandomFixedLengthField),
VarcharConstant,
Timestamp(TimestampField),
Timestamp(ChronoField<Timestamp>),
Struct(Vec<(String, FieldGeneratorImpl)>),
List(Box<FieldGeneratorImpl>, usize),
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl FieldGeneratorImpl {
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
Ok(FieldGeneratorImpl::Timestamp(ChronoField::new(
base,
max_past,
max_past_mode,
Expand Down
78 changes: 44 additions & 34 deletions src/common/src/field_generator/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,28 @@ use chrono::{Duration, DurationRound};
use humantime::parse_duration;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use serde_json::{json, Value};
use serde_json::Value;
use tracing::debug;

use super::DEFAULT_MAX_PAST;
use crate::types::{Datum, Scalar, Timestamp};

#[derive(Debug)]
enum LocalNow {
Relative,
Absolute(NaiveDateTime),
}

pub struct TimestampField {
base: Option<DateTime<FixedOffset>>,
pub struct ChronoField<T: ChronoFieldInner> {
max_past: Duration,
local_now: LocalNow,
absolute_base: Option<T>,
seed: u64,
}

impl TimestampField {
impl<T: ChronoFieldInner> ChronoField<T> {
pub fn new(
base: Option<DateTime<FixedOffset>>,
max_past_option: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
let local_now = match max_past_mode.as_deref() {
Some("relative") => LocalNow::Relative,
_ => {
LocalNow::Absolute(
Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))?,
) // round to 1 us std duration
}
Some("relative") => None,
_ => Some(T::from_now()),
};

let max_past = if let Some(max_past_option) = max_past_option {
Expand All @@ -63,36 +50,59 @@ impl TimestampField {
};
debug!(?local_now, ?max_past, "parse timestamp field option");
Ok(Self {
base,
// convert to chrono::Duration
max_past: chrono::Duration::from_std(max_past)?,
local_now,
absolute_base: base.map(T::from_base).or(local_now),
seed,
})
}

fn generate_data(&mut self, offset: u64) -> NaiveDateTime {
fn generate_data(&mut self, offset: u64) -> T {
let milliseconds = self.max_past.num_milliseconds();
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
let max_milliseconds = rng.gen_range(0..=milliseconds);
let now = match self.base {
Some(base) => base.naive_local(),
None => match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
},
let base = match self.absolute_base {
Some(base) => base,
None => T::from_now(),
};
now - Duration::milliseconds(max_milliseconds)
base.minus(Duration::milliseconds(max_milliseconds))
}

pub fn generate(&mut self, offset: u64) -> Value {
json!(self.generate_data(offset).to_string())
self.generate_data(offset).to_json()
}

pub fn generate_datum(&mut self, offset: u64) -> Datum {
Some(Timestamp::new(self.generate_data(offset)).to_scalar_value())
Some(self.generate_data(offset).to_scalar_value())
}
}

pub trait ChronoFieldInner: std::fmt::Debug + Copy + Scalar {
fn from_now() -> Self;
fn from_base(base: DateTime<FixedOffset>) -> Self;
fn minus(&self, duration: Duration) -> Self;
fn to_json(&self) -> Value;
}

impl ChronoFieldInner for Timestamp {
fn from_now() -> Self {
Timestamp::new(
Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
)
}

fn from_base(base: DateTime<FixedOffset>) -> Self {
Timestamp::new(base.naive_local())
}

fn minus(&self, duration: Duration) -> Self {
Timestamp::new(self.0 - duration)
}

fn to_json(&self) -> Value {
Value::String(self.0.to_string())
}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ fn generator_from_data_type(
match data_type {
DataType::Timestamp => {
let max_past_key = format!("fields.{}.max_past", name);
let max_past_value = fields_option_map.get(&max_past_key).map(|s| s.to_string());
let max_past_value = fields_option_map.get(&max_past_key).cloned();
let max_past_mode_key = format!("fields.{}.max_past_mode", name);
let max_past_mode_value = fields_option_map
.get(&max_past_mode_key)
Expand Down
Loading