Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(deps): bump datafusion 20240528 #4061

Merged
merged 13 commits into from
Jun 1, 2024
1,131 changes: 581 additions & 550 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
derive_builder = "0.12"
dotenv = "0.15"
# TODO(LFC): Wait for https://github.com/etcdv3/etcd-client/pull/76
Expand Down Expand Up @@ -162,7 +162,7 @@ smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sysinfo = "0.30"
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e4e496b8d62416ad50ce70a1b460c7313610cf5d", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
Expand Down
1 change: 0 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ api.workspace = true
arrow.workspace = true
chrono.workspace = true
clap.workspace = true
client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-telemetry.workspace = true
common-wal.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions src/common/query/src/logical_plan/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use datafusion::arrow::datatypes::Field;
use datafusion_common::Result;
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF as DfAggregateUdf, AggregateUDFImpl,
};
Expand Down Expand Up @@ -129,13 +129,13 @@ impl AggregateUDFImpl for DfUdafAdapter {
(self.accumulator)(acc_args)
}

fn state_fields(&self, name: &str, _: ArrowDataType, _: Vec<Field>) -> Result<Vec<Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
let state_types = self.creator.state_types()?;
let fields = state_types
.into_iter()
.enumerate()
.map(|(i, t)| {
let name = format!("{name}_{i}");
let name = format!("{}_{i}", args.name);
Field::new(name, t.as_arrow_type(), true)
})
.collect::<Vec<_>>();
Expand Down
4 changes: 4 additions & 0 deletions src/common/query/src/logical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl ScalarUDFImpl for DfUdfAdapter {
fn invoke(&self, args: &[DfColumnarValue]) -> datafusion_common::Result<DfColumnarValue> {
(self.fun)(args)
}

fn invoke_no_args(&self, number_rows: usize) -> datafusion_common::Result<DfColumnarValue> {
Ok((self.fun)(&[])?.into_array(number_rows)?.into())
}
}

