diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 7bc46cab9c187..ddda62c6aace5 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -27,7 +27,6 @@ message SinkParam { string db_name = 5; string sink_from_name = 6; catalog.SinkFormatDesc format_desc = 7; - optional uint32 target_table = 8; } enum SinkPayloadFormat { diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 8436c0d8988e5..3e2bf740d32d9 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static { ) -> impl Future)>> + Send + '_; } -pub trait LogStoreFactory: 'static { - type Reader: LogReader + Send + 'static; - type Writer: LogWriter + Send + 'static; +pub trait LogStoreFactory: Send + 'static { + type Reader: LogReader; + type Writer: LogWriter; fn build(self) -> impl Future + Send; } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index d31c1cb475d19..8a5a9d49d25b3 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -32,7 +32,6 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod starrocks; -pub mod table; pub mod test_sink; pub mod utils; pub mod writer; @@ -46,7 +45,7 @@ use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, @@ -62,7 +61,6 @@ use self::catalog::{SinkFormatDesc, SinkType}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; -use crate::sink::table::TABLE_SINK; use crate::sink::writer::SinkWriter; use crate::ConnectorParams; @@ -89,8 +87,7 @@ macro_rules! for_all_sinks { { Starrocks, $crate::sink::starrocks::StarrocksSink }, { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, - { Test, $crate::sink::test_sink::TestSink }, - { Table, $crate::sink::table::TableSink } + { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* } @@ -151,7 +148,6 @@ pub struct SinkParam { pub format_desc: Option, pub db_name: String, pub sink_from_name: String, - pub target_table: Option, } impl SinkParam { @@ -183,7 +179,6 @@ impl SinkParam { format_desc, db_name: pb_param.db_name, sink_from_name: pb_param.sink_from_name, - target_table: pb_param.target_table.map(TableId::new), } } @@ -199,7 +194,6 @@ impl SinkParam { format_desc: self.format_desc.as_ref().map(|f| f.to_proto()), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), - target_table: self.target_table.map(|table_id| table_id.table_id()), } } @@ -225,7 +219,6 @@ impl From for SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, - target_table: sink_catalog.target_table, } } } @@ -373,13 +366,10 @@ impl SinkImpl { param.properties.remove(PRIVATE_LINK_TARGET_KEY); param.properties.remove(CONNECTION_NAME_KEY); - let sink_type = if param.target_table.is_some() { - TABLE_SINK - } else { - param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) - })? - }; + let sink_type = param + .properties + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; match_sink_name_str!( sink_type.to_lowercase().as_str(), diff --git a/src/connector/src/sink/table.rs b/src/connector/src/sink/table.rs deleted file mode 100644 index 20293c4781a2e..0000000000000 --- a/src/connector/src/sink/table.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; - -use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; -use crate::sink::{ - DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, - SinkWriterParam, -}; - -pub const TABLE_SINK: &str = "table"; - -/// A table sink outputs stream into another RisingWave's table. -/// -/// Different from a materialized view, table sinks do not enforce strong consistency between upstream and downstream in principle. As a result, the `create sink` statement returns immediately, which is similar to any other `create sink`. It also allows users to execute DMLs on these target tables. -/// -/// See also [RFC: Create Sink into Table](https://github.com/risingwavelabs/rfcs/pull/52). -#[derive(Debug)] -pub struct TableSink; - -impl TryFrom for TableSink { - type Error = SinkError; - - fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) - } -} - -impl Sink for TableSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = Self; - - const SINK_NAME: &'static str = TABLE_SINK; - - async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) - } - - async fn validate(&self) -> Result<()> { - Ok(()) - } -} - -#[async_trait] -impl LogSinker for TableSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { - loop { - let (epoch, item) = log_reader.next_item().await?; - match item { - LogStoreReadItem::StreamChunk { chunk_id, .. } => { - log_reader - .truncate(TruncateOffset::Chunk { epoch, chunk_id }) - .await?; - } - LogStoreReadItem::Barrier { .. } => { - log_reader - .truncate(TruncateOffset::Barrier { epoch }) - .await?; - } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} - } - } - } -} diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index 76c8bb5b3081e..7943b1774fef5 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -605,7 +605,6 @@ mod tests { format_desc: None, db_name: "db".into(), sink_from_name: "table".into(), - target_table: None, }; let rw_schema = param.schema(); diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index f63675ae3a141..4c2b2a37295b1 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -407,7 +407,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -578,7 +577,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -698,7 +696,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(); @@ -738,7 +735,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; @@ -818,7 +814,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 1b69741df899a..3631792ee91f7 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -19,7 +19,7 @@ use prometheus::{ exponential_buckets, histogram_opts, register_gauge_vec_with_registry, register_histogram_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntCounterVec, IntGauge, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ @@ -68,8 +68,8 @@ pub struct StreamingMetrics { pub source_split_change_count: GenericCounterVec, // Sink & materialized view - pub sink_input_row_count: GenericCounterVec, - pub mview_input_row_count: GenericCounterVec, + pub sink_input_row_count: LabelGuardedIntCounterVec<3>, + pub mview_input_row_count: IntCounterVec, // Exchange (see also `compute::ExchangeServiceMetrics`) pub exchange_frag_recv_size: GenericCounterVec, @@ -232,7 +232,7 @@ impl StreamingMetrics { ) .unwrap(); - let sink_input_row_count = register_int_counter_vec_with_registry!( + let sink_input_row_count = register_guarded_int_counter_vec_with_registry!( "stream_sink_input_row_count", "Total number of rows streamed into sink executors", &["sink_id", "actor_id", "fragment_id"], diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index f3e8b9b4bb8d6..1ebf45b02d560 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -16,7 +16,7 @@ use std::mem; use anyhow::anyhow; use futures::stream::select; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkMut; @@ -24,29 +24,29 @@ use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkComp use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_connector::dispatch_sink; -use risingwave_connector::sink::catalog::{SinkId, SinkType}; +use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, }; -use risingwave_connector::sink::{ - build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, -}; +use risingwave_connector::sink::{build_sink, LogSinker, Sink, SinkParam, SinkWriterParam}; use thiserror_ext::AsReport; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, ExecutorInfo, Message, PkIndices}; -use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream, Mutation}; +use crate::executor::{ + expect_first_barrier, ActorContextRef, BoxedMessageStream, MessageStream, Mutation, +}; +use crate::task::ActorId; pub struct SinkExecutor { actor_context: ActorContextRef, info: ExecutorInfo, input: BoxedExecutor, - sink: SinkImpl, input_columns: Vec, sink_param: SinkParam, - log_reader: F::Reader, - log_writer: F::Writer, + log_store_factory: F, sink_writer_param: SinkWriterParam, + is_external_sink: bool, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -77,6 +77,7 @@ fn force_delete_only(c: StreamChunk) -> StreamChunk { impl SinkExecutor { #[allow(clippy::too_many_arguments)] + #[expect(clippy::unused_async)] pub async fn new( actor_context: ActorContextRef, info: ExecutorInfo, @@ -85,10 +86,8 @@ impl SinkExecutor { sink_param: SinkParam, columns: Vec, log_store_factory: F, + is_external_sink: bool, ) -> StreamExecutorResult { - let (log_reader, log_writer) = log_store_factory.build().await; - - let sink = build_sink(sink_param.clone())?; let input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) @@ -99,16 +98,19 @@ impl SinkExecutor { actor_context, info, input, - sink, input_columns: columns, sink_param, - log_reader, - log_writer, + log_store_factory, sink_writer_param, + is_external_sink, }) } - fn execute_inner(self) -> BoxedMessageStream { + #[try_stream(boxed, ok = Message, error = StreamExecutorError)] + async fn execute_inner(self) { + let sink_id = self.sink_param.sink_id; + let actor_id = self.actor_context.id; + let stream_key = self.info.pk_indices.clone(); let stream_key_sink_pk_mismatch = { @@ -117,42 +119,72 @@ impl SinkExecutor { .any(|i| !self.sink_param.downstream_pk.contains(i)) }; - let write_log_stream = Self::execute_write_log( - self.input, - stream_key, - self.log_writer - .monitored(self.sink_writer_param.sink_metrics.clone()), - self.sink_param.sink_id, + let input = self.input.execute(); + + let input_row_count = self + .actor_context + .streaming_metrics + .sink_input_row_count + .with_guarded_label_values(&[ + &sink_id.to_string(), + &actor_id.to_string(), + &self.actor_context.fragment_id.to_string(), + ]); + + let input = input.inspect_ok(move |msg| { + if let Message::Chunk(c) = msg { + input_row_count.inc_by(c.capacity() as u64); + } + }); + + let processed_input = Self::process_input( + input, self.sink_param.sink_type, - self.actor_context.clone(), + stream_key, stream_key_sink_pk_mismatch, ); - dispatch_sink!(self.sink, sink, { - let consume_log_stream = Self::execute_consume_log( - sink, - self.log_reader, - self.input_columns, - self.sink_writer_param, - self.actor_context, - self.info, + if !self.is_external_sink { + #[for_await] + for msg in processed_input { + yield msg?; + } + } else { + let sink = build_sink(self.sink_param)?; + let (log_reader, log_writer) = self.log_store_factory.build().await; + + let write_log_stream = Self::execute_write_log( + processed_input, + log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + actor_id, ); - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }) + + let output = dispatch_sink!(sink, sink, { + let consume_log_stream = Self::execute_consume_log( + sink, + log_reader, + self.input_columns, + self.sink_writer_param, + self.actor_context, + self.info, + ); + // TODO: may try to remove the boxed + select(consume_log_stream.into_stream(), write_log_stream).boxed() + }); + #[for_await] + for msg in output { + yield msg?; + } + } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_write_log( - input: BoxedExecutor, - stream_key: PkIndices, + input: impl MessageStream, mut log_writer: impl LogWriter, - sink_id: SinkId, - sink_type: SinkType, - actor_context: ActorContextRef, - stream_key_sink_pk_mismatch: bool, + actor_id: ActorId, ) { - let mut input = input.execute(); - + pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; let epoch_pair = barrier.epoch; @@ -164,11 +196,42 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); - // for metrics - let sink_id_str = sink_id.to_string(); - let actor_id_str = actor_context.id.to_string(); - let fragment_id_str = actor_context.fragment_id.to_string(); + #[for_await] + for msg in input { + match msg? { + Message::Watermark(w) => yield Message::Watermark(w), + Message::Chunk(chunk) => { + log_writer.write_chunk(chunk.clone()).await?; + yield Message::Chunk(chunk); + } + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => log_writer.pause()?, + Mutation::Resume => log_writer.resume()?, + _ => (), + } + } + + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_id) { + log_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + yield Message::Barrier(barrier); + } + } + } + } + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn process_input( + input: impl MessageStream, + sink_type: SinkType, + stream_key: PkIndices, + stream_key_sink_pk_mismatch: bool, + ) { // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order // stream key: a,b // sink pk: a @@ -207,12 +270,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => watermark = Some(w), Message::Chunk(c) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(c.capacity() as u64); - chunk_buffer.push_chunk(c); } Message::Barrier(barrier) => { @@ -234,28 +291,11 @@ impl SinkExecutor { } for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) { - log_writer.write_chunk(c.clone()).await?; yield Message::Chunk(c); } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) } - - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -266,12 +306,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(chunk.capacity() as u64); - // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE // V->V). let chunk = merge_chunk_row(chunk, &stream_key); @@ -284,27 +318,9 @@ impl SinkExecutor { chunk }; - log_writer.write_chunk(chunk.clone()).await?; - - // Use original chunk instead of the reordered one as the executor output. yield Message::Chunk(chunk); } Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -487,7 +503,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -504,6 +519,7 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); @@ -615,7 +631,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -632,6 +647,7 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); @@ -740,7 +756,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -757,6 +772,7 @@ mod test { sink_param, columns, BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 615da9d81219f..a2e135e718977 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::catalog::{ColumnCatalog, TableId}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ @@ -47,7 +47,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_id = sink_desc.get_id().into(); let db_name = sink_desc.get_db_name().into(); let sink_from_name = sink_desc.get_sink_from_name().into(); - let target_table = sink_desc.get_target_table().cloned().ok().map(TableId::new); + // when target table is not present, it should be an external sink + let is_external_sink = sink_desc.get_target_table().is_err(); let properties = sink_desc.get_properties().clone(); let downstream_pk = sink_desc .downstream_pk @@ -102,7 +103,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { format_desc, db_name, sink_from_name, - target_table, }; let sink_id_str = format!("{}", sink_id.sink_id); @@ -140,6 +140,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + is_external_sink, ) .await?, )) @@ -171,6 +172,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + is_external_sink, ) .await?, ))