diff --git a/src/common/macro/src/range_fn.rs b/src/common/macro/src/range_fn.rs index 582fff523dd4..ce68e8aaeeb0 100644 --- a/src/common/macro/src/range_fn.rs +++ b/src/common/macro/src/range_fn.rs @@ -119,16 +119,12 @@ fn build_struct( } pub fn scalar_udf() -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( + datafusion_expr::create_udf( Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(Self::calc) as _), + Self::input_type(), + Arc::new(Self::return_type()), + Volatility::Immutable, + Arc::new(Self::calc) as _, ) } diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index ab5dedc14f87..5a100fc6131e 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -29,6 +29,7 @@ pub use self::udf::ScalarUdf; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; use crate::logical_plan::accumulator::*; use crate::signature::{Signature, Volatility}; + /// Creates a new UDF with a specific signature and specific return type. /// This is a helper function to create a new UDF. /// The function `create_udf` returns a subset of all possible `ScalarFunction`: diff --git a/src/common/query/src/logical_plan/udaf.rs b/src/common/query/src/logical_plan/udaf.rs index b9a11cfbfb4b..f2abcb673be8 100644 --- a/src/common/query/src/logical_plan/udaf.rs +++ b/src/common/query/src/logical_plan/udaf.rs @@ -91,74 +91,67 @@ impl AggregateFunction { } } -impl From for DfAggregateUdf { - fn from(udaf: AggregateFunction) -> Self { - struct DfUdafAdapter { - name: String, - signature: datafusion_expr::Signature, - return_type_func: datafusion_expr::ReturnTypeFunction, - accumulator: AccumulatorFactoryFunction, - creator: AggregateFunctionCreatorRef, - } +struct DfUdafAdapter { + name: String, + signature: datafusion_expr::Signature, + return_type_func: datafusion_expr::ReturnTypeFunction, + accumulator: AccumulatorFactoryFunction, + creator: AggregateFunctionCreatorRef, +} - impl Debug for DfUdafAdapter { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("DfUdafAdapter") - .field("name", &self.name) - .field("signature", &self.signature) - .finish() - } - } +impl Debug for DfUdafAdapter { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("DfUdafAdapter") + .field("name", &self.name) + .field("signature", &self.signature) + .finish() + } +} - impl AggregateUDFImpl for DfUdafAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &datafusion_expr::Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[ArrowDataType]) -> Result { - (self.return_type_func)(arg_types).map(|x| x.as_ref().clone()) - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - (self.accumulator)(acc_args) - } - - fn state_fields( - &self, - name: &str, - _value_type: ArrowDataType, - _ordering_fields: Vec, - ) -> Result> { - self.creator - .state_types() - .map(|x| { - (0..x.len()) - .zip(x) - .map(|(i, t)| { - Field::new(format!("{}_{}", name, i), t.as_arrow_type(), true) - }) - .collect::>() - }) - .map_err(|e| e.into()) - } - } +impl AggregateUDFImpl for DfUdafAdapter { + fn as_any(&self) -> &dyn Any { + self + } - DfUdafAdapter { + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &datafusion_expr::Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[ArrowDataType]) -> Result { + (self.return_type_func)(arg_types).map(|x| x.as_ref().clone()) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + (self.accumulator)(acc_args) + } + + fn state_fields(&self, name: &str, _: ArrowDataType, _: Vec) -> Result> { + let state_types = self.creator.state_types()?; + let fields = state_types + .into_iter() + .enumerate() + .map(|(i, t)| { + let name = format!("{name}_{i}"); + Field::new(name, t.as_arrow_type(), true) + }) + .collect::>(); + Ok(fields) + } +} + +impl From for DfAggregateUdf { + fn from(udaf: AggregateFunction) -> Self { + DfAggregateUdf::new_from_impl(DfUdafAdapter { name: udaf.name, signature: udaf.signature.into(), return_type_func: to_df_return_type(udaf.return_type), accumulator: to_df_accumulator_func(udaf.accumulator, udaf.creator.clone()), creator: udaf.creator, - } - .into() + }) } } diff --git a/src/common/query/src/logical_plan/udf.rs b/src/common/query/src/logical_plan/udf.rs index df5cec762c6d..79714479c3be 100644 --- a/src/common/query/src/logical_plan/udf.rs +++ b/src/common/query/src/logical_plan/udf.rs @@ -14,6 +14,7 @@ //! Udf module contains foundational types that are used to represent UDFs. //! It's modified from datafusion. +use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -21,7 +22,9 @@ use std::sync::Arc; use datafusion_expr::{ ColumnarValue as DfColumnarValue, ScalarFunctionImplementation as DfScalarFunctionImplementation, ScalarUDF as DfScalarUDF, + ScalarUDFImpl, }; +use datatypes::arrow::datatypes::DataType; use crate::error::Result; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; @@ -68,25 +71,60 @@ impl ScalarUdf { } } +#[derive(Clone)] +struct DfUdfAdapter { + name: String, + signature: datafusion_expr::Signature, + return_type: datafusion_expr::ReturnTypeFunction, + fun: DfScalarFunctionImplementation, +} + +impl Debug for DfUdfAdapter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("DfUdfAdapter") + .field("name", &self.name) + .field("signature", &self.signature) + .finish() + } +} + +impl ScalarUDFImpl for DfUdfAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &datafusion_expr::Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + (self.return_type)(arg_types).map(|ty| ty.as_ref().clone()) + } + + fn invoke(&self, args: &[DfColumnarValue]) -> datafusion_common::Result { + (self.fun)(args) + } +} + impl From for DfScalarUDF { fn from(udf: ScalarUdf) -> Self { - // TODO(LFC): remove deprecated - #[allow(deprecated)] - DfScalarUDF::new( - &udf.name, - &udf.signature.into(), - &to_df_return_type(udf.return_type), - &to_df_scalar_func(udf.fun), - ) + DfScalarUDF::new_from_impl(DfUdfAdapter { + name: udf.name, + signature: udf.signature.into(), + return_type: to_df_return_type(udf.return_type), + fun: to_df_scalar_func(udf.fun), + }) } } fn to_df_scalar_func(fun: ScalarFunctionImplementation) -> DfScalarFunctionImplementation { Arc::new(move |args: &[DfColumnarValue]| { let args: Result> = args.iter().map(TryFrom::try_from).collect(); - - let result = (fun)(&args?); - + let result = fun(&args?); result.map(From::from).map_err(|e| e.into()) }) } diff --git a/src/promql/src/functions/aggr_over_time.rs b/src/promql/src/functions/aggr_over_time.rs index e02e4a7d910b..a653e1942e17 100644 --- a/src/promql/src/functions/aggr_over_time.rs +++ b/src/promql/src/functions/aggr_over_time.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_macro::range_fn; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::compute; diff --git a/src/promql/src/functions/changes.rs b/src/promql/src/functions/changes.rs index bb547e87f1ba..d10b6f2165d5 100644 --- a/src/promql/src/functions/changes.rs +++ b/src/promql/src/functions/changes.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_macro::range_fn; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; diff --git a/src/promql/src/functions/deriv.rs b/src/promql/src/functions/deriv.rs index 462637ceb5aa..9a7b9d1e45fb 100644 --- a/src/promql/src/functions/deriv.rs +++ b/src/promql/src/functions/deriv.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_macro::range_fn; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 7a57efae04a6..27ac86d8af09 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -35,8 +35,9 @@ use std::sync::Arc; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -62,19 +63,23 @@ impl ExtrapolatedRate Vec { - vec![ + fn scalar_udf_with_name(name: &str, range_length: i64) -> ScalarUDF { + let input_types = vec![ // timestamp range vector RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), // value range vector RangeArray::convert_data_type(DataType::Float64), // timestamp vector DataType::Timestamp(TimeUnit::Millisecond, None), - ] - } + ]; - fn return_type() -> DataType { - DataType::Float64 + create_udf( + name, + input_types, + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new(move |input: &_| Self::new(range_length).calc(input)) as _, + ) } fn calc(&self, input: &[ColumnarValue]) -> Result { @@ -204,17 +209,7 @@ impl ExtrapolatedRate { } pub fn scalar_udf(range_length: i64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( - Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(range_length).calc(input)) as _), - ) + Self::scalar_udf_with_name(Self::name(), range_length) } } @@ -225,17 +220,7 @@ impl ExtrapolatedRate { } pub fn scalar_udf(range_length: i64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( - Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(range_length).calc(input)) as _), - ) + Self::scalar_udf_with_name(Self::name(), range_length) } } @@ -246,17 +231,7 @@ impl ExtrapolatedRate { } pub fn scalar_udf(range_length: i64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( - Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(range_length).calc(input)) as _), - ) + Self::scalar_udf_with_name(Self::name(), range_length) } } diff --git a/src/promql/src/functions/holt_winters.rs b/src/promql/src/functions/holt_winters.rs index c047c1883d04..c063b02b7358 100644 --- a/src/promql/src/functions/holt_winters.rs +++ b/src/promql/src/functions/holt_winters.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use datafusion::arrow::array::Float64Array; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -68,16 +69,12 @@ impl HoltWinters { } pub fn scalar_udf(level: f64, trend: f64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( + create_udf( Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(level, trend).calc(input)) as _), + Self::input_type(), + Arc::new(Self::return_type()), + Volatility::Immutable, + Arc::new(move |input: &_| Self::new(level, trend).calc(input)) as _, ) } diff --git a/src/promql/src/functions/idelta.rs b/src/promql/src/functions/idelta.rs index 9a74b65fec27..5c5504b4ea2f 100644 --- a/src/promql/src/functions/idelta.rs +++ b/src/promql/src/functions/idelta.rs @@ -18,8 +18,9 @@ use std::sync::Arc; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -42,16 +43,12 @@ impl IDelta { } pub fn scalar_udf() -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( + create_udf( Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(Self::calc) as _), + Self::input_type(), + Arc::new(Self::return_type()), + Volatility::Immutable, + Arc::new(Self::calc) as _, ) } diff --git a/src/promql/src/functions/predict_linear.rs b/src/promql/src/functions/predict_linear.rs index c9b24a76a896..647205f9797c 100644 --- a/src/promql/src/functions/predict_linear.rs +++ b/src/promql/src/functions/predict_linear.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -44,32 +45,22 @@ impl PredictLinear { } pub fn scalar_udf(t: i64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( - Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(t).calc(input)) as _), - ) - } - - // time index column and value column - fn input_type() -> Vec { - vec![ + let input_types = vec![ + // time index column RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + // value column RangeArray::convert_data_type(DataType::Float64), - ] - } - - fn return_type() -> DataType { - DataType::Float64 + ]; + create_udf( + Self::name(), + input_types, + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new(move |input: &_| Self::new(t).predict_linear(input)) as _, + ) } - fn calc(&self, input: &[ColumnarValue]) -> Result { + fn predict_linear(&self, input: &[ColumnarValue]) -> Result { // construct matrix from input. assert_eq!(input.len(), 2); let ts_array = extract_array(&input[0])?; diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs index d055ad122734..3721e0da8d10 100644 --- a/src/promql/src/functions/quantile.rs +++ b/src/promql/src/functions/quantile.rs @@ -17,8 +17,9 @@ use std::sync::Arc; use datafusion::arrow::array::Float64Array; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -40,32 +41,25 @@ impl QuantileOverTime { } pub fn scalar_udf(quantile: f64) -> ScalarUDF { - // TODO(LFC): Use the new Datafusion UDF impl. - #[allow(deprecated)] - ScalarUDF::new( - Self::name(), - &Signature::new( - TypeSignature::Exact(Self::input_type()), - Volatility::Immutable, - ), - &(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _), - &(Arc::new(move |input: &_| Self::new(quantile).calc(input)) as _), - ) - } - - // time index column and value column - fn input_type() -> Vec { - vec![ + let input_types = vec![ + // time index column RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + // value column RangeArray::convert_data_type(DataType::Float64), - ] - } - - fn return_type() -> DataType { - DataType::Float64 + ]; + create_udf( + Self::name(), + input_types, + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new(move |input: &_| Self::new(quantile).quantile_over_time(input)) as _, + ) } - fn calc(&self, input: &[ColumnarValue]) -> Result { + fn quantile_over_time( + &self, + input: &[ColumnarValue], + ) -> Result { // construct matrix from input. assert_eq!(input.len(), 2); let ts_array = extract_array(&input[0])?; diff --git a/src/promql/src/functions/resets.rs b/src/promql/src/functions/resets.rs index 00dec32d019e..a63ef7a99f2f 100644 --- a/src/promql/src/functions/resets.rs +++ b/src/promql/src/functions/resets.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_macro::range_fn; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::common::DataFusionError; -use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType;