Skip to content

Commit

Permalink
refactor(common): simplify chunk iteration with filter_by_bitmap (#…
Browse files Browse the repository at this point in the history
…19254)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Nov 4, 2024
1 parent bb0d786 commit b796d0d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 89 deletions.
15 changes: 6 additions & 9 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures_async_stream::try_stream;
use hashbrown::HashMap;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::FilterByBitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
Expand Down Expand Up @@ -198,16 +199,12 @@ impl<K: HashKey> GroupTopNExecutor<K> {
let chunk = Arc::new(chunk?);
let keys = K::build_many(self.group_key.as_slice(), &chunk);

for (row_id, ((encoded_row, key), visible)) in
encode_chunk(&chunk, &self.column_orders)?
.into_iter()
.zip_eq_fast(keys.into_iter())
.zip_eq_fast(chunk.visibility().iter())
.enumerate()
for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)?
.into_iter()
.zip_eq_fast(keys.into_iter())
.enumerate()
.filter_by_bitmap(chunk.visibility())
{
if !visible {
continue;
}
let heap = groups.entry(key).or_insert_with(|| {
TopNHeap::new(
self.limit,
Expand Down
9 changes: 3 additions & 6 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures_async_stream::try_stream;
use hashbrown::hash_map::Entry;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::bitmap::{Bitmap, FilterByBitmap};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
use risingwave_common::memory::MemoryContext;
Expand Down Expand Up @@ -545,14 +545,11 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
let chunk = StreamChunk::from(chunk?);
let keys = K::build_many(self.group_key_columns.as_slice(), &chunk);
let mut memory_usage_diff = 0;
for (row_id, (key, visible)) in keys
for (row_id, key) in keys
.into_iter()
.zip_eq_fast(chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(chunk.visibility())
{
if !visible {
continue;
}
let mut new_group = false;
let states = match groups.entry(key) {
Entry::Occupied(entry) => entry.into_mut(),
Expand Down
94 changes: 26 additions & 68 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Array, DataChunk, RowRef};
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
Expand Down Expand Up @@ -514,15 +514,12 @@ impl<K: HashKey> HashJoinExecutor<K> {
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
let build_keys = K::build_many(&self.build_key_idxs, build_chunk);

for (build_row_id, (build_key, visible)) in build_keys
for (build_row_id, build_key) in build_keys
.into_iter()
.zip_eq_fast(build_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(build_chunk.visibility())
{
self.shutdown_rx.check()?;
if !visible {
continue;
}
// Only insert key to hash map if it is consistent with the null safe restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
let row_id = RowId::new(build_chunk_id, build_row_id);
Expand Down Expand Up @@ -765,14 +762,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
for build_row_id in
next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
{
Expand Down Expand Up @@ -828,14 +822,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
for build_row_id in
next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
Expand Down Expand Up @@ -898,14 +889,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
non_equi_state
.first_output_row_id
Expand Down Expand Up @@ -979,14 +967,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
shutdown_rx.check()?;
if !ANTI_JOIN {
if hash_map.contains_key(probe_key) {
Expand Down Expand Up @@ -1043,14 +1028,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
non_equi_state.found_matched = false;
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
non_equi_state
Expand Down Expand Up @@ -1119,14 +1101,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
non_equi_state.found_matched = false;
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
non_equi_state
Expand Down Expand Up @@ -1201,14 +1180,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
for build_row_id in
next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
{
Expand Down Expand Up @@ -1266,14 +1242,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
for build_row_id in
next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
{
Expand Down Expand Up @@ -1338,13 +1311,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_key, visible) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
{
if !visible {
continue;
}
for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
for build_row_id in
next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
{
Expand Down Expand Up @@ -1392,14 +1359,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
for build_row_id in
next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
{
Expand Down Expand Up @@ -1465,14 +1429,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
for build_row_id in
next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
Expand Down Expand Up @@ -1547,14 +1508,11 @@ impl<K: HashKey> HashJoinExecutor<K> {
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
for (probe_row_id, probe_key) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(probe_chunk.visibility())
{
if !visible {
continue;
}
left_non_equi_state.found_matched = false;
if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
left_non_equi_state
Expand Down
9 changes: 3 additions & 6 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::FilterByBitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::expr::BoxedExpression;
Expand Down Expand Up @@ -150,14 +150,11 @@ impl<K: HashKey> LookupJoinBase<K> {
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk);

for (build_row_id, (build_key, visible)) in build_keys
for (build_row_id, build_key) in build_keys
.into_iter()
.zip_eq_fast(build_chunk.visibility().iter())
.enumerate()
.filter_by_bitmap(build_chunk.visibility())
{
if !visible {
continue;
}
// Only insert key to hash map if it is consistent with the null safe
// restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, Ran
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::PbBuffer;
use rw_iter_util::ZipEqFast;

#[derive(Default, Debug, Clone, EstimateSize)]
pub struct BitmapBuilder {
Expand Down Expand Up @@ -826,6 +827,15 @@ impl iter::Iterator for BitmapOnesIter<'_> {
}
}

pub trait FilterByBitmap: ExactSizeIterator + Sized {
fn filter_by_bitmap(self, bitmap: &Bitmap) -> impl Iterator<Item = Self::Item> {
self.zip_eq_fast(bitmap.iter())
.filter_map(|(item, bit)| bit.then_some(item))
}
}

impl<T> FilterByBitmap for T where T: ExactSizeIterator {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit b796d0d

Please sign in to comment.