Skip to content

Commit

Permalink
refactor(expr): merge expression templates into #[function] macros (#…
Browse files Browse the repository at this point in the history
…11134)

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Sep 13, 2023
1 parent 173ad4e commit a61b5aa
Show file tree
Hide file tree
Showing 57 changed files with 1,756 additions and 4,470 deletions.
34 changes: 29 additions & 5 deletions e2e_test/batch/basic/func.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,30 @@ select regexp_match('abc01234xyz', '(?:(.*?)(\d+)(.*)){1,1}');
----
{abc,01234,xyz}

query T
select regexp_match(string, 'a')
from (values
('abc'),
('def'),
('ghi')
) t(string);
----
{a}
NULL
NULL

query T
select regexp_match(string, pattern, flags)
from (values
('abc', 'bc', ''),
('abc', 'Bc', ''),
('abc', 'Bc', 'i')
) t(string, pattern, flags);
----
{bc}
NULL
{bc}

query T
select regexp_matches('foobarbequebazilbarfbonk', '(b[^b]+)(b[^b]+)', 'g');
----
Expand Down Expand Up @@ -504,10 +528,10 @@ select regexp_replace('abc123', 'abc', 'prefix\&suffix');
----
prefixabcsuffix123

query error invalid syntax for `regexp_replace`
query error invalid digit found in string
select regexp_replace('foobarbaz', 'b..', 'X', 1, 'g');

query error invalid parameters specified in regexp_replace
query error invalid digit found in string
select regexp_replace('foobarbaz', 'b..', 'X', 'g', 1);

# With Unicode
Expand Down Expand Up @@ -586,16 +610,16 @@ select regexp_count('foobarbaz', 'b..', 3);
----
2

query error Expected start to be a Unsigned Int32
query error invalid digit found in string
select regexp_count('foobarbaz', 'b..', 'g');

query error Expected start to be a Unsigned Int32
query error invalid digit found in string
select regexp_count('foobarbaz', 'b..', 'i');

query error invalid regular expression option: "a"
select regexp_count('foobarbaz', 'b..', 3, 'a');

query error `regexp_count` does not support global flag option
query error does not support the global option
select regexp_count('foobar', 'b..', 1, 'g');

query T
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/array/bool_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ impl BoolArray {
Self { bitmap, data }
}

/// Build a [`BoolArray`] from iterator and bitmap.
///
/// NOTE: The length of `bitmap` must be equal to the length of `iter`.
pub fn from_iter_bitmap(iter: impl IntoIterator<Item = bool>, bitmap: Bitmap) -> Self {
let data: Bitmap = iter.into_iter().collect();
assert_eq!(data.len(), bitmap.len());
BoolArray { bitmap, data }
}

pub fn data(&self) -> &Bitmap {
&self.data
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/array/num256_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,13 @@ impl EstimateSize for Int256Array {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<I256>()
}
}

impl FromIterator<Int256> for Int256Array {
fn from_iter<I: IntoIterator<Item = Int256>>(iter: I) -> Self {
let data: Vec<I256> = iter.into_iter().map(|i| *i.0).collect();
Int256Array {
bitmap: Bitmap::ones(data.len()),
data,
}
}
}
20 changes: 16 additions & 4 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,13 @@ impl DataType {
///
/// ```
/// use risingwave_common::types::DataType::*;
/// assert_eq!(List(Box::new(Int32)).unnest_list(), Int32);
/// assert_eq!(List(Box::new(List(Box::new(Int32)))).unnest_list(), Int32);
/// assert_eq!(List(Box::new(Int32)).unnest_list(), &Int32);
/// assert_eq!(List(Box::new(List(Box::new(Int32)))).unnest_list(), &Int32);
/// ```
pub fn unnest_list(&self) -> Self {
pub fn unnest_list(&self) -> &Self {
match self {
DataType::List(inner) => inner.unnest_list(),
_ => self.clone(),
_ => self,
}
}

Expand Down Expand Up @@ -738,6 +738,18 @@ impl TryFrom<ScalarImpl> for String {
}
}

impl From<&[u8]> for ScalarImpl {
fn from(s: &[u8]) -> Self {
Self::Bytea(s.into())
}
}

impl From<JsonbRef<'_>> for ScalarImpl {
fn from(jsonb: JsonbRef<'_>) -> Self {
Self::Jsonb(jsonb.to_owned_scalar())
}
}

impl ScalarImpl {
pub fn from_binary(bytes: &Bytes, data_type: &DataType) -> RwResult<Self> {
let res = match data_type {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/num256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::types::{to_text, Buf, DataType, Scalar, ScalarRef, F64};

/// A 256-bit signed integer.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
pub struct Int256(Box<i256>);
pub struct Int256(pub(crate) Box<i256>);

/// A reference to an `Int256` value.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
Expand Down
9 changes: 6 additions & 3 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ impl Epoch {
UNIX_RISINGWAVE_DATE_SEC * 1000 + self.physical_time()
}

/// Returns the epoch in a Timestamptz.
pub fn as_timestamptz(&self) -> Timestamptz {
Timestamptz::from_millis(self.as_unix_millis() as i64).expect("epoch is out of range")
}

/// Returns the epoch in a Timestamptz scalar.
pub fn as_scalar(&self) -> ScalarImpl {
Timestamptz::from_millis(self.as_unix_millis() as i64)
.expect("epoch is out of range")
.into()
self.as_timestamptz().into()
}

/// Returns the epoch in real system time.
Expand Down
140 changes: 112 additions & 28 deletions src/expr/benches/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::array::*;
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::*;
use risingwave_expr::agg::{build as build_agg, AggArgs, AggCall};
use risingwave_expr::agg::{build as build_agg, AggArgs, AggCall, AggKind};
use risingwave_expr::expr::*;
use risingwave_expr::sig::agg::agg_func_sigs;
use risingwave_expr::sig::func::func_sigs;
Expand Down Expand Up @@ -88,20 +88,42 @@ fn bench_expr(c: &mut Criterion) {
.into_ref(),
// 16: extract field for date
Utf8Array::from_iter_display(
["DAY", "MONTH", "YEAR", "DOW", "DOY"]
.into_iter()
.cycle()
.take(CHUNK_SIZE)
.map(Some),
[
"DAY",
"MONTH",
"YEAR",
"DOW",
"DOY",
"MILLENNIUM",
"CENTURY",
"DECADE",
"ISOYEAR",
"QUARTER",
"WEEK",
"ISODOW",
"EPOCH",
"JULIAN",
]
.into_iter()
.cycle()
.take(CHUNK_SIZE)
.map(Some),
)
.into_ref(),
// 17: extract field for time
Utf8Array::from_iter_display(
["HOUR", "MINUTE", "SECOND"]
.into_iter()
.cycle()
.take(CHUNK_SIZE)
.map(Some),
[
"Hour",
"Minute",
"Second",
"Millisecond",
"Microsecond",
"Epoch",
]
.into_iter()
.cycle()
.take(CHUNK_SIZE)
.map(Some),
)
.into_ref(),
// 18: extract field for timestamptz
Expand Down Expand Up @@ -151,6 +173,38 @@ fn bench_expr(c: &mut Criterion) {
(1..=CHUNK_SIZE).map(|i| JsonbVal::from(serde_json::Value::Number(i.into()))),
)
.into_ref(),
// 27: int256 array
Int256Array::from_iter((1..=CHUNK_SIZE).map(|_| Int256::from(1))).into_ref(),
// 28: extract field for interval
Utf8Array::from_iter_display(
[
"Millennium",
"Century",
"Decade",
"Year",
"Month",
"Day",
"Hour",
"Minute",
"Second",
"Millisecond",
"Microsecond",
"Epoch",
]
.into_iter()
.cycle()
.take(CHUNK_SIZE)
.map(Some),
)
.into_ref(),
// 29: timestamp string for to_timestamp
Utf8Array::from_iter_display(
[Some("2021/04/01 00:00:00")]
.into_iter()
.cycle()
.take(CHUNK_SIZE),
)
.into_ref(),
],
CHUNK_SIZE,
));
Expand All @@ -171,6 +225,7 @@ fn bench_expr(c: &mut Criterion) {
InputRefExpression::new(DataType::Varchar, 12),
InputRefExpression::new(DataType::Bytea, 13),
InputRefExpression::new(DataType::Jsonb, 26),
InputRefExpression::new(DataType::Int256, 27),
];
let input_index_for_type = |ty: DataType| {
inputrefs
Expand All @@ -185,13 +240,15 @@ fn bench_expr(c: &mut Criterion) {
const EXTRACT_FIELD_TIME: usize = 17;
const EXTRACT_FIELD_TIMESTAMP: usize = 16;
const EXTRACT_FIELD_TIMESTAMPTZ: usize = 18;
const EXTRACT_FIELD_INTERVAL: usize = 28;
const BOOL_STRING: usize = 19;
const NUMBER_STRING: usize = 12;
const DATE_STRING: usize = 20;
const TIME_STRING: usize = 21;
const TIMESTAMP_STRING: usize = 22;
const TIMESTAMPTZ_STRING: usize = 23;
const INTERVAL_STRING: usize = 24;
const TIMESTAMP_FORMATTED_STRING: usize = 29;

c.bench_function("inputref", |bencher| {
let inputref = inputrefs[0].clone().boxed();
Expand All @@ -218,28 +275,44 @@ fn bench_expr(c: &mut Criterion) {
let sigs = func_sigs();
let sigs = sigs.sorted_by_cached_key(|sig| format!("{sig:?}"));
'sig: for sig in sigs {
if sig
.inputs_type
.iter()
if (sig.inputs_type.iter())
.chain(&[sig.ret_type])
.any(|t| matches!(t, DataTypeName::Struct | DataTypeName::List))
{
// TODO: support struct and list
println!("todo: {sig:?}");
continue;
}
if [
"date_trunc(varchar, timestamptz) -> timestamptz",
"to_timestamp1(varchar, varchar) -> timestamptz",
"to_char(timestamptz, varchar) -> varchar",
]
.contains(&format!("{sig:?}").as_str())
{
println!("ignore: {sig:?}");
continue;
}

fn string_literal(s: &str) -> BoxedExpression {
LiteralExpression::new(DataType::Varchar, Some(s.into())).boxed()
}

let mut children = vec![];
for (i, t) in sig.inputs_type.iter().enumerate() {
use DataTypeName::*;
let idx = match (sig.func, i) {
(PbType::ToChar, 1) => {
children.push(
LiteralExpression::new(
DataType::Varchar,
Some("YYYY/MM/DD HH:MM:SS".into()),
)
.boxed(),
);
(PbType::ToTimestamp1, 0) => TIMESTAMP_FORMATTED_STRING,
(PbType::ToChar | PbType::ToTimestamp1, 1) => {
children.push(string_literal("YYYY/MM/DD HH:MM:SS"));
continue;
}
(PbType::ToChar | PbType::ToTimestamp1, 2) => {
children.push(string_literal("Australia/Sydney"));
continue;
}
(PbType::IsJson, 1) => {
children.push(string_literal("VALUE"));
continue;
}
(PbType::Cast, 0) if *t == DataTypeName::Varchar => match sig.ret_type {
Expand All @@ -264,6 +337,7 @@ fn bench_expr(c: &mut Criterion) {
Time => EXTRACT_FIELD_TIME,
Timestamp => EXTRACT_FIELD_TIMESTAMP,
Timestamptz => EXTRACT_FIELD_TIMESTAMPTZ,
Interval => EXTRACT_FIELD_INTERVAL,
t => panic!("unexpected type: {t:?}"),
},
_ => input_index_for_type((*t).into()),
Expand All @@ -276,17 +350,27 @@ fn bench_expr(c: &mut Criterion) {
});
}

for sig in agg_func_sigs() {
if sig.inputs_type.len() != 1 {
let sigs = agg_func_sigs();
let sigs = sigs.sorted_by_cached_key(|sig| format!("{sig:?}"));
for sig in sigs {
if matches!(sig.func, AggKind::PercentileDisc | AggKind::PercentileCont)
|| (sig.inputs_type.iter())
.chain(&[sig.ret_type])
.any(|t| matches!(t, DataTypeName::Struct | DataTypeName::List))
{
println!("todo: {sig:?}");
continue;
}
let agg = match build_agg(&AggCall {
kind: sig.func,
args: AggArgs::Unary(
sig.inputs_type[0].into(),
input_index_for_type(sig.inputs_type[0].into()),
),
args: match sig.inputs_type {
[] => AggArgs::None,
[t] => AggArgs::Unary((*t).into(), input_index_for_type((*t).into())),
_ => {
println!("todo: {sig:?}");
continue;
}
},
return_type: sig.ret_type.into(),
column_orders: vec![],
filter: None,
Expand Down
Loading

0 comments on commit a61b5aa

Please sign in to comment.