Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): handle stream key sink pk mismatch #12458

Merged
merged 9 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/stream/src/common/log_store/kv_log_store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ impl<LS: LocalStateStore> LogWriter for KvLogStoreWriter<LS> {
}

async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
assert!(chunk.cardinality() > 0);
if chunk.cardinality() == 0 {
return Ok(());
}
let epoch = self.state_store.epoch();
let start_seq_id = self.seq_id;
self.seq_id += chunk.cardinality() as SeqIdType;
Expand Down
268 changes: 238 additions & 30 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -21,7 +22,7 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk};
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::dispatch_sink;
Expand Down Expand Up @@ -70,6 +71,19 @@ fn force_append_only(c: StreamChunk) -> StreamChunk {
c.into()
}

// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
fn force_delete_only(c: StreamChunk) -> StreamChunk {
let mut c: StreamChunkMut = c.into();
for (_, mut r) in c.to_rows_mut() {
match r.op() {
Op::Delete => {}
Op::Insert | Op::UpdateInsert => r.set_vis(false),
Op::UpdateDelete => r.set_op(Op::Delete),
}
}
c.into()
}

impl<F: LogStoreFactory> SinkExecutor<F> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
Expand Down Expand Up @@ -114,13 +128,21 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let sink_metrics = SinkMetrics {
sink_commit_duration_metrics,
};
let stream_key = self.pk_indices;

let stream_key_sink_pk_mismatch = {
stream_key
.iter()
.any(|i| !self.sink_param.downstream_pk.contains(i))
};

let write_log_stream = Self::execute_write_log(
self.input,
self.pk_indices,
stream_key,
self.log_writer,
self.sink_param.sink_type,
self.actor_context,
stream_key_sink_pk_mismatch,
);

dispatch_sink!(self.sink, sink, {
Expand All @@ -142,6 +164,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
mut log_writer: impl LogWriter,
sink_type: SinkType,
actor_context: ActorContextRef,
stream_key_sink_pk_mismatch: bool,
) {
let mut input = input.execute();

Expand All @@ -156,36 +179,80 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// Propagate the first barrier
yield Message::Barrier(barrier);

#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
if stream_key_sink_pk_mismatch {
st1page marked this conversation as resolved.
Show resolved Hide resolved
let mut chunk_buffer = StreamChunkCompactor::new(stream_key.clone());
let mut watermark = None;
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => watermark = Some(w),
Message::Chunk(c) => {
chunk_buffer.push_chunk(c);
}
Message::Barrier(barrier) => {
let mut delete_chunks = vec![];
let mut insert_chunks = vec![];
for c in mem::replace(
&mut chunk_buffer,
StreamChunkCompactor::new(stream_key.clone()),
)
.into_compacted_chunks()
{
delete_chunks.push(force_delete_only(c.clone()));
insert_chunks.push(force_append_only(c));
}

for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) {
log_writer.write_chunk(c.clone()).await?;
yield Message::Chunk(c);
}
if let Some(w) = mem::take(&mut watermark) {
yield Message::Watermark(w)
}
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) {
log_writer.update_vnode_bitmap(vnode_bitmap);
}
} else {
#[for_await]
for msg in input {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk)
} else {
chunk
};

log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
}
Message::Barrier(barrier) => {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier);
}
yield Message::Barrier(barrier);
}
}
}
Expand Down Expand Up @@ -447,6 +514,147 @@ mod test {
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn stream_key_sink_pk_mismatch() {
use risingwave_common::array::stream_chunk::StreamChunk;
use risingwave_common::array::StreamChunkTestExt;
use risingwave_common::types::DataType;

use crate::executor::Barrier;

let properties = maplit::hashmap! {
"connector".into() => "blackhole".into(),
};

// We have two visible columns and one hidden column. The hidden column will be pruned out
// within the sink executor.
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
is_hidden: true,
},
];
let schema: Schema = columns
.iter()
.map(|column| Field::from(column.column_desc.clone()))
.collect();

let mock = MockSource::with_messages(
schema,
vec![0, 1],
vec![
Message::Barrier(Barrier::new_test_barrier(1)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(2)),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 3 30",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
+ 1 2 20
- 1 2 20",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I I
- 1 1 10",
))),
Message::Barrier(Barrier::new_test_barrier(3)),
],
);

let sink_param = SinkParam {
sink_id: 0.into(),
properties,
columns: columns
.iter()
.filter(|col| !col.is_hidden)
.map(|col| col.column_desc.clone())
.collect(),
downstream_pk: vec![0],
sink_type: SinkType::ForceAppendOnly,
db_name: "test".into(),
sink_from_name: "test".into(),
};

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::default(),
sink_param,
columns.clone(),
ActorContext::create(0),
BoundedInMemLogStoreFactory::new(1),
vec![0, 1],
)
.await
.unwrap();

let mut executor = SinkExecutor::execute(Box::new(sink_executor));

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 1 10",
)
);

// Barrier message.
executor.next().await.unwrap().unwrap();

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
- 1 1 10",
)
);

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 1 3 30",
)
);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0);

// Should not receive the third stream chunk message because the force-append-only sink
// executor will drop all DELETE messages.

// The last barrier message.
executor.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn test_empty_barrier_sink() {
use risingwave_common::types::DataType;
Expand Down
Loading