Skip to content

Commit

Permalink
refactor(common): remove Vis (#12379)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Sep 19, 2023
1 parent ff50ccb commit 44b8ae8
Show file tree
Hide file tree
Showing 64 changed files with 396 additions and 1,071 deletions.
4 changes: 2 additions & 2 deletions src/batch/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl AggregateFunction for Distinct {
let state = state.downcast_mut::<State>();

let mut bitmap_builder = BitmapBuilder::with_capacity(input.capacity());
bitmap_builder.append_bitmap(&input.data_chunk().vis().to_bitmap());
bitmap_builder.append_bitmap(input.data_chunk().visibility());
for row_id in range.clone() {
let (row_ref, vis) = input.data_chunk().row_at(row_id);
let row = row_ref.to_owned_row();
Expand All @@ -94,7 +94,7 @@ impl AggregateFunction for Distinct {
}
bitmap_builder.set(row_id, b);
}
let input = input.with_visibility(bitmap_builder.finish().into());
let input = input.clone_with_vis(bitmap_builder.finish());
self.inner
.update_range(&mut state.inner, &input, range)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/aggregation/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl AggregateFunction for Filter {
.as_bool()
.to_bitmap();
let mut input1 = input.clone();
input1.set_vis(input.vis() & &bitmap);
input1.set_visibility(input.visibility() & &bitmap);
self.inner.update_range(state, &input1, range).await
}

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::num::NonZeroUsize;

use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Vis};
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Interval};
Expand Down Expand Up @@ -173,7 +173,7 @@ impl HopWindowExecutor {
#[for_await]
for data_chunk in child.execute() {
let data_chunk = data_chunk?;
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
assert!(data_chunk.is_compacted());
let len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Expand Down
3 changes: 1 addition & 2 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ impl InsertExecutor {
columns.insert(row_id_index, Arc::new(row_id_col.into()))
}

let stream_chunk =
StreamChunk::new(vec![Op::Insert; cap], columns, vis.into_visibility());
let stream_chunk = StreamChunk::with_visibility(vec![Op::Insert; cap], columns, vis);

#[cfg(debug_assertions)]
table_dml_handle.check_chunk_schema(&stream_chunk);
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,8 +1905,8 @@ mod tests {
}

fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) -> bool {
assert!(left.visibility().is_none());
assert!(right.visibility().is_none());
assert!(left.is_compacted());
assert!(right.is_compacted());

if left.cardinality() != right.cardinality() {
return false;
Expand Down
27 changes: 11 additions & 16 deletions src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use itertools::Itertools;
pub use local_lookup_join::*;
pub use lookup_join_base::*;
pub use nested_loop_join::*;
use risingwave_common::array::{DataChunk, RowRef, Vis};
use risingwave_common::array::{DataChunk, RowRef};
use risingwave_common::error::Result;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef};
Expand Down Expand Up @@ -124,10 +124,10 @@ fn concatenate(left: &DataChunk, right: &DataChunk) -> Result<DataChunk> {
concated_columns.extend_from_slice(left.columns());
concated_columns.extend_from_slice(right.columns());
// Only handle one side is constant row chunk: One of visibility must be None.
let vis = match (left.vis(), right.vis()) {
(Vis::Compact(_), _) => right.vis().clone(),
(_, Vis::Compact(_)) => left.vis().clone(),
(Vis::Bitmap(_), Vis::Bitmap(_)) => {
let vis = match (left.is_compacted(), right.is_compacted()) {
(true, _) => right.visibility().clone(),
(_, true) => left.visibility().clone(),
(false, false) => {
return Err(BatchError::UnsupportedFunction(
"The concatenate behaviour of two chunk with visibility is undefined".to_string(),
)
Expand Down Expand Up @@ -176,7 +176,8 @@ fn convert_row_to_chunk(
#[cfg(test)]
mod tests {

use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder, Vis};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, PrimitiveArrayBuilder};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl};
Expand All @@ -196,20 +197,14 @@ mod tests {
let arr = builder.finish();
columns.push(arr.into_ref())
}
let chunk1: DataChunk = DataChunk::new(columns.clone(), length);
let bool_vec = vec![true, false, true, false, false];
let chunk2: DataChunk = DataChunk::new(
columns.clone(),
Vis::Bitmap((bool_vec.clone()).into_iter().collect()),
);
let chunk1 = DataChunk::new(columns.clone(), length);
let visibility = Bitmap::from_bool_slice(&[true, false, true, false, false]);
let chunk2 = DataChunk::new(columns.clone(), visibility.clone());
let chunk = concatenate(&chunk1, &chunk2).unwrap();
assert_eq!(chunk.capacity(), chunk1.capacity());
assert_eq!(chunk.capacity(), chunk2.capacity());
assert_eq!(chunk.columns().len(), chunk1.columns().len() * 2);
assert_eq!(
chunk.visibility().cloned().unwrap(),
(bool_vec).into_iter().collect()
);
assert_eq!(chunk.visibility(), &visibility);
}

/// Test the function of convert row into constant row chunk (one row repeat multiple times).
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl NestedLoopJoinExecutor {
.await?;
if chunk.cardinality() > 0 {
// chunk.visibility() must be Some(_)
matched = &matched | chunk.visibility().unwrap();
matched = &matched | chunk.visibility();
for spilled in chunk_builder.append_chunk(chunk) {
yield spilled
}
Expand Down Expand Up @@ -433,7 +433,7 @@ impl NestedLoopJoinExecutor {
.await?;
if chunk.cardinality() > 0 {
// chunk.visibility() must be Some(_)
matched = &matched | chunk.visibility().unwrap();
matched = &matched | chunk.visibility();
}
}
if ANTI_JOIN {
Expand Down Expand Up @@ -475,7 +475,7 @@ impl NestedLoopJoinExecutor {
.await?;
if chunk.cardinality() > 0 {
left_matched.set(left_row_idx, true);
right_matched = &right_matched | chunk.visibility().unwrap();
right_matched = &right_matched | chunk.visibility();
for spilled in chunk_builder.append_chunk(chunk) {
yield spilled
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl LimitExecutor {
}
// process chunk
let mut new_vis;
if let Some(old_vis) = data_chunk.visibility() {
new_vis = old_vis.iter().collect_vec();
if !data_chunk.is_compacted() {
new_vis = data_chunk.visibility().iter().collect_vec();
for vis in new_vis.iter_mut().filter(|x| **x) {
if skipped < self.offset {
skipped += 1;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl SourceExecutor {

fn covert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
// chunk read from source must be compact
assert!(chunk.data_chunk().visibility().is_none());
assert!(chunk.data_chunk().is_compacted());

if chunk.ops().iter().any(|op| *op != Op::Insert) {
return Err(RwError::from(BatchError::Internal(anyhow!(
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ pub async fn diff_executor_output(actual: BoxedExecutor, expect: BoxedExecutor)
}

fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) {
assert!(left.visibility().is_none());
assert!(right.visibility().is_none());
assert!(left.is_compacted());
assert!(right.is_compacted());

assert_eq!(
left.cardinality(),
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl UpdateExecutor {
columns.push(column);
}

DataChunk::new(columns, data_chunk.vis().clone())
DataChunk::new(columns, data_chunk.visibility().clone())
};

if self.returning {
Expand Down
8 changes: 1 addition & 7 deletions src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::ops::BitAnd;
use std::option::Option;
use std::sync::Arc;

Expand Down Expand Up @@ -89,12 +88,7 @@ fn generate_new_data_chunks(
});
let mut res = Vec::with_capacity(output_count);
for (sink_id, vis_map_vec) in vis_maps.into_iter().enumerate() {
let vis_map: Bitmap = vis_map_vec.into_iter().collect();
let vis_map = if let Some(visibility) = chunk.visibility() {
vis_map.bitand(visibility)
} else {
vis_map
};
let vis_map = Bitmap::from_bool_slice(&vis_map_vec) & chunk.visibility();
let new_data_chunk = chunk.with_visibility(vis_map);
trace!(
"send to sink:{}, cardinality:{}",
Expand Down
8 changes: 1 addition & 7 deletions src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::ops::BitAnd;
use std::option::Option;
use std::sync::Arc;

Expand Down Expand Up @@ -86,12 +85,7 @@ fn generate_new_data_chunks(
});
let mut res = Vec::with_capacity(output_count);
for (sink_id, vis_map_vec) in vis_maps.into_iter().enumerate() {
let vis_map: Bitmap = vis_map_vec.into_iter().collect();
let vis_map = if let Some(visibility) = chunk.visibility() {
vis_map.bitand(visibility)
} else {
vis_map
};
let vis_map = Bitmap::from_bool_slice(&vis_map_vec) & chunk.visibility();
let new_data_chunk = chunk.with_visibility(vis_map);
trace!(
"send to sink:{}, cardinality:{}",
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;

/// Compact the stream chunks with just modify the `Ops` and `Vis` of the chunk. Currently, two
/// Compact the stream chunks with just modify the `Ops` and visibility of the chunk. Currently, two
/// transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
Expand Down
Loading

0 comments on commit 44b8ae8

Please sign in to comment.