Skip to content

Commit

Permalink
feat(sink): handle stream key sink pk mismatch (#12458)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Sep 27, 2023
1 parent 590faea commit 4ca3a9a
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 31 deletions.
4 changes: 3 additions & 1 deletion src/stream/src/common/log_store_impl/kv_log_store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ impl<LS: LocalStateStore> LogWriter for KvLogStoreWriter<LS> {
}

async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
assert!(chunk.cardinality() > 0);
if chunk.cardinality() == 0 {
return Ok(());
}
let epoch = self.state_store.epoch();
let start_seq_id = self.seq_id;
self.seq_id += chunk.cardinality() as SeqIdType;
Expand Down
304 changes: 274 additions & 30 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -20,7 +21,7 @@ use futures::{FutureExt, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk};
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::dispatch_sink;
Expand Down Expand Up @@ -65,6 +66,19 @@ fn force_append_only(c: StreamChunk) -> StreamChunk {
c.into()
}

// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
fn force_delete_only(c: StreamChunk) -> StreamChunk {
let mut c: StreamChunkMut = c.into();
for (_, mut r) in c.to_rows_mut() {
match r.op() {
Op::Delete => {}
Op::Insert | Op::UpdateInsert => r.set_vis(false),
Op::UpdateDelete => r.set_op(Op::Delete),
}
}
c.into()
}

impl<F: LogStoreFactory> SinkExecutor<F> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
Expand Down Expand Up @@ -101,12 +115,21 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
}

fn execute_inner(self) -> BoxedMessageStream {
let stream_key = self.pk_indices;

let stream_key_sink_pk_mismatch = {
stream_key
.iter()
.any(|i| !self.sink_param.downstream_pk.contains(i))
};

let write_log_stream = Self::execute_write_log(
self.input,
self.pk_indices,
stream_key,
self.log_writer,
self.sink_param.sink_type,
self.actor_context,
stream_key_sink_pk_mismatch,
);

dispatch_sink!(self.sink, sink, {
Expand All @@ -127,6 +150,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
mut log_writer: impl LogWriter,
sink_type: SinkType,
actor_context: ActorContextRef,
stream_key_sink_pk_mismatch: bool,
) {
let mut input = input.execute();

Expand All @@ -141,36 +165,115 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// Propagate the first barrier
yield Message::Barrier(barrier);

#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a

// original:
// (1,1) -> (1,2)
// (1,2) -> (1,3)

// mv fragment 1:
// delete (1,1)

// mv fragment 2:
// insert (1,2)
// delete (1,2)

// mv fragment 3:
// insert (1,3)

// merge to sink fragment:
// insert (1,3)
// insert (1,2)
// delete (1,2)
// delete (1,1)
// So we do additional compaction in the sink executor per barrier.

// 1. compact all the chanes with the stream key.
// 2. sink all the delete events and then sink all insert evernt.

// after compacting with the stream key, the two event with the same used defined sink pk must have different stream key.
// So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
if stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly {
let mut chunk_buffer = StreamChunkCompactor::new(stream_key.clone());
let mut watermark = None;
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => watermark = Some(w),
Message::Chunk(c) => {
chunk_buffer.push_chunk(c);
}
Message::Barrier(barrier) => {
let mut delete_chunks = vec![];
let mut insert_chunks = vec![];
for c in mem::replace(
&mut chunk_buffer,
StreamChunkCompactor::new(stream_key.clone()),
)
.into_compacted_chunks()
{
if sink_type != SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
delete_chunks.push(force_delete_only(c.clone()));
}
insert_chunks.push(force_append_only(c));
}

for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) {
log_writer.write_chunk(c.clone()).await?;
yield Message::Chunk(c);
}
if let Some(w) = mem::take(&mut watermark) {
yield Message::Watermark(w)
}
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) {
log_writer.update_vnode_bitmap(vnode_bitmap);
}
} else {
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
yield Message::Barrier(barrier);
}
}
}
Expand Down Expand Up @@ -352,6 +455,147 @@ mod test {
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn stream_key_sink_pk_mismatch() {
use risingwave_common::array::stream_chunk::StreamChunk;
use risingwave_common::array::StreamChunkTestExt;
use risingwave_common::types::DataType;

use crate::executor::Barrier;

let properties = maplit::hashmap! {
"connector".into() => "blackhole".into(),
};

// We have two visible columns and one hidden column. The hidden column will be pruned out
// within the sink executor.
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
is_hidden: true,
},
];
let schema: Schema = columns
.iter()
.map(|column| Field::from(column.column_desc.clone()))
.collect();

let mock = MockSource::with_messages(
schema,
vec![0, 1],
vec![
Message::Barrier(Barrier::new_test_barrier(1)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(2)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 3 30",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 2 20
- 1 2 20",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
- 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(3)),
],
);

let sink_param = SinkParam {
sink_id: 0.into(),
properties,
columns: columns
.iter()
.filter(|col| !col.is_hidden)
.map(|col| col.column_desc.clone())
.collect(),
downstream_pk: vec![0],
sink_type: SinkType::Upsert,
db_name: "test".into(),
sink_from_name: "test".into(),
};

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::default(),
sink_param,
columns.clone(),
ActorContext::create(0),
BoundedInMemLogStoreFactory::new(1),
vec![0, 1],
)
.await
.unwrap();

let mut executor = SinkExecutor::execute(Box::new(sink_executor));

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 1 10",
)
);

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
- 1 1 10",
)
);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 3 30",
)
);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

// Should not receive the third stream chunk message because the force-append-only sink
// executor will drop all DELETE messages.

// The last barrier message.
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn test_empty_barrier_sink() {
use risingwave_common::types::DataType;
Expand Down
1 change: 1 addition & 0 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async fn test_sink_basic() -> Result<()> {
sleep(Duration::from_millis(10)).await;
}
}
sleep(Duration::from_millis(10000)).await;

assert_eq!(6, parallelism_counter.load(Relaxed));
assert_eq!(count, row_counter.load(Relaxed));
Expand Down

0 comments on commit 4ca3a9a

Please sign in to comment.