Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): compact stream chunk by dropping useless rows in sink #11070

Merged
merged 55 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ad24c38
compact stream chunk
xx01cyx Jul 19, 2023
132fb6b
Update src/stream/src/common/compact_chunk.rs
xx01cyx Jul 19, 2023
8b3ec22
Merge remote-tracking branch 'origin/main' into cyx/compact-stream-chunk
xx01cyx Jul 20, 2023
35e12a0
restore update in kafka debezium
xx01cyx Jul 20, 2023
54a5ad5
Merge branch 'cyx/compact-stream-chunk' of https://github.com/risingw…
xx01cyx Jul 20, 2023
2bdfe48
risedev c
xx01cyx Jul 20, 2023
573524b
use stream key as merge key
xx01cyx Jul 26, 2023
51bb6f3
update sink and project executors
xx01cyx Jul 26, 2023
bd68db6
fix project hash
xx01cyx Jul 26, 2023
d34154b
fix
xx01cyx Jul 31, 2023
7e7cfc9
fix project ut
xx01cyx Jul 31, 2023
2346f49
add hint in project
xx01cyx Jul 31, 2023
4b978f0
rename project methods
xx01cyx Jul 31, 2023
935eb68
clippy
xx01cyx Jul 31, 2023
5f7c7e2
fix sink test
xx01cyx Jul 31, 2023
51f2c8d
use prehashed map
xx01cyx Jul 31, 2023
e46c84b
merge main and resolve conflicts
xx01cyx Aug 1, 2023
7c3a330
Merge branch 'main' into cyx/compact-stream-chunk
st1page Aug 14, 2023
890f56f
conflick
st1page Aug 14, 2023
fa27338
add VisMut
st1page Aug 14, 2023
d87c0af
add VisMut
st1page Aug 14, 2023
4a1ff05
fix
st1page Aug 15, 2023
ce12bef
add tests
st1page Aug 15, 2023
00dbcd8
fix
st1page Aug 15, 2023
37242eb
fix
st1page Aug 15, 2023
9df8fb7
add test
st1page Aug 15, 2023
d6c4847
Merge branch 'sts/add_vis_mut' into cyx/compact-stream-chunk
st1page Aug 15, 2023
2311ca0
Merge remote-tracking branch 'origin/main' into cyx/compact-stream-chunk
st1page Aug 16, 2023
3e84a5e
Merge branch 'main' into cyx/compact-stream-chunk
st1page Aug 25, 2023
3409dbd
Merge remote-tracking branch 'origin/main' into cyx/compact-stream-chunk
st1page Aug 25, 2023
942fc9f
Merge branch 'main' into cyx/compact-stream-chunk
st1page Aug 25, 2023
f31eb81
Merge branch 'cyx/compact-stream-chunk' of github.com:risingwavelabs/…
st1page Sep 4, 2023
9877599
try to impl
st1page Sep 4, 2023
873b134
implement
st1page Sep 5, 2023
54744f0
clippy fix
st1page Sep 5, 2023
e77d89b
add compact
st1page Sep 6, 2023
3783d4b
Merge branch 'main' into cyx/compact-stream-chunk
st1page Sep 6, 2023
c15c3ea
handle conflict
st1page Sep 6, 2023
b571b69
conflict
st1page Sep 6, 2023
b432b29
add test
st1page Sep 7, 2023
0ae211a
revert project logic
st1page Sep 7, 2023
06f09a4
fix test
st1page Sep 7, 2023
b457332
fix
st1page Sep 7, 2023
b915d00
fix
st1page Sep 7, 2023
64c0fa1
Update src/common/src/row/project.rs
st1page Sep 11, 2023
4e38d52
add assert
st1page Sep 11, 2023
94f43b5
add back
st1page Sep 11, 2023
4bd6956
maintain ui ud
st1page Sep 12, 2023
57f3317
fix clippy
st1page Sep 12, 2023
05ebe68
Update src/common/src/array/compact_chunk.rs
st1page Sep 12, 2023
3fcf00c
change interface
st1page Sep 12, 2023
7cc30ac
Merge branch 'cyx/compact-stream-chunk' of github.com:risingwavelabs/…
st1page Sep 12, 2023
27d3bc5
fix sink
st1page Sep 13, 2023
77d2cad
try fix
st1page Sep 13, 2023
7ef5945
fix
st1page Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ fi
# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after update is not as expected."
Expand All @@ -81,15 +81,15 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"
# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 13 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 15 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after delete is not as expected."
Expand Down
24 changes: 13 additions & 11 deletions e2e_test/sink/kafka/debezium2.result

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions e2e_test/sink/kafka/debezium3.result

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions e2e_test/sink/kafka/upsert2.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{"id":10} {"id":10,"v_bigint":20674,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_smallint":26951,"v_timestamp":1681404058888,"v_varchar":"0oVqRIHqkb"}
{"id":1} null
{"id":1} {"id":1,"v_bigint":0,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_smallint":0,"v_timestamp":0,"v_varchar":""}
{"id":1} {"id":1,"v_bigint":1872,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_smallint":31031,"v_timestamp":1681453634104,"v_varchar":"8DfUFencLe"}
{"id":2} {"id":2,"v_bigint":4598,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_smallint":22690,"v_timestamp":1681429444869,"v_varchar":"sIo1XXVeHZ"}
Expand Down
1 change: 1 addition & 0 deletions e2e_test/sink/kafka/upsert3.result
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{"id":10} {"id":10,"v_bigint":20674,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_smallint":26951,"v_timestamp":1681404058888,"v_varchar":"0oVqRIHqkb"}
{"id":1} null
{"id":1} null
{"id":1} {"id":1,"v_bigint":0,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_smallint":0,"v_timestamp":0,"v_varchar":""}
{"id":1} {"id":1,"v_bigint":1872,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_smallint":31031,"v_timestamp":1681453634104,"v_varchar":"8DfUFencLe"}
{"id":2} {"id":2,"v_bigint":4598,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_smallint":22690,"v_timestamp":1681429444869,"v_varchar":"sIo1XXVeHZ"}
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ postgres-types = { version = "0.2.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
] }
prehash = "1"
prometheus = { version = "0.13" }
prost = "0.11"
rand = "0.8"
Expand Down
219 changes: 219 additions & 0 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use std::mem;

use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;

/// Compact the stream chunks with just modify the `Ops` and `Vis` of the chunk. Currently, two
/// transformation will be applied
/// - remove intermediate operation of the same key. The operations of the same stream key will only
/// have three kind of patterns Insert, Delete or Update.
/// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
/// removed.
pub struct StreamChunkCompactor {
st1page marked this conversation as resolved.
Show resolved Hide resolved
chunks: Vec<StreamChunk>,
stream_key: Vec<usize>,
}

struct OpRowMutRefTuple<'a> {
previous: Option<OpRowMutRef<'a>>,
latest: OpRowMutRef<'a>,
}

impl<'a> OpRowMutRefTuple<'a> {
/// return true if no row left
fn push(&mut self, mut op_row: OpRowMutRef<'a>) -> bool {
debug_assert!(self.latest.vis());
match (self.latest.op(), op_row.op()) {
(Op::Insert, Op::Insert) => panic!("receive duplicated insert on the stream"),
(Op::Delete, Op::Delete) => panic!("receive duplicated delete on the stream"),
(Op::Insert, Op::Delete) => {
self.latest.set_vis(false);
op_row.set_vis(false);
self.latest = if let Some(prev) = self.previous.take() {
prev
} else {
return true;
}
}
(Op::Delete, Op::Insert) => {
// The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has
// been filtered.
debug_assert!(self.previous.is_none());
self.previous = Some(mem::replace(&mut self.latest, op_row));
}
// `all the updateDelete` and `updateInsert` should be normalized to `delete`
// and`insert`
_ => unreachable!(),
};
false
}

fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
self.previous.as_mut().map(|prev| {
debug_assert_eq!(prev.op(), Op::Delete);
debug_assert_eq!(self.latest.op(), Op::Insert);
(prev, &mut self.latest)
})
}
}

type OpRowMap<'a, 'b> =
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;

impl StreamChunkCompactor {
pub fn new(stream_key: Vec<usize>) -> Self {
Self {
stream_key,
chunks: vec![],
}
}

pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
(self.chunks, self.stream_key)
}

pub fn push_chunk(&mut self, c: StreamChunk) {
self.chunks.push(c);
}

/// Compact a chunk by modifying the ops and the visibility of a stream chunk. All UPDATE INSERT
/// and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to
st1page marked this conversation as resolved.
Show resolved Hide resolved
/// certain rules (see `merge_insert` and `merge_delete` for more details).
pub fn into_compacted_chunks(self) -> impl Iterator<Item = StreamChunk> {
let (chunks, key_indices) = self.into_inner();

let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
let mut chunks: Vec<(Vec<u64>, StreamChunkMut)> = chunks
.into_iter()
.map(|c| {
let hash_values = c
.data_chunk()
.get_hash_values(&key_indices, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.value())
.collect_vec();
(hash_values, StreamChunkMut::from(c))
})
.collect_vec();

let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
for (hash_values, c) in &mut chunks {
for (row, mut op_row) in c.to_rows_mut() {
if !op_row.vis() {
continue;
}
op_row.set_op(op_row.op().normalize_update());
let hash = hash_values[row.index()];
let stream_key = row.project(&key_indices);
match op_row_map.entry(Prehashed::new(stream_key, hash)) {
Entry::Vacant(v) => {
v.insert(OpRowMutRefTuple {
previous: None,
latest: op_row,
});
}
Entry::Occupied(mut o) => {
if o.get_mut().push(op_row) {
o.remove_entry();
}
}
}
}
}
for tuple in op_row_map.values_mut() {
if let Some((prev, latest)) = tuple.as_update_op() {
if prev.row_ref() == latest.row_ref() {
prev.set_vis(false);
latest.set_vis(false);
} else if prev.same_chunk(latest) && prev.index() + 1 == latest.index() {
// TODO(st1page): use next_one check in bitmap
prev.set_op(Op::UpdateDelete);
latest.set_op(Op::UpdateInsert);
}
}
}
Copy link
Member

@fuyufjh fuyufjh Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent about 5 minutes to understand the logic here 🤣 It takes a very clever way to in-place update the visibility array instead of rebuilding new chunks.

However, I think the motivation behind this design is because the compaction happens on Project and it cost dramatic performance loss, while due to my last comment, I oppose it. Instead, let's focus on the original user requirement and do it on Sink only.

If so, considering the disordering problem of Sink, we can solve both problems with a single approach (as I have proposed in Slack)

  • If sink_key != stream_key
    1. Shuffle the sink_key to stream_key
    2. Before or on SinkExecutor, do compaction for every sink_key
      • Collect all op & rows
      • Make them offset with each other
      • Output one of the 3 forms: -, +, U- U+

For example:

// sink_key = [a], stream_key = [a, b]
+(1,3)
+(1,2)
-(1,2)
-(1,1)
// Collect all op & rows
1 -> (+3, +2, -2, -1)
// Make them offset with each other 
1 -> (+3, , , -1)
// Output one of the 3 forms: `-`, `+`, `U- U+`
U-(1,3)
U+(1,1)

In this way, problem can be solved by building one hash table. Because this only happen on Sink, rebuilding Chunk is acceptable to me.

Besides, @st1page is considering that this clever implementation can be reused. But I didn't see any requirements for reusing it 🥹

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least currently, we have two different use cases here

  1. When sink's pk is different from the stream key
  2. To compact the useless update value with the same values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. To compact the useless update value with the same values.

Yeah, this makes sense, but

  • I do think it might be dangerous to reorder events in the process of streaming. After all, no operator does this now. But sorry I can't come up with a good example now.
  • I hope to preserve the U-/U+ rows. There are still lots of operators can benefit more or less from this. Reordering all deletes before inserts will completely break U-/U+.

Alternatively, we can partially achieve this while preserving event orders - the solution is trivial: just hide the update with same before & after values.

For example, set vis = false for these:

U- 1 2 'foo'
U+ 1 2 'foo'

but not these:

- 1 2 'foo'
... optionally, some other rows ...
+ 1 2 'foo'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this PR does not reorder any events 😇

Copy link
Member

@fuyufjh fuyufjh Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because it's now only used in SinkExecutor, not Project. I said so because you mentioned it can be reused (but not yet now).

At least currently, we have two different use cases here

  1. When sink's pk is different from the stream key
  2. To compact the useless update value with the same values.

chunks.into_iter().map(|(_, c)| c.into())
}
}

pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(stream_chunk);
compactor.into_compacted_chunks().next().unwrap()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::array::StreamChunk;
use crate::test_prelude::StreamChunkTestExt;

#[test]
fn test_merge_chunk_row() {
let pk_indices = [0, 1];
let mut compactor = StreamChunkCompactor::new(pk_indices.to_vec());
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 1 1 1
+ 1 1 2
+ 2 5 7
+ 4 9 2
- 2 5 7
+ 2 5 5
- 6 6 9
+ 6 6 9
- 9 9 1",
));
compactor.push_chunk(StreamChunk::from_pretty(
" I I I
- 6 6 9
+ 9 9 9
- 9 9 4
+ 2 2 2
+ 9 9 1",
));
let mut iter = compactor.into_compacted_chunks();
assert_eq!(
iter.next().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
U- 1 1 1
U+ 1 1 2
+ 4 9 2
+ 2 5 5
- 6 6 9",
)
);
assert_eq!(
iter.next().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 2 2 2",
)
);

assert_eq!(iter.next(), None);
}
}
Loading