impl From<ScalarUdf> for DfScalarUDF {
Expand Down
13 changes: 1 addition & 12 deletions src/common/query/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ pub enum TypeSignature {
/// arbitrary number of arguments of an common type out of a list of valid types
// A function such as `concat` is `Variadic(vec![ConcreteDataType::String, ConcreteDataType::String])`
Variadic(Vec<ConcreteDataType>),
/// arbitrary number of arguments of an arbitrary but equal type
// A function such as `array` is `VariadicEqual`
// The first argument decides the type used for coercion
VariadicEqual,
/// One or more arguments with arbitrary types
VariadicAny,
/// fixed number of arguments of an arbitrary but equal type out of a list of valid types
Expand Down Expand Up @@ -67,20 +63,14 @@ impl Signature {
volatility,
}
}

/// variadic - Creates a variadic signature that represents an arbitrary number of arguments all from a type in common_types.
pub fn variadic(common_types: Vec<ConcreteDataType>, volatility: Volatility) -> Self {
Self {
type_signature: TypeSignature::Variadic(common_types),
volatility,
}
}
/// variadic_equal - Creates a variadic signature that represents an arbitrary number of arguments of the same type.
pub fn variadic_equal(volatility: Volatility) -> Self {
Self {
type_signature: TypeSignature::VariadicEqual,
volatility,
}
}

/// variadic_any - Creates a variadic signature that represents an arbitrary number of arguments of any type.
pub fn variadic_any(volatility: Volatility) -> Self {
Expand Down Expand Up @@ -131,7 +121,6 @@ impl From<TypeSignature> for DfTypeSignature {
TypeSignature::Variadic(types) => {
DfTypeSignature::Variadic(concrete_types_to_arrow_types(types))
}
TypeSignature::VariadicEqual => DfTypeSignature::VariadicEqual,
TypeSignature::Uniform(n, types) => {
DfTypeSignature::Uniform(n, concrete_types_to_arrow_types(types))
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl ExecutionPlanVisitor for MetricCollector {
// skip if no metric available
let Some(metric) = plan.metrics() else {
self.record_batch_metrics.plan_metrics.push(PlanMetrics {
plan: plan.name().to_string(),
plan: std::any::type_name::<Self>().to_string(),
level: self.current_level,
metrics: vec![],
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl PromStoreProtocolHandler for Instance {
let plan = output.meta.plan.clone();
query_results.push(to_query_result(&table_name, output).await?);
if let Some(ref plan) = plan {
collect_plan_metrics(plan.clone(), &mut [&mut map]);
collect_plan_metrics(plan, &mut [&mut map]);
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,8 @@ fn find_primary_keys(
let constraints_pk = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: _,
columns,
is_primary: true,
..
TableConstraint::PrimaryKey {
name: _, columns, ..
} => Some(columns.iter().map(|ident| ident.value.clone())),
_ => None,
})
Expand All @@ -353,7 +350,6 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
TableConstraint::Unique {
name: Some(name),
columns,
is_primary: false,
..
} => {
if name.value == TIME_INDEX {
Expand Down
18 changes: 9 additions & 9 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,19 @@ impl UserDefinedLogicalNodeCore for EmptyMetric {
)
}

fn from_template(&self, expr: &[Expr], _inputs: &[LogicalPlan]) -> Self {
Self {
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
_inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
Ok(Self {
start: self.start,
end: self.end,
interval: self.interval,
expr: if !expr.is_empty() {
Some(expr[0].clone())
} else {
None
},
expr: exprs.into_iter().next(),
time_index_schema: self.time_index_schema.clone(),
result_schema: self.result_schema.clone(),
}
})
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ impl ExecutionPlan for EmptyMetricExec {
vec![]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

Expand Down
16 changes: 10 additions & 6 deletions src/promql/src/extension_plan/histogram_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,21 @@ impl UserDefinedLogicalNodeCore for HistogramFold {
)
}

fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
Self {
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
Ok(Self {
le_column: self.le_column.clone(),
ts_column: self.ts_column.clone(),
input: inputs[0].clone(),
input: inputs.into_iter().next().unwrap(),
field_column: self.field_column.clone(),
quantile: self.quantile,
// This method cannot return error. Otherwise we should re-calculate
// the output schema
output_schema: self.output_schema.clone(),
}
})
}
}

Expand Down Expand Up @@ -279,8 +283,8 @@ impl ExecutionPlan for HistogramFoldExec {
vec![true; self.children().len()]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

// cannot change schema with this method
Expand Down
22 changes: 15 additions & 7 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,26 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
)
}

fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(DataFusionError::Internal(
"InstantManipulate should have at least one input".to_string(),
));
}

Self {
Ok(Self {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input: inputs[0].clone(),
}
input: inputs.into_iter().next().unwrap(),
})
}
}

Expand Down Expand Up @@ -207,8 +215,8 @@ impl ExecutionPlan for InstantManipulateExec {
vec![false; self.children().len()]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
Expand Down
22 changes: 15 additions & 7 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,23 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
)
}

fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(DataFusionError::Internal(
"SeriesNormalize should have at least one input".to_string(),
));
}

Self {
Ok(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
input: inputs[0].clone(),
}
input: inputs.into_iter().next().unwrap()
})
}
}

Expand Down Expand Up @@ -173,8 +181,8 @@ impl ExecutionPlan for SeriesNormalizeExec {
self.input.properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
Expand Down
22 changes: 15 additions & 7 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,27 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
)
}

fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(DataFusionError::Internal(
"RangeManipulate should have at least one input".to_string(),
));
}

Self {
Ok(Self {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index: self.time_index.clone(),
field_columns: self.field_columns.clone(),
input: inputs[0].clone(),
input: inputs.into_iter().next().unwrap(),
output_schema: self.output_schema.clone(),
}
})
}
}

Expand Down Expand Up @@ -280,8 +288,8 @@ impl ExecutionPlan for RangeManipulateExec {
vec![true; self.children().len()]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
Expand Down
Loading
Loading