Skip to content

Commit

Permalink
Merge commit '060e67e8ff3b57ab695daeb28cf7175c4e2c568c' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-3
  • Loading branch information
appletreeisyellow committed Apr 26, 2024
2 parents 561b989 + 060e67e commit ad96ab2
Show file tree
Hide file tree
Showing 36 changed files with 1,468 additions and 1,414 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 52 additions & 15 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::common::FileType;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -221,37 +222,73 @@ async fn exec_and_print(

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
);
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
let mut print_options = print_options.clone();
if should_ignore_maxrows {
print_options.maxrows = MaxRows::Unlimited;
}
if print_options.format == PrintFormat::Automatic {
print_options.format = PrintFormat::Table;
}
let results = collect(physical_plan, task_ctx.clone()).await?;
print_options.print_batches(&results, now)?;
adjusted.into_inner().print_batches(&results, now)?;
}
}

Ok(())
}

/// Track adjustments to the print options based on the plan / statement being executed
#[derive(Debug)]
struct AdjustedPrintOptions {
inner: PrintOptions,
}

impl AdjustedPrintOptions {
fn new(inner: PrintOptions) -> Self {
Self { inner }
}
/// Adjust print options based on any statement specific requirements
fn with_statement(mut self, statement: &Statement) -> Self {
if let Statement::Statement(sql_stmt) = statement {
// SHOW / SHOW ALL
if let sqlparser::ast::Statement::ShowVariable { .. } = sql_stmt.as_ref() {
self.inner.maxrows = MaxRows::Unlimited
}
}
self
}

/// Adjust print options based on any plan specific requirements
fn with_plan(mut self, plan: &LogicalPlan) -> Self {
// For plans like `Explain` ignore `MaxRows` option and always display
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
self
}

/// Finalize and return the inner `PrintOptions`
fn into_inner(mut self) -> PrintOptions {
if self.inner.format == PrintFormat::Automatic {
self.inner.format = PrintFormat::Table;
}

self.inner
}
}

