From 71903e1b2c62cda9a92808a71f8b63bcdd43762d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 25 Jul 2024 14:04:33 -0400 Subject: [PATCH] Minor: use `ready!` macro to simplify FilterExec poll loop (#11649) --- datafusion/physical-plan/src/filter.rs | 35 +++++++++++--------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a9d78d059f5c..67de0989649e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, @@ -59,6 +59,7 @@ pub struct FilterExec { metrics: ExecutionPlanMetricsSet, /// Selectivity for statistics. 0 = no rows, 100 = all rows default_selectivity: u8, + /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, } @@ -375,26 +376,20 @@ impl Stream for FilterExecStream { ) -> Poll> { let poll; loop { - match self.input.poll_next_unpin(cx) { - Poll::Ready(value) => match value { - Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = batch_filter(&batch, &self.predicate)?; - // skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - timer.done(); - poll = Poll::Ready(Some(Ok(filtered_batch))); - break; + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = self.baseline_metrics.elapsed_compute().timer(); + let filtered_batch = batch_filter(&batch, &self.predicate)?; + // skip entirely filtered batches + if filtered_batch.num_rows() == 0 { + continue; } - _ => { - poll = Poll::Ready(value); - break; - } - }, - Poll::Pending => { - poll = Poll::Pending; + timer.done(); + poll = Poll::Ready(Some(Ok(filtered_batch))); + break; + } + value => { + poll = Poll::Ready(value); break; } }