Skip to content

Commit

Permalink
feat: make query be aware of timezone setting (GreptimeTeam#3175)
Browse files Browse the repository at this point in the history
* feat: let TypeConversionRule aware query context timezone setting

* chore: don't optimize explain command

* feat: parse string into timestamp with timezone

* fix: compile error

* chore: check the scalar value type in predicate

* chore: remove mut for engine context

* chore: return none if the scalar value is utf8 in time range predicate

* fix: some fixme

* feat: let Date and DateTime parsing from string value be aware of timezone

* chore: tweak

* test: add datetime from_str test with timezone

* feat: construct function context from query context

* test: add timezone test for to_unixtime and date_format function

* fix: typo

* chore: apply suggestion

* test: adds string with timezone

* chore: apply CR suggestion

Co-authored-by: Lei, HUANG <[email protected]>

* chore: apply suggestion

---------

Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
killme2008 and v0y4g3r authored Jan 22, 2024
1 parent 2bf4b08 commit 6a12c27
Show file tree
Hide file tree
Showing 45 changed files with 994 additions and 237 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ impl Repl {

let plan = query_engine
.planner()
.plan(stmt, query_ctx)
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;

let LogicalPlan::DfPlan(plan) =
query_engine.optimize(&plan).context(PlanStatementSnafu)?;
let LogicalPlan::DfPlan(plan) = query_engine
.optimize(&query_engine.engine_context(query_ctx), &plan)
.context(PlanStatementSnafu)?;

let plan = DFLogicalSubstraitConvertor {}
.encode(&plan)
Expand Down
1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste = "1.0"
session.workspace = true
snafu.workspace = true
statrs = "0.16"

Expand Down
8 changes: 4 additions & 4 deletions src/common/function/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ use std::sync::Arc;

use common_query::error::Result;
use common_query::prelude::Signature;
use common_time::timezone::get_timezone;
use common_time::Timezone;
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::VectorRef;
use session::context::{QueryContextBuilder, QueryContextRef};

/// The function execution context
#[derive(Clone)]
pub struct FunctionContext {
pub timezone: Timezone,
pub query_ctx: QueryContextRef,
}

impl Default for FunctionContext {
fn default() -> Self {
Self {
timezone: get_timezone(None).clone(),
query_ctx: QueryContextBuilder::default().build(),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/function/src/scalars/date/date_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Function for DateFormatFunction {

let result = match (ts, format) {
(Some(ts), Some(fmt)) => Some(
ts.as_formatted_string(&fmt, Some(&func_ctx.timezone))
ts.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
),
Expand All @@ -96,7 +96,7 @@ impl Function for DateFormatFunction {

let result = match (date, format) {
(Some(date), Some(fmt)) => date
.as_formatted_string(&fmt, Some(&func_ctx.timezone))
.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
_ => None,
Expand All @@ -112,7 +112,7 @@ impl Function for DateFormatFunction {

let result = match (datetime, format) {
(Some(datetime), Some(fmt)) => datetime
.as_formatted_string(&fmt, Some(&func_ctx.timezone))
.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
_ => None,
Expand Down
9 changes: 4 additions & 5 deletions src/common/function/src/scalars/timestamp/greatest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ impl fmt::Display for GreatestFunction {

#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::sync::Arc;

use common_time::Date;
Expand Down Expand Up @@ -137,11 +136,11 @@ mod tests {
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str("2001-02-01").unwrap())
Value::Date(Date::from_str_utc("2001-02-01").unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str("2012-12-23").unwrap())
Value::Date(Date::from_str_utc("2012-12-23").unwrap())
);
}

Expand All @@ -162,11 +161,11 @@ mod tests {
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str("1970-01-01").unwrap())
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str("1970-01-03").unwrap())
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
}
14 changes: 7 additions & 7 deletions src/common/function/src/scalars/timestamp/to_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
Expand All @@ -31,16 +30,17 @@ pub struct ToUnixtimeFunction;

const NAME: &str = "to_unixtime";

fn convert_to_seconds(arg: &str) -> Option<i64> {
if let Ok(dt) = DateTime::from_str(arg) {
fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
let timezone = &func_ctx.query_ctx.timezone();
if let Ok(dt) = DateTime::from_str(arg, Some(timezone)) {
return Some(dt.val() / 1000);
}

if let Ok(ts) = Timestamp::from_str(arg) {
if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) {
return Some(ts.split().0);
}

if let Ok(date) = Date::from_str(arg) {
if let Ok(date) = Date::from_str(arg, Some(timezone)) {
return Some(date.to_secs());
}

Expand Down Expand Up @@ -92,7 +92,7 @@ impl Function for ToUnixtimeFunction {
)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
Expand All @@ -108,7 +108,7 @@ impl Function for ToUnixtimeFunction {
match columns[0].data_type() {
ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from(
(0..vector.len())
.map(|i| convert_to_seconds(&vector.get(i).to_string()))
.map(|i| convert_to_seconds(&vector.get(i).to_string(), &func_ctx))
.collect::<Vec<_>>(),
))),
ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
Expand Down
16 changes: 10 additions & 6 deletions src/common/function/src/scalars/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ use common_query::prelude::{
use datatypes::error::Error as DataTypeError;
use datatypes::prelude::*;
use datatypes::vectors::Helper;
use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::function::{FunctionContext, FunctionRef};

/// Create a ScalarUdf from function.
pub fn create_udf(func: FunctionRef) -> ScalarUdf {
/// Create a ScalarUdf from function and query context.
pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf {
let func_cloned = func.clone();
let return_type: ReturnTypeFunction = Arc::new(move |input_types: &[ConcreteDataType]| {
Ok(Arc::new(func_cloned.return_type(input_types)?))
});

let func_cloned = func.clone();

let fun: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
// FIXME(dennis): set the timezone into function context
// Question: how to get the timezone from the query context?
let func_ctx = FunctionContext::default();
let func_ctx = FunctionContext {
query_ctx: query_ctx.clone(),
};

let len = args
.iter()
Expand Down Expand Up @@ -72,6 +74,7 @@ mod tests {
use datatypes::prelude::{ScalarVector, Vector, VectorRef};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, ConstantVector};
use session::context::QueryContextBuilder;

use super::*;
use crate::function::Function;
Expand All @@ -80,6 +83,7 @@ mod tests {
#[test]
fn test_create_udf() {
let f = Arc::new(TestAndFunction);
let query_ctx = QueryContextBuilder::default().build();

let args: Vec<VectorRef> = vec![
Arc::new(ConstantVector::new(
Expand All @@ -97,7 +101,7 @@ mod tests {
}

// create a udf and test it again
let udf = create_udf(f.clone());
let udf = create_udf(f.clone(), query_ctx);

assert_eq!("test_and", udf.name);
assert_eq!(f.signature(), udf.signature);
Expand Down
2 changes: 1 addition & 1 deletion src/common/query/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ mod tests {
assert_eq!(return_type, (udf.return_type)(&[]).unwrap());

// test into_df_udf
let df_udf: DfScalarUDF = udf.into_df_udf();
let df_udf: DfScalarUDF = udf.into();
assert_eq!("and", df_udf.name);

let types = vec![DataType::Boolean, DataType::Boolean];
Expand Down
13 changes: 7 additions & 6 deletions src/common/query/src/logical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ impl ScalarUdf {
fun: fun.clone(),
}
}
}

/// Cast self into datafusion UDF.
pub fn into_df_udf(self) -> DfScalarUDF {
impl From<ScalarUdf> for DfScalarUDF {
fn from(udf: ScalarUdf) -> Self {
DfScalarUDF::new(
&self.name,
&self.signature.into(),
&to_df_return_type(self.return_type),
&to_df_scalar_func(self.fun),
&udf.name,
&udf.signature.into(),
&to_df_return_type(udf.return_type),
&to_df_scalar_func(udf.fun),
)
}
}
Expand Down
Loading

0 comments on commit 6a12c27

Please sign in to comment.