Skip to content

Commit

Permalink
Minor: use ready! macro to simplify FilterExec poll loop (apache#11649
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alamb authored Jul 25, 2024
1 parent 7db4213 commit 71903e1
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
Expand Down Expand Up @@ -59,6 +59,7 @@ pub struct FilterExec {
metrics: ExecutionPlanMetricsSet,
/// Selectivity for statistics. 0 = no rows, 100 = all rows
default_selectivity: u8,
/// Properties equivalence properties, partitioning, etc.
cache: PlanProperties,
}

Expand Down Expand Up @@ -375,26 +376,20 @@ impl Stream for FilterExecStream {
) -> Poll<Option<Self::Item>> {
let poll;
loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(value) => match value {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
// skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
timer.done();
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
// skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
_ => {
poll = Poll::Ready(value);
break;
}
},
Poll::Pending => {
poll = Poll::Pending;
timer.done();
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
}
value => {
poll = Poll::Ready(value);
break;
}
}
Expand Down

0 comments on commit 71903e1

Please sign in to comment.