Skip to content

Commit

Permalink
Merge branch 'main' into 11367/parquet-statistics-defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jul 25, 2024
2 parents 5f3ecb2 + 71903e1 commit 20e793c
Show file tree
Hide file tree
Showing 115 changed files with 2,448 additions and 1,182 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ jobs:
# Ensure that the datafusion crate can be built with only a subset of the function
# packages enabled.
- name: Check datafusion (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion
- name: Check datafusion (nested_expressions)
run: cargo check --no-default-features --features=nested_expressions -p datafusion

- name: Check datafusion (crypto)
run: cargo check --no-default-features --features=crypto_expressions -p datafusion
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ members = [
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/functions-nested",
"datafusion/optimizer",
"datafusion/physical-expr-common",
"datafusion/physical-expr",
Expand Down Expand Up @@ -94,7 +94,7 @@ datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "40.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "40.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "40.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version = "40.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "40.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "40.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "40.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "40.0.0", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ This crate has several [features] which can be specified in your `Cargo.toml`.

Default features:

- `array_expressions`: functions for working with arrays such as `array_to_string`
- `nested_expressions`: functions for working with nested type function such as `array_to_string`
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
- `datetime_expressions`: date and time functions such as `to_timestamp`
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{AggregateExt, ColumnarValue, ExprSchemable, Operator};
use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -95,7 +95,7 @@ fn expr_fn_demo() -> Result<()> {
let agg = first_value.call(vec![col("price")]);
assert_eq!(agg.to_string(), "first_value(price)");

// You can use the AggregateExt trait to create more complex aggregates
// You can use the ExprFunctionExt trait to create more complex aggregates
// such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts )
let agg = first_value
.call(vec![col("price")])
Expand Down
12 changes: 6 additions & 6 deletions datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
44 changes: 44 additions & 0 deletions datafusion/common/src/scalar/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Constants defined for scalar construction.

// PI ~ 3.1415927 in f32
#[allow(clippy::approx_constant)]
pub(super) const PI_UPPER_F32: f32 = 3.141593_f32;

// PI ~ 3.141592653589793 in f64
pub(super) const PI_UPPER_F64: f64 = 3.141592653589794_f64;

// -PI ~ -3.1415927 in f32
#[allow(clippy::approx_constant)]
pub(super) const NEGATIVE_PI_LOWER_F32: f32 = -3.141593_f32;

// -PI ~ -3.141592653589793 in f64
pub(super) const NEGATIVE_PI_LOWER_F64: f64 = -3.141592653589794_f64;

// PI / 2 ~ 1.5707964 in f32
pub(super) const FRAC_PI_2_UPPER_F32: f32 = 1.5707965_f32;

// PI / 2 ~ 1.5707963267948966 in f64
pub(super) const FRAC_PI_2_UPPER_F64: f64 = 1.5707963267948967_f64;

// -PI / 2 ~ -1.5707964 in f32
pub(super) const NEGATIVE_FRAC_PI_2_LOWER_F32: f32 = -1.5707965_f32;

// -PI / 2 ~ -1.5707963267948966 in f64
pub(super) const NEGATIVE_FRAC_PI_2_LOWER_F64: f64 = -1.5707963267948967_f64;
119 changes: 119 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! [`ScalarValue`]: stores single values
mod consts;
mod struct_builder;

use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::{HashSet, VecDeque};
Expand Down Expand Up @@ -1007,6 +1009,123 @@ impl ScalarValue {
}
}

/// Returns a [`ScalarValue`] representing PI
pub fn new_pi(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(std::f32::consts::PI)),
DataType::Float64 => Ok(ScalarValue::from(std::f64::consts::PI)),
_ => _internal_err!("PI is not supported for data type: {:?}", datatype),
}
}

/// Returns a [`ScalarValue`] representing PI's upper bound
pub fn new_pi_upper(datatype: &DataType) -> Result<ScalarValue> {
// TODO: replace the constants with next_up/next_down when
// they are stabilized: https://doc.rust-lang.org/std/primitive.f64.html#method.next_up
match datatype {
DataType::Float32 => Ok(ScalarValue::from(consts::PI_UPPER_F32)),
DataType::Float64 => Ok(ScalarValue::from(consts::PI_UPPER_F64)),
_ => {
_internal_err!("PI_UPPER is not supported for data type: {:?}", datatype)
}
}
}

/// Returns a [`ScalarValue`] representing -PI's lower bound
pub fn new_negative_pi_lower(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(consts::NEGATIVE_PI_LOWER_F32)),
DataType::Float64 => Ok(ScalarValue::from(consts::NEGATIVE_PI_LOWER_F64)),
_ => {
_internal_err!("-PI_LOWER is not supported for data type: {:?}", datatype)
}
}
}

