From b13734697f9e3dafa5e4b2dd9d0b6508ee4e6375 Mon Sep 17 00:00:00 2001 From: jakevin Date: Thu, 25 Apr 2024 18:07:14 +0800 Subject: [PATCH 1/7] doc: fix subscribe mail link (#10225) --- docs/source/contributor-guide/communication.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/contributor-guide/communication.md b/docs/source/contributor-guide/communication.md index 96f5e61d105e..3e5e816d2f90 100644 --- a/docs/source/contributor-guide/communication.md +++ b/docs/source/contributor-guide/communication.md @@ -45,8 +45,8 @@ request one in the `Arrow Rust` channel of the [Arrow Rust Discord server](https We also use arrow.apache.org's `dev@` mailing list for release coordination and occasional design discussions. Other than the release process, most DataFusion mailing list traffic will link to a GitHub issue or PR for discussion. -([subscribe](mailto:dev-subscribe@arrow.apache.org), -[unsubscribe](mailto:dev-unsubscribe@arrow.apache.org), +([subscribe](mailto:dev-subscribe@datafusion.apache.org), +[unsubscribe](mailto:dev-unsubscribe@datafusion.apache.org), [archives](https://lists.apache.org/list.html?dev@arrow.apache.org)). When emailing the dev list, please make sure to prefix the subject line with a From b9f17b030d9fe69b3ef9ef86a7b9706449ab9399 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 25 Apr 2024 03:33:19 -0700 Subject: [PATCH 2/7] Minor: Prevent empty datafusion-cli commands (#10219) * Prevent empty datafusion-cli commands * err msg * clippy --- datafusion-cli/src/main.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 38537dbd9238..6f71ccafb729 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -83,7 +83,8 @@ struct Args { short = 'c', long, multiple_values = true, - help = "Execute the given command string(s), then exit" + help = "Execute the given command string(s), then exit. Commands are expected to be non empty.", + validator(is_valid_command) )] command: Vec, @@ -285,6 +286,14 @@ fn is_valid_memory_pool_size(size: &str) -> Result<(), String> { } } +fn is_valid_command(command: &str) -> Result<(), String> { + if !command.is_empty() { + Ok(()) + } else { + Err("-c flag expects only non empty commands".to_string()) + } +} + #[derive(Debug, Clone, Copy)] enum ByteUnit { Byte, From 169701e0128911f16ed07e1ea714a4fdc1e90ee0 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 25 Apr 2024 11:57:06 +0000 Subject: [PATCH 3/7] Optimize date_bin (2x faster) (#10215) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add date_bin benchmark * optimize date_bin As mentioned in the docs for `PrimaryArray::unary` it is faster to apply an infallible operation across both valid and invalid values, rather than branching at every value. 1) Make stride function infallible 2) Use `unary` method This gives this speedup on my machine: Before: 22.345 µs After: 10.558 µs So around 2x faster --- datafusion/functions/Cargo.toml | 5 ++ datafusion/functions/benches/date_bin.rs | 57 +++++++++++++++++++ datafusion/functions/src/datetime/date_bin.rs | 19 +++---- 3 files changed, 71 insertions(+), 10 deletions(-) create mode 100644 datafusion/functions/benches/date_bin.rs diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 577ecdb7461d..0886dee03479 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -112,6 +112,11 @@ harness = false name = "make_date" required-features = ["datetime_expressions"] +[[bench]] +harness = false +name = "date_bin" +required-features = ["datetime_expressions"] + [[bench]] harness = false name = "to_char" diff --git a/datafusion/functions/benches/date_bin.rs b/datafusion/functions/benches/date_bin.rs new file mode 100644 index 000000000000..c881947354fd --- /dev/null +++ b/datafusion/functions/benches/date_bin.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use std::sync::Arc; + +use arrow::array::{ArrayRef, TimestampSecondArray}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::ScalarValue; +use rand::rngs::ThreadRng; +use rand::Rng; + +use datafusion_expr::ColumnarValue; +use datafusion_functions::datetime::date_bin; + +fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray { + let mut seconds = vec![]; + for _ in 0..1000 { + seconds.push(rng.gen_range(0..1_000_000)); + } + + TimestampSecondArray::from(seconds) +} + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("date_bin_1000", |b| { + let mut rng = rand::thread_rng(); + let interval = ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1_000_000)); + let timestamps = ColumnarValue::Array(Arc::new(timestamps(&mut rng)) as ArrayRef); + let udf = date_bin(); + + b.iter(|| { + black_box( + udf.invoke(&[interval.clone(), timestamps.clone()]) + .expect("date_bin should work on valid values"), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 7f5d9bb5d921..da1797cdae81 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -320,14 +320,14 @@ fn date_bin_impl( origin: i64, stride: i64, stride_fn: fn(i64, i64, i64) -> i64, - ) -> impl Fn(Option) -> Option { + ) -> impl Fn(i64) -> i64 { let scale = match T::UNIT { Nanosecond => 1, Microsecond => NANOSECONDS / 1_000_000, Millisecond => NANOSECONDS / 1_000, Second => NANOSECONDS, }; - move |x: Option| x.map(|x| stride_fn(stride, x * scale, origin) / scale) + move |x: i64| stride_fn(stride, x * scale, origin) / scale } Ok(match array { @@ -335,7 +335,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -343,7 +343,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -351,7 +351,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -359,7 +359,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -377,14 +377,13 @@ fn date_bin_impl( { let array = as_primitive_array::(array)?; let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); - let array = array - .iter() - .map(apply_stride_fn) - .collect::>() + let array: PrimitiveArray = array + .unary(apply_stride_fn) .with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) } + match array.data_type() { Timestamp(Nanosecond, tz_opt) => { transform_array_with_stride::( From 7dae9ab2dd63ba16575bd8137cb0e5897a2f079a Mon Sep 17 00:00:00 2001 From: Vrishabh Date: Thu, 25 Apr 2024 18:01:33 +0530 Subject: [PATCH 4/7] Refactor sessionconfig set fns to avoid an unnecessary enum to string conversion (#10141) * Refactor sessionconfig set functions to avoid an uncessary enum to string conversion * Call set_str directly on usize --------- Co-authored-by: Andrew Lamb --- datafusion/execution/src/config.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index e29030e61457..28275d484e29 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -157,30 +157,29 @@ impl SessionConfig { } /// Set a configuration option - pub fn set(mut self, key: &str, value: ScalarValue) -> Self { - self.options.set(key, &value.to_string()).unwrap(); - self + pub fn set(self, key: &str, value: ScalarValue) -> Self { + self.set_str(key, &value.to_string()) } /// Set a boolean configuration option pub fn set_bool(self, key: &str, value: bool) -> Self { - self.set(key, ScalarValue::Boolean(Some(value))) + self.set_str(key, &value.to_string()) } /// Set a generic `u64` configuration option pub fn set_u64(self, key: &str, value: u64) -> Self { - self.set(key, ScalarValue::UInt64(Some(value))) + self.set_str(key, &value.to_string()) } /// Set a generic `usize` configuration option pub fn set_usize(self, key: &str, value: usize) -> Self { - let value: u64 = value.try_into().expect("convert usize to u64"); - self.set(key, ScalarValue::UInt64(Some(value))) + self.set_str(key, &value.to_string()) } /// Set a generic `str` configuration option - pub fn set_str(self, key: &str, value: &str) -> Self { - self.set(key, ScalarValue::from(value)) + pub fn set_str(mut self, key: &str, value: &str) -> Self { + self.options.set(key, value).unwrap(); + self } /// Customize batch size From b87f210dbdd90e5f65caefac1eeb053b0f0f612e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 25 Apr 2024 14:31:55 +0200 Subject: [PATCH 5/7] fix: reduce lock contention in distributor channels (#10026) * fix: lock contention in distributor channels Reduce lock contention in distributor channels via: - use atomic counters instead of "counter behind mutex" where appropriate - use less state - only lock when needed - move "wake" operation out of lock scopes (they are eventual operations anyways and many wake operations results in "futex wake" operations -- i.e. a syscall -- which you should avoid while holding the lock) * refactor: add more docs and tests for distributor channels --------- Co-authored-by: Andrew Lamb --- .../src/repartition/distributor_channels.rs | 358 ++++++++++++------ 1 file changed, 245 insertions(+), 113 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index e71b88467bcc..bad923ce9e82 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -40,8 +40,12 @@ use std::{ collections::VecDeque, future::Future, + ops::DerefMut, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll, Waker}, }; @@ -52,20 +56,12 @@ pub fn channels( n: usize, ) -> (Vec>, Vec>) { let channels = (0..n) - .map(|id| { - Arc::new(Mutex::new(Channel { - data: VecDeque::default(), - n_senders: 1, - recv_alive: true, - recv_wakers: Vec::default(), - id, - })) - }) + .map(|id| Arc::new(Channel::new_with_one_sender(id))) .collect::>(); - let gate = Arc::new(Mutex::new(Gate { - empty_channels: n, - send_wakers: Vec::default(), - })); + let gate = Arc::new(Gate { + empty_channels: AtomicUsize::new(n), + send_wakers: Mutex::new(None), + }); let senders = channels .iter() .map(|channel| DistributionSender { @@ -143,8 +139,7 @@ impl DistributionSender { impl Clone for DistributionSender { fn clone(&self) -> Self { - let mut guard = self.channel.lock(); - guard.n_senders += 1; + self.channel.n_senders.fetch_add(1, Ordering::SeqCst); Self { channel: Arc::clone(&self.channel), @@ -155,19 +150,46 @@ impl Clone for DistributionSender { impl Drop for DistributionSender { fn drop(&mut self) { - let mut guard_channel = self.channel.lock(); - guard_channel.n_senders -= 1; + let n_senders_pre = self.channel.n_senders.fetch_sub(1, Ordering::SeqCst); + // is the the last copy of the sender side? + if n_senders_pre > 1 { + return; + } - if guard_channel.n_senders == 0 { - // Note: the recv_alive check is so that we don't double-clear the status - if guard_channel.data.is_empty() && guard_channel.recv_alive { + let receivers = { + let mut state = self.channel.state.lock(); + + // During the shutdown of a empty channel, both the sender and the receiver side will be dropped. However we + // only want to decrement the "empty channels" counter once. + // + // We are within a critical section here, so we we can safely assume that either the last sender or the + // receiver (there's only one) will be dropped first. + // + // If the last sender is dropped first, `state.data` will still exists and the sender side decrements the + // signal. The receiver side then MUST check the `n_senders` counter during the section and if it is zero, + // it inferres that it is dropped afterwards and MUST NOT decrement the counter. + // + // If the receiver end is dropped first, it will inferr -- based on `n_senders` -- that there are still + // senders and it will decrement the `empty_channels` counter. It will also set `data` to `None`. The sender + // side will then see that `data` is `None` and can therefore inferr that the receiver end was dropped, and + // hence it MUST NOT decrement the `empty_channels` counter. + if state + .data + .as_ref() + .map(|data| data.is_empty()) + .unwrap_or_default() + { // channel is gone, so we need to clear our signal - let mut guard_gate = self.gate.lock(); - guard_gate.empty_channels -= 1; + self.gate.decr_empty_channels(); } - // receiver may be waiting for data, but should return `None` now since the channel is closed - guard_channel.wake_receivers(); + // make sure that nobody can add wakers anymore + state.recv_wakers.take().expect("not closed yet") + }; + + // wake outside of lock scope + for recv in receivers { + recv.wake(); } } } @@ -188,33 +210,41 @@ impl<'a, T> Future for SendFuture<'a, T> { let this = &mut *self; assert!(this.element.is_some(), "polled ready future"); - let mut guard_channel = this.channel.lock(); - - // receiver end still alive? - if !guard_channel.recv_alive { - return Poll::Ready(Err(SendError( - this.element.take().expect("just checked"), - ))); - } + // lock scope + let to_wake = { + let mut guard_channel_state = this.channel.state.lock(); + + let Some(data) = guard_channel_state.data.as_mut() else { + // receiver end dead + return Poll::Ready(Err(SendError( + this.element.take().expect("just checked"), + ))); + }; + + // does ANY receiver need data? + // if so, allow sender to create another + if this.gate.empty_channels.load(Ordering::SeqCst) == 0 { + let mut guard = this.gate.send_wakers.lock(); + if let Some(send_wakers) = guard.deref_mut() { + send_wakers.push((cx.waker().clone(), this.channel.id)); + return Poll::Pending; + } + } - let mut guard_gate = this.gate.lock(); + let was_empty = data.is_empty(); + data.push_back(this.element.take().expect("just checked")); - // does ANY receiver need data? - // if so, allow sender to create another - if guard_gate.empty_channels == 0 { - guard_gate - .send_wakers - .push((cx.waker().clone(), guard_channel.id)); - return Poll::Pending; - } + if was_empty { + this.gate.decr_empty_channels(); + guard_channel_state.take_recv_wakers() + } else { + Vec::with_capacity(0) + } + }; - let was_empty = guard_channel.data.is_empty(); - guard_channel - .data - .push_back(this.element.take().expect("just checked")); - if was_empty { - guard_gate.empty_channels -= 1; - guard_channel.wake_receivers(); + // wake outside of lock scope + for receiver in to_wake { + receiver.wake(); } Poll::Ready(Ok(())) @@ -243,21 +273,18 @@ impl DistributionReceiver { impl Drop for DistributionReceiver { fn drop(&mut self) { - let mut guard_channel = self.channel.lock(); - let mut guard_gate = self.gate.lock(); - guard_channel.recv_alive = false; + let mut guard_channel_state = self.channel.state.lock(); + let data = guard_channel_state.data.take().expect("not dropped yet"); - // Note: n_senders check is here so we don't double-clear the signal - if guard_channel.data.is_empty() && (guard_channel.n_senders > 0) { + // See `DistributedSender::drop` for an explanation of the drop order and when the "empty channels" counter is + // decremented. + if data.is_empty() && (self.channel.n_senders.load(Ordering::SeqCst) > 0) { // channel is gone, so we need to clear our signal - guard_gate.empty_channels -= 1; + self.gate.decr_empty_channels(); } // senders may be waiting for gate to open but should error now that the channel is closed - guard_gate.wake_channel_senders(guard_channel.id); - - // clear potential remaining data from channel - guard_channel.data.clear(); + self.gate.wake_channel_senders(self.channel.id); } } @@ -275,37 +302,51 @@ impl<'a, T> Future for RecvFuture<'a, T> { let this = &mut *self; assert!(!this.rdy, "polled ready future"); - let mut guard_channel = this.channel.lock(); + let mut guard_channel_state = this.channel.state.lock(); + let channel_state = guard_channel_state.deref_mut(); + let data = channel_state.data.as_mut().expect("not dropped yet"); - match guard_channel.data.pop_front() { + match data.pop_front() { Some(element) => { // change "empty" signal for this channel? - if guard_channel.data.is_empty() && (guard_channel.n_senders > 0) { - let mut guard_gate = this.gate.lock(); - + if data.is_empty() && channel_state.recv_wakers.is_some() { // update counter - let old_counter = guard_gate.empty_channels; - guard_gate.empty_channels += 1; + let old_counter = + this.gate.empty_channels.fetch_add(1, Ordering::SeqCst); // open gate? - if old_counter == 0 { - guard_gate.wake_all_senders(); + let to_wake = if old_counter == 0 { + let mut guard = this.gate.send_wakers.lock(); + + // check after lock to see if we should still change the state + if this.gate.empty_channels.load(Ordering::SeqCst) > 0 { + guard.take().unwrap_or_default() + } else { + Vec::with_capacity(0) + } + } else { + Vec::with_capacity(0) + }; + + drop(guard_channel_state); + + // wake outside of lock scope + for (waker, _channel_id) in to_wake { + waker.wake(); } - - drop(guard_gate); - drop(guard_channel); } this.rdy = true; Poll::Ready(Some(element)) } - None if guard_channel.n_senders == 0 => { - this.rdy = true; - Poll::Ready(None) - } None => { - guard_channel.recv_wakers.push(cx.waker().clone()); - Poll::Pending + if let Some(recv_wakers) = channel_state.recv_wakers.as_mut() { + recv_wakers.push(cx.waker().clone()); + Poll::Pending + } else { + this.rdy = true; + Poll::Ready(None) + } } } } @@ -314,78 +355,122 @@ impl<'a, T> Future for RecvFuture<'a, T> { /// Links senders and receivers. #[derive(Debug)] struct Channel { - /// Buffered data. - data: VecDeque, - /// Reference counter for the sender side. - n_senders: usize, - - /// Reference "counter"/flag for the single receiver. - recv_alive: bool, - - /// Wakers for the receiver side. - /// - /// The receiver will be pending if the [buffer](Self::data) is empty and - /// there are senders left (according to the [reference counter](Self::n_senders)). - recv_wakers: Vec, + n_senders: AtomicUsize, /// Channel ID. /// /// This is used to address [send wakers](Gate::send_wakers). id: usize, + + /// Mutable state. + state: Mutex>, } impl Channel { - fn wake_receivers(&mut self) { - for waker in self.recv_wakers.drain(..) { - waker.wake(); + /// Create new channel with one sender (so we don't need to [fetch-add](AtomicUsize::fetch_add) directly afterwards). + fn new_with_one_sender(id: usize) -> Self { + Channel { + n_senders: AtomicUsize::new(1), + id, + state: Mutex::new(ChannelState { + data: Some(VecDeque::default()), + recv_wakers: Some(Vec::default()), + }), } } } +#[derive(Debug)] +struct ChannelState { + /// Buffered data. + /// + /// This is [`None`] when the receiver is gone. + data: Option>, + + /// Wakers for the receiver side. + /// + /// The receiver will be pending if the [buffer](Self::data) is empty and + /// there are senders left (otherwise this is set to [`None`]). + recv_wakers: Option>, +} + +impl ChannelState { + /// Get all [`recv_wakers`](Self::recv_wakers) and replace with identically-sized buffer. + /// + /// The wakers should be woken AFTER the lock to [this state](Self) was dropped. + /// + /// # Panics + /// Assumes that channel is NOT closed yet, i.e. that [`recv_wakers`](Self::recv_wakers) is not [`None`]. + fn take_recv_wakers(&mut self) -> Vec { + let to_wake = self.recv_wakers.as_mut().expect("not closed"); + let mut tmp = Vec::with_capacity(to_wake.capacity()); + std::mem::swap(to_wake, &mut tmp); + tmp + } +} + /// Shared channel. /// /// One or multiple senders and a single receiver will share a channel. -type SharedChannel = Arc>>; +type SharedChannel = Arc>; /// The "all channels have data" gate. #[derive(Debug)] struct Gate { /// Number of currently empty (and still open) channels. - empty_channels: usize, + empty_channels: AtomicUsize, /// Wakers for the sender side, including their channel IDs. - send_wakers: Vec<(Waker, usize)>, + /// + /// This is `None` if the there are non-empty channels. + send_wakers: Mutex>>, } impl Gate { - //// Wake all senders. + /// Wake senders for a specific channel. /// - /// This is helpful to signal that there are some channels empty now and hence the gate was opened. - fn wake_all_senders(&mut self) { - for (waker, _id) in self.send_wakers.drain(..) { + /// This is helpful to signal that the receiver side is gone and the senders shall now error. + fn wake_channel_senders(&self, id: usize) { + // lock scope + let to_wake = { + let mut guard = self.send_wakers.lock(); + + if let Some(send_wakers) = guard.deref_mut() { + // `drain_filter` is unstable, so implement our own + let (wake, keep) = + send_wakers.drain(..).partition(|(_waker, id2)| id == *id2); + + *send_wakers = keep; + + wake + } else { + Vec::with_capacity(0) + } + }; + + // wake outside of lock scope + for (waker, _id) in to_wake { waker.wake(); } } - /// Wake senders for a specific channel. - /// - /// This is helpful to signal that the receiver side is gone and the senders shall now error. - fn wake_channel_senders(&mut self, id: usize) { - // `drain_filter` is unstable, so implement our own - let (wake, keep) = self - .send_wakers - .drain(..) - .partition(|(_waker, id2)| id == *id2); - self.send_wakers = keep; - for (waker, _id) in wake { - waker.wake(); + fn decr_empty_channels(&self) { + let old_count = self.empty_channels.fetch_sub(1, Ordering::SeqCst); + + if old_count == 1 { + let mut guard = self.send_wakers.lock(); + + // double-check state during lock + if self.empty_channels.load(Ordering::SeqCst) == 0 && guard.is_none() { + *guard = Some(Vec::new()); + } } } } /// Gate shared by all senders and receivers. -type SharedGate = Arc>; +type SharedGate = Arc; #[cfg(test)] mod tests { @@ -596,6 +681,52 @@ mod tests { assert_eq!(counter.strong_count(), 0); } + /// Ensure that polling "pending" futures work even when you poll them too often (which happens under some circumstances). + #[test] + fn test_poll_empty_channel_twice() { + let (txs, mut rxs) = channels(1); + + let mut recv_fut = rxs[0].recv(); + let waker_1a = poll_pending(&mut recv_fut); + let waker_1b = poll_pending(&mut recv_fut); + + let mut recv_fut = rxs[0].recv(); + let waker_2 = poll_pending(&mut recv_fut); + + poll_ready(&mut txs[0].send("a")).unwrap(); + assert!(waker_1a.woken()); + assert!(waker_1b.woken()); + assert!(waker_2.woken()); + assert_eq!(poll_ready(&mut recv_fut), Some("a"),); + + poll_ready(&mut txs[0].send("b")).unwrap(); + let mut send_fut = txs[0].send("c"); + let waker_3 = poll_pending(&mut send_fut); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("b"),); + assert!(waker_3.woken()); + poll_ready(&mut send_fut).unwrap(); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("c")); + + let mut recv_fut = rxs[0].recv(); + let waker_4 = poll_pending(&mut recv_fut); + + let mut recv_fut = rxs[0].recv(); + let waker_5 = poll_pending(&mut recv_fut); + + poll_ready(&mut txs[0].send("d")).unwrap(); + let mut send_fut = txs[0].send("e"); + let waker_6a = poll_pending(&mut send_fut); + let waker_6b = poll_pending(&mut send_fut); + + assert!(waker_4.woken()); + assert!(waker_5.woken()); + assert_eq!(poll_ready(&mut recv_fut), Some("d"),); + + assert!(waker_6a.woken()); + assert!(waker_6b.woken()); + poll_ready(&mut send_fut).unwrap(); + } + #[test] #[should_panic(expected = "polled ready future")] fn test_panic_poll_send_future_after_ready_ok() { @@ -655,6 +786,7 @@ mod tests { poll_pending(&mut fut); } + /// Test [`poll_pending`] (i.e. the testing utils, not the actual library code). #[test] fn test_meta_poll_pending_waker() { let (tx, mut rx) = futures::channel::oneshot::channel(); From 5c86db0b2c5a459d54994b7c6ce6b461cc35d767 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 25 Apr 2024 15:24:09 -0400 Subject: [PATCH 6/7] Avoid some copies, encapsulate the handling of child indicies in `OptimizeProjection` (#10216) --- datafusion/common/src/dfschema.rs | 17 +- .../mod.rs} | 304 ++++-------------- .../optimize_projections/required_indices.rs | 227 +++++++++++++ 3 files changed, 311 insertions(+), 237 deletions(-) rename datafusion/optimizer/src/{optimize_projections.rs => optimize_projections/mod.rs} (87%) create mode 100644 datafusion/optimizer/src/optimize_projections/required_indices.rs diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index f1909f0dc8e1..64e40ea99e67 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -347,9 +347,22 @@ impl DFSchema { matches.next() } - /// Find the index of the column with the given qualifier and name - pub fn index_of_column(&self, col: &Column) -> Result { + /// Find the index of the column with the given qualifier and name, + /// returning `None` if not found + /// + /// See [Self::index_of_column] for a version that returns an error if the + /// column is not found + pub fn maybe_index_of_column(&self, col: &Column) -> Option { self.index_of_column_by_name(col.relation.as_ref(), &col.name) + } + + /// Find the index of the column with the given qualifier and name, + /// returning `Err` if not found + /// + /// See [Self::maybe_index_of_column] for a version that returns `None` if + /// the column is not found + pub fn index_of_column(&self, col: &Column) -> Result { + self.maybe_index_of_column(col) .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self)) } diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections/mod.rs similarity index 87% rename from datafusion/optimizer/src/optimize_projections.rs rename to datafusion/optimizer/src/optimize_projections/mod.rs index 70ffd8f24498..0f2aaa6cbcb3 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -17,16 +17,16 @@ //! [`OptimizeProjections`] identifies and eliminates unused columns +mod required_indices; + use std::collections::HashSet; use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use arrow::datatypes::SchemaRef; use datafusion_common::{ - get_required_group_by_exprs_indices, internal_err, Column, DFSchema, DFSchemaRef, - JoinType, Result, + get_required_group_by_exprs_indices, internal_err, Column, JoinType, Result, }; use datafusion_expr::expr::{Alias, ScalarFunction}; use datafusion_expr::{ @@ -34,9 +34,10 @@ use datafusion_expr::{ Expr, Projection, TableScan, Window, }; +use crate::optimize_projections::required_indices::RequiredIndicies; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use hashbrown::HashMap; -use itertools::{izip, Itertools}; +use itertools::izip; /// Optimizer rule to prune unnecessary columns from intermediate schemas /// inside the [`LogicalPlan`]. This rule: @@ -70,8 +71,8 @@ impl OptimizerRule for OptimizeProjections { config: &dyn OptimizerConfig, ) -> Result> { // All output fields are necessary: - let indices = (0..plan.schema().fields().len()).collect::>(); - optimize_projections(plan, config, &indices) + let indices = RequiredIndicies::new_for_all_exprs(plan); + optimize_projections(plan, config, indices) } fn name(&self) -> &str { @@ -105,13 +106,9 @@ impl OptimizerRule for OptimizeProjections { fn optimize_projections( plan: &LogicalPlan, config: &dyn OptimizerConfig, - indices: &[usize], + indices: RequiredIndicies, ) -> Result> { - // `child_required_indices` stores - // - indices of the columns required for each child - // - a flag indicating whether putting a projection above children is beneficial for the parent. - // As an example LogicalPlan::Filter benefits from small tables. Hence for filter child this flag would be `true`. - let child_required_indices: Vec<(Vec, bool)> = match plan { + let child_required_indices: Vec = match plan { LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) @@ -123,12 +120,13 @@ fn optimize_projections( // that appear in this plan's expressions to its child. All these // operators benefit from "small" inputs, so the projection_beneficial // flag is `true`. - let exprs = plan.expressions(); plan.inputs() .into_iter() .map(|input| { - get_all_required_indices(indices, input, exprs.iter()) - .map(|idxs| (idxs, true)) + indices + .clone() + .with_projection_beneficial() + .with_plan_exprs(plan, input.schema()) }) .collect::>()? } @@ -137,13 +135,9 @@ fn optimize_projections( // that appear in this plan's expressions to its child. These operators // do not benefit from "small" inputs, so the projection_beneficial // flag is `false`. - let exprs = plan.expressions(); plan.inputs() .into_iter() - .map(|input| { - get_all_required_indices(indices, input, exprs.iter()) - .map(|idxs| (idxs, false)) - }) + .map(|input| indices.clone().with_plan_exprs(plan, input.schema())) .collect::>()? } LogicalPlan::Copy(_) @@ -159,16 +153,14 @@ fn optimize_projections( // TODO: For some subquery variants (e.g. a subquery arising from an // EXISTS expression), we may not need to require all indices. plan.inputs() - .iter() - .map(|input| ((0..input.schema().fields().len()).collect_vec(), false)) - .collect::>() + .into_iter() + .map(RequiredIndicies::new_for_all_exprs) + .collect() } LogicalPlan::Extension(extension) => { - let necessary_children_indices = if let Some(necessary_children_indices) = - extension.node.necessary_children_exprs(indices) - { - necessary_children_indices - } else { + let Some(necessary_children_indices) = + extension.node.necessary_children_exprs(indices.indices()) + else { // Requirements from parent cannot be routed down to user defined logical plan safely return Ok(None); }; @@ -178,16 +170,12 @@ fn optimize_projections( Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \ consistent with actual children length for the node."); } - // Expressions used by node. - let exprs = plan.expressions(); children .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - let child_schema = child.schema(); - let child_req_indices = - indices_referred_by_exprs(child_schema, exprs.iter())?; - Ok((merge_slices(&necessary_indices, &child_req_indices), false)) + RequiredIndicies::new_from_indices(necessary_indices) + .with_plan_exprs(plan, child.schema()) }) .collect::>>()? } @@ -213,13 +201,9 @@ fn optimize_projections( LogicalPlan::Aggregate(aggregate) => { // Split parent requirements to GROUP BY and aggregate sections: let n_group_exprs = aggregate.group_expr_len()?; - let (group_by_reqs, mut aggregate_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < n_group_exprs); // Offset aggregate indices so that they point to valid indices at // `aggregate.aggr_expr`: - for idx in aggregate_reqs.iter_mut() { - *idx -= n_group_exprs; - } + let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); // Get absolutely necessary GROUP BY fields: let group_by_expr_existing = aggregate @@ -235,16 +219,16 @@ fn optimize_projections( // Some of the fields in the GROUP BY may be required by the // parent even if these fields are unnecessary in terms of // functional dependency. - let required_indices = - merge_slices(&simplest_groupby_indices, &group_by_reqs); - get_at_indices(&aggregate.group_expr, &required_indices) + group_by_reqs + .append(&simplest_groupby_indices) + .get_at_indices(&aggregate.group_expr) } else { aggregate.group_expr.clone() }; // Only use the absolutely necessary aggregate expressions required // by the parent: - let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr, &aggregate_reqs); + let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); // Aggregations always need at least one aggregate expression. // With a nested count, we don't require any column as input, but @@ -263,10 +247,12 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); - let necessary_indices = indices_referred_by_exprs(schema, all_exprs_iter)?; + let necessary_indices = + RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?; + let necessary_exprs = necessary_indices.get_required_exprs(schema); let aggregate_input = if let Some(input) = - optimize_projections(&aggregate.input, config, &necessary_indices)? + optimize_projections(&aggregate.input, config, necessary_indices)? { input } else { @@ -277,7 +263,6 @@ fn optimize_projections( // that its input only contains absolutely necessary columns for // the aggregate expressions. Note that necessary_indices refer to // fields in `aggregate.input.schema()`. - let necessary_exprs = get_required_exprs(schema, &necessary_indices); let (aggregate_input, _) = add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)?; @@ -291,29 +276,24 @@ fn optimize_projections( .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate))); } LogicalPlan::Window(window) => { + let input_schema = window.input.schema(); // Split parent requirements to child and window expression sections: - let n_input_fields = window.input.schema().fields().len(); - let (child_reqs, mut window_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < n_input_fields); + let n_input_fields = input_schema.fields().len(); // Offset window expression indices so that they point to valid // indices at `window.window_expr`: - for idx in window_reqs.iter_mut() { - *idx -= n_input_fields; - } + let (child_reqs, window_reqs) = indices.split_off(n_input_fields); // Only use window expressions that are absolutely necessary according // to parent requirements: - let new_window_expr = get_at_indices(&window.window_expr, &window_reqs); + let new_window_expr = window_reqs.get_at_indices(&window.window_expr); // Get all the required column indices at the input, either by the // parent or window expression requirements. - let required_indices = get_all_required_indices( - &child_reqs, - &window.input, - new_window_expr.iter(), - )?; + let required_indices = + child_reqs.with_exprs(input_schema, &new_window_expr)?; + let window_child = if let Some(new_window_child) = - optimize_projections(&window.input, config, &required_indices)? + optimize_projections(&window.input, config, required_indices.clone())? { new_window_child } else { @@ -327,8 +307,7 @@ fn optimize_projections( // Calculate required expressions at the input of the window. // Please note that we use `old_child`, because `required_indices` // refers to `old_child`. - let required_exprs = - get_required_exprs(window.input.schema(), &required_indices); + let required_exprs = required_indices.get_required_exprs(input_schema); let (window_child, _) = add_projection_on_top_if_helpful(window_child, required_exprs)?; Window::try_new(new_window_expr, Arc::new(window_child)) @@ -339,31 +318,35 @@ fn optimize_projections( let left_len = join.left.schema().fields().len(); let (left_req_indices, right_req_indices) = split_join_requirements(left_len, indices, &join.join_type); - let exprs = plan.expressions(); let left_indices = - get_all_required_indices(&left_req_indices, &join.left, exprs.iter())?; + left_req_indices.with_plan_exprs(plan, join.left.schema())?; let right_indices = - get_all_required_indices(&right_req_indices, &join.right, exprs.iter())?; + right_req_indices.with_plan_exprs(plan, join.right.schema())?; // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: - vec![(left_indices, true), (right_indices, true)] + vec![ + left_indices.with_projection_beneficial(), + right_indices.with_projection_beneficial(), + ] } LogicalPlan::CrossJoin(cross_join) => { let left_len = cross_join.left.schema().fields().len(); - let (left_child_indices, right_child_indices) = + let (left_indices, right_indices) = split_join_requirements(left_len, indices, &JoinType::Inner); // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: - vec![(left_child_indices, true), (right_child_indices, true)] + vec![ + left_indices.with_projection_beneficial(), + right_indices.with_projection_beneficial(), + ] } LogicalPlan::TableScan(table_scan) => { - let schema = table_scan.source.schema(); // Get indices referred to in the original (schema with all fields) // given projected indices. - let projection = with_indices(&table_scan.projection, schema, |map| { - indices.iter().map(|&idx| map[idx]).collect() - }); - + let projection = match &table_scan.projection { + Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), + None => indices.into_inner(), + }; return TableScan::try_new( table_scan.table_name.clone(), table_scan.source.clone(), @@ -376,15 +359,16 @@ fn optimize_projections( }; let new_inputs = izip!(child_required_indices, plan.inputs().into_iter()) - .map(|((required_indices, projection_beneficial), child)| { + .map(|(required_indices, child)| { + let projection_beneficial = required_indices.projection_beneficial(); + let project_exprs = required_indices.get_required_exprs(child.schema()); let (input, is_changed) = if let Some(new_input) = - optimize_projections(child, config, &required_indices)? + optimize_projections(child, config, required_indices)? { (new_input, true) } else { (child.clone(), false) }; - let project_exprs = get_required_exprs(child.schema(), &required_indices); let (input, proj_added) = if projection_beneficial { add_projection_on_top_if_helpful(input, project_exprs)? } else { @@ -408,26 +392,6 @@ fn optimize_projections( } } -/// This function applies the given function `f` to the projection indices -/// `proj_indices` if they exist. Otherwise, applies `f` to a default set -/// of indices according to `schema`. -fn with_indices( - proj_indices: &Option>, - schema: SchemaRef, - mut f: F, -) -> Vec -where - F: FnMut(&[usize]) -> Vec, -{ - match proj_indices { - Some(indices) => f(indices.as_slice()), - None => { - let range: Vec = (0..schema.fields.len()).collect(); - f(range.as_slice()) - } - } -} - /// Merges consecutive projections. /// /// Given a projection `proj`, this function attempts to merge it with a previous @@ -653,132 +617,6 @@ fn outer_columns_helper_multi<'a>( exprs.into_iter().for_each(|e| outer_columns(e, columns)); } -/// Generates the required expressions (columns) that reside at `indices` of -/// the given `input_schema`. -/// -/// # Arguments -/// -/// * `input_schema` - A reference to the input schema. -/// * `indices` - A slice of `usize` indices specifying required columns. -/// -/// # Returns -/// -/// A vector of `Expr::Column` expressions residing at `indices` of the `input_schema`. -fn get_required_exprs(input_schema: &Arc, indices: &[usize]) -> Vec { - indices - .iter() - .map(|&idx| Expr::Column(Column::from(input_schema.qualified_field(idx)))) - .collect() -} - -/// Get indices of the fields referred to by any expression in `exprs` within -/// the given schema (`input_schema`). -/// -/// # Arguments -/// -/// * `input_schema`: The input schema to analyze for index requirements. -/// * `exprs`: An iterator of expressions for which we want to find necessary -/// field indices. -/// -/// # Returns -/// -/// A [`Result`] object containing the indices of all required fields in -/// `input_schema` to calculate all `exprs` successfully. -fn indices_referred_by_exprs<'a>( - input_schema: &DFSchemaRef, - exprs: impl Iterator, -) -> Result> { - let indices = exprs - .map(|expr| indices_referred_by_expr(input_schema, expr)) - .collect::>>()?; - Ok(indices - .into_iter() - .flatten() - // Make sure no duplicate entries exist and indices are ordered: - .sorted() - .dedup() - .collect()) -} - -/// Get indices of the fields referred to by the given expression `expr` within -/// the given schema (`input_schema`). -/// -/// # Parameters -/// -/// * `input_schema`: The input schema to analyze for index requirements. -/// * `expr`: An expression for which we want to find necessary field indices. -/// -/// # Returns -/// -/// A [`Result`] object containing the indices of all required fields in -/// `input_schema` to calculate `expr` successfully. -fn indices_referred_by_expr( - input_schema: &DFSchemaRef, - expr: &Expr, -) -> Result> { - let mut cols = expr.to_columns()?; - // Get outer-referenced (subquery) columns: - outer_columns(expr, &mut cols); - Ok(cols - .iter() - .flat_map(|col| input_schema.index_of_column(col)) - .collect()) -} - -/// Gets all required indices for the input; i.e. those required by the parent -/// and those referred to by `exprs`. -/// -/// # Parameters -/// -/// * `parent_required_indices` - A slice of indices required by the parent plan. -/// * `input` - The input logical plan to analyze for index requirements. -/// * `exprs` - An iterator of expressions used to determine required indices. -/// -/// # Returns -/// -/// A `Result` containing a vector of `usize` indices containing all the required -/// indices. -fn get_all_required_indices<'a>( - parent_required_indices: &[usize], - input: &LogicalPlan, - exprs: impl Iterator, -) -> Result> { - indices_referred_by_exprs(input.schema(), exprs) - .map(|indices| merge_slices(parent_required_indices, &indices)) -} - -/// Retrieves the expressions at specified indices within the given slice. Ignores -/// any invalid indices. -/// -/// # Parameters -/// -/// * `exprs` - A slice of expressions to index into. -/// * `indices` - A slice of indices specifying the positions of expressions sought. -/// -/// # Returns -/// -/// A vector of expressions corresponding to specified indices. -fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec { - indices - .iter() - // Indices may point to further places than `exprs` len. - .filter_map(|&idx| exprs.get(idx).cloned()) - .collect() -} - -/// Merges two slices into a single vector with sorted (ascending) and -/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]` -/// will produce `[1, 2, 3, 6]`. -fn merge_slices(left: &[T], right: &[T]) -> Vec { - // Make sure to sort before deduping, which removes the duplicates: - left.iter() - .cloned() - .chain(right.iter().cloned()) - .sorted() - .dedup() - .collect() -} - /// Splits requirement indices for a join into left and right children based on /// the join type. /// @@ -810,26 +648,21 @@ fn merge_slices(left: &[T], right: &[T]) -> Vec { /// adjusted based on the join type. fn split_join_requirements( left_len: usize, - indices: &[usize], + indices: RequiredIndicies, join_type: &JoinType, -) -> (Vec, Vec) { +) -> (RequiredIndicies, RequiredIndicies) { match join_type { // In these cases requirements are split between left/right children: JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let (left_reqs, mut right_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < left_len); // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: - for idx in right_reqs.iter_mut() { - *idx -= left_len; - } - (left_reqs, right_reqs) + indices.split_off(left_len) } // All requirements can be re-routed to left child directly. - JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]), + JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndicies::new()), // All requirements can be re-routed to right side directly. // No need to change index, join schema is right child schema. - JoinType::RightSemi | JoinType::RightAnti => (vec![], indices.to_vec()), + JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(), indices), } } @@ -885,13 +718,14 @@ fn add_projection_on_top_if_helpful( fn rewrite_projection_given_requirements( proj: &Projection, config: &dyn OptimizerConfig, - indices: &[usize], + indices: RequiredIndicies, ) -> Result> { - let exprs_used = get_at_indices(&proj.expr, indices); + let exprs_used = indices.get_at_indices(&proj.expr); + let required_indices = - indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?; + RequiredIndicies::new().with_exprs(proj.input.schema(), exprs_used.iter())?; return if let Some(input) = - optimize_projections(&proj.input, config, &required_indices)? + optimize_projections(&proj.input, config, required_indices)? { if is_projection_unnecessary(&input, &exprs_used)? { Ok(Some(input)) diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs new file mode 100644 index 000000000000..113c100bbd9b --- /dev/null +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RequiredIndicies`] helper for OptimizeProjection + +use crate::optimize_projections::outer_columns; +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::{Column, DFSchemaRef, Result}; +use datafusion_expr::{Expr, LogicalPlan}; + +/// Represents columns in a schema which are required (used) by a plan node +/// +/// Also carries a flag indicating if putting a projection above children is +/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from +/// small tables. Hence for filter child this flag would be `true`. Defaults to +/// `false` +/// +/// # Invariant +/// +/// Indices are always in order and without duplicates. For example, if these +/// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented +/// by `[1, 2, 3, 6]`. +#[derive(Debug, Clone, Default)] +pub(super) struct RequiredIndicies { + /// The indices of the required columns in the + indices: Vec, + /// If putting a projection above children is beneficial for the parent. + /// Defaults to false. + projection_beneficial: bool, +} + +impl RequiredIndicies { + /// Create a new, empty instance + pub fn new() -> Self { + Self::default() + } + + /// Create a new instance that requires all columns from the specified plan + pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self { + Self { + indices: (0..plan.schema().fields().len()).collect(), + projection_beneficial: false, + } + } + + /// Create a new instance with the specified indices as required + pub fn new_from_indices(indices: Vec) -> Self { + Self { + indices, + projection_beneficial: false, + } + .compact() + } + + /// Convert the instance to its inner indices + pub fn into_inner(self) -> Vec { + self.indices + } + + /// Set the projection beneficial flag + pub fn with_projection_beneficial(mut self) -> Self { + self.projection_beneficial = true; + self + } + + /// Return the value of projection beneficial flag + pub fn projection_beneficial(&self) -> bool { + self.projection_beneficial + } + + /// Return a reference to the underlying indices + pub fn indices(&self) -> &[usize] { + &self.indices + } + + /// Add required indices for all `exprs` used in plan + pub fn with_plan_exprs( + mut self, + plan: &LogicalPlan, + schema: &DFSchemaRef, + ) -> Result { + // Add indices of the child fields referred to by the expressions in the + // parent + plan.apply_expressions(|e| { + self.add_expr(schema, e)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(self.compact()) + } + + /// Adds the indices of the fields referred to by the given expression + /// `expr` within the given schema (`input_schema`). + /// + /// Self is NOT compacted (and thus this method is not pub) + /// + /// # Parameters + /// + /// * `input_schema`: The input schema to analyze for index requirements. + /// * `expr`: An expression for which we want to find necessary field indices. + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { + // TODO could remove these clones (and visit the expression directly) + let mut cols = expr.to_columns()?; + // Get outer-referenced (subquery) columns: + outer_columns(expr, &mut cols); + self.indices.reserve(cols.len()); + for col in cols { + if let Some(idx) = input_schema.maybe_index_of_column(&col) { + self.indices.push(idx); + } + } + Ok(()) + } + + /// Adds the indices of the fields referred to by the given expressions + /// `within the given schema. + /// + /// # Parameters + /// + /// * `input_schema`: The input schema to analyze for index requirements. + /// * `exprs`: the expressions for which we want to find field indices. + pub fn with_exprs<'a>( + self, + schema: &DFSchemaRef, + exprs: impl IntoIterator, + ) -> Result { + exprs + .into_iter() + .try_fold(self, |mut acc, expr| { + acc.add_expr(schema, expr)?; + Ok(acc) + }) + .map(|acc| acc.compact()) + } + + /// Adds all `indices` into this instance. + pub fn append(mut self, indices: &[usize]) -> Self { + self.indices.extend_from_slice(indices); + self.compact() + } + + /// Splits this instance into a tuple with two instances: + /// * The first `n` indices + /// * The remaining indices, adjusted down by n + pub fn split_off(self, n: usize) -> (Self, Self) { + let (l, r) = self.partition(|idx| idx < n); + (l, r.map_indices(|idx| idx - n)) + } + + /// Partitions the indicies in this instance into two groups based on the + /// given predicate function `f`. + fn partition(&self, f: F) -> (Self, Self) + where + F: Fn(usize) -> bool, + { + let (l, r): (Vec, Vec) = + self.indices.iter().partition(|&&idx| f(idx)); + let projection_beneficial = self.projection_beneficial; + + ( + Self { + indices: l, + projection_beneficial, + }, + Self { + indices: r, + projection_beneficial, + }, + ) + } + + /// Map the indices in this instance to a new set of indices based on the + /// given function `f`, returning the mapped indices + /// + /// Not `pub` as it might not preserve the invariant of compacted indices + fn map_indices(mut self, f: F) -> Self + where + F: Fn(usize) -> usize, + { + self.indices.iter_mut().for_each(|idx| *idx = f(*idx)); + self + } + + /// Apply the given function `f` to each index in this instance, returning + /// the mapped indices + pub fn into_mapped_indices(self, f: F) -> Vec + where + F: Fn(usize) -> usize, + { + self.map_indices(f).into_inner() + } + + /// Returns the `Expr`s from `exprs` that are at the indices in this instance + pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec { + self.indices.iter().map(|&idx| exprs[idx].clone()).collect() + } + + /// Generates the required expressions (columns) that reside at `indices` of + /// the given `input_schema`. + pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec { + self.indices + .iter() + .map(|&idx| Expr::from(Column::from(input_schema.qualified_field(idx)))) + .collect() + } + + /// Compacts the indices of this instance so they are sorted + /// (ascending) and deduplicated. + fn compact(mut self) -> Self { + self.indices.sort_unstable(); + self.indices.dedup(); + self + } +} From bab39f78cd7a4aca92c60950812016b4f72798b1 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 26 Apr 2024 04:55:30 +0800 Subject: [PATCH 7/7] chore: Create a doap file (#10233) * chore: Create a doap file Signed-off-by: tison * Apply suggestions from code review Co-authored-by: Andrew Lamb --------- Signed-off-by: tison Co-authored-by: Andrew Lamb --- doap.rdf | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 doap.rdf diff --git a/doap.rdf b/doap.rdf new file mode 100644 index 000000000000..c8b8cb361ad8 --- /dev/null +++ b/doap.rdf @@ -0,0 +1,57 @@ + + + + + + + 2024-04-17 + + Apache DataFusion + + + Apache DataFusion is a fast, extensible query engine for building high-quality data-centric systems in Rust. + + Apache DataFusion is a fast, extensible query engine for building high-quality data-centric systems + in Rust, using the Apache Arrow in-memory format. Python Bindings are also available. DataFusion offers SQL + and Dataframe APIs, excellent performance, built-in support for CSV, Parquet, JSON, and Avro, + extensive customization, and a great community. + + + + + Python + Rust + + + + + + + + + + + +