Skip to content

Commit

Permalink
Add schema_err! error macros with optional backtrace (apache#8620)
Browse files Browse the repository at this point in the history
* Add `schema_err!` error macros with optional backtrace
  • Loading branch information
comphead authored Jan 6, 2024
1 parent 4289737 commit dd4263f
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 91 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -316,7 +316,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=line-tables-only"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
macos:
name: cargo test (mac)
runs-on: macos-latest
Expand Down Expand Up @@ -356,7 +356,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"

test-datafusion-pyarrow:
name: cargo test pyarrow (amd64)
Expand Down
17 changes: 8 additions & 9 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Column
use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use std::collections::HashSet;
Expand Down Expand Up @@ -211,13 +212,13 @@ impl Column {
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}

/// Qualify column if not done yet.
Expand Down Expand Up @@ -299,23 +300,21 @@ impl Column {
}

// If not due to USING columns then due to ambiguous column name
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
});
}
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}
}

Expand Down
35 changes: 15 additions & 20 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::error::{
unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err,
_schema_err,
};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
Expand Down Expand Up @@ -141,11 +142,9 @@ impl DFSchema {
if let Some(qualifier) = field.qualifier() {
qualified_names.insert((qualifier, field.name()));
} else if !unqualified_names.insert(field.name()) {
return Err(DataFusionError::SchemaError(
SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
},
));
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
});
}
}

Expand All @@ -159,14 +158,12 @@ impl DFSchema {
qualified_names.sort();
for (qualifier, name) in &qualified_names {
if unqualified_names.contains(name) {
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
},
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
}
});
}
}
Ok(Self {
Expand Down Expand Up @@ -392,14 +389,12 @@ impl DFSchema {
if fields_without_qualifier.len() == 1 {
Ok(fields_without_qualifier[0])
} else {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
_schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
))
})
}
}
}
Expand Down
93 changes: 60 additions & 33 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub enum DataFusionError {
Configuration(String),
/// This error happens with schema-related errors, such as schema inference not possible
/// and non-unique column names.
SchemaError(SchemaError),
/// 2nd argument is for optional backtrace
/// Boxing the optional backtrace to prevent <https://rust-lang.github.io/rust-clippy/master/index.html#/result_large_err>
SchemaError(SchemaError, Box<Option<String>>),
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
Expand Down Expand Up @@ -125,34 +127,6 @@ pub enum SchemaError {
},
}

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -298,7 +272,7 @@ impl Display for DataFusionError {
write!(f, "IO error: {desc}")
}
DataFusionError::SQL(ref desc, ref backtrace) => {
let backtrace = backtrace.clone().unwrap_or("".to_owned());
let backtrace: String = backtrace.clone().unwrap_or("".to_owned());
write!(f, "SQL error: {desc:?}{backtrace}")
}
DataFusionError::Configuration(ref desc) => {
Expand All @@ -314,8 +288,10 @@ impl Display for DataFusionError {
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {desc}")
}
DataFusionError::SchemaError(ref desc) => {
write!(f, "Schema error: {desc}")
DataFusionError::SchemaError(ref desc, ref backtrace) => {
let backtrace: &str =
&backtrace.as_ref().clone().unwrap_or("".to_owned());
write!(f, "Schema error: {desc}{backtrace}")
}
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {desc}")
Expand Down Expand Up @@ -356,7 +332,7 @@ impl Error for DataFusionError {
DataFusionError::Internal(_) => None,
DataFusionError::Configuration(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
Expand Down Expand Up @@ -556,12 +532,63 @@ macro_rules! arrow_err {
};
}

// Exposes a macro to create `DataFusionError::SchemaError` with optional backtrace
#[macro_export]
macro_rules! schema_datafusion_err {
($ERR:expr) => {
DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
)
};
}

// Exposes a macro to create `Err(DataFusionError::SchemaError)` with optional backtrace
#[macro_export]
macro_rules! schema_err {
($ERR:expr) => {
Err(DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
))
};
}

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_err as _plan_err;
pub use schema_err as _schema_err;

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

#[cfg(test)]
mod test {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ impl DataFrame {
let field_to_rename = match self.plan.schema().field_from_column(&old_column) {
Ok(field) => field,
// no-op if field not found
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { .. })) => {
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }, _)) => {
return Ok(self)
}
Err(err) => return Err(err),
Expand Down
34 changes: 20 additions & 14 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1845,13 +1845,16 @@ mod tests {
.project(vec![col("id"), col("first_name").alias("id")]);

match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
})) => {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
},
_,
)) => {
assert_eq!("employee_csv", table);
assert_eq!("id", &name);
Ok(())
Expand All @@ -1872,13 +1875,16 @@ mod tests {
.aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]);

match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
})) => {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
},
_,
)) => {
assert_eq!("employee_csv", table);
assert_eq!("state", &name);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Default expressions are restricted, column references are not allowed
let empty_schema = DFSchema::empty();
let error_desc = |e: DataFusionError| match e {
DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }) => {
DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }, _) => {
plan_datafusion_err!(
"Column reference is not allowed in the DEFAULT expression : {}",
e
Expand Down
15 changes: 7 additions & 8 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use arrow_schema::DataType;
use datafusion_common::file_options::StatementOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found, Column,
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
Result, ScalarValue, SchemaReference, TableReference, ToDFSchema,
not_impl_err, plan_datafusion_err, plan_err, schema_err, unqualified_field_not_found,
Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference, Result, ScalarValue, SchemaError, SchemaReference,
TableReference, ToDFSchema,
};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
Expand Down Expand Up @@ -1138,11 +1139,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if value_indices[column_index].is_some() {
return Err(DataFusionError::SchemaError(
datafusion_common::SchemaError::DuplicateUnqualifiedField {
name: c,
},
));
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: c,
});
} else {
value_indices[column_index] = Some(i);
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,11 @@ fn join_with_ambiguous_column() {
#[test]
fn where_selection_with_ambiguous_column() {
let sql = "SELECT * FROM person a, person b WHERE id = id + 1";
let err = logical_plan(sql).expect_err("query should have failed");
let err = logical_plan(sql)
.expect_err("query should have failed")
.strip_backtrace();
assert_eq!(
"SchemaError(AmbiguousReference { field: Column { relation: None, name: \"id\" } })",
"\"Schema error: Ambiguous reference to unqualified field id\"",
format!("{err:?}")
);
}
Expand Down

0 comments on commit dd4263f

Please sign in to comment.