diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index b1f4791358131..7dda468066e93 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -20,6 +20,7 @@ use futures_async_stream::try_stream; use hashbrown::HashMap; use itertools::Itertools; use risingwave_common::array::DataChunk; +use risingwave_common::bitmap::FilterByBitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc}; @@ -198,16 +199,12 @@ impl GroupTopNExecutor { let chunk = Arc::new(chunk?); let keys = K::build_many(self.group_key.as_slice(), &chunk); - for (row_id, ((encoded_row, key), visible)) in - encode_chunk(&chunk, &self.column_orders)? - .into_iter() - .zip_eq_fast(keys.into_iter()) - .zip_eq_fast(chunk.visibility().iter()) - .enumerate() + for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)? + .into_iter() + .zip_eq_fast(keys.into_iter()) + .enumerate() + .filter_by_bitmap(chunk.visibility()) { - if !visible { - continue; - } let heap = groups.entry(key).or_insert_with(|| { TopNHeap::new( self.limit, diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index bde5d36bd8c64..ea780d7bf83b3 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -21,7 +21,7 @@ use futures_async_stream::try_stream; use hashbrown::hash_map::Entry; use itertools::Itertools; use risingwave_common::array::{DataChunk, StreamChunk}; -use risingwave_common::bitmap::Bitmap; +use risingwave_common::bitmap::{Bitmap, FilterByBitmap}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; @@ -545,14 +545,11 @@ impl HashAggExecutor { let chunk = StreamChunk::from(chunk?); let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); let mut memory_usage_diff = 0; - for (row_id, (key, visible)) in keys + for (row_id, key) in keys .into_iter() - .zip_eq_fast(chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(chunk.visibility()) { - if !visible { - continue; - } let mut new_group = false; let states = match groups.entry(key) { Entry::Occupied(entry) => entry.into_mut(), diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 834320b60d2b0..5ec115e0990a2 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{Array, DataChunk, RowRef}; -use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; +use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap}; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc}; @@ -514,15 +514,12 @@ impl HashJoinExecutor { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build_many(&self.build_key_idxs, build_chunk); - for (build_row_id, (build_key, visible)) in build_keys + for (build_row_id, build_key) in build_keys .into_iter() - .zip_eq_fast(build_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(build_chunk.visibility()) { self.shutdown_rx.check()?; - if !visible { - continue; - } // Only insert key to hash map if it is consistent with the null safe restriction. if build_key.null_bitmap().is_subset(&null_matched) { let row_id = RowId::new(build_chunk_id, build_row_id); @@ -765,14 +762,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -828,14 +822,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -898,14 +889,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state .first_output_row_id @@ -979,14 +967,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } shutdown_rx.check()?; if !ANTI_JOIN { if hash_map.contains_key(probe_key) { @@ -1043,14 +1028,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state @@ -1119,14 +1101,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state @@ -1201,14 +1180,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1266,14 +1242,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1338,13 +1311,7 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_key, visible) in probe_keys - .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) - { - if !visible { - continue; - } + for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) { for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1392,14 +1359,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1465,14 +1429,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -1547,14 +1508,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } left_non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { left_non_equi_state diff --git a/src/batch/src/executor/join/lookup_join_base.rs b/src/batch/src/executor/join/lookup_join_base.rs index 39a39b9a1424d..743ae25cf4d6a 100644 --- a/src/batch/src/executor/join/lookup_join_base.rs +++ b/src/batch/src/executor/join/lookup_join_base.rs @@ -18,13 +18,13 @@ use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; +use risingwave_common::bitmap::FilterByBitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::BoxedExpression; @@ -150,14 +150,11 @@ impl LookupJoinBase { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk); - for (build_row_id, (build_key, visible)) in build_keys + for (build_row_id, build_key) in build_keys .into_iter() - .zip_eq_fast(build_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(build_chunk.visibility()) { - if !visible { - continue; - } // Only insert key to hash map if it is consistent with the null safe // restriction. if build_key.null_bitmap().is_subset(&null_matched) { diff --git a/src/common/src/bitmap.rs b/src/common/src/bitmap.rs index 22869b23bc1d8..995771970ece3 100644 --- a/src/common/src/bitmap.rs +++ b/src/common/src/bitmap.rs @@ -42,6 +42,7 @@ use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, Ran use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::PbBuffer; +use rw_iter_util::ZipEqFast; #[derive(Default, Debug, Clone, EstimateSize)] pub struct BitmapBuilder { @@ -826,6 +827,15 @@ impl iter::Iterator for BitmapOnesIter<'_> { } } +pub trait FilterByBitmap: ExactSizeIterator + Sized { + fn filter_by_bitmap(self, bitmap: &Bitmap) -> impl Iterator { + self.zip_eq_fast(bitmap.iter()) + .filter_map(|(item, bit)| bit.then_some(item)) + } +} + +impl FilterByBitmap for T where T: ExactSizeIterator {} + #[cfg(test)] mod tests { use super::*;