async fn create_plan(
ctx: &mut SessionContext,
statement: Statement,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,11 @@ impl SessionState {
&self.config
}

/// Return the mutable [`SessionConfig`].
pub fn config_mut(&mut self) -> &mut SessionConfig {
&mut self.config
}

/// Return the physical optimizers
pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
&self.physical_optimizers.rules
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
LogicalPlan::Unnest(Unnest { input, columns, schema, options }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let column_execs = columns.iter().map(|column| {
schema.index_of_column(column).map(|idx| Column::new(&column.name, idx))
}).collect::<Result<_>>()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())))
Ok(Arc::new(UnnestExec::new(input, column_execs, schema, options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
Expand Down
43 changes: 42 additions & 1 deletion datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,54 @@ impl SessionConfig {
///
/// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
where
T: Send + Sync + 'static,
{
self.set_extension(ext);
self
}

/// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
/// mutable reference instead of owning it. Useful if you want to add another extension after
/// the [`SessionConfig`] is created.
///
/// # Example
/// ```
/// use std::sync::Arc;
/// use datafusion_execution::config::SessionConfig;
///
/// // application-specific extension types
/// struct Ext1(u8);
/// struct Ext2(u8);
/// struct Ext3(u8);
///
/// let ext1a = Arc::new(Ext1(10));
/// let ext1b = Arc::new(Ext1(11));
/// let ext2 = Arc::new(Ext2(2));
///
/// let mut cfg = SessionConfig::default();
///
/// // will only remember the last Ext1
/// cfg.set_extension(Arc::clone(&ext1a));
/// cfg.set_extension(Arc::clone(&ext1b));
/// cfg.set_extension(Arc::clone(&ext2));
///
/// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
/// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
/// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
///
/// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
/// assert!(Arc::ptr_eq(&ext2_received, &ext2));
///
/// assert!(cfg.get_extension::<Ext3>().is_none());
/// ```
pub fn set_extension<T>(&mut self, ext: Arc<T>)
where
T: Send + Sync + 'static,
{
let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
let id = TypeId::of::<T>();
self.extensions.insert(id, ext);
self
}

/// Get extension, if any for the specified type `T` exists.
Expand Down
44 changes: 1 addition & 43 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ use strum_macros::EnumIter;
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter, Copy)]
pub enum BuiltinScalarFunction {
// math functions
/// ceil
Ceil,
/// coalesce
Coalesce,
/// exp
Exp,
/// factorial
Factorial,
// string functions
/// concat
Concat,
Expand Down Expand Up @@ -106,10 +100,7 @@ impl BuiltinScalarFunction {
pub fn volatility(&self) -> Volatility {
match self {
// Immutable scalar builtins
BuiltinScalarFunction::Ceil => Volatility::Immutable,
BuiltinScalarFunction::Coalesce => Volatility::Immutable,
BuiltinScalarFunction::Exp => Volatility::Immutable,
BuiltinScalarFunction::Factorial => Volatility::Immutable,
BuiltinScalarFunction::Concat => Volatility::Immutable,
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
Expand Down Expand Up @@ -145,15 +136,6 @@ impl BuiltinScalarFunction {
utf8_to_str_type(&input_expr_types[0], "initcap")
}
BuiltinScalarFunction::EndsWith => Ok(Boolean),

BuiltinScalarFunction::Factorial => Ok(Int64),

BuiltinScalarFunction::Ceil | BuiltinScalarFunction::Exp => {
match input_expr_types[0] {
Float32 => Ok(Float32),
_ => Ok(Float64),
}
}
}
}

Expand Down Expand Up @@ -185,43 +167,19 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::Factorial => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::Ceil | BuiltinScalarFunction::Exp => {
// math expressions expect 1 argument of type f64 or f32
// priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we
// return the best approximation for it (in f64).
// We accept f32 because in this case it is clear that the best approximation
// will be as good as the number of digits in the number
Signature::uniform(1, vec![Float64, Float32], self.volatility())
}
}
}

/// This function specifies monotonicity behaviors for built-in scalar functions.
/// The list can be extended, only mathematical and datetime functions are
/// considered for the initial implementation of this feature.
pub fn monotonicity(&self) -> Option<FuncMonotonicity> {
if matches!(
&self,
BuiltinScalarFunction::Ceil
| BuiltinScalarFunction::Exp
| BuiltinScalarFunction::Factorial
) {
Some(vec![Some(true)])
} else {
None
}
None
}

/// Returns all names that can be used to call this function
pub fn aliases(&self) -> &'static [&'static str] {
match self {
BuiltinScalarFunction::Ceil => &["ceil"],
BuiltinScalarFunction::Exp => &["exp"],
BuiltinScalarFunction::Factorial => &["factorial"],

// conditional functions
BuiltinScalarFunction::Coalesce => &["coalesce"],

Expand Down
20 changes: 16 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ pub enum Expr {

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
pub expr: Box<Expr>,
}

impl Unnest {
/// Create a new Unnest expression.
pub fn new(expr: Expr) -> Self {
Self {
expr: Box::new(expr),
}
}
}

/// Alias expression
Expand Down Expand Up @@ -1567,8 +1576,8 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
Expr::Unnest(Unnest { expr }) => {
write!(f, "UNNEST({expr:?})")
}
}
}
Expand Down Expand Up @@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => create_function_name("unnest", false, exprs),
Expr::Unnest(Unnest { expr }) => {
let expr_name = create_name(expr)?;
Ok(format!("unnest({expr_name})"))
}
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
30 changes: 0 additions & 30 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,6 @@ macro_rules! nary_scalar_expr {
// generate methods for creating the supported unary/binary expressions

// math functions
scalar_expr!(Factorial, factorial, num, "factorial");
scalar_expr!(
Ceil,
ceil,
num,
"nearest integer greater than or equal to argument"
);

scalar_expr!(Exp, exp, num, "exponential");

scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase");
scalar_expr!(EndsWith, ends_with, string suffix, "whether the `string` ends with the `suffix`");
nary_scalar_expr!(Coalesce, coalesce, "returns `coalesce(args...)`, which evaluates to the value of the first [Expr] which is not NULL");
Expand Down Expand Up @@ -877,22 +867,6 @@ mod test {
);
}

macro_rules! test_unary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {{
if let Expr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::BuiltIn(fun),
args,
}) = $FUNC(col("tableA.a"))
{
let name = built_in_function::BuiltinScalarFunction::$ENUM;
assert_eq!(name, fun);
assert_eq!(1, args.len());
} else {
assert!(false, "unexpected");
}
}};
}

macro_rules! test_scalar_expr {
($ENUM:ident, $FUNC:ident, $($arg:ident),*) => {
let expected = [$(stringify!($arg)),*];
Expand All @@ -913,10 +887,6 @@ mod test {

#[test]
fn scalar_function_definitions() {
test_unary_scalar_expr!(Factorial, factorial);
test_unary_scalar_expr!(Ceil, ceil);
test_unary_scalar_expr!(Exp, exp);

test_scalar_expr!(InitCap, initcap, string);
test_scalar_expr!(EndsWith, ends_with, string, characters);
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
if let Expr::Unnest(Unnest { expr }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
expr.as_ref().clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
}

expr.transform(&|expr| {
Expand Down
Loading

0 comments on commit ad96ab2

Please sign in to comment.