diff --git a/src/batch/src/executor/aggregation/distinct.rs b/src/batch/src/executor/aggregation/distinct.rs index 6723cd9b926ac..9ce4b2dc66112 100644 --- a/src/batch/src/executor/aggregation/distinct.rs +++ b/src/batch/src/executor/aggregation/distinct.rs @@ -83,7 +83,7 @@ impl AggregateFunction for Distinct { let state = state.downcast_mut::(); let mut bitmap_builder = BitmapBuilder::with_capacity(input.capacity()); - bitmap_builder.append_bitmap(&input.data_chunk().vis().to_bitmap()); + bitmap_builder.append_bitmap(input.data_chunk().visibility()); for row_id in range.clone() { let (row_ref, vis) = input.data_chunk().row_at(row_id); let row = row_ref.to_owned_row(); @@ -94,7 +94,7 @@ impl AggregateFunction for Distinct { } bitmap_builder.set(row_id, b); } - let input = input.with_visibility(bitmap_builder.finish().into()); + let input = input.clone_with_vis(bitmap_builder.finish()); self.inner .update_range(&mut state.inner, &input, range) .await diff --git a/src/batch/src/executor/aggregation/filter.rs b/src/batch/src/executor/aggregation/filter.rs index 9b85c2fbdddee..06e8d894d5278 100644 --- a/src/batch/src/executor/aggregation/filter.rs +++ b/src/batch/src/executor/aggregation/filter.rs @@ -62,7 +62,7 @@ impl AggregateFunction for Filter { .as_bool() .to_bitmap(); let mut input1 = input.clone(); - input1.set_vis(input.vis() & &bitmap); + input1.set_visibility(input.visibility() & &bitmap); self.inner.update_range(state, &input1, range).await } diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 06e257456dd5d..85e485db00376 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -16,7 +16,7 @@ use std::num::NonZeroUsize; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{DataChunk, Vis}; +use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::{DataType, Interval}; @@ -173,7 +173,7 @@ impl HopWindowExecutor { #[for_await] for data_chunk in child.execute() { let data_chunk = data_chunk?; - assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + assert!(data_chunk.is_compacted()); let len = data_chunk.cardinality(); for i in 0..units { let window_start_col = if output_indices.contains(&window_start_col_index) { diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index e0f4816e9a4bd..28e8dda8a5979 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -157,8 +157,7 @@ impl InsertExecutor { columns.insert(row_id_index, Arc::new(row_id_col.into())) } - let stream_chunk = - StreamChunk::new(vec![Op::Insert; cap], columns, vis.into_visibility()); + let stream_chunk = StreamChunk::with_visibility(vec![Op::Insert; cap], columns, vis); #[cfg(debug_assertions)] table_dml_handle.check_chunk_schema(&stream_chunk); diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index a1badeadf3873..633f9243f0eeb 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -1905,8 +1905,8 @@ mod tests { } fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) -> bool { - assert!(left.visibility().is_none()); - assert!(right.visibility().is_none()); + assert!(left.is_compacted()); + assert!(right.is_compacted()); if left.cardinality() != right.cardinality() { return false; diff --git a/src/batch/src/executor/join/mod.rs b/src/batch/src/executor/join/mod.rs index 996e45e26f031..320c4a27a037d 100644 --- a/src/batch/src/executor/join/mod.rs +++ b/src/batch/src/executor/join/mod.rs @@ -26,7 +26,7 @@ use itertools::Itertools; pub use local_lookup_join::*; pub use lookup_join_base::*; pub use nested_loop_join::*; -use risingwave_common::array::{DataChunk, RowRef, Vis}; +use risingwave_common::array::{DataChunk, RowRef}; use risingwave_common::error::Result; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef}; @@ -124,10 +124,10 @@ fn concatenate(left: &DataChunk, right: &DataChunk) -> Result { concated_columns.extend_from_slice(left.columns()); concated_columns.extend_from_slice(right.columns()); // Only handle one side is constant row chunk: One of visibility must be None. - let vis = match (left.vis(), right.vis()) { - (Vis::Compact(_), _) => right.vis().clone(), - (_, Vis::Compact(_)) => left.vis().clone(), - (Vis::Bitmap(_), Vis::Bitmap(_)) => { + let vis = match (left.is_compacted(), right.is_compacted()) { + (true, _) => right.visibility().clone(), + (_, true) => left.visibility().clone(), + (false, false) => { return Err(BatchError::UnsupportedFunction( "The concatenate behaviour of two chunk with visibility is undefined".to_string(), ) @@ -176,7 +176,8 @@ fn convert_row_to_chunk( #[cfg(test)] mod tests { - use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder, Vis}; + use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder}; + use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarRefImpl}; @@ -196,20 +197,14 @@ mod tests { let arr = builder.finish(); columns.push(arr.into_ref()) } - let chunk1: DataChunk = DataChunk::new(columns.clone(), length); - let bool_vec = vec![true, false, true, false, false]; - let chunk2: DataChunk = DataChunk::new( - columns.clone(), - Vis::Bitmap((bool_vec.clone()).into_iter().collect()), - ); + let chunk1 = DataChunk::new(columns.clone(), length); + let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]); + let chunk2 = DataChunk::new(columns.clone(), visibility.clone()); let chunk = concatenate(&chunk1, &chunk2).unwrap(); assert_eq!(chunk.capacity(), chunk1.capacity()); assert_eq!(chunk.capacity(), chunk2.capacity()); assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2); - assert_eq!( - chunk.visibility().cloned().unwrap(), - (bool_vec).into_iter().collect() - ); + assert_eq!(chunk.visibility(), &visibility); } /// Test the function of convert row into constant row chunk (one row repeat multiple times). diff --git a/src/batch/src/executor/join/nested_loop_join.rs b/src/batch/src/executor/join/nested_loop_join.rs index 21409e568ce6d..a52faf35dc724 100644 --- a/src/batch/src/executor/join/nested_loop_join.rs +++ b/src/batch/src/executor/join/nested_loop_join.rs @@ -389,7 +389,7 @@ impl NestedLoopJoinExecutor { .await?; if chunk.cardinality() > 0 { // chunk.visibility() must be Some(_) - matched = &matched | chunk.visibility().unwrap(); + matched = &matched | chunk.visibility(); for spilled in chunk_builder.append_chunk(chunk) { yield spilled } @@ -433,7 +433,7 @@ impl NestedLoopJoinExecutor { .await?; if chunk.cardinality() > 0 { // chunk.visibility() must be Some(_) - matched = &matched | chunk.visibility().unwrap(); + matched = &matched | chunk.visibility(); } } if ANTI_JOIN { @@ -475,7 +475,7 @@ impl NestedLoopJoinExecutor { .await?; if chunk.cardinality() > 0 { left_matched.set(left_row_idx, true); - right_matched = &right_matched | chunk.visibility().unwrap(); + right_matched = &right_matched | chunk.visibility(); for spilled in chunk_builder.append_chunk(chunk) { yield spilled } diff --git a/src/batch/src/executor/limit.rs b/src/batch/src/executor/limit.rs index 970b9a2815828..7828974d64c03 100644 --- a/src/batch/src/executor/limit.rs +++ b/src/batch/src/executor/limit.rs @@ -91,8 +91,8 @@ impl LimitExecutor { } // process chunk let mut new_vis; - if let Some(old_vis) = data_chunk.visibility() { - new_vis = old_vis.iter().collect_vec(); + if !data_chunk.is_compacted() { + new_vis = data_chunk.visibility().iter().collect_vec(); for vis in new_vis.iter_mut().filter(|x| **x) { if skipped < self.offset { skipped += 1; diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 78733420c9158..71cac6e917dc3 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -174,7 +174,7 @@ impl SourceExecutor { fn covert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result { // chunk read from source must be compact - assert!(chunk.data_chunk().visibility().is_none()); + assert!(chunk.data_chunk().is_compacted()); if chunk.ops().iter().any(|op| *op != Op::Insert) { return Err(RwError::from(BatchError::Internal(anyhow!( diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index 0b7e684348338..153184b9bdffc 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -219,8 +219,8 @@ pub async fn diff_executor_output(actual: BoxedExecutor, expect: BoxedExecutor) } fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) { - assert!(left.visibility().is_none()); - assert!(right.visibility().is_none()); + assert!(left.is_compacted()); + assert!(right.is_compacted()); assert_eq!( left.cardinality(), diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index cf71a0f97ab46..e3d2d9f03bf3a 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -169,7 +169,7 @@ impl UpdateExecutor { columns.push(column); } - DataChunk::new(columns, data_chunk.vis().clone()) + DataChunk::new(columns, data_chunk.visibility().clone()) }; if self.returning { diff --git a/src/batch/src/task/consistent_hash_shuffle_channel.rs b/src/batch/src/task/consistent_hash_shuffle_channel.rs index 336b8c98c2879..a9f3bc1b078ac 100644 --- a/src/batch/src/task/consistent_hash_shuffle_channel.rs +++ b/src/batch/src/task/consistent_hash_shuffle_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::ops::BitAnd; use std::option::Option; use std::sync::Arc; @@ -89,12 +88,7 @@ fn generate_new_data_chunks( }); let mut res = Vec::with_capacity(output_count); for (sink_id, vis_map_vec) in vis_maps.into_iter().enumerate() { - let vis_map: Bitmap = vis_map_vec.into_iter().collect(); - let vis_map = if let Some(visibility) = chunk.visibility() { - vis_map.bitand(visibility) - } else { - vis_map - }; + let vis_map = Bitmap::from_bool_slice(&vis_map_vec) & chunk.visibility(); let new_data_chunk = chunk.with_visibility(vis_map); trace!( "send to sink:{}, cardinality:{}", diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index b07d5091249cd..8ea78a838f18a 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::ops::BitAnd; use std::option::Option; use std::sync::Arc; @@ -86,12 +85,7 @@ fn generate_new_data_chunks( }); let mut res = Vec::with_capacity(output_count); for (sink_id, vis_map_vec) in vis_maps.into_iter().enumerate() { - let vis_map: Bitmap = vis_map_vec.into_iter().collect(); - let vis_map = if let Some(visibility) = chunk.visibility() { - vis_map.bitand(visibility) - } else { - vis_map - }; + let vis_map = Bitmap::from_bool_slice(&vis_map_vec) & chunk.visibility(); let new_data_chunk = chunk.with_visibility(vis_map); trace!( "send to sink:{}, cardinality:{}", diff --git a/src/common/src/array/compact_chunk.rs b/src/common/src/array/compact_chunk.rs index d64aa47c6769c..c009621af2135 100644 --- a/src/common/src/array/compact_chunk.rs +++ b/src/common/src/array/compact_chunk.rs @@ -25,7 +25,7 @@ use crate::array::{Op, RowRef, StreamChunk}; use crate::row::{Project, RowExt}; use crate::util::hash_util::Crc32FastBuilder; -/// Compact the stream chunks with just modify the `Ops` and `Vis` of the chunk. Currently, two +/// Compact the stream chunks with just modify the `Ops` and visibility of the chunk. Currently, two /// transformation will be applied /// - remove intermediate operation of the same key. The operations of the same stream key will only /// have three kind of patterns Insert, Delete or Update. diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 657dfd3c366f9..58844ca2e4371 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -25,7 +25,7 @@ use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; use risingwave_pb::data::PbDataChunk; -use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray, Vis}; +use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray}; use crate::array::data_chunk_iter::RowRef; use crate::array::ArrayBuilderImpl; use crate::buffer::{Bitmap, BitmapBuilder}; @@ -64,24 +64,25 @@ use crate::util::value_encoding::{ #[must_use] pub struct DataChunk { columns: Arc<[ArrayRef]>, - vis2: Vis, + visibility: Bitmap, } impl DataChunk { pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++| ++++++"; - /// Create a `DataChunk` with `columns` and visibility. The visibility can either be a `Bitmap` - /// or a simple cardinality number. - pub fn new>(columns: Vec, vis: V) -> Self { - let vis: Vis = vis.into(); - let capacity = vis.len(); + /// Create a `DataChunk` with `columns` and visibility. + /// + /// The visibility can either be a `Bitmap` or a simple cardinality number. + pub fn new(columns: Vec, visibility: impl Into) -> Self { + let visibility = visibility.into(); + let capacity = visibility.len(); for column in &columns { assert_eq!(capacity, column.len()); } DataChunk { columns: columns.into(), - vis2: vis, + visibility, } } @@ -89,7 +90,7 @@ impl DataChunk { pub fn new_dummy(cardinality: usize) -> Self { DataChunk { columns: Arc::new([]), - vis2: Vis::Compact(cardinality), + visibility: Bitmap::ones(cardinality), } } @@ -115,28 +116,22 @@ impl DataChunk { /// Return the next visible row index on or after `row_idx`. pub fn next_visible_row_idx(&self, row_idx: usize) -> Option { - match &self.vis2 { - Vis::Bitmap(vis) => vis.next_set_bit(row_idx), - Vis::Compact(cardinality) => { - if row_idx < *cardinality { - Some(row_idx) - } else { - None - } - } - } + self.visibility.next_set_bit(row_idx) } - pub fn into_parts(self) -> (Vec, Vis) { - (self.columns.to_vec(), self.vis2) + pub fn into_parts(self) -> (Vec, Bitmap) { + (self.columns.to_vec(), self.visibility) } - pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Vis) { - (self.columns, self.vis2) + pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) { + (self.columns, self.visibility) } - pub fn from_parts(columns: Arc<[ArrayRef]>, vis2: Vis) -> Self { - Self { columns, vis2 } + pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self { + Self { + columns, + visibility: visibilities, + } } pub fn dimension(&self) -> usize { @@ -145,53 +140,42 @@ impl DataChunk { /// `cardinality` returns the number of visible tuples pub fn cardinality(&self) -> usize { - match &self.vis2 { - Vis::Bitmap(b) => b.count_ones(), - Vis::Compact(len) => *len, - } + self.visibility.count_ones() } /// `capacity` returns physical length of any chunk column pub fn capacity(&self) -> usize { - self.vis2.len() - } - - pub fn vis(&self) -> &Vis { - &self.vis2 + self.visibility.len() } pub fn selectivity(&self) -> f64 { - match &self.vis2 { - Vis::Bitmap(b) => { - if b.is_empty() { - 0.0 - } else { - b.count_ones() as f64 / b.len() as f64 - } - } - Vis::Compact(_) => 1.0, + if self.visibility.is_empty() { + 0.0 + } else if self.visibility.all() { + 1.0 + } else { + self.visibility.count_ones() as f64 / self.visibility.len() as f64 } } - pub fn with_visibility(&self, visibility: impl Into) -> Self { + pub fn with_visibility(&self, visibility: impl Into) -> Self { DataChunk { columns: self.columns.clone(), - vis2: visibility.into(), + visibility: visibility.into(), } } - pub fn visibility(&self) -> Option<&Bitmap> { - self.vis2.as_visibility() - } - - pub fn set_vis(&mut self, vis: Vis) { - assert_eq!(vis.len(), self.capacity()); - self.vis2 = vis; + pub fn visibility(&self) -> &Bitmap { + &self.visibility } pub fn set_visibility(&mut self, visibility: Bitmap) { assert_eq!(visibility.len(), self.capacity()); - self.vis2 = Vis::Bitmap(visibility); + self.visibility = visibility; + } + + pub fn is_compacted(&self) -> bool { + self.visibility.all() } pub fn column_at(&self, idx: usize) -> &ArrayRef { @@ -214,16 +198,13 @@ impl DataChunk { /// Panics if `idx > columns.len()`. pub fn split_column_at(&self, idx: usize) -> (Self, Self) { let (left, right) = self.columns.split_at(idx); - let left = DataChunk::new(left.to_vec(), self.vis2.clone()); - let right = DataChunk::new(right.to_vec(), self.vis2.clone()); + let left = DataChunk::new(left.to_vec(), self.visibility.clone()); + let right = DataChunk::new(right.to_vec(), self.visibility.clone()); (left, right) } pub fn to_protobuf(&self) -> PbDataChunk { - assert!( - matches!(self.vis2, Vis::Compact(_)), - "must be compacted before transfer" - ); + assert!(self.visibility.all(), "must be compacted before transfer"); let mut proto = PbDataChunk { cardinality: self.cardinality() as u32, columns: Default::default(), @@ -249,42 +230,38 @@ impl DataChunk { /// The main benefit is that the data chunk is smaller, taking up less memory. /// We can also save the cost of iterating over many hidden rows. pub fn compact(self) -> Self { - match &self.vis2 { - Vis::Compact(_) => self, - Vis::Bitmap(visibility) => { - let cardinality = visibility.count_ones(); - let columns = self - .columns - .iter() - .map(|col| { - let array = col; - array.compact(visibility, cardinality).into() - }) - .collect::>(); - Self::new(columns, cardinality) - } + if self.visibility.all() { + return self; } + let cardinality = self.visibility.count_ones(); + let columns = self + .columns + .iter() + .map(|col| { + let array = col; + array.compact(&self.visibility, cardinality).into() + }) + .collect::>(); + Self::new(columns, Bitmap::ones(cardinality)) } /// Convert the chunk to compact format. /// /// If the chunk is not compacted, return a new compacted chunk, otherwise return a reference to self. pub fn compact_cow(&self) -> Cow<'_, Self> { - match &self.vis2 { - Vis::Compact(_) => Cow::Borrowed(self), - Vis::Bitmap(visibility) => { - let cardinality = visibility.count_ones(); - let columns = self - .columns - .iter() - .map(|col| { - let array = col; - array.compact(visibility, cardinality).into() - }) - .collect::>(); - Cow::Owned(Self::new(columns, cardinality)) - } + if self.visibility.all() { + return Cow::Borrowed(self); } + let cardinality = self.visibility.count_ones(); + let columns = self + .columns + .iter() + .map(|col| { + let array = col; + array.compact(&self.visibility, cardinality).into() + }) + .collect::>(); + Cow::Owned(Self::new(columns, Bitmap::ones(cardinality))) } pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult { @@ -406,7 +383,7 @@ impl DataChunk { /// * bool - whether this tuple is visible pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) { let row = self.row_at_unchecked_vis(pos); - let vis = self.vis2.is_set(pos); + let vis = self.visibility.is_set(pos); (row, vis) } @@ -469,7 +446,7 @@ impl DataChunk { .collect(); DataChunk { columns, - vis2: self.vis2.clone(), + visibility: self.visibility.clone(), } } @@ -481,16 +458,16 @@ impl DataChunk { pub fn project(&self, indices: &[usize]) -> Self { Self { columns: indices.iter().map(|i| self.columns[*i].clone()).collect(), - vis2: self.vis2.clone(), + visibility: self.visibility.clone(), } } /// Reorder columns and set visibility. - pub fn project_with_vis(&self, indices: &[usize], vis: Vis) -> Self { - assert_eq!(vis.len(), self.capacity()); + pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self { + assert_eq!(visibility.len(), self.capacity()); Self { columns: indices.iter().map(|i| self.columns[*i].clone()).collect(), - vis2: vis, + visibility, } } @@ -561,57 +538,54 @@ impl DataChunk { // Note(bugen): should we exclude the invisible rows in the output so that the caller won't need // to handle visibility again? pub fn serialize(&self) -> Vec { - let buffers = match &self.vis2 { - Vis::Bitmap(vis) => { - let rows_num = vis.len(); - let mut buffers: Vec> = vec![]; - let (row_len_fixed, col_variable) = self.partition_sizes(); - - // First initialize buffer with the right size to avoid re-allocations - for i in 0..rows_num { - // SAFETY(value_at_unchecked): the idx is always in bound. - unsafe { - if vis.is_set_unchecked(i) { - buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i)); - } else { - buffers.push(vec![]); - } + let buffers = if !self.visibility.all() { + let rows_num = self.visibility.len(); + let mut buffers: Vec> = vec![]; + let (row_len_fixed, col_variable) = self.partition_sizes(); + + // First initialize buffer with the right size to avoid re-allocations + for i in 0..rows_num { + // SAFETY(value_at_unchecked): the idx is always in bound. + unsafe { + if self.visibility.is_set_unchecked(i) { + buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i)); + } else { + buffers.push(vec![]); } } + } - // Then do the actual serialization - for c in &*self.columns { - assert_eq!(c.len(), rows_num); - for (i, buffer) in buffers.iter_mut().enumerate() { - // SAFETY(value_at_unchecked): the idx is always in bound. - unsafe { - if vis.is_set_unchecked(i) { - serialize_datum_into(c.value_at_unchecked(i), buffer); - } + // Then do the actual serialization + for c in &*self.columns { + assert_eq!(c.len(), rows_num); + for (i, buffer) in buffers.iter_mut().enumerate() { + // SAFETY(value_at_unchecked): the idx is always in bound. + unsafe { + if self.visibility.is_set_unchecked(i) { + serialize_datum_into(c.value_at_unchecked(i), buffer); } } } - buffers } - Vis::Compact(rows_num) => { - let mut buffers: Vec> = vec![]; - let (row_len_fixed, col_variable) = self.partition_sizes(); - for i in 0..*rows_num { - unsafe { - buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i)); - } + buffers + } else { + let mut buffers: Vec> = vec![]; + let (row_len_fixed, col_variable) = self.partition_sizes(); + for i in 0..self.visibility.len() { + unsafe { + buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i)); } - for c in &*self.columns { - assert_eq!(c.len(), *rows_num); - for (i, buffer) in buffers.iter_mut().enumerate() { - // SAFETY(value_at_unchecked): the idx is always in bound. - unsafe { - serialize_datum_into(c.value_at_unchecked(i), buffer); - } + } + for c in &*self.columns { + assert_eq!(c.len(), self.visibility.len()); + for (i, buffer) in buffers.iter_mut().enumerate() { + // SAFETY(value_at_unchecked): the idx is always in bound. + unsafe { + serialize_datum_into(c.value_at_unchecked(i), buffer); } } - buffers } + buffers }; buffers.into_iter().map(|item| item.into()).collect_vec() @@ -666,7 +640,7 @@ impl<'a> From<&'a StructArray> for DataChunk { fn from(array: &'a StructArray) -> Self { Self { columns: array.fields().cloned().collect(), - vis2: Vis::Compact(array.len()), + visibility: Bitmap::ones(array.len()), } } } @@ -677,7 +651,7 @@ impl EstimateSize for DataChunk { .iter() .map(|a| a.estimated_heap_size()) .sum::() - + self.vis2.estimated_heap_size() + + self.visibility.estimated_heap_size() } } @@ -817,11 +791,7 @@ impl DataChunkTestExt for DataChunk { .into_iter() .map(|builder| builder.finish().into()) .collect(); - let vis = if visibility.iter().all(|b| *b) { - Vis::Compact(visibility.len()) - } else { - Vis::Bitmap(Bitmap::from_iter(visibility)) - }; + let vis = Bitmap::from_iter(visibility); let chunk = DataChunk::new(columns, vis); chunk.assert_valid(); chunk @@ -851,14 +821,14 @@ impl DataChunkTestExt for DataChunk { builder.finish().into() }) .collect(); - let chunk = DataChunk::new(new_cols, Vis::Bitmap(new_vis.finish())); + let chunk = DataChunk::new(new_cols, new_vis.finish()); chunk.assert_valid(); chunk } fn assert_valid(&self) { let cols = self.columns(); - let vis = &self.vis2; + let vis = &self.visibility; let n = vis.len(); for col in cols { assert_eq!(col.len(), n); diff --git a/src/common/src/array/data_chunk_iter.rs b/src/common/src/array/data_chunk_iter.rs index c01ecf0d0f3e8..8e63e442384d8 100644 --- a/src/common/src/array/data_chunk_iter.rs +++ b/src/common/src/array/data_chunk_iter.rs @@ -100,7 +100,7 @@ impl<'a> Iterator for DataChunkRefIterWithHoles<'a> { fn next(&mut self) -> Option { let len = self.chunk.capacity(); - let vis = self.chunk.vis(); + let vis = self.chunk.visibility(); if self.idx == len { None } else { diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 9bccd3f4166a2..2c6e72b500311 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -36,7 +36,6 @@ pub mod stream_record; pub mod struct_array; mod utf8_array; mod value_reader; -mod vis; use std::convert::From; use std::hash::{Hash, Hasher}; @@ -63,7 +62,6 @@ use risingwave_pb::data::PbArray; pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt}; pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue}; pub use utf8_array::*; -pub use vis::{Vis, VisRef}; pub use self::error::ArrayError; pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder}; diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 1fe99ad935d76..3175806257e7a 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -24,10 +24,9 @@ use rand::prelude::SmallRng; use rand::{Rng, SeedableRng}; use risingwave_pb::data::{PbOp, PbStreamChunk}; -use super::vis::VisMut; use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef}; -use crate::array::{DataChunk, Vis}; -use crate::buffer::Bitmap; +use crate::array::DataChunk; +use crate::buffer::{Bitmap, BitmapBuilder}; use crate::catalog::Schema; use crate::estimate_size::EstimateSize; use crate::field_generator::VarcharProperty; @@ -104,21 +103,24 @@ impl Default for StreamChunk { } impl StreamChunk { - pub fn new( + /// Create a new `StreamChunk` with given ops and columns. + pub fn new(ops: impl Into>, columns: Vec) -> Self { + let ops = ops.into(); + let visibility = Bitmap::ones(ops.len()); + Self::with_visibility(ops, columns, visibility) + } + + /// Create a new `StreamChunk` with given ops, columns and visibility. + pub fn with_visibility( ops: impl Into>, columns: Vec, - visibility: Option, + visibility: Bitmap, ) -> Self { let ops = ops.into(); for col in &columns { assert_eq!(col.len(), ops.len()); } - - let vis = match visibility { - Some(b) => Vis::Bitmap(b), - None => Vis::Compact(ops.len()), - }; - let data = DataChunk::new(columns, vis); + let data = DataChunk::new(columns, visibility); StreamChunk { ops, data } } @@ -142,7 +144,7 @@ impl StreamChunk { .into_iter() .map(|builder| builder.finish().into()) .collect::>(); - StreamChunk::new(ops, new_columns, None) + StreamChunk::new(ops, new_columns) } /// Get the reference of the underlying data chunk. @@ -152,25 +154,24 @@ impl StreamChunk { /// compact the `StreamChunk` with its visibility map pub fn compact(self) -> Self { - if self.visibility().is_none() { + if self.is_compacted() { return self; } let (ops, columns, visibility) = self.into_inner(); - let visibility = visibility.as_visibility().unwrap(); let cardinality = visibility .iter() .fold(0, |vis_cnt, vis| vis_cnt + vis as usize); let columns: Vec<_> = columns .into_iter() - .map(|col| col.compact(visibility, cardinality).into()) + .map(|col| col.compact(&visibility, cardinality).into()) .collect(); let mut new_ops = Vec::with_capacity(cardinality); for idx in visibility.iter_ones() { new_ops.push(ops[idx]); } - StreamChunk::new(new_ops, columns, None) + StreamChunk::new(new_ops, columns) } pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) { @@ -179,10 +180,10 @@ impl StreamChunk { pub fn from_parts(ops: impl Into>, data_chunk: DataChunk) -> Self { let (columns, vis) = data_chunk.into_parts(); - Self::new(ops, columns, vis.into_visibility()) + Self::with_visibility(ops, columns, vis) } - pub fn into_inner(self) -> (Arc<[Op]>, Vec, Vis) { + pub fn into_inner(self) -> (Arc<[Op]>, Vec, Bitmap) { let (columns, vis) = self.data.into_parts(); (self.ops, columns, vis) } @@ -205,7 +206,7 @@ impl StreamChunk { for column in prost.get_columns() { columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into()); } - Ok(StreamChunk::new(ops, columns, None)) + Ok(StreamChunk::new(ops, columns)) } pub fn ops(&self) -> &[Op] { @@ -277,7 +278,7 @@ impl StreamChunk { } /// Reorder columns and set visibility. - pub fn project_with_vis(&self, indices: &[usize], vis: Vis) -> Self { + pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self { Self { ops: self.ops.clone(), data: self.data.project_with_vis(indices, vis), @@ -285,7 +286,7 @@ impl StreamChunk { } /// Clone the `StreamChunk` with a new visibility. - pub fn with_visibility(&self, vis: Vis) -> Self { + pub fn clone_with_vis(&self, vis: Bitmap) -> Self { Self { ops: self.ops.clone(), data: self.data.with_visibility(vis), @@ -407,7 +408,7 @@ impl From for Arc<[Op]> { pub struct StreamChunkMut { columns: Arc<[ArrayRef]>, ops: OpsMut, - vis: VisMut, + vis: BitmapBuilder, } impl From for StreamChunkMut { @@ -424,7 +425,7 @@ impl From for StreamChunkMut { impl From for StreamChunk { fn from(c: StreamChunkMut) -> Self { - StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.into())) + StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish())) } } @@ -607,7 +608,7 @@ impl StreamChunkTestExt for StreamChunk { fn valid(&self) -> bool { let len = self.ops.len(); let data = &self.data; - data.vis().len() == len && data.columns().iter().all(|col| col.len() == len) + data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len) } fn concat(chunks: Vec) -> StreamChunk { diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 9dfb23fe4e921..27c5d8c0dc237 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -267,7 +267,7 @@ impl From for StructArray { Self::new( StructType::unnamed(chunk.columns().iter().map(|c| c.data_type()).collect()), chunk.columns().to_vec(), - chunk.vis().to_bitmap(), + chunk.visibility().clone(), ) } } diff --git a/src/common/src/array/vis.rs b/src/common/src/array/vis.rs deleted file mode 100644 index 5cdb3bcd9ab1a..0000000000000 --- a/src/common/src/array/vis.rs +++ /dev/null @@ -1,513 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::mem; - -use either::Either; -use itertools::repeat_n; - -use crate::buffer::{Bitmap, BitmapBuilder, BitmapIter, BitmapOnesIter}; -use crate::estimate_size::EstimateSize; - -/// `Vis` is a visibility bitmap of rows. -#[derive(Clone, PartialEq, Debug)] -pub enum Vis { - /// Non-compact variant. - /// Certain rows are hidden using this bitmap. - Bitmap(Bitmap), - - /// Compact variant which just stores cardinality of rows. - /// This can be used when all rows are visible. - Compact(usize), // equivalent to all ones of this size -} - -impl From for Vis { - fn from(b: Bitmap) -> Self { - Vis::Bitmap(b) - } -} - -impl From for Vis { - fn from(c: usize) -> Self { - Vis::Compact(c) - } -} - -impl From for Vis { - fn from(v: VisMut) -> Self { - match v.state { - VisMutState::Bitmap(x) => Vis::Bitmap(x), - VisMutState::Compact(x) => Vis::Compact(x), - VisMutState::Builder(x) => Vis::Bitmap(x.finish()), - } - } -} - -impl Vis { - pub fn into_mut(self) -> VisMut { - VisMut::from(self) - } - - pub fn is_empty(&self) -> bool { - self.as_ref().is_empty() - } - - pub fn len(&self) -> usize { - self.as_ref().len() - } - - /// # Panics - /// Panics if `idx > len`. - pub fn is_set(&self, idx: usize) -> bool { - self.as_ref().is_set(idx) - } - - pub fn iter(&self) -> Iter<'_> { - self.as_ref().iter() - } - - pub fn iter_ones(&self) -> OnesIter<'_> { - self.as_ref().iter_ones() - } - - #[inline(always)] - pub fn as_ref(&self) -> VisRef<'_> { - match self { - Vis::Bitmap(b) => VisRef::Bitmap(b), - Vis::Compact(c) => VisRef::Compact(*c), - } - } - - /// Returns a bitmap of this `Vis`. - pub fn to_bitmap(&self) -> Bitmap { - match self { - Vis::Bitmap(b) => b.clone(), - Vis::Compact(c) => Bitmap::ones(*c), - } - } - - /// Consumes this `Vis` and returns the inner `Bitmap` if not compact. - pub fn into_visibility(self) -> Option { - match self { - Vis::Bitmap(b) => Some(b), - Vis::Compact(_) => None, - } - } - - /// Returns a reference to the inner `Bitmap` if not compact. - pub fn as_visibility(&self) -> Option<&Bitmap> { - match self { - Vis::Bitmap(b) => Some(b), - Vis::Compact(_) => None, - } - } -} - -impl EstimateSize for Vis { - fn estimated_heap_size(&self) -> usize { - match self { - Vis::Bitmap(bitmap) => bitmap.estimated_heap_size(), - Vis::Compact(_) => 0, - } - } -} - -impl std::ops::BitAndAssign<&Bitmap> for Vis { - fn bitand_assign(&mut self, rhs: &Bitmap) { - match self { - Vis::Bitmap(lhs) => lhs.bitand_assign(rhs), - Vis::Compact(_) => *self = Vis::Bitmap(rhs.clone()), - } - } -} - -impl std::ops::BitAndAssign for Vis { - fn bitand_assign(&mut self, rhs: Bitmap) { - match self { - Vis::Bitmap(lhs) => lhs.bitand_assign(&rhs), - Vis::Compact(_) => *self = Vis::Bitmap(rhs), - } - } -} - -impl std::ops::BitAnd<&Bitmap> for &Vis { - type Output = Vis; - - fn bitand(self, rhs: &Bitmap) -> Self::Output { - match self { - Vis::Bitmap(lhs) => Vis::Bitmap(lhs.bitand(rhs)), - Vis::Compact(_) => Vis::Bitmap(rhs.clone()), - } - } -} - -impl<'a, 'b> std::ops::BitAnd<&'b Vis> for &'a Vis { - type Output = Vis; - - fn bitand(self, rhs: &'b Vis) -> Self::Output { - self.as_ref().bitand(rhs.as_ref()) - } -} - -impl<'a> std::ops::BitAnd for &'a Vis { - type Output = Vis; - - fn bitand(self, rhs: Vis) -> Self::Output { - self.as_ref().bitand(rhs) - } -} - -impl<'a, 'b> std::ops::BitOr<&'b Vis> for &'a Vis { - type Output = Vis; - - fn bitor(self, rhs: &'b Vis) -> Self::Output { - self.as_ref().bitor(rhs.as_ref()) - } -} - -impl<'a> std::ops::Not for &'a Vis { - type Output = Vis; - - fn not(self) -> Self::Output { - self.as_ref().not() - } -} - -#[derive(Copy, Clone, PartialEq, Debug)] -pub enum VisRef<'a> { - Bitmap(&'a Bitmap), - Compact(usize), // equivalent to all ones of this size -} - -pub type Iter<'a> = Either, itertools::RepeatN>; -pub type OnesIter<'a> = Either, std::ops::Range>; - -impl<'a> VisRef<'a> { - pub fn is_empty(self) -> bool { - match self { - VisRef::Bitmap(b) => b.is_empty(), - VisRef::Compact(c) => c == 0, - } - } - - pub fn len(self) -> usize { - match self { - VisRef::Bitmap(b) => b.len(), - VisRef::Compact(c) => c, - } - } - - /// # Panics - /// - /// Panics if `idx > len`. - pub fn is_set(self, idx: usize) -> bool { - match self { - VisRef::Bitmap(b) => b.is_set(idx), - VisRef::Compact(c) => { - assert!(idx <= c); - true - } - } - } - - pub fn iter(self) -> Iter<'a> { - match self { - VisRef::Bitmap(b) => Either::Left(b.iter()), - VisRef::Compact(c) => Either::Right(repeat_n(true, c)), - } - } - - pub fn iter_ones(self) -> OnesIter<'a> { - match self { - VisRef::Bitmap(b) => Either::Left(b.iter_ones()), - VisRef::Compact(c) => Either::Right(0..c), - } - } -} - -impl<'a> From<&'a Bitmap> for VisRef<'a> { - fn from(b: &'a Bitmap) -> Self { - VisRef::Bitmap(b) - } -} - -impl<'a> From for VisRef<'a> { - fn from(c: usize) -> Self { - VisRef::Compact(c) - } -} - -impl<'a> From<&'a Vis> for VisRef<'a> { - fn from(vis: &'a Vis) -> Self { - vis.as_ref() - } -} - -impl<'a, 'b> std::ops::BitAnd> for VisRef<'a> { - type Output = Vis; - - fn bitand(self, rhs: VisRef<'b>) -> Self::Output { - match (self, rhs) { - (VisRef::Bitmap(b1), VisRef::Bitmap(b2)) => Vis::Bitmap(b1 & b2), - (VisRef::Bitmap(b1), VisRef::Compact(c2)) => { - assert_eq!(b1.len(), c2); - Vis::Bitmap(b1.clone()) - } - (VisRef::Compact(c1), VisRef::Bitmap(b2)) => { - assert_eq!(c1, b2.len()); - Vis::Bitmap(b2.clone()) - } - (VisRef::Compact(c1), VisRef::Compact(c2)) => { - assert_eq!(c1, c2); - Vis::Compact(c1) - } - } - } -} - -impl<'a> std::ops::BitAnd for VisRef<'a> { - type Output = Vis; - - fn bitand(self, rhs: Vis) -> Self::Output { - match (self, rhs) { - (VisRef::Bitmap(b1), Vis::Bitmap(b2)) => Vis::Bitmap(b1 & b2), - (VisRef::Bitmap(b1), Vis::Compact(c2)) => { - assert_eq!(b1.len(), c2); - Vis::Bitmap(b1.clone()) - } - (VisRef::Compact(c1), Vis::Bitmap(b2)) => { - assert_eq!(c1, b2.len()); - Vis::Bitmap(b2) - } - (VisRef::Compact(c1), Vis::Compact(c2)) => { - assert_eq!(c1, c2); - Vis::Compact(c1) - } - } - } -} - -impl<'a, 'b> std::ops::BitOr> for VisRef<'a> { - type Output = Vis; - - fn bitor(self, rhs: VisRef<'b>) -> Self::Output { - match (self, rhs) { - (VisRef::Bitmap(b1), VisRef::Bitmap(b2)) => Vis::Bitmap(b1 | b2), - (VisRef::Bitmap(b1), VisRef::Compact(c2)) => { - assert_eq!(b1.len(), c2); - Vis::Compact(c2) - } - (VisRef::Compact(c1), VisRef::Bitmap(b2)) => { - assert_eq!(c1, b2.len()); - Vis::Compact(c1) - } - (VisRef::Compact(c1), VisRef::Compact(c2)) => { - assert_eq!(c1, c2); - Vis::Compact(c1) - } - } - } -} - -impl<'a> std::ops::BitOr for VisRef<'a> { - type Output = Vis; - - fn bitor(self, rhs: Vis) -> Self::Output { - // Unlike the `bitand` implementation, we can forward by ref directly here, because this - // will not introduce unnecessary clones. - self.bitor(rhs.as_ref()) - } -} - -impl<'a> std::ops::Not for VisRef<'a> { - type Output = Vis; - - fn not(self) -> Self::Output { - match self { - VisRef::Bitmap(b) => Vis::Bitmap(!b), - VisRef::Compact(c) => Vis::Bitmap(BitmapBuilder::zeroed(c).finish()), - } - } -} - -/// A mutable wrapper for `Vis`. can only set the visibilities and can not change the size. -#[derive(Debug)] -pub struct VisMut { - state: VisMutState, -} - -#[derive(Debug)] -enum VisMutState { - /// Non-compact variant. - /// Certain rows are hidden using this bitmap. - Bitmap(Bitmap), - - /// Compact variant which just stores cardinality of rows. - /// This can be used when all rows are visible. - Compact(usize), // equivalent to all ones of this size - - Builder(BitmapBuilder), -} - -impl From for VisMut { - fn from(vis: Vis) -> Self { - let state = match vis { - Vis::Bitmap(x) => VisMutState::Bitmap(x), - Vis::Compact(x) => VisMutState::Compact(x), - }; - Self { state } - } -} - -impl VisMut { - pub fn len(&self) -> usize { - match &self.state { - VisMutState::Bitmap(b) => b.len(), - VisMutState::Compact(c) => *c, - VisMutState::Builder(b) => b.len(), - } - } - - /// # Panics - /// - /// Panics if `idx >= len`. - pub fn is_set(&self, idx: usize) -> bool { - match &self.state { - VisMutState::Bitmap(b) => b.is_set(idx), - VisMutState::Compact(c) => { - assert!(idx < *c); - true - } - VisMutState::Builder(b) => b.is_set(idx), - } - } - - pub fn set(&mut self, n: usize, val: bool) { - if let VisMutState::Builder(b) = &mut self.state { - b.set(n, val); - } else { - let state = mem::replace(&mut self.state, VisMutState::Compact(0)); // intermediate state - let mut builder = match state { - VisMutState::Bitmap(b) => b.into(), - VisMutState::Compact(c) => BitmapBuilder::filled(c), - VisMutState::Builder(_) => unreachable!(), - }; - builder.set(n, val); - self.state = VisMutState::Builder(builder); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_vis_mut_from_compact() { - let n: usize = 128; - let vis = Vis::Compact(n); - let mut vis = vis.into_mut(); - assert_eq!(vis.len(), n); - - for i in 0..n { - assert!(vis.is_set(i), "{}", i); - } - vis.set(0, true); - for i in 0..n { - assert!(vis.is_set(i), "{}", i); - } - assert_eq!(vis.len(), n); - - let to_false = vec![1usize, 2, 14, 25, 17, 77, 62, 96]; - for i in &to_false { - vis.set(*i, false); - } - assert_eq!(vis.len(), n); - for i in 0..n { - assert_eq!(vis.is_set(i), !to_false.contains(&i), "{}", i); - } - - let vis: Vis = vis.into(); - assert_eq!(vis.len(), n); - for i in 0..n { - assert_eq!(vis.is_set(i), !to_false.contains(&i), "{}", i); - } - let count_ones = match &vis { - Vis::Bitmap(b) => b.count_ones(), - Vis::Compact(len) => *len, - }; - assert_eq!(count_ones, n - to_false.len()); - } - #[test] - fn test_vis_mut_from_bitmap() { - let zeros = 61usize; - let ones = 62usize; - let n: usize = ones + zeros; - - let mut builder = BitmapBuilder::default(); - builder.append_bitmap(&Bitmap::zeros(zeros)); - builder.append_bitmap(&Bitmap::ones(ones)); - - let vis = Vis::Bitmap(builder.finish()); - assert_eq!(vis.len(), n); - - let mut vis = vis.into_mut(); - assert_eq!(vis.len(), n); - for i in 0..n { - assert_eq!(vis.is_set(i), i >= zeros, "{}", i); - } - - vis.set(0, false); - assert_eq!(vis.len(), n); - for i in 0..n { - assert_eq!(vis.is_set(i), i >= zeros, "{}", i); - } - - let toggles = vec![1usize, 2, 14, 25, 17, 77, 62, 96]; - for i in &toggles { - let i = *i; - vis.set(i, i < zeros); - } - assert_eq!(vis.len(), n); - for i in 0..zeros { - assert_eq!(vis.is_set(i), toggles.contains(&i), "{}", i); - } - for i in zeros..n { - assert_eq!(vis.is_set(i), !toggles.contains(&i), "{}", i); - } - - let vis: Vis = vis.into(); - assert_eq!(vis.len(), n); - for i in 0..zeros { - assert_eq!(vis.is_set(i), toggles.contains(&i), "{}", i); - } - for i in zeros..n { - assert_eq!(vis.is_set(i), !toggles.contains(&i), "{}", i); - } - let count_ones = match &vis { - Vis::Bitmap(b) => b.count_ones(), - Vis::Compact(len) => *len, - }; - let mut expected_ones = ones; - for i in &toggles { - let i = *i; - if i < zeros { - expected_ones += 1; - } else { - expected_ones -= 1; - } - } - assert_eq!(count_ones, expected_ones); - } -} diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index d5da545ddfe3c..e6f908556ca90 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -419,6 +419,12 @@ impl Bitmap { } } +impl From for Bitmap { + fn from(val: usize) -> Self { + Self::ones(val) + } +} + impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap { type Output = Bitmap; @@ -469,6 +475,12 @@ impl BitAndAssign<&Bitmap> for Bitmap { } } +impl BitAndAssign for Bitmap { + fn bitand_assign(&mut self, rhs: Bitmap) { + *self = &*self & rhs; + } +} + impl<'a, 'b> BitOr<&'b Bitmap> for &'a Bitmap { type Output = Bitmap; diff --git a/src/common/src/hash/key_v2.rs b/src/common/src/hash/key_v2.rs index 46b402394738c..49936e4cbc6c8 100644 --- a/src/common/src/hash/key_v2.rs +++ b/src/common/src/hash/key_v2.rs @@ -321,7 +321,7 @@ impl HashKey for HashKeyImpl { dispatch_array_variants!(array, array, { for ((scalar, visible), serializer) in array .iter() - .zip_eq_fast(data_chunk.vis().iter()) + .zip_eq_fast(data_chunk.visibility().iter()) .zip_eq_fast(&mut serializers) { if visible { @@ -386,7 +386,7 @@ impl DataChunk { } let mut sizes = self - .vis() + .visibility() .iter() .map(|visible| if visible { exact_size } else { 0 }) .collect_vec(); @@ -395,7 +395,7 @@ impl DataChunk { dispatch_array_variants!(&*self.columns()[i], col, { for ((datum, visible), size) in col .iter() - .zip_eq_fast(self.vis().iter()) + .zip_eq_fast(self.visibility().iter()) .zip_eq_fast(&mut sizes) { if visible && let Some(scalar) = datum { diff --git a/src/common/src/test_utils/rand_bitmap.rs b/src/common/src/test_utils/rand_bitmap.rs index 75d804de8bdc0..604771933a378 100644 --- a/src/common/src/test_utils/rand_bitmap.rs +++ b/src/common/src/test_utils/rand_bitmap.rs @@ -19,6 +19,9 @@ use rand::SeedableRng; use crate::buffer::{Bitmap, BitmapBuilder}; pub fn gen_rand_bitmap(num_bits: usize, count_ones: usize, seed: u64) -> Bitmap { + if count_ones == num_bits { + return Bitmap::ones(num_bits); + } let mut builder = BitmapBuilder::zeroed(num_bits); let mut range = (0..num_bits).collect_vec(); range.shuffle(&mut rand::rngs::StdRng::seed_from_u64(seed)); diff --git a/src/common/src/test_utils/rand_stream_chunk.rs b/src/common/src/test_utils/rand_stream_chunk.rs index e256b8a9bff17..3d961f7f9505a 100644 --- a/src/common/src/test_utils/rand_stream_chunk.rs +++ b/src/common/src/test_utils/rand_stream_chunk.rs @@ -19,7 +19,7 @@ use crate::array::{ArrayBuilder, ArrayImpl, I64ArrayBuilder}; use crate::buffer::Bitmap; pub fn gen_legal_stream_chunk( - bitmap: Option<&Bitmap>, + bitmap: &Bitmap, chunk_size: usize, append_only: bool, seed: u64, @@ -28,34 +28,9 @@ pub fn gen_legal_stream_chunk( let mut ops: Vec = vec![]; let mut cur_data: Vec = vec![]; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - if let Some(bitmap) = bitmap { - for i in 0..chunk_size { - // SAFETY(value_at_unchecked): the idx is always in bound. - unsafe { - if bitmap.is_set_unchecked(i) { - let op = if append_only || cur_data.is_empty() || rng.gen() { - Op::Insert - } else { - Op::Delete - }; - ops.push(op); - if op == Op::Insert { - let value = rng.gen::() as i64; - data_builder.append(Some(value)); - cur_data.push(value); - } else { - let idx = rng.gen_range(0..cur_data.len()); - data_builder.append(Some(cur_data[idx])); - cur_data.remove(idx); - } - } else { - ops.push(Op::Insert); - data_builder.append(Some(1234567890)); - } - } - } - } else { - for _ in 0..chunk_size { + for i in 0..chunk_size { + // SAFETY(value_at_unchecked): the idx is always in bound. + if unsafe { bitmap.is_set_unchecked(i) } { let op = if append_only || cur_data.is_empty() || rng.gen() { Op::Insert } else { @@ -71,6 +46,9 @@ pub fn gen_legal_stream_chunk( data_builder.append(Some(cur_data[idx])); cur_data.remove(idx); } + } else { + ops.push(Op::Insert); + data_builder.append(Some(1234567890)); } } (ops, data_builder.finish().into()) diff --git a/src/common/src/test_utils/test_stream_chunk.rs b/src/common/src/test_utils/test_stream_chunk.rs index c93b32b2ccef1..e291aef793684 100644 --- a/src/common/src/test_utils/test_stream_chunk.rs +++ b/src/common/src/test_utils/test_stream_chunk.rs @@ -86,7 +86,7 @@ impl BigStreamChunk { let col = Arc::new(I32Array::from_iter(std::iter::repeat(114_514).take(capacity)).into()); - let chunk = StreamChunk::new(ops, vec![col], Some(visibility)); + let chunk = StreamChunk::with_visibility(ops, vec![col], visibility); Self(chunk) } diff --git a/src/common/src/util/chunk_coalesce.rs b/src/common/src/util/chunk_coalesce.rs index 3c5e4b109da90..9a41fc83e8f0e 100644 --- a/src/common/src/util/chunk_coalesce.rs +++ b/src/common/src/util/chunk_coalesce.rs @@ -76,32 +76,30 @@ impl DataChunkBuilder { self.ensure_builders(); let mut new_return_offset = input_chunk.offset; - match input_chunk.data_chunk.visibility() { - Some(vis) => { - for vis in vis.iter().skip(input_chunk.offset) { - new_return_offset += 1; - if !vis { - continue; - } - - self.append_one_row_internal(&input_chunk.data_chunk, new_return_offset - 1); - if self.buffered_count >= self.batch_size { - break; - } + let vis = input_chunk.data_chunk.visibility(); + if !vis.all() { + for vis in vis.iter().skip(input_chunk.offset) { + new_return_offset += 1; + if !vis { + continue; } - } - None => { - let num_rows_to_append = std::cmp::min( - self.batch_size - self.buffered_count, - input_chunk.data_chunk.capacity() - input_chunk.offset, - ); - let end_offset = input_chunk.offset + num_rows_to_append; - for input_row_idx in input_chunk.offset..end_offset { - new_return_offset += 1; - self.append_one_row_internal(&input_chunk.data_chunk, input_row_idx) + + self.append_one_row_internal(&input_chunk.data_chunk, new_return_offset - 1); + if self.buffered_count >= self.batch_size { + break; } } - } + } else { + let num_rows_to_append = std::cmp::min( + self.batch_size - self.buffered_count, + input_chunk.data_chunk.capacity() - input_chunk.offset, + ); + let end_offset = input_chunk.offset + num_rows_to_append; + for input_row_idx in input_chunk.offset..end_offset { + new_return_offset += 1; + self.append_one_row_internal(&input_chunk.data_chunk, input_row_idx) + } + }; assert!(self.buffered_count <= self.batch_size); @@ -332,14 +330,14 @@ mod tests { assert_eq!(Some(1), returned_input.as_ref().map(|c| c.offset)); assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality)); assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity)); - assert!(output.unwrap().visibility().is_none()); + assert!(output.unwrap().is_compacted()); // Append last input let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap()); assert!(returned_input.is_none()); assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality)); assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity)); - assert!(output.unwrap().visibility().is_none()); + assert!(output.unwrap().is_compacted()); } #[test] @@ -370,7 +368,7 @@ mod tests { assert_eq!(Some(3), returned_input.as_ref().map(|c| c.offset)); assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality)); assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity)); - assert!(output.unwrap().visibility().is_none()); + assert!(output.unwrap().is_compacted()); assert_eq!(0, builder.buffered_count()); // Append last input @@ -412,7 +410,7 @@ mod tests { for output in &[output_1, output_2] { assert_eq!(3, output.cardinality()); assert_eq!(3, output.capacity()); - assert!(output.visibility().is_none()); + assert!(output.is_compacted()); } } @@ -437,7 +435,7 @@ mod tests { let output = builder.consume_all().expect("Failed to consume all!"); assert_eq!(2, output.cardinality()); assert_eq!(2, output.capacity()); - assert!(output.visibility().is_none()); + assert!(output.is_compacted()); } #[test] diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 90d8f0c3a5541..fbc7b419f13d4 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -106,8 +106,8 @@ impl MockOffsetGenExecutor { match msg { Message::Chunk(chunk) => { let mut offset_builder = Utf8ArrayBuilder::new(chunk.cardinality()); + assert!(chunk.is_compacted()); let (ops, mut columns, vis) = chunk.into_inner(); - assert!(vis.as_visibility().is_none()); for _ in 0..ops.len() { let offset_str = self.next_offset()?; @@ -116,7 +116,7 @@ impl MockOffsetGenExecutor { let offsets = offset_builder.finish(); columns.push(offsets.into_ref()); - yield Message::Chunk(StreamChunk::new(ops, columns, vis.into_visibility())); + yield Message::Chunk(StreamChunk::with_visibility(ops, columns, vis)); } Message::Barrier(barrier) => { yield Message::Barrier(barrier); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index ba9f1710a2e55..987d049a11f0c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -104,7 +104,6 @@ impl SourceStreamChunkBuilder { .into_iter() .map(|builder| builder.finish().into()) .collect(), - None, ) } diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 451c8b2686ec7..8d19cc0ad705c 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -24,7 +24,6 @@ use icelake::config::{TableConfig, TableConfigRef}; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; use icelake::Table; -use itertools::Itertools; use opendal::services::S3; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -289,12 +288,7 @@ impl IcebergWriter { let (mut chunk, ops) = chunk.into_parts(); let filters = - Bitmap::from_bool_slice(&ops.iter().map(|op| *op == Op::Insert).collect_vec()); - let filters = if let Some(ori_vis) = chunk.visibility() { - ori_vis & &filters - } else { - filters - }; + chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); chunk.set_visibility(filters); let chunk = RecordBatch::try_from(&chunk.compact()) diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 77026581492c2..f940a705754a5 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -228,7 +228,7 @@ impl FunctionAttr { for child in &self.children[#num_args..] { columns.push(child.eval_checked(input).await?); } - let variadic_input = DataChunk::new(columns, input.vis().clone()); + let variadic_input = DataChunk::new(columns, input.visibility().clone()); } }); // evaluate variadic arguments in `eval_row` @@ -382,24 +382,21 @@ impl FunctionAttr { quote! { let mut builder = #builder_type::with_type(input.capacity(), self.context.return_type.clone()); - match input.vis() { - Vis::Bitmap(vis) => { - // allow using `zip` for performance - #[allow(clippy::disallowed_methods)] - for (i, ((#(#inputs,)*), visible)) in #array_zip.zip(vis.iter()).enumerate() { - if !visible { - builder.append_null(); - continue; - } - #let_variadic - #append_output - } + if input.is_compacted() { + for (i, (#(#inputs,)*)) in #array_zip.enumerate() { + #let_variadic + #append_output } - Vis::Compact(_) => { - for (i, (#(#inputs,)*)) in #array_zip.enumerate() { - #let_variadic - #append_output + } else { + // allow using `zip` for performance + #[allow(clippy::disallowed_methods)] + for (i, ((#(#inputs,)*), visible)) in #array_zip.zip(input.visibility().iter()).enumerate() { + if !visible { + builder.append_null(); + continue; } + #let_variadic + #append_output } } Ok(Arc::new(builder.finish().into())) @@ -699,21 +696,10 @@ impl FunctionAttr { #(#let_arrays)* let state0 = state0.as_datum_mut(); let mut state: Option<#state_type> = #let_state; - match input.vis() { - Vis::Bitmap(bitmap) => { - for row_id in bitmap.iter_ones() { - let op = unsafe { *input.ops().get_unchecked(row_id) }; - #(#let_values)* - state = #next_state; - } - } - Vis::Compact(_) => { - for row_id in 0..input.capacity() { - let op = unsafe { *input.ops().get_unchecked(row_id) }; - #(#let_values)* - state = #next_state; - } - } + for row_id in input.visibility().iter_ones() { + let op = unsafe { *input.ops().get_unchecked(row_id) }; + #(#let_values)* + state = #next_state; } *state0 = #assign_state; Ok(()) @@ -724,25 +710,22 @@ impl FunctionAttr { #(#let_arrays)* let state0 = state0.as_datum_mut(); let mut state: Option<#state_type> = #let_state; - match input.vis() { - Vis::Bitmap(bitmap) => { - for row_id in bitmap.iter_ones() { - if row_id < range.start { - continue; - } else if row_id >= range.end { - break; - } - let op = unsafe { *input.ops().get_unchecked(row_id) }; - #(#let_values)* - state = #next_state; - } + if input.is_compacted() { + for row_id in range { + let op = unsafe { *input.ops().get_unchecked(row_id) }; + #(#let_values)* + state = #next_state; } - Vis::Compact(_) => { - for row_id in range { - let op = unsafe { *input.ops().get_unchecked(row_id) }; - #(#let_values)* - state = #next_state; + } else { + for row_id in input.visibility().iter_ones() { + if row_id < range.start { + continue; + } else if row_id >= range.end { + break; } + let op = unsafe { *input.ops().get_unchecked(row_id) }; + #(#let_values)* + state = #next_state; } } *state0 = #assign_state; @@ -949,7 +932,7 @@ impl FunctionAttr { let mut index_builder = I32ArrayBuilder::new(self.chunk_size); #(let mut #builders = #builder_types::with_type(self.chunk_size, #return_types);)* - for (i, (row, visible)) in multizip((#(#arrays.iter(),)*)).zip_eq_fast(input.vis().iter()).enumerate() { + for (i, (row, visible)) in multizip((#(#arrays.iter(),)*)).zip_eq_fast(input.visibility().iter()).enumerate() { if let (#(Some(#inputs),)*) = row && visible { let iter = #fn_name(#(#inputs,)* #prebuilt_arg); for output in #iter { diff --git a/src/expr/src/agg/general.rs b/src/expr/src/agg/general.rs index 8971c48c278c7..d746641fadfc8 100644 --- a/src/expr/src/agg/general.rs +++ b/src/expr/src/agg/general.rs @@ -413,25 +413,10 @@ mod tests { "benching {} agg, chunk_size={}, vis_rate={}", agg_desc, chunk_size, vis_rate ); - let bitmap = if vis_rate < 1.0 { - Some(rand_bitmap::gen_rand_bitmap( - chunk_size, - (chunk_size as f64 * vis_rate) as usize, - 666, - )) - } else { - None - }; - let (ops, data) = rand_stream_chunk::gen_legal_stream_chunk( - bitmap.as_ref(), - chunk_size, - append_only, - 666, - ); - let vis = match bitmap { - Some(bitmap) => Vis::Bitmap(bitmap), - None => Vis::Compact(chunk_size), - }; + let vis = + rand_bitmap::gen_rand_bitmap(chunk_size, (chunk_size as f64 * vis_rate) as usize, 666); + let (ops, data) = + rand_stream_chunk::gen_legal_stream_chunk(&vis, chunk_size, append_only, 666); let chunk = StreamChunk::from_parts(ops, DataChunk::new(vec![Arc::new(data)], vis)); let pretty = format!("({agg_desc}:int8 $0:int8)"); let agg = crate::agg::build_append_only(&AggCall::from_pretty(pretty)).unwrap(); diff --git a/src/expr/src/expr/expr_array_transform.rs b/src/expr/src/expr/expr_array_transform.rs index 1f5d67397b332..4d1afe88bc659 100644 --- a/src/expr/src/expr/expr_array_transform.rs +++ b/src/expr/src/expr/expr_array_transform.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; -use risingwave_common::array::{ArrayRef, DataChunk, Vis}; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum, ListValue, ScalarImpl}; @@ -40,8 +40,7 @@ impl Expression for ArrayTransformExpression { let new_list = lambda_input .map_inner(|flatten_input| async move { let flatten_len = flatten_input.len(); - let chunk = - DataChunk::new(vec![Arc::new(flatten_input)], Vis::Compact(flatten_len)); + let chunk = DataChunk::new(vec![Arc::new(flatten_input)], flatten_len); self.lambda.eval(&chunk).await.map(Arc::unwrap_or_clone) }) .await?; diff --git a/src/expr/src/expr/expr_binary_nullable.rs b/src/expr/src/expr/expr_binary_nullable.rs index 6864ef12d9508..bc3ccf8f98231 100644 --- a/src/expr/src/expr/expr_binary_nullable.rs +++ b/src/expr/src/expr/expr_binary_nullable.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use risingwave_common::array::*; -use risingwave_common::buffer::Bitmap; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum, Scalar}; use risingwave_expr_macro::build_function; @@ -46,29 +45,24 @@ impl Expression for BinaryShortCircuitExpression { let left = self.expr_ia1.eval_checked(input).await?; let left = left.as_bool(); - let res_vis: Vis = match self.expr_type { + let res_vis = match self.expr_type { // For `Or` operator, if res of left part is not null and is true, we do not want to // calculate right part because the result must be true. - Type::Or => (!left.to_bitmap()).into(), + Type::Or => !left.to_bitmap(), // For `And` operator, If res of left part is not null and is false, we do not want // to calculate right part because the result must be false. - Type::And => (left.data() | !left.null_bitmap()).into(), + Type::And => left.data() | !left.null_bitmap(), _ => unimplemented!(), }; - let new_vis = input.vis() & res_vis; + let new_vis = input.visibility() & res_vis; let mut input1 = input.clone(); - input1.set_vis(new_vis); + input1.set_visibility(new_vis); let right = self.expr_ia2.eval_checked(&input1).await?; let right = right.as_bool(); assert_eq!(left.len(), right.len()); - let mut bitmap = match input.visibility() { - Some(vis) => vis.clone(), - None => Bitmap::ones(input.capacity()), - }; - bitmap &= left.null_bitmap(); - bitmap &= right.null_bitmap(); + let mut bitmap = input.visibility() & left.null_bitmap() & right.null_bitmap(); let c = match self.expr_type { Type::Or => { diff --git a/src/expr/src/expr/expr_case.rs b/src/expr/src/expr/expr_case.rs index 894426247043a..be4cdb864d520 100644 --- a/src/expr/src/expr/expr_case.rs +++ b/src/expr/src/expr/expr_case.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_common::array::{ArrayRef, DataChunk, Vis}; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_common::{bail, ensure}; @@ -64,25 +64,20 @@ impl Expression for CaseExpression { let when_len = self.when_clauses.len(); let mut result_array = Vec::with_capacity(when_len + 1); for (when_idx, WhenClause { when, then }) in self.when_clauses.iter().enumerate() { - let calc_then_vis: Vis = when - .eval_checked(&input) - .await? - .as_bool() - .to_bitmap() - .into(); - let input_vis = input.vis().clone(); - input.set_vis(calc_then_vis.clone()); + let calc_then_vis = when.eval_checked(&input).await?.as_bool().to_bitmap(); + let input_vis = input.visibility().clone(); + input.set_visibility(calc_then_vis.clone()); let then_res = then.eval_checked(&input).await?; calc_then_vis .iter_ones() .for_each(|pos| selection[pos] = Some(when_idx)); - input.set_vis(&input_vis & (!&calc_then_vis)); + input.set_visibility(&input_vis & (!calc_then_vis)); result_array.push(then_res); } if let Some(ref else_expr) = self.else_clause { let else_res = else_expr.eval_checked(&input).await?; input - .vis() + .visibility() .iter_ones() .for_each(|pos| selection[pos] = Some(when_len)); result_array.push(else_res); diff --git a/src/expr/src/expr/expr_coalesce.rs b/src/expr/src/expr/expr_coalesce.rs index df06ff2a2110b..ee47db4db20bc 100644 --- a/src/expr/src/expr/expr_coalesce.rs +++ b/src/expr/src/expr/expr_coalesce.rs @@ -16,7 +16,7 @@ use std::convert::TryFrom; use std::ops::BitAnd; use std::sync::Arc; -use risingwave_common::array::{ArrayRef, DataChunk, Vis, VisRef}; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_pb::expr::expr_node::{RexNode, Type}; @@ -38,7 +38,7 @@ impl Expression for CoalesceExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let init_vis = input.vis(); + let init_vis = input.visibility(); let mut input = input.clone(); let len = input.capacity(); let mut selection: Vec> = vec![None; len]; @@ -46,18 +46,12 @@ impl Expression for CoalesceExpression { for (child_idx, child) in self.children.iter().enumerate() { let res = child.eval_checked(&input).await?; let res_bitmap = res.null_bitmap(); - let orig_vis = input.vis(); - let res_bitmap_ref: VisRef<'_> = res_bitmap.into(); - orig_vis - .as_ref() - .bitand(res_bitmap_ref) - .iter_ones() - .for_each(|pos| { - selection[pos] = Some(child_idx); - }); - let res_vis: Vis = (!res_bitmap).into(); - let new_vis = orig_vis & res_vis; - input.set_vis(new_vis); + let orig_vis = input.visibility(); + for pos in orig_vis.bitand(res_bitmap).iter_ones() { + selection[pos] = Some(child_idx); + } + let new_vis = orig_vis & !res_bitmap; + input.set_visibility(new_vis); children_array.push(res); } let mut builder = self.return_type.create_array_builder(len); diff --git a/src/expr/src/expr/expr_in.rs b/src/expr/src/expr/expr_in.rs index cbe356bc1bbd6..ed7c9cc9e50cb 100644 --- a/src/expr/src/expr/expr_in.rs +++ b/src/expr/src/expr/expr_in.rs @@ -76,7 +76,7 @@ impl Expression for InExpression { async fn eval(&self, input: &DataChunk) -> Result { let input_array = self.left.eval_checked(input).await?; let mut output_array = BoolArrayBuilder::new(input_array.len()); - for (data, vis) in input_array.iter().zip_eq_fast(input.vis().iter()) { + for (data, vis) in input_array.iter().zip_eq_fast(input.visibility().iter()) { if vis { let ret = self.exists(&data.to_owned_datum()); output_array.append(ret); @@ -233,7 +233,7 @@ mod tests { .eval(&data_chunks[i]) .await .unwrap() - .compact(vis.unwrap(), expected[i].len()); + .compact(vis, expected[i].len()); for (i, expect) in expected[i].iter().enumerate() { assert_eq!(res.datum_at(i), expect.map(ScalarImpl::Bool)); diff --git a/src/expr/src/expr/expr_some_all.rs b/src/expr/src/expr/expr_some_all.rs index 8b47df11343f3..9ae9146406ada 100644 --- a/src/expr/src/expr/expr_some_all.rs +++ b/src/expr/src/expr/expr_some_all.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use itertools::{multizip, Itertools}; +use itertools::Itertools; use risingwave_common::array::{Array, ArrayRef, BoolArray, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum, ListRef, Scalar, ScalarImpl, ScalarRefImpl}; @@ -84,7 +84,6 @@ impl Expression for SomeAllExpression { async fn eval(&self, data_chunk: &DataChunk) -> Result { let arr_left = self.left_expr.eval_checked(data_chunk).await?; let arr_right = self.right_expr.eval_checked(data_chunk).await?; - let bitmap = data_chunk.visibility(); let mut num_array = Vec::with_capacity(data_chunk.capacity()); let arr_right_inner = arr_right.as_list(); @@ -124,24 +123,21 @@ impl Expression for SomeAllExpression { } }; - match bitmap { - Some(bitmap) => { - for ((left, right), visible) in arr_left - .iter() - .zip_eq_fast(arr_right.iter()) - .zip_eq_fast(bitmap.iter()) - { - if !visible { - num_array.push(None); - continue; - } - unfolded_left_right(left, right, &mut num_array); - } + if data_chunk.is_compacted() { + for (left, right) in arr_left.iter().zip_eq_fast(arr_right.iter()) { + unfolded_left_right(left, right, &mut num_array); } - None => { - for (left, right) in multizip((arr_left.iter(), arr_right.iter())) { - unfolded_left_right(left, right, &mut num_array); + } else { + for ((left, right), visible) in arr_left + .iter() + .zip_eq_fast(arr_right.iter()) + .zip_eq_fast(data_chunk.visibility().iter()) + { + if !visible { + num_array.push(None); + continue; } + unfolded_left_right(left, right, &mut num_array); } } diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs index b7a03a6611f53..8ba2e801edef4 100644 --- a/src/expr/src/expr/expr_udf.rs +++ b/src/expr/src/expr/expr_udf.rs @@ -47,7 +47,7 @@ impl Expression for UdfExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let vis = input.vis().to_bitmap(); + let vis = input.visibility(); let mut columns = Vec::with_capacity(self.children.len()); for child in &self.children { let array = child.eval_checked(input).await?; @@ -69,9 +69,7 @@ impl Expression for UdfExpression { .iter() .map::, _>(|c| Ok(c.as_ref().try_into()?)) .try_collect()?; - let output_array = self - .eval_inner(arg_columns, chunk.vis().to_bitmap()) - .await?; + let output_array = self.eval_inner(arg_columns, chunk.visibility()).await?; Ok(output_array.to_datum()) } } @@ -80,7 +78,7 @@ impl UdfExpression { async fn eval_inner( &self, columns: Vec, - vis: risingwave_common::buffer::Bitmap, + vis: &risingwave_common::buffer::Bitmap, ) -> Result { let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(vis.len())); let input = diff --git a/src/expr/src/table_function/user_defined.rs b/src/expr/src/table_function/user_defined.rs index 813cf23504482..89c6e3325f26a 100644 --- a/src/expr/src/table_function/user_defined.rs +++ b/src/expr/src/table_function/user_defined.rs @@ -57,10 +57,10 @@ impl UserDefinedTableFunction { let val = c.eval_checked(input).await?; columns.push(val); } - let direct_input = DataChunk::new(columns, input.vis().clone()); + let direct_input = DataChunk::new(columns, input.visibility().clone()); // compact the input chunk and record the row mapping - let visible_rows = direct_input.vis().iter_ones().collect_vec(); + let visible_rows = direct_input.visibility().iter_ones().collect_vec(); let compacted_input = direct_input.compact_cow(); let arrow_input = RecordBatch::try_from(compacted_input.as_ref())?; @@ -85,7 +85,7 @@ impl UserDefinedTableFunction { let output = DataChunk::new( vec![origin_indices.into_ref(), output.column_at(1).clone()], - output.vis().clone(), + output.visibility().clone(), ); yield output; } diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 3781bee79b1b4..08aa3a38aeaaa 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -317,11 +317,8 @@ mod tests { macro_rules! write_chunk { ($i:expr) => {{ - let chunk = StreamChunk::new( - vec![Op::Insert], - vec![I64Array::from_iter([$i]).into_ref()], - None, - ); + let chunk = + StreamChunk::new(vec![Op::Insert], vec![I64Array::from_iter([$i]).into_ref()]); write_handle.write_chunk(chunk).await.unwrap(); }}; } @@ -362,11 +359,7 @@ mod tests { assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_)); - let chunk = StreamChunk::new( - vec![Op::Insert], - vec![I64Array::from_iter([1]).into_ref()], - None, - ); + let chunk = StreamChunk::new(vec![Op::Insert], vec![I64Array::from_iter([1]).into_ref()]); write_handle.write_chunk(chunk).await.unwrap(); assert_matches!(reader.next().await.unwrap()?, txn_msg => { diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index aa876e2c6b88c..d50e4ec0277ab 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -184,7 +184,7 @@ pub fn compute_chunk_vnode( VirtualNode::compute_chunk(chunk, &dist_key_indices) .into_iter() - .zip_eq_fast(chunk.vis().iter()) + .zip_eq_fast(chunk.visibility().iter()) .map(|(vnode, vis)| { // Ignore the invisible rows. if vis { diff --git a/src/stream/src/common/builder.rs b/src/stream/src/common/builder.rs index 0945066592d83..947a79f3747c9 100644 --- a/src/stream/src/common/builder.rs +++ b/src/stream/src/common/builder.rs @@ -139,7 +139,6 @@ impl StreamChunkBuilder { Some(StreamChunk::new( std::mem::replace(&mut self.ops, Vec::with_capacity(self.capacity)), new_columns, - None, )) } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 2d06a3721a8c3..aadbccc1b97f6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -22,7 +22,7 @@ use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; -use risingwave_common::array::{Op, StreamChunk, Vis}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{get_dist_key_in_pk_indices, ColumnDesc, TableId, TableOption}; @@ -844,45 +844,41 @@ where }) .collect_vec(); - let vis = key_chunk.vis(); - match vis { - Vis::Bitmap(vis) => { - for ((op, (key, key_bytes), value), vis) in - izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(vis.iter()) - { - if vis { - match op { - Op::Insert | Op::UpdateInsert => { - if USE_WATERMARK_CACHE && let Some(ref pk) = key { + if !key_chunk.is_compacted() { + for ((op, (key, key_bytes), value), vis) in + izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter()) + { + if vis { + match op { + Op::Insert | Op::UpdateInsert => { + if USE_WATERMARK_CACHE && let Some(ref pk) = key { self.watermark_cache.insert(pk); } - self.insert_inner(TableKey(key_bytes), value); - } - Op::Delete | Op::UpdateDelete => { - if USE_WATERMARK_CACHE && let Some(ref pk) = key { + self.insert_inner(TableKey(key_bytes), value); + } + Op::Delete | Op::UpdateDelete => { + if USE_WATERMARK_CACHE && let Some(ref pk) = key { self.watermark_cache.delete(pk); } - self.delete_inner(TableKey(key_bytes), value); - } + self.delete_inner(TableKey(key_bytes), value); } } } } - Vis::Compact(_) => { - for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) { - match op { - Op::Insert | Op::UpdateInsert => { - if USE_WATERMARK_CACHE && let Some(ref pk) = key { + } else { + for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) { + match op { + Op::Insert | Op::UpdateInsert => { + if USE_WATERMARK_CACHE && let Some(ref pk) = key { self.watermark_cache.insert(pk); } - self.insert_inner(TableKey(key_bytes), value); - } - Op::Delete | Op::UpdateDelete => { - if USE_WATERMARK_CACHE && let Some(ref pk) = key { + self.insert_inner(TableKey(key_bytes), value); + } + Op::Delete | Op::UpdateDelete => { + if USE_WATERMARK_CACHE && let Some(ref pk) = key { self.watermark_cache.delete(pk); } - self.delete_inner(TableKey(key_bytes), value); - } + self.delete_inner(TableKey(key_bytes), value); } } } diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 2f5dc3202adb9..c0a07ebcb2f02 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1110,11 +1110,8 @@ async fn test_state_table_write_chunk_visibility() { &data_types, ); let (ops, columns, _) = chunk.into_inner(); - let chunk = StreamChunk::new( - ops, - columns, - Some(Bitmap::from_iter([true, true, true, false])), - ); + let chunk = + StreamChunk::with_visibility(ops, columns, Bitmap::from_iter([true, true, true, false])); state_table.write_chunk(chunk); diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 8c7853e2fc7af..afc395a7ab128 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -17,7 +17,8 @@ use std::marker::PhantomData; use std::sync::Arc; use risingwave_common::array::stream_record::{Record, RecordType}; -use risingwave_common::array::{StreamChunk, Vis}; +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::must_match; @@ -329,7 +330,7 @@ impl AggGroup { chunk: &StreamChunk, calls: &[AggCall], funcs: &[BoxedAggregateFunction], - visibilities: Vec, + visibilities: Vec, ) -> StreamExecutorResult<()> { if self.curr_row_count() == 0 { tracing::trace!(group = ?self.group_key_row(), "first time see this group"); diff --git a/src/stream/src/executor/aggregation/agg_state.rs b/src/stream/src/executor/aggregation/agg_state.rs index bb59faf40f150..36690b872da58 100644 --- a/src/stream/src/executor/aggregation/agg_state.rs +++ b/src/stream/src/executor/aggregation/agg_state.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::{StreamChunk, Vis}; +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::must_match; @@ -98,7 +99,7 @@ impl AggState { chunk: &StreamChunk, call: &AggCall, func: &BoxedAggregateFunction, - visibility: Vis, + visibility: Bitmap, ) -> StreamExecutorResult<()> { match self { Self::Value(state) => { @@ -108,7 +109,7 @@ impl AggState { } Self::MaterializedInput(state) => { // the input chunk for minput is unprojected - let chunk = chunk.with_visibility(visibility); + let chunk = chunk.clone_with_vis(visibility); state.apply_chunk(&chunk) } } diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index dd5905c342710..bcc12d065169e 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -18,8 +18,8 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use itertools::Itertools; -use risingwave_common::array::{ArrayRef, Op, Vis, VisRef}; -use risingwave_common::buffer::BitmapBuilder; +use risingwave_common::array::{ArrayRef, Op}; +use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt}; use risingwave_common::types::{ScalarImpl, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; @@ -55,7 +55,7 @@ impl ColumnDeduplicater { &mut self, ops: &[Op], column: &ArrayRef, - mut visibilities: Vec<&mut Vis>, + mut visibilities: Vec<&mut Bitmap>, dedup_table: &mut StateTable, group_key: Option<&GroupKey>, ctx: ActorContextRef, @@ -173,11 +173,8 @@ impl ColumnDeduplicater { }); for (vis, vis_mask_inv) in visibilities.iter_mut().zip_eq(vis_masks_inv.into_iter()) { - let mask = !vis_mask_inv.finish(); - if !mask.all() { - // update visibility if needed - **vis = vis.as_ref() & VisRef::from(&mask); - } + // update visibility + **vis &= !vis_mask_inv.finish(); } // if we determine to flush to the table when processing every chunk instead of barrier @@ -259,11 +256,11 @@ impl DistinctDeduplicater { &mut self, ops: &[Op], columns: &[ArrayRef], - mut visibilities: Vec, + mut visibilities: Vec, dedup_tables: &mut HashMap>, group_key: Option<&GroupKey>, ctx: ActorContextRef, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { for (distinct_col, (ref call_indices, deduplicater)) in &mut self.deduplicaters { let column = &columns[*distinct_col]; let dedup_table = dedup_tables.get_mut(distinct_col).unwrap(); diff --git a/src/stream/src/executor/aggregation/mod.rs b/src/stream/src/executor/aggregation/mod.rs index 4505ce5520981..fb1884ed8a972 100644 --- a/src/stream/src/executor/aggregation/mod.rs +++ b/src/stream/src/executor/aggregation/mod.rs @@ -16,8 +16,9 @@ pub use agg_group::*; pub use agg_state::*; pub use distinct::*; use risingwave_common::array::ArrayImpl::Bool; -use risingwave_common::array::{DataChunk, Vis}; +use risingwave_common::array::DataChunk; use risingwave_common::bail; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_expr::agg::{AggCall, AggKind}; use risingwave_storage::StateStore; @@ -63,8 +64,8 @@ pub async fn agg_call_filter_res( identity: &str, agg_call: &AggCall, chunk: &DataChunk, -) -> StreamExecutorResult { - let mut vis = chunk.vis().clone(); +) -> StreamExecutorResult { + let mut vis = chunk.visibility().clone(); if matches!( agg_call.kind, AggKind::Min | AggKind::Max | AggKind::StringAgg diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 6c49be9e607a1..67f25fc506f16 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -170,10 +170,10 @@ pub(crate) fn mark_chunk_ref_by_vnode( new_visibility.append(v); } let (columns, _) = data.into_parts(); - Ok(StreamChunk::new( + Ok(StreamChunk::with_visibility( ops, columns, - Some(new_visibility.finish()), + new_visibility.finish(), )) } @@ -201,7 +201,7 @@ fn mark_chunk_inner( new_visibility.append(v); } let (columns, _) = data.into_parts(); - StreamChunk::new(ops, columns, Some(new_visibility.finish())) + StreamChunk::with_visibility(ops, columns, new_visibility.finish()) } fn mark_cdc_chunk_inner( @@ -246,10 +246,10 @@ fn mark_cdc_chunk_inner( } let (columns, _) = data.into_parts(); - Ok(StreamChunk::new( + Ok(StreamChunk::with_visibility( ops, columns, - Some(new_visibility.finish()), + new_visibility.finish(), )) } @@ -257,7 +257,7 @@ fn mark_cdc_chunk_inner( pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk { let (ops, columns, visibility) = chunk.into_inner(); let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect(); - StreamChunk::new(ops, mapped_columns, visibility.into_visibility()) + StreamChunk::with_visibility(ops, mapped_columns, visibility) } fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 890b1e4ac997d..898f78290bbb0 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -130,7 +130,7 @@ impl AppendOnlyDedupExecutor { if vis.count_ones() > 0 { // Construct the new chunk and write the data to state table. let (ops, columns, _) = chunk.into_inner(); - let chunk = StreamChunk::new(ops, columns, Some(vis)); + let chunk = StreamChunk::with_visibility(ops, columns, vis); self.state_table.write_chunk(chunk.clone()); commit_data = true; diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index fdf67eafcd5c5..9c0c931aaa022 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -666,7 +666,7 @@ impl Dispatcher for HashDataDispatcher { .iter() .copied() .zip_eq_fast(chunk.ops()) - .zip_eq_fast(chunk.vis().iter()) + .zip_eq_fast(chunk.visibility().iter()) { // Build visibility map for every output chunk. for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) { @@ -703,7 +703,7 @@ impl Dispatcher for HashDataDispatcher { let vis_map = vis_map.finish(); // columns is not changed in this function let new_stream_chunk = - StreamChunk::new(ops.clone(), chunk.columns().into(), Some(vis_map)); + StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map); if new_stream_chunk.cardinality() > 0 { event!( tracing::Level::TRACE, @@ -1273,7 +1273,7 @@ mod tests { }) .collect(); - let chunk = StreamChunk::new(ops, columns, None); + let chunk = StreamChunk::new(ops, columns); hash_dispatcher.dispatch_data(chunk).await.unwrap(); for (output_idx, output) in output_data_vecs.into_iter().enumerate() { @@ -1295,12 +1295,8 @@ mod tests { .for_each(|(real_col, expect_col)| { let real_vals = real_chunk .visibility() - .as_ref() - .unwrap() - .iter() - .enumerate() - .filter(|(_, vis)| *vis) - .map(|(row_idx, _)| real_col.as_int32().value_at(row_idx).unwrap()) + .iter_ones() + .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap()) .collect::>(); assert_eq!(real_vals.len(), expect_col.len()); assert_eq!(real_vals, *expect_col); diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 234ef6db29ab2..6d95b52d9e548 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -320,7 +320,8 @@ impl DynamicFilterExecutor 0 { - let new_chunk = StreamChunk::new(new_ops, columns, Some(new_visibility)); + let new_chunk = + StreamChunk::with_visibility(new_ops, columns, new_visibility); yield Message::Chunk(new_chunk) } } diff --git a/src/stream/src/executor/expand.rs b/src/stream/src/executor/expand.rs index a1263d34a46e4..1aed4e15c0463 100644 --- a/src/stream/src/executor/expand.rs +++ b/src/stream/src/executor/expand.rs @@ -67,7 +67,7 @@ impl ExpandExecutor { let (mut columns, vis) = input.data_chunk().keep_columns(subsets).into_parts(); columns.extend(input.columns().iter().cloned()); columns.push(flags); - let chunk = StreamChunk::new(input.ops(), columns, vis.into_visibility()); + let chunk = StreamChunk::with_visibility(input.ops(), columns, vis); yield Message::Chunk(chunk); } } diff --git a/src/stream/src/executor/filter.rs b/src/stream/src/executor/filter.rs index 2a2fca4fe827d..ccdf9729c48d3 100644 --- a/src/stream/src/executor/filter.rs +++ b/src/stream/src/executor/filter.rs @@ -15,7 +15,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk, Vis}; +use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::Schema; use risingwave_common::util::iter_util::ZipEqFast; @@ -72,10 +72,7 @@ impl FilterExecutor { let mut new_visibility = BitmapBuilder::with_capacity(n); let mut last_res = false; - assert!(match vis { - Vis::Compact(c) => c == n, - Vis::Bitmap(ref m) => m.len() == n, - }); + assert_eq!(vis.len(), n); let ArrayImpl::Bool(bool_array) = &*filter else { panic!("unmatched type: filter expr returns a non-null array"); @@ -127,7 +124,7 @@ impl FilterExecutor { let new_visibility = new_visibility.finish(); Ok(if new_visibility.count_ones() > 0 { - let new_chunk = StreamChunk::new(new_ops, columns, Some(new_visibility)); + let new_chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility); Some(new_chunk) } else { None diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 488c73f8768b0..a2814195ac9c6 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -254,15 +254,15 @@ impl HashAggExecutor { /// in one chunk. /// /// * `keys`: Hash Keys of rows. - /// * `base_visibility`: Visibility of rows, `None` means all are visible. - fn get_group_visibilities(keys: Vec, base_visibility: Option<&Bitmap>) -> Vec<(K, Bitmap)> { + /// * `base_visibility`: Visibility of rows. + fn get_group_visibilities(keys: Vec, base_visibility: &Bitmap) -> Vec<(K, Bitmap)> { let n_rows = keys.len(); let mut vis_builders = HashMap::new(); - for (row_idx, key) in keys.into_iter().enumerate().filter(|(row_idx, _)| { - base_visibility - .map(|vis| vis.is_set(*row_idx)) - .unwrap_or(true) - }) { + for (row_idx, key) in keys + .into_iter() + .enumerate() + .filter(|(row_idx, _)| base_visibility.is_set(*row_idx)) + { vis_builders .entry(key) .or_insert_with(|| BitmapBuilder::zeroed(n_rows)) diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index aa1840aa832ce..8298ad36cbaa2 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -17,7 +17,7 @@ use std::num::NonZeroUsize; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{DataChunk, Op, Vis}; +use risingwave_common::array::{DataChunk, Op}; use risingwave_common::types::Interval; use risingwave_expr::expr::BoxedExpression; use risingwave_expr::ExprError; @@ -142,7 +142,7 @@ impl HopWindowExecutor { let chunk = chunk.compact(); let (data_chunk, ops) = chunk.into_parts(); // SAFETY: Already compacted. - assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + assert!(data_chunk.is_compacted()); let len = data_chunk.cardinality(); // Collect each window's data into a chunk. diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 7a6c6f53301ca..cee9b74b67b6b 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -187,7 +187,6 @@ async fn test_merger_sum_aggr() { let chunk = StreamChunk::new( vec![op; i], vec![I64Array::from_iter(vec![1; i]).into_ref()], - None, ); input.send(Message::Chunk(chunk)).await.unwrap(); } diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index e317509555839..4df4ae4d3e302 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -467,7 +467,7 @@ mod tests { fn build_test_chunk(epoch: u64) -> StreamChunk { // The number of items in `ops` is the epoch count. let ops = vec![Op::Insert; epoch as usize]; - StreamChunk::new(ops, vec![], None) + StreamChunk::new(ops, vec![]) } #[tokio::test] diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 0c9a981d18108..caabe213e146c 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -22,7 +22,7 @@ use enum_as_inner::EnumAsInner; use futures::{stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{izip, Itertools}; -use risingwave_common::array::{Op, StreamChunk, Vis}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Schema, TableId}; use risingwave_common::estimate_size::EstimateSize; @@ -281,7 +281,7 @@ fn generate_output( } if let Some(new_data_chunk) = data_chunk_builder.consume_all() { - let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec(), None); + let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec()); Ok(Some(new_stream_chunk)) } else { Ok(None) @@ -329,26 +329,12 @@ impl MaterializeBuffer { let (_, vis) = key_chunk.into_parts(); let mut buffer = MaterializeBuffer::new(); - match vis { - Vis::Bitmap(vis) => { - for ((op, key, value), vis) in - izip!(ops.iter(), pks, values).zip_eq_debug(vis.iter()) - { - if vis { - match op { - Op::Insert | Op::UpdateInsert => buffer.insert(key, value), - Op::Delete | Op::UpdateDelete => buffer.delete(key, value), - }; - } - } - } - Vis::Compact(_) => { - for (op, key, value) in izip!(ops.iter(), pks, values) { - match op { - Op::Insert | Op::UpdateInsert => buffer.insert(key, value), - Op::Delete | Op::UpdateDelete => buffer.delete(key, value), - }; - } + for ((op, key, value), vis) in izip!(ops.iter(), pks, values).zip_eq_debug(vis.iter()) { + if vis { + match op { + Op::Insert | Op::UpdateInsert => buffer.insert(key, value), + Op::Delete | Op::UpdateDelete => buffer.delete(key, value), + }; } } buffer diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 22553641369c1..d483b0c345c0f 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -352,11 +352,7 @@ impl EowcOverWindowExecutor { let columns: Vec = builders.into_iter().map(|b| b.finish().into()).collect(); let chunk_size = columns[0].len(); Ok(if chunk_size > 0 { - Some(StreamChunk::new( - vec![Op::Insert; chunk_size], - columns, - None, - )) + Some(StreamChunk::new(vec![Op::Insert; chunk_size], columns)) } else { None }) diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 04d6eb0909c11..4da00e7c9d94c 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -146,8 +146,7 @@ impl Inner { projected_columns.push(evaluated_expr); } let (_, vis) = data_chunk.into_parts(); - let vis = vis.into_visibility(); - let new_chunk = StreamChunk::new(ops, projected_columns, vis); + let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis); Ok(Some(new_chunk)) } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index c8502c770cea7..88a11f03c663b 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -107,7 +107,7 @@ impl RowIdGenExecutor { let (ops, mut columns, bitmap) = chunk.into_inner(); columns[self.row_id_index] = self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops); - yield Message::Chunk(StreamChunk::new(ops, columns, bitmap.into_visibility())); + yield Message::Chunk(StreamChunk::with_visibility(ops, columns, bitmap)); } Message::Barrier(barrier) => { // Update row id generator if vnode mapping changes. diff --git a/src/stream/src/executor/stateless_simple_agg.rs b/src/stream/src/executor/stateless_simple_agg.rs index 150530e4ebb77..0f0411b5e277b 100644 --- a/src/stream/src/executor/stateless_simple_agg.rs +++ b/src/stream/src/executor/stateless_simple_agg.rs @@ -112,7 +112,7 @@ impl StatelessSimpleAggExecutor { .try_collect()?; let ops = vec![Op::Insert; 1]; - yield Message::Chunk(StreamChunk::new(ops, columns, None)); + yield Message::Chunk(StreamChunk::new(ops, columns)); } yield m; diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 198338d523097..cd235e9a26e00 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -164,7 +164,7 @@ pub fn generate_output( } // since `new_rows` is not empty, we unwrap directly let new_data_chunk = data_chunk_builder.consume_all().unwrap(); - let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec(), None); + let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec()); Ok(new_stream_chunk) } else { let columns = schema @@ -172,7 +172,7 @@ pub fn generate_output( .into_iter() .map(|x| x.finish().into()) .collect_vec(); - Ok(StreamChunk::new(vec![], columns, None)) + Ok(StreamChunk::new(vec![], columns)) } }