Skip to content

Commit

Permalink
feat(datagen): support increasing timestamp value generation (risingw…
Browse files Browse the repository at this point in the history
…avelabs#6591)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
hzxa21 and mergify[bot] authored Nov 28, 2022
1 parent 66df2f7 commit e6c9116
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 37 deletions.
1 change: 1 addition & 0 deletions e2e_test/source/basic/datagen.slt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ create materialized source s1 (v1 struct<v2 int>, t1 timestamp, c1 varchar) with
fields.v1.v2.seed = '1',
fields.t1.kind = 'random',
fields.t1.max_past = '2h 37min',
fields.t1.max_past_mode = 'relative',
fields.t1.seed = '3',
fields.c1.kind = 'random',
fields.c1.length = '100',
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn gen_data(batch_size: usize, batch_num: usize, data_types: &[DataType]) ->
let mut columns = Vec::new();
for data_type in data_types {
let mut data_gen =
FieldGeneratorImpl::with_random(data_type.clone(), None, None, None, None, SEED)
FieldGeneratorImpl::with_number_random(data_type.clone(), None, None, SEED)
.unwrap();
let mut array_builder = data_type.create_array_builder(batch_size);
for j in 0..batch_size {
Expand All @@ -66,7 +66,7 @@ pub fn gen_sorted_data(
start: String,
step: u64,
) -> Vec<DataChunk> {
let mut data_gen = FieldGeneratorImpl::with_sequence(
let mut data_gen = FieldGeneratorImpl::with_number_sequence(
DataType::Int64,
Some(start),
Some(i64::MAX.to_string()),
Expand Down Expand Up @@ -100,7 +100,7 @@ pub fn gen_projected_data(
expr: BoxedExpression,
) -> Vec<DataChunk> {
let mut data_gen =
FieldGeneratorImpl::with_random(DataType::Int64, None, None, None, None, SEED).unwrap();
FieldGeneratorImpl::with_number_random(DataType::Int64, None, None, SEED).unwrap();
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);

for i in 0..batch_num {
Expand Down
42 changes: 29 additions & 13 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub enum FieldGeneratorImpl {
}

impl FieldGeneratorImpl {
pub fn with_sequence(
pub fn with_number_sequence(
data_type: DataType,
start: Option<String>,
end: Option<String>,
Expand Down Expand Up @@ -131,12 +131,10 @@ impl FieldGeneratorImpl {
}
}

pub fn with_random(
pub fn with_number_random(
data_type: DataType,
min: Option<String>,
max: Option<String>,
mast_past: Option<String>,
length: Option<String>,
seed: u64,
) -> Result<Self> {
match data_type {
Expand All @@ -155,16 +153,28 @@ impl FieldGeneratorImpl {
DataType::Float64 => Ok(FieldGeneratorImpl::F64Random(F64RandomField::new(
min, max, seed,
)?)),
DataType::Varchar => Ok(FieldGeneratorImpl::Varchar(VarcharField::new(
length, seed,
)?)),
DataType::Timestamp => Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
mast_past, seed,
)?)),
_ => unimplemented!(),
}
}

pub fn with_timestamp(
max_past: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
max_past,
max_past_mode,
seed,
)?))
}

pub fn with_varchar(length: Option<String>, seed: u64) -> Result<Self> {
Ok(FieldGeneratorImpl::Varchar(VarcharField::new(
length, seed,
)?))
}

pub fn with_struct_fields(fields: Vec<(String, FieldGeneratorImpl)>) -> Result<Self> {
Ok(FieldGeneratorImpl::Struct(fields))
}
Expand Down Expand Up @@ -228,7 +238,7 @@ mod tests {
let mut i32_fields = vec![];
for split_index in 0..split_num {
i32_fields.push(
FieldGeneratorImpl::with_sequence(
FieldGeneratorImpl::with_number_sequence(
DataType::Int32,
Some("1".to_string()),
Some("20".to_string()),
Expand All @@ -252,6 +262,7 @@ mod tests {

#[test]
fn test_random_generate() {
let seed = 1234;
for data_type in [
DataType::Int16,
DataType::Int32,
Expand All @@ -261,8 +272,13 @@ mod tests {
DataType::Varchar,
DataType::Timestamp,
] {
let mut generator =
FieldGeneratorImpl::with_random(data_type, None, None, None, None, 1234).unwrap();
let mut generator = match data_type {
DataType::Varchar => FieldGeneratorImpl::with_varchar(None, seed).unwrap(),
DataType::Timestamp => {
FieldGeneratorImpl::with_timestamp(None, None, seed).unwrap()
}
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};

let val1 = generator.generate(1);
let val2 = generator.generate(2);
Expand Down
36 changes: 30 additions & 6 deletions src/common/src/field_generator/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,34 @@ use serde_json::{json, Value};
use super::DEFAULT_MAX_PAST;
use crate::types::{Datum, NaiveDateTimeWrapper, Scalar};

enum LocalNow {
Relative,
Absolute(NaiveDateTime),
}

pub struct TimestampField {
max_past: Duration,
local_now: NaiveDateTime,
local_now: LocalNow,
seed: u64,
}

impl TimestampField {
pub fn new(max_past_option: Option<String>, seed: u64) -> Result<Self> {
let mut local_now = Local::now().naive_local();
local_now = local_now.duration_round(Duration::microseconds(1))?; // round to 1 us
// std duration
pub fn new(
max_past_option: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
let local_now = match max_past_mode.as_deref() {
Some("relative") => LocalNow::Relative,
_ => {
LocalNow::Absolute(
Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))?,
) // round to 1 us std duration
}
};

let max_past = if let Some(max_past_option) = max_past_option {
parse_duration(&max_past_option)?
} else {
Expand All @@ -52,7 +69,14 @@ impl TimestampField {
let milliseconds = self.max_past.num_milliseconds();
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
let max_milliseconds = rng.gen_range(0..=milliseconds);
self.local_now - Duration::milliseconds(max_milliseconds)
let now = match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
};
now - Duration::milliseconds(max_milliseconds)
}

pub fn generate(&mut self, offset: u64) -> Value {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ mod tests {
let mut fields_map = HashMap::new();
fields_map.insert(
"v1".to_string(),
FieldGeneratorImpl::with_sequence(
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Int32,
Some("1".to_string()),
Some("10".to_string()),
Expand All @@ -107,7 +107,7 @@ mod tests {

fields_map.insert(
"v2".to_string(),
FieldGeneratorImpl::with_sequence(
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Float32,
Some("1".to_string()),
Some("10".to_string()),
Expand Down
22 changes: 9 additions & 13 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,17 @@ fn generator_from_data_type(
DataType::Timestamp => {
let max_past_key = format!("fields.{}.max_past", name);
let max_past_value = fields_option_map.get(&max_past_key).map(|s| s.to_string());
FieldGeneratorImpl::with_random(
data_type,
None,
None,
max_past_value,
None,
random_seed,
)
let max_past_mode_key = format!("fields.{}.max_past_mode", name);
let max_past_mode_value = fields_option_map
.get(&max_past_mode_key)
.map(|s| s.to_lowercase());

FieldGeneratorImpl::with_timestamp(max_past_value, max_past_mode_value, random_seed)
}
DataType::Varchar => {
let length_key = format!("fields.{}.length", name);
let length_value = fields_option_map.get(&length_key).map(|s| s.to_string());
FieldGeneratorImpl::with_random(data_type, None, None, None, length_value, random_seed)
FieldGeneratorImpl::with_varchar(length_value, random_seed)
}
DataType::Struct(struct_type) => {
let struct_fields = zip_eq(struct_type.field_names.clone(), struct_type.fields.clone())
Expand All @@ -196,7 +194,7 @@ fn generator_from_data_type(
let start_value =
fields_option_map.get(&start_key).map(|s| s.to_string());
let end_value = fields_option_map.get(&end_key).map(|s| s.to_string());
FieldGeneratorImpl::with_sequence(
FieldGeneratorImpl::with_number_sequence(
data_type,
start_value,
end_value,
Expand All @@ -208,12 +206,10 @@ fn generator_from_data_type(
let max_key = format!("fields.{}.max", name);
let min_value = fields_option_map.get(&min_key).map(|s| s.to_string());
let max_value = fields_option_map.get(&max_key).map(|s| s.to_string());
FieldGeneratorImpl::with_random(
FieldGeneratorImpl::with_number_random(
data_type,
min_value,
max_value,
None,
None,
random_seed
)
}
Expand Down

0 comments on commit e6c9116

Please sign in to comment.