/// Returns a [`ScalarValue`] representing FRAC_PI_2's upper bound
pub fn new_frac_pi_2_upper(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(consts::FRAC_PI_2_UPPER_F32)),
DataType::Float64 => Ok(ScalarValue::from(consts::FRAC_PI_2_UPPER_F64)),
_ => {
_internal_err!(
"PI_UPPER/2 is not supported for data type: {:?}",
datatype
)
}
}
}

// Returns a [`ScalarValue`] representing FRAC_PI_2's lower bound
pub fn new_neg_frac_pi_2_lower(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => {
Ok(ScalarValue::from(consts::NEGATIVE_FRAC_PI_2_LOWER_F32))
}
DataType::Float64 => {
Ok(ScalarValue::from(consts::NEGATIVE_FRAC_PI_2_LOWER_F64))
}
_ => {
_internal_err!(
"-PI/2_LOWER is not supported for data type: {:?}",
datatype
)
}
}
}

/// Returns a [`ScalarValue`] representing -PI
pub fn new_negative_pi(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(-std::f32::consts::PI)),
DataType::Float64 => Ok(ScalarValue::from(-std::f64::consts::PI)),
_ => _internal_err!("-PI is not supported for data type: {:?}", datatype),
}
}

/// Returns a [`ScalarValue`] representing PI/2
pub fn new_frac_pi_2(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(std::f32::consts::FRAC_PI_2)),
DataType::Float64 => Ok(ScalarValue::from(std::f64::consts::FRAC_PI_2)),
_ => _internal_err!("PI/2 is not supported for data type: {:?}", datatype),
}
}

/// Returns a [`ScalarValue`] representing -PI/2
pub fn new_neg_frac_pi_2(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(-std::f32::consts::FRAC_PI_2)),
DataType::Float64 => Ok(ScalarValue::from(-std::f64::consts::FRAC_PI_2)),
_ => _internal_err!("-PI/2 is not supported for data type: {:?}", datatype),
}
}

/// Returns a [`ScalarValue`] representing infinity
pub fn new_infinity(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(f32::INFINITY)),
DataType::Float64 => Ok(ScalarValue::from(f64::INFINITY)),
_ => {
_internal_err!("Infinity is not supported for data type: {:?}", datatype)
}
}
}

/// Returns a [`ScalarValue`] representing negative infinity
pub fn new_neg_infinity(datatype: &DataType) -> Result<ScalarValue> {
match datatype {
DataType::Float32 => Ok(ScalarValue::from(f32::NEG_INFINITY)),
DataType::Float64 => Ok(ScalarValue::from(f64::NEG_INFINITY)),
_ => {
_internal_err!(
"Negative Infinity is not supported for data type: {:?}",
datatype
)
}
}
}

/// Create a zero value in the given type.
pub fn new_zero(datatype: &DataType) -> Result<ScalarValue> {
Ok(match datatype {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ name = "datafusion"
path = "src/lib.rs"

[features]
nested_expressions = ["datafusion-functions-nested"]
# This feature is deprecated. Use the `nested_expressions` feature instead.
array_expressions = ["nested_expressions"]
# Used to enable the avro format
array_expressions = ["datafusion-functions-array"]
avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"]
crypto_expressions = ["datafusion-functions/crypto_expressions"]
datetime_expressions = ["datafusion-functions/datetime_expressions"]
default = [
"array_expressions",
"nested_expressions",
"crypto_expressions",
"datetime_expressions",
"encoding_expressions",
Expand Down Expand Up @@ -102,7 +104,7 @@ datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
Expand Down Expand Up @@ -221,4 +223,4 @@ name = "parquet_statistic"
[[bench]]
harness = false
name = "map_query_sql"
required-features = ["array_expressions"]
required-features = ["nested_expressions"]
2 changes: 1 addition & 1 deletion datafusion/core/benches/map_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::runtime::Runtime;
use datafusion::prelude::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datafusion_functions_array::map::map;
use datafusion_functions_nested::map::map;

mod data_utils;

Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,8 +1696,8 @@ mod tests {
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
cast, create_udf, expr, lit, BuiltInWindowFunction, ScalarFunctionImplementation,
Volatility, WindowFrame, WindowFunctionDefinition,
cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt,
ScalarFunctionImplementation, Volatility, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::{array_agg, count_distinct};
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -1867,11 +1867,10 @@ mod tests {
BuiltInWindowFunction::FirstValue,
),
vec![col("aggregate_test_100.c1")],
vec![col("aggregate_test_100.c2")],
vec![],
WindowFrame::new(None),
None,
));
))
.partition_by(vec![col("aggregate_test_100.c2")])
.build()
.unwrap();
let t2 = t.select(vec![col("c1"), first_row])?;
let plan = t2.plan.clone();

Expand Down
Loading

0 comments on commit 20e793c

Please sign in to comment.