Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): handle stream key sink pk mismatch #12458

Merged
merged 9 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/stream/src/common/log_store_impl/kv_log_store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ impl<LS: LocalStateStore> LogWriter for KvLogStoreWriter<LS> {
}

async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
assert!(chunk.cardinality() > 0);
if chunk.cardinality() == 0 {
return Ok(());
}
let epoch = self.state_store.epoch();
let start_seq_id = self.seq_id;
self.seq_id += chunk.cardinality() as SeqIdType;
Expand Down
304 changes: 274 additions & 30 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -20,7 +21,7 @@ use futures::{FutureExt, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk};
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::dispatch_sink;
Expand Down Expand Up @@ -65,6 +66,19 @@ fn force_append_only(c: StreamChunk) -> StreamChunk {
c.into()
}

// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
fn force_delete_only(c: StreamChunk) -> StreamChunk {
let mut c: StreamChunkMut = c.into();
for (_, mut r) in c.to_rows_mut() {
match r.op() {
Op::Delete => {}
Op::Insert | Op::UpdateInsert => r.set_vis(false),
Op::UpdateDelete => r.set_op(Op::Delete),
}
}
c.into()
}

impl<F: LogStoreFactory> SinkExecutor<F> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
Expand Down Expand Up @@ -101,12 +115,21 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
}

fn execute_inner(self) -> BoxedMessageStream {
let stream_key = self.pk_indices;

let stream_key_sink_pk_mismatch = {
stream_key
.iter()
.any(|i| !self.sink_param.downstream_pk.contains(i))
};

let write_log_stream = Self::execute_write_log(
self.input,
self.pk_indices,
stream_key,
self.log_writer,
self.sink_param.sink_type,
self.actor_context,
stream_key_sink_pk_mismatch,
);

dispatch_sink!(self.sink, sink, {
Expand All @@ -127,6 +150,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
mut log_writer: impl LogWriter,
sink_type: SinkType,
actor_context: ActorContextRef,
stream_key_sink_pk_mismatch: bool,
) {
let mut input = input.execute();

Expand All @@ -141,36 +165,115 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// Propagate the first barrier
yield Message::Barrier(barrier);

#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a

// original:
// (1,1) -> (1,2)
// (1,2) -> (1,3)

// mv fragment 1:
// delete (1,1)

// mv fragment 2:
// insert (1,2)
// delete (1,2)

// mv fragment 3:
// insert (1,3)

// merge to sink fragment:
// insert (1,3)
// insert (1,2)
// delete (1,2)
// delete (1,1)
// So we do additional compaction in the sink executor per barrier.

// 1. compact all the chanes with the stream key.
// 2. sink all the delete events and then sink all insert evernt.

// after compacting with the stream key, the two event with the same used defined sink pk must have different stream key.
// So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
if stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly {
let mut chunk_buffer = StreamChunkCompactor::new(stream_key.clone());
let mut watermark = None;
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => watermark = Some(w),
Message::Chunk(c) => {
chunk_buffer.push_chunk(c);
}
Message::Barrier(barrier) => {
let mut delete_chunks = vec![];
let mut insert_chunks = vec![];
for c in mem::replace(
&mut chunk_buffer,
StreamChunkCompactor::new(stream_key.clone()),
)
.into_compacted_chunks()
{
if sink_type != SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
delete_chunks.push(force_delete_only(c.clone()));
}
insert_chunks.push(force_append_only(c));
}

for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) {
log_writer.write_chunk(c.clone()).await?;
yield Message::Chunk(c);
}
if let Some(w) = mem::take(&mut watermark) {
yield Message::Watermark(w)
}
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) {
log_writer.update_vnode_bitmap(vnode_bitmap);
}
} else {
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
yield Message::Barrier(barrier);
}
}
}
Expand Down Expand Up @@ -352,6 +455,147 @@ mod test {
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn stream_key_sink_pk_mismatch() {
use risingwave_common::array::stream_chunk::StreamChunk;
use risingwave_common::array::StreamChunkTestExt;
use risingwave_common::types::DataType;

use crate::executor::Barrier;

let properties = maplit::hashmap! {
"connector".into() => "blackhole".into(),
};

// We have two visible columns and one hidden column. The hidden column will be pruned out
// within the sink executor.
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
is_hidden: true,
},
];
let schema: Schema = columns
.iter()
.map(|column| Field::from(column.column_desc.clone()))
.collect();

let mock = MockSource::with_messages(
schema,
vec![0, 1],
vec![
Message::Barrier(Barrier::new_test_barrier(1)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(2)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 3 30",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 2 20
- 1 2 20",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
- 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(3)),
],
);

let sink_param = SinkParam {
sink_id: 0.into(),
properties,
columns: columns
.iter()
.filter(|col| !col.is_hidden)
.map(|col| col.column_desc.clone())
.collect(),
downstream_pk: vec![0],
sink_type: SinkType::Upsert,
db_name: "test".into(),
sink_from_name: "test".into(),
};

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::default(),
sink_param,
columns.clone(),
ActorContext::create(0),
BoundedInMemLogStoreFactory::new(1),
vec![0, 1],
)
.await
.unwrap();

let mut executor = SinkExecutor::execute(Box::new(sink_executor));

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 1 10",
)
);

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
- 1 1 10",
)
);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 3 30",
)
);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

// Should not receive the third stream chunk message because the force-append-only sink
// executor will drop all DELETE messages.

// The last barrier message.
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn test_empty_barrier_sink() {
use risingwave_common::types::DataType;
Expand Down
1 change: 1 addition & 0 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async fn test_sink_basic() -> Result<()> {
sleep(Duration::from_millis(10)).await;
}
}
sleep(Duration::from_millis(10000)).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.c. @wenym1


assert_eq!(6, parallelism_counter.load(Relaxed));
assert_eq!(count, row_counter.load(Relaxed));
Expand Down
Loading