Skip to content

Commit

Permalink
refactor: remove unused code for pruning row groups
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 21, 2023
1 parent bad8918 commit dd0377b
Showing 1 changed file with 11 additions and 88 deletions.
99 changes: 11 additions & 88 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,23 @@
use std::sync::Arc;

use common_query::logical_plan::{DfExpr, Expr};
use common_telemetry::{debug, error, warn};
use common_telemetry::{error, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator};
use datafusion_expr::{Between, BinaryExpr, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datatypes::arrow;
use datatypes::arrow::array::BooleanArray;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
use snafu::ResultExt;

use crate::error;
use crate::predicate::stats::RowGroupPruningStatistics;

#[cfg(test)]
mod stats;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -77,83 +73,6 @@ impl Predicate {
.collect::<Vec<_>>())
}

/// Builds an empty predicate from given schema.
pub fn empty() -> Self {
Self { exprs: vec![] }
}

/// Evaluates the predicate against row group metadata.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_row_groups(
&self,
row_groups: &[RowGroupMetaData],
schema: SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; row_groups.len()];

let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else {
return res;
};

let arrow_schema = schema.arrow_schema();
for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
Ok(p) => {
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
match p.prune(&stat) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
*res &= curr_val
}
}
Err(e) => {
warn!("Failed to prune row groups, error: {:?}", e);
}
}
}
Err(e) => {
error!("Failed to create predicate for expr, error: {:?}", e);
}
}
}
res
}

/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
let pk_schema = primary_key.schema();
let physical_exprs = self.to_physical_exprs(&pk_schema)?;
for expr in &physical_exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
Ok(true)
}

/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(
Expand Down Expand Up @@ -443,6 +362,7 @@ mod tests {
use parquet::file::properties::WriterProperties;

use super::*;
use crate::predicate::stats::RowGroupPruningStatistics;

fn check_build_predicate(expr: DfExpr, expect: TimestampRange) {
assert_eq!(
Expand Down Expand Up @@ -568,6 +488,7 @@ mod tests {
TimestampRange::until_end(Timestamp::new_millisecond(1000), false),
);
}

#[test]
fn test_lt_eq() {
// ts <= 1ms
Expand Down Expand Up @@ -651,8 +572,8 @@ mod tests {
expect: Vec<bool>,
) {
let dir = create_temp_dir("prune_parquet");
let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap());
let (path, arrow_schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(arrow_schema.clone()).unwrap());
let arrow_predicate = Predicate::new(filters);
let builder = ParquetRecordBatchStreamBuilder::new(
tokio::fs::OpenOptions::new()
Expand All @@ -665,7 +586,9 @@ mod tests {
.unwrap();
let metadata = builder.metadata().clone();
let row_groups = metadata.row_groups();
let res = arrow_predicate.prune_row_groups(row_groups, schema);

let stats = RowGroupPruningStatistics::new(row_groups, &schema);
let res = arrow_predicate.prune_with_stats(&stats, &arrow_schema);
assert_eq!(expect, res);
}

Expand Down

0 comments on commit dd0377b

Please sign in to comment.