Skip to content

Commit

Permalink
feat: eliminate adjacent noop update in dispatcher when some columns …
Browse files Browse the repository at this point in the history
…are pruned (#14652)
  • Loading branch information
st1page authored Jan 18, 2024
1 parent 186703d commit e1df65f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
69 changes: 69 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,33 @@ impl StreamChunk {
}
}

/// Remove the adjacent delete-insert if their row value are the same.
pub fn eliminate_adjacent_noop_update(self) -> Self {
let len = self.data_chunk().capacity();
let mut c: StreamChunkMut = self.into();
let mut prev_r = None;
for curr in 0..len {
if !c.vis(curr) {
continue;
}
if let Some(prev) = prev_r {
if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
&& matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
&& c.row_ref(prev) == c.row_ref(curr)
{
c.set_vis(prev, false);
c.set_vis(curr, false);
prev_r = None;
} else {
prev_r = Some(curr)
}
} else {
prev_r = Some(curr);
}
}
c.into()
}

/// Reorder columns and set visibility.
pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
Self {
Expand Down Expand Up @@ -492,6 +519,18 @@ impl OpRowMutRef<'_> {
}

impl StreamChunkMut {
pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}

pub fn op(&self, i: usize) -> Op {
self.ops.get(i)
}

pub fn row_ref(&self, i: usize) -> RowRef<'_> {
RowRef::with_columns(self.columns(), i)
}

pub fn set_vis(&mut self, n: usize, val: bool) {
self.vis.set(n, val);
}
Expand Down Expand Up @@ -788,6 +827,36 @@ mod tests {
"\
+---+---+---+
| - | 2 | |
+---+---+---+"
);
}

#[test]
fn test_eliminate_adjacent_noop_update() {
let c = StreamChunk::from_pretty(
" I I
- 1 6 D
- 2 2
+ 2 3
- 2 3
+ 1 6
- 1 7
+ 1 10 D
+ 1 7
U- 3 7
U+ 3 7
+ 2 3",
);
let c = c.eliminate_adjacent_noop_update();
assert_eq!(
c.to_pretty().to_string(),
"\
+---+---+---+
| - | 2 | 2 |
| + | 2 | 3 |
| - | 2 | 3 |
| + | 1 | 6 |
| + | 2 | 3 |
+---+---+---+"
);
}
Expand Down
33 changes: 29 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,14 @@ impl RoundRobinDataDispatcher {

impl Dispatcher for RoundRobinDataDispatcher {
async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
let chunk = chunk.project(&self.output_indices);
let chunk = if self.output_indices.len() < chunk.columns().len() {
chunk
.project(&self.output_indices)
.eliminate_adjacent_noop_update()
} else {
chunk.project(&self.output_indices)
};

self.outputs[self.cur].send(Message::Chunk(chunk)).await?;
self.cur += 1;
self.cur %= self.outputs.len();
Expand Down Expand Up @@ -739,7 +746,13 @@ impl Dispatcher for HashDataDispatcher {
let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());

// Apply output indices after calculating the vnode.
let chunk = chunk.project(&self.output_indices);
let chunk = if self.output_indices.len() < chunk.columns().len() {
chunk
.project(&self.output_indices)
.eliminate_adjacent_noop_update()
} else {
chunk.project(&self.output_indices)
};

for ((vnode, &op), visible) in vnodes
.iter()
Expand Down Expand Up @@ -858,7 +871,13 @@ impl BroadcastDispatcher {

impl Dispatcher for BroadcastDispatcher {
async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
let chunk = chunk.project(&self.output_indices);
let chunk = if self.output_indices.len() < chunk.columns().len() {
chunk
.project(&self.output_indices)
.eliminate_adjacent_noop_update()
} else {
chunk.project(&self.output_indices)
};
broadcast_concurrent(self.outputs.values_mut(), Message::Chunk(chunk)).await
}

Expand Down Expand Up @@ -956,7 +975,13 @@ impl Dispatcher for SimpleDispatcher {
.exactly_one()
.expect("expect exactly one output");

let chunk = chunk.project(&self.output_indices);
let chunk = if self.output_indices.len() < chunk.columns().len() {
chunk
.project(&self.output_indices)
.eliminate_adjacent_noop_update()
} else {
chunk.project(&self.output_indices)
};
output.send(Message::Chunk(chunk)).await
}

Expand Down

0 comments on commit e1df65f

Please sign in to comment.