Skip to content

Commit

Permalink
Merge commit '2b0a7db0ce64950864e07edaddfa80756fe0ffd5' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-1-2
  • Loading branch information
appletreeisyellow committed Apr 22, 2024
2 parents 59cb935 + 2b0a7db commit d51293b
Show file tree
Hide file tree
Showing 40 changed files with 237 additions and 179 deletions.
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ datafusion = { path = "../datafusion/core", version = "37.0.0", features = [
"unicode_expressions",
"compression",
] }
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::exec_err;
use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use datafusion_common::instant::Instant;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
Expand Down
6 changes: 3 additions & 3 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion::prelude::SessionContext;
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion_common::FileType;
use datafusion::common::FileType;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -350,8 +350,8 @@ pub(crate) async fn register_object_store_and_config_extensions(
mod tests {
use super::*;

use datafusion_common::config::FormatOptions;
use datafusion_common::plan_err;
use datafusion::common::config::FormatOptions;
use datafusion::common::plan_err;

use url::Url;

Expand Down
9 changes: 4 additions & 5 deletions datafusion-cli/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::borrow::Cow;

use datafusion::common::sql_err;
use datafusion::common::sql_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -189,10 +189,9 @@ pub fn unescape_input(input: &str) -> datafusion::error::Result<String> {
't' => '\t',
'\\' => '\\',
_ => {
return sql_err!(ParserError::TokenizerError(format!(
"unsupported escape char: '\\{}'",
next_char
),))
return Err(sql_datafusion_err!(ParserError::TokenizerError(
format!("unsupported escape char: '\\{}'", next_char)
)))
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use datafusion::common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
};

use async_trait::async_trait;
use aws_credential_types::provider::ProvideCredentials;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::instant::Instant;
use datafusion::common::instant::Instant;
use std::fmt::{Display, Formatter};
use std::io::Write;
use std::pin::Pin;
Expand Down
40 changes: 23 additions & 17 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use std::sync::Arc;

/// A resolved path to a table of the form "catalog.schema.table"
/// A fully resolved path to a table of the form "catalog.schema.table"
#[derive(Debug, Clone)]
pub struct ResolvedTableReference {
/// The catalog (aka database) containing the table
Expand All @@ -35,17 +35,20 @@ impl std::fmt::Display for ResolvedTableReference {
}
}

/// [`TableReference`]s represent a multi part identifier (path) to a
/// table that may require further resolution.
/// A multi part identifier (path) to a table that may require further
/// resolution (e.g. `foo.bar`).
///
/// # Creating [`TableReference`]
/// [`TableReference`]s are cheap to `clone()` as they are implemented with
/// `Arc`.
///
/// See [`ResolvedTableReference`] for a fully resolved table reference.
///
/// When converting strings to [`TableReference`]s, the string is
/// parsed as though it were a SQL identifier, normalizing (convert to
/// lowercase) any unquoted identifiers.
/// # Creating [`TableReference`]
///
/// See [`TableReference::bare`] to create references without applying
/// normalization semantics
/// When converting strings to [`TableReference`]s, the string is parsed as
/// though it were a SQL identifier, normalizing (convert to lowercase) any
/// unquoted identifiers. [`TableReference::bare`] creates references without
/// applying normalization semantics.
///
/// # Examples
/// ```
Expand Down Expand Up @@ -116,7 +119,7 @@ impl TableReference {

/// Convenience method for creating a [`TableReference::Bare`]
///
/// As described on [`TableReference`] this does *NO* parsing at
/// As described on [`TableReference`] this does *NO* normalization at
/// all, so "Foo.Bar" stays as a reference to the table named
/// "Foo.Bar" (rather than "foo"."bar")
pub fn bare(table: impl Into<Arc<str>>) -> TableReference {
Expand All @@ -127,7 +130,7 @@ impl TableReference {

/// Convenience method for creating a [`TableReference::Partial`].
///
/// As described on [`TableReference`] this does *NO* parsing at all.
/// Note: *NO* normalization is applied to the schema or table name.
pub fn partial(
schema: impl Into<Arc<str>>,
table: impl Into<Arc<str>>,
Expand All @@ -140,7 +143,8 @@ impl TableReference {

/// Convenience method for creating a [`TableReference::Full`]
///
/// As described on [`TableReference`] this does *NO* parsing at all.
/// Note: *NO* normalization is applied to the catalog, schema or table
/// name.
pub fn full(
catalog: impl Into<Arc<str>>,
schema: impl Into<Arc<str>>,
Expand All @@ -153,7 +157,7 @@ impl TableReference {
}
}

/// Retrieve the actual table name, regardless of qualification
/// Retrieve the table name, regardless of qualification.
pub fn table(&self) -> &str {
match self {
Self::Full { table, .. }
Expand All @@ -162,15 +166,16 @@ impl TableReference {
}
}

/// Retrieve the schema name if in the `Partial` or `Full` qualification
/// Retrieve the schema name if [`Self::Partial]` or [`Self::`Full`],
/// `None` otherwise.
pub fn schema(&self) -> Option<&str> {
match self {
Self::Full { schema, .. } | Self::Partial { schema, .. } => Some(schema),
_ => None,
}
}

/// Retrieve the catalog name if in the `Full` qualification
/// Retrieve the catalog name if [`Self::Full`], `None` otherwise.
pub fn catalog(&self) -> Option<&str> {
match self {
Self::Full { catalog, .. } => Some(catalog),
Expand All @@ -179,7 +184,7 @@ impl TableReference {
}

/// Compare with another [`TableReference`] as if both are resolved.
/// This allows comparing across variants, where if a field is not present
/// This allows comparing across variants. If a field is not present
/// in both variants being compared then it is ignored in the comparison.
///
/// e.g. this allows a [`TableReference::Bare`] to be considered equal to a
Expand All @@ -203,7 +208,8 @@ impl TableReference {
}
}

/// Given a default catalog and schema, ensure this table reference is fully resolved
/// Given a default catalog and schema, ensure this table reference is fully
/// resolved
pub fn resolve(
self,
default_catalog: &str,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1430,12 +1430,12 @@ impl TableProvider for DataFrameTableProvider {
Some(&self.plan)
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// A filter is added on the DataFrame when given
Ok(TableProviderFilterPushDown::Exact)
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}

fn schema(&self) -> SchemaRef {
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ impl TableProvider for CteWorkTable {
)))
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// TODO: should we support filter pushdown?
Ok(TableProviderFilterPushDown::Unsupported)
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}
}
44 changes: 25 additions & 19 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,26 +685,32 @@ impl TableProvider for ListingTable {
.await
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partiton pruning, it is exact
Ok(TableProviderFilterPushDown::Exact)
} else {
// otherwise, we still might be able to handle the filter with file
// level mechanisms such as Parquet row group pruning.
Ok(TableProviderFilterPushDown::Inexact)
}
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let support: Vec<_> = filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partition pruning, it is exact
TableProviderFilterPushDown::Exact
} else {
// otherwise, we still might be able to handle the filter with file
// level mechanisms such as Parquet row group pruning.
TableProviderFilterPushDown::Inexact
}
})
.collect();
Ok(support)
}

fn get_table_definition(&self) -> Option<&str> {
Expand Down
43 changes: 25 additions & 18 deletions datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub trait TableProvider: Sync + Send {
/// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
/// expressions are `AND`ed together).
///
/// To enable filter pushdown you must override
/// [`Self::supports_filters_pushdown`] as the default implementation does
/// not and `filters` will be empty.
///
/// DataFusion pushes filtering into the scans whenever possible
/// ("Filter Pushdown"), and depending on the format and the
/// implementation of the format, evaluating the predicate during the scan
Expand Down Expand Up @@ -154,28 +158,31 @@ pub trait TableProvider: Sync + Send {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
#[deprecated(since = "20.0.0", note = "use supports_filters_pushdown instead")]
fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}

/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
/// Note: the returned vector much have the same size as the filters argument.
#[allow(deprecated)]
/// Specify if DataFusion should provide filter expressions to the
/// TableProvider to apply *during* the scan.
///
/// The return value must have one element for each filter expression passed
/// in. The value of each element indicates if the TableProvider can apply
/// that particular filter during the scan.
///
/// Some TableProviders can evaluate filters more efficiently than the
/// `Filter` operator in DataFusion, for example by using an index.
///
/// By default, returns [`Unsupported`] for all filters, meaning no filters
/// will be provided to [`Self::scan`]. If the TableProvider can implement
/// filter pushdown, it should return either [`Exact`] or [`Inexact`].
///
/// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
/// [`Exact`]: TableProviderFilterPushDown::Exact
/// [`Inexact`]: TableProviderFilterPushDown::Inexact
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
filters
.iter()
.map(|f| self.supports_filter_pushdown(f))
.collect()
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}

/// Get statistics for this table, if available
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ impl TableProvider for ViewTable {
fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// A filter is added on the View when given
Ok(TableProviderFilterPushDown::Exact)
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}

async fn scan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ impl TableProvider for CustomProvider {
}
}

fn supports_filter_pushdown(&self, _: &Expr) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Exact)
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}

Expand Down
Loading

0 comments on commit d51293b

Please sign in to comment.