Skip to content

Commit

Permalink
feat(expr): add pg_sleep function (#12294)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored and Li0k committed Sep 15, 2023
1 parent 6c90f0b commit 639acc2
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 14 deletions.
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ message ExprNode {
VNODE = 1101;
// Non-deterministic functions
PROCTIME = 2023;
PG_SLEEP = 2024;
PG_SLEEP_FOR = 2025;
PG_SLEEP_UNTIL = 2026;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand Down
6 changes: 4 additions & 2 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,13 @@ impl FunctionAttr {
true => quote! { &mut writer, },
false => quote! {},
};
let await_ = user_fn.async_.then(|| quote! { .await });
// call the user defined function
// inputs: [ Option<impl ScalarRef> ]
let mut output =
quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) };
let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ };
output = match user_fn.return_type_kind {
// XXX: we don't support void type yet. return null::int for now.
_ if self.ret == "void" => quote! { { #output; Option::<i32>::None } },
ReturnTypeKind::T => quote! { Some(#output) },
ReturnTypeKind::Option => output,
ReturnTypeKind::Result => quote! { Some(#output?) },
Expand Down
34 changes: 28 additions & 6 deletions src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ mod utils;
///
/// # Table of Contents
///
/// - [Function Signature](#function-signature)
/// - [SQL Function Signature](#sql-function-signature)
/// - [Multiple Function Definitions](#multiple-function-definitions)
/// - [Type Expansion](#type-expansion)
/// - [Automatic Type Inference](#automatic-type-inference)
/// - [Custom Type Inference Function](#custom-type-inference-function)
/// - [Rust Function Requirements](#rust-function-requirements)
/// - [Rust Function Signature](#rust-function-signature)
/// - [Nullable Arguments](#nullable-arguments)
/// - [Return Value](#return-value)
/// - [Optimization](#optimization)
/// - [Functions Returning Strings](#functions-returning-strings)
/// - [Preprocessing Constant Arguments](#preprocessing-constant-arguments)
/// - [Context](#context)
/// - [Async Function](#async-function)
/// - [Table Function](#table-function)
/// - [Registration and Invocation](#registration-and-invocation)
/// - [Appendix: Type Matrix](#appendix-type-matrix)
Expand All @@ -55,13 +56,13 @@ mod utils;
/// }
/// ```
///
/// # Function Signature
/// # SQL Function Signature
///
/// Each function must have a signature, specified in the `function("...")` part of the macro
/// invocation. The signature follows this pattern:
///
/// ```text
/// name([arg_types],*) -> [setof] return_type
/// name ( [arg_types],* ) [ -> [setof] return_type ]
/// ```
///
/// Where `name` is the function name, which must match the function name defined in `prost`.
Expand All @@ -73,6 +74,9 @@ mod utils;
/// function (table function), meaning it can return multiple values instead of just one. For more
/// details, see the section on table functions.
///
/// If no return type is specified, the function returns `void`. However, the void type is not
/// supported in our type system, so it now returns a null value of type int.
///
/// ## Multiple Function Definitions
///
/// Multiple `#[function]` macros can be applied to a single generic Rust function to define
Expand Down Expand Up @@ -154,7 +158,7 @@ mod utils;
///
/// This type inference function will be invoked at the frontend.
///
/// # Rust Function Requirements
/// # Rust Function Signature
///
/// The `#[function]` macro can handle various types of Rust functions.
///
Expand Down Expand Up @@ -277,6 +281,19 @@ mod utils;
/// }
/// ```
///
/// ## Async Function
///
/// Functions can be asynchronous.
///
/// ```ignore
/// #[function("pg_sleep(float64)")]
/// async fn pg_sleep(second: F64) {
/// tokio::time::sleep(Duration::from_secs_f64(second.0)).await;
/// }
/// ```
///
/// Asynchronous functions will be evaluated on rows sequentially.
///
/// # Table Function
///
/// A table function is a special kind of function that can return multiple values instead of just
Expand Down Expand Up @@ -460,6 +477,8 @@ struct FunctionAttr {
prebuild: Option<String>,
/// Type inference function.
type_infer: Option<String>,
/// Whether the function is volatile.
volatile: bool,
/// Whether the function is deprecated.
deprecated: bool,
}
Expand All @@ -469,6 +488,8 @@ struct FunctionAttr {
struct UserFunctionAttr {
/// Function name
name: String,
/// Whether the function is async.
async_: bool,
/// Whether contains argument `&Context`.
context: bool,
/// The last argument type is `&mut dyn Write`.
Expand Down Expand Up @@ -556,7 +577,8 @@ impl FunctionAttr {
impl UserFunctionAttr {
/// Returns true if the function is like `fn(T1, T2, .., Tn) -> T`.
fn is_pure(&self) -> bool {
!self.write
!self.async_
&& !self.write
&& !self.context
&& !self.arg_option
&& self.return_type_kind == ReturnTypeKind::T
Expand Down
10 changes: 7 additions & 3 deletions src/expr/macro/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ impl Parse for FunctionAttr {

let sig = input.parse::<LitStr>()?;
let sig_str = sig.value();
let (name_args, ret) = sig_str
.split_once("->")
.ok_or_else(|| Error::new_spanned(&sig, "expected '->'"))?;
let (name_args, ret) = match sig_str.split_once("->") {
Some((name_args, ret)) => (name_args, ret),
None => (sig_str.as_str(), "void"),
};
let (name, args) = name_args
.split_once('(')
.ok_or_else(|| Error::new_spanned(&sig, "expected '('"))?;
Expand Down Expand Up @@ -74,6 +75,8 @@ impl Parse for FunctionAttr {
parsed.prebuild = Some(get_value()?);
} else if meta.path().is_ident("type_infer") {
parsed.type_infer = Some(get_value()?);
} else if meta.path().is_ident("volatile") {
parsed.volatile = true;
} else if meta.path().is_ident("deprecated") {
parsed.deprecated = true;
} else if meta.path().is_ident("append_only") {
Expand Down Expand Up @@ -113,6 +116,7 @@ impl From<&syn::Signature> for UserFunctionAttr {
};
UserFunctionAttr {
name: sig.ident.to_string(),
async_: sig.asyncness.is_some(),
write: sig.inputs.iter().any(arg_is_write),
context: sig.inputs.iter().any(arg_is_context),
retract: last_arg_is_retract(sig),
Expand Down
4 changes: 4 additions & 0 deletions src/expr/macro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ fn lookup_matrix(mut ty: &str, idx: usize) -> &str {
ty = "list";
} else if ty.starts_with("struct") {
ty = "struct";
} else if ty == "void" {
// XXX: we don't support void type yet.
// replace it with int32 for now.
ty = "int32";
}
let s = TYPE_MATRIX.trim().lines().find_map(|line| {
let mut parts = line.split_whitespace();
Expand Down
52 changes: 52 additions & 0 deletions src/expr/src/vector_op/delay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_common::types::{Interval, F64};
use risingwave_expr_macro::function;

/// Makes the current session's process sleep until the given number of seconds have elapsed.
///
/// ```slt
/// query I
/// SELECT pg_sleep(1.5);
/// ----
/// NULL
/// ```
#[function("pg_sleep(float64)", volatile)]
async fn pg_sleep(second: F64) {
tokio::time::sleep(Duration::from_secs_f64(second.0)).await;
}

/// Makes the current session's process sleep until the given interval has elapsed.
///
/// ```slt
/// query I
/// SELECT pg_sleep_for('1 second');
/// ----
/// NULL
/// ```
#[function("pg_sleep_for(interval)", volatile)]
async fn pg_sleep_for(interval: Interval) {
// we only use the microsecond part of the interval
let usecs = if interval.is_positive() {
interval.usecs() as u64
} else {
// return if the interval is not positive
return;
};
let duration = Duration::from_micros(usecs);
tokio::time::sleep(duration).await;
}
1 change: 1 addition & 0 deletions src/expr/src/vector_op/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod cmp;
pub mod concat_op;
pub mod conjunction;
pub mod date_trunc;
pub mod delay;
pub mod encdec;
pub mod exp;
pub mod extract;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/vector_op/proctime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_expr_macro::function;
use crate::{ExprError, Result};

/// Get the processing time in Timestamptz scalar from the task-local epoch.
#[function("proctime() -> timestamptz")]
#[function("proctime() -> timestamptz", volatile)]
fn proctime() -> Result<Timestamptz> {
let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context)?;
Ok(epoch.as_timestamptz())
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,11 @@ impl Binder {
// non-deterministic
("now", now()),
("current_timestamp", now()),
("proctime", proctime())
("proctime", proctime()),
("pg_sleep", raw_call(ExprType::PgSleep)),
("pg_sleep_for", raw_call(ExprType::PgSleepFor)),
// TODO: implement pg_sleep_until
// ("pg_sleep_until", raw_call(ExprType::PgSleepUntil)),
]
.into_iter()
.collect()
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ impl ExprVisitor<bool> for ImpureAnalyzer {
x
}
// expression output is not deterministic
expr_node::Type::Vnode | expr_node::Type::Proctime => true,
expr_node::Type::Vnode
| expr_node::Type::Proctime
| expr_node::Type::PgSleep
| expr_node::Type::PgSleepFor
| expr_node::Type::PgSleepUntil => true,
}
}
}
Expand Down

0 comments on commit 639acc2

Please sign in to comment.