diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 0dbdce47abf55..92f3e30f6cf4c 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -61,6 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt' +sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt' sleep 1 echo "--- testing remote sinks" diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt index a3df061fc8db4..15a6a37e17c4c 100644 --- a/e2e_test/sink/sink_into_table.slt +++ b/e2e_test/sink/sink_into_table.slt @@ -6,6 +6,9 @@ SET RW_IMPLICIT_FLUSH TO true; statement ok create table t_simple (v1 int, v2 int); +statement error unsupported sink type table +create sink table_sink from t_simple with (connector = 'table'); + statement ok create table m_simple (v1 int primary key, v2 int); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 7bc46cab9c187..ddda62c6aace5 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -27,7 +27,6 @@ message SinkParam { string db_name = 5; string sink_from_name = 6; catalog.SinkFormatDesc format_desc = 7; - optional uint32 target_table = 8; } enum SinkPayloadFormat { diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 548f33389c3c5..f74a22d3b80e5 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static { ) -> impl Future)>> + Send + '_; } -pub trait LogStoreFactory: 'static { - type Reader: LogReader + Send + 'static; - type Writer: LogWriter + Send + 'static; +pub trait LogStoreFactory: Send + 'static { + type Reader: LogReader; + type Writer: LogWriter; fn build(self) -> impl Future + Send; } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 945835449dcad..7bbd42edcd338 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -13,7 +13,6 @@ // limitations under the License. pub mod big_query; -pub mod blackhole; pub mod boxed; pub mod catalog; pub mod clickhouse; @@ -32,8 +31,8 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod starrocks; -pub mod table; pub mod test_sink; +pub mod trivial; pub mod utils; pub mod writer; @@ -46,7 +45,7 @@ use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, @@ -62,7 +61,6 @@ use self::catalog::{SinkFormatDesc, SinkType}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; -use crate::sink::table::TABLE_SINK; use crate::sink::writer::SinkWriter; use crate::ConnectorParams; @@ -74,7 +72,7 @@ macro_rules! for_all_sinks { { Redis, $crate::sink::redis::RedisSink }, { Kafka, $crate::sink::kafka::KafkaSink }, { Pulsar, $crate::sink::pulsar::PulsarSink }, - { BlackHole, $crate::sink::blackhole::BlackHoleSink }, + { BlackHole, $crate::sink::trivial::BlackHoleSink }, { Kinesis, $crate::sink::kinesis::KinesisSink }, { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, { Iceberg, $crate::sink::iceberg::IcebergSink }, @@ -88,7 +86,7 @@ macro_rules! for_all_sinks { { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, - { Table, $crate::sink::table::TableSink } + { Table, $crate::sink::trivial::TableSink } } $(,$arg)* } @@ -149,7 +147,6 @@ pub struct SinkParam { pub format_desc: Option, pub db_name: String, pub sink_from_name: String, - pub target_table: Option, } impl SinkParam { @@ -181,7 +178,6 @@ impl SinkParam { format_desc, db_name: pb_param.db_name, sink_from_name: pb_param.sink_from_name, - target_table: pb_param.target_table.map(TableId::new), } } @@ -197,7 +193,6 @@ impl SinkParam { format_desc: self.format_desc.as_ref().map(|f| f.to_proto()), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), - target_table: self.target_table.map(|table_id| table_id.table_id()), } } @@ -223,7 +218,6 @@ impl From for SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, - target_table: sink_catalog.target_table, } } } @@ -371,13 +365,10 @@ impl SinkImpl { param.properties.remove(PRIVATE_LINK_TARGET_KEY); param.properties.remove(CONNECTION_NAME_KEY); - let sink_type = if param.target_table.is_some() { - TABLE_SINK - } else { - param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) - })? - }; + let sink_type = param + .properties + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; match_sink_name_str!( sink_type.to_lowercase().as_str(), @@ -391,6 +382,10 @@ impl SinkImpl { } ) } + + pub fn is_sink_into_table(&self) -> bool { + matches!(self, SinkImpl::Table(_)) + } } pub fn build_sink(param: SinkParam) -> Result { diff --git a/src/connector/src/sink/table.rs b/src/connector/src/sink/table.rs deleted file mode 100644 index 5fdcb72bc2c91..0000000000000 --- a/src/connector/src/sink/table.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; - -use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; -use crate::sink::{ - DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, - SinkWriterParam, -}; - -pub const TABLE_SINK: &str = "table"; - -/// A table sink outputs stream into another RisingWave's table. -/// -/// Different from a materialized view, table sinks do not enforce strong consistency between upstream and downstream in principle. As a result, the `create sink` statement returns immediately, which is similar to any other `create sink`. It also allows users to execute DMLs on these target tables. -/// -/// See also [RFC: Create Sink into Table](https://github.com/risingwavelabs/rfcs/pull/52). -#[derive(Debug)] -pub struct TableSink; - -impl TryFrom for TableSink { - type Error = SinkError; - - fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) - } -} - -impl Sink for TableSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = Self; - - const SINK_NAME: &'static str = TABLE_SINK; - - async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) - } - - async fn validate(&self) -> Result<()> { - Ok(()) - } -} - -#[async_trait] -impl LogSinker for TableSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { - loop { - let (epoch, item) = log_reader.next_item().await?; - match item { - LogStoreReadItem::StreamChunk { chunk_id, .. } => { - log_reader - .truncate(TruncateOffset::Chunk { epoch, chunk_id }) - .await?; - } - LogStoreReadItem::Barrier { .. } => { - log_reader - .truncate(TruncateOffset::Barrier { epoch }) - .await?; - } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} - } - } - } -} diff --git a/src/connector/src/sink/blackhole.rs b/src/connector/src/sink/trivial.rs similarity index 71% rename from src/connector/src/sink/blackhole.rs rename to src/connector/src/sink/trivial.rs index 937af4778988b..a3c5a1bd7e2ce 100644 --- a/src/connector/src/sink/blackhole.rs +++ b/src/connector/src/sink/trivial.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; + use async_trait::async_trait; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; @@ -21,26 +23,49 @@ use crate::sink::{ }; pub const BLACKHOLE_SINK: &str = "blackhole"; +pub const TABLE_SINK: &str = "table"; + +pub trait TrivialSinkName: Send + 'static { + const SINK_NAME: &'static str; +} #[derive(Debug)] -pub struct BlackHoleSink; +pub struct BlackHoleSinkName; + +impl TrivialSinkName for BlackHoleSinkName { + const SINK_NAME: &'static str = BLACKHOLE_SINK; +} -impl TryFrom for BlackHoleSink { +pub type BlackHoleSink = TrivialSink; + +#[derive(Debug)] +pub struct TableSinkName; + +impl TrivialSinkName for TableSinkName { + const SINK_NAME: &'static str = TABLE_SINK; +} + +pub type TableSink = TrivialSink; + +#[derive(Debug)] +pub struct TrivialSink(PhantomData); + +impl TryFrom for TrivialSink { type Error = SinkError; fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) + Ok(Self(PhantomData)) } } -impl Sink for BlackHoleSink { +impl Sink for TrivialSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = Self; - const SINK_NAME: &'static str = BLACKHOLE_SINK; + const SINK_NAME: &'static str = T::SINK_NAME; async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) + Ok(Self(PhantomData)) } async fn validate(&self) -> Result<()> { @@ -49,7 +74,7 @@ impl Sink for BlackHoleSink { } #[async_trait] -impl LogSinker for BlackHoleSink { +impl LogSinker for TrivialSink { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { loop { let (epoch, item) = log_reader.next_item().await?; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 8c690d6047445..50ee550759c72 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -29,6 +29,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, @@ -105,17 +106,26 @@ impl StreamSink { format_desc, )?; + let unsupported_sink = + |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); + // check and ensure that the sink connector is specified and supported match sink.properties.get(CONNECTOR_TYPE_KEY) { - Some(connector) => match_sink_name_str!( - connector.to_lowercase().as_str(), - SinkType, - Ok(()), - |other| Err(SinkError::Config(anyhow!( - "unsupported sink type {}", - other - ))) - )?, + Some(connector) => { + match_sink_name_str!( + connector.to_lowercase().as_str(), + SinkType, + { + // the table sink is created by with properties + if connector == TABLE_SINK && sink.target_table.is_none() { + unsupported_sink(TABLE_SINK) + } else { + Ok(()) + } + }, + |other: &str| unsupported_sink(other) + )?; + } None => { return Err( SinkError::Config(anyhow!("connector not specified when create sink")).into(), diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0ea5c51904c21..e37848a77ffd5 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -81,10 +81,9 @@ use crate::hummock::metrics_utils::{ trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; -use crate::manager::{ - ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, MetadataManager, TableId, - META_NODE_ID, -}; +#[cfg(any(test, feature = "test"))] +use crate::manager::{ClusterManagerRef, FragmentManagerRef}; +use crate::manager::{IdCategory, MetaSrvEnv, MetadataManager, TableId, META_NODE_ID}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction, VarTransaction, diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index af97a4851dbcd..34b4073916e6c 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -407,7 +407,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -578,7 +577,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -698,7 +696,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(); @@ -738,7 +735,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; @@ -818,7 +814,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index f6a098090f0ef..c21c0294a70dd 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -19,7 +19,7 @@ use prometheus::{ exponential_buckets, histogram_opts, register_gauge_vec_with_registry, register_histogram_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntCounterVec, IntGauge, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ @@ -68,8 +68,8 @@ pub struct StreamingMetrics { pub source_split_change_count: GenericCounterVec, // Sink & materialized view - pub sink_input_row_count: GenericCounterVec, - pub mview_input_row_count: GenericCounterVec, + pub sink_input_row_count: LabelGuardedIntCounterVec<3>, + pub mview_input_row_count: IntCounterVec, // Exchange (see also `compute::ExchangeServiceMetrics`) pub exchange_frag_recv_size: GenericCounterVec, @@ -232,7 +232,7 @@ impl StreamingMetrics { ) .unwrap(); - let sink_input_row_count = register_int_counter_vec_with_registry!( + let sink_input_row_count = register_guarded_int_counter_vec_with_registry!( "stream_sink_input_row_count", "Total number of rows streamed into sink executors", &["sink_id", "actor_id", "fragment_id"], diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index b4cb2e4e532fe..5aca2842f5ad6 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -16,7 +16,7 @@ use std::mem; use anyhow::anyhow; use futures::stream::select; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkMut; @@ -24,7 +24,7 @@ use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkComp use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_connector::dispatch_sink; -use risingwave_connector::sink::catalog::{SinkId, SinkType}; +use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, }; @@ -35,7 +35,10 @@ use thiserror_ext::AsReport; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, ExecutorInfo, Message, PkIndices}; -use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream, Mutation}; +use crate::executor::{ + expect_first_barrier, ActorContextRef, BoxedMessageStream, MessageStream, Mutation, +}; +use crate::task::ActorId; pub struct SinkExecutor { actor_context: ActorContextRef, @@ -44,8 +47,7 @@ pub struct SinkExecutor { sink: SinkImpl, input_columns: Vec, sink_param: SinkParam, - log_reader: F::Reader, - log_writer: F::Writer, + log_store_factory: F, sink_writer_param: SinkWriterParam, } @@ -77,6 +79,7 @@ fn force_delete_only(c: StreamChunk) -> StreamChunk { impl SinkExecutor { #[allow(clippy::too_many_arguments)] + #[expect(clippy::unused_async)] pub async fn new( actor_context: ActorContextRef, info: ExecutorInfo, @@ -86,8 +89,6 @@ impl SinkExecutor { columns: Vec, log_store_factory: F, ) -> StreamExecutorResult { - let (log_reader, log_writer) = log_store_factory.build().await; - let sink = build_sink(sink_param.clone())?; let input_schema: Schema = columns .iter() @@ -102,13 +103,15 @@ impl SinkExecutor { sink, input_columns: columns, sink_param, - log_reader, - log_writer, + log_store_factory, sink_writer_param, }) } fn execute_inner(self) -> BoxedMessageStream { + let sink_id = self.sink_param.sink_id; + let actor_id = self.actor_context.id; + let stream_key = self.info.pk_indices.clone(); let stream_key_sink_pk_mismatch = { @@ -117,42 +120,69 @@ impl SinkExecutor { .any(|i| !self.sink_param.downstream_pk.contains(i)) }; - let write_log_stream = Self::execute_write_log( - self.input, - stream_key, - self.log_writer - .monitored(self.sink_writer_param.sink_metrics.clone()), - self.sink_param.sink_id, + let input = self.input.execute(); + + let input_row_count = self + .actor_context + .streaming_metrics + .sink_input_row_count + .with_guarded_label_values(&[ + &sink_id.to_string(), + &actor_id.to_string(), + &self.actor_context.fragment_id.to_string(), + ]); + + let input = input.inspect_ok(move |msg| { + if let Message::Chunk(c) = msg { + input_row_count.inc_by(c.capacity() as u64); + } + }); + + let processed_input = Self::process_msg( + input, self.sink_param.sink_type, - self.actor_context.clone(), + stream_key, stream_key_sink_pk_mismatch, ); - dispatch_sink!(self.sink, sink, { - let consume_log_stream = Self::execute_consume_log( - sink, - self.log_reader, - self.input_columns, - self.sink_writer_param, - self.actor_context, - self.info, - ); - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }) + if self.sink.is_sink_into_table() { + processed_input.boxed() + } else { + self.log_store_factory + .build() + .map(move |(log_reader, log_writer)| { + let write_log_stream = Self::execute_write_log( + processed_input, + log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + actor_id, + ); + + dispatch_sink!(self.sink, sink, { + let consume_log_stream = Self::execute_consume_log( + sink, + log_reader, + self.input_columns, + self.sink_writer_param, + self.actor_context, + self.info, + ); + // TODO: may try to remove the boxed + select(consume_log_stream.into_stream(), write_log_stream).boxed() + }) + }) + .into_stream() + .flatten() + .boxed() + } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_write_log( - input: BoxedExecutor, - stream_key: PkIndices, + input: impl MessageStream, mut log_writer: impl LogWriter, - sink_id: SinkId, - sink_type: SinkType, - actor_context: ActorContextRef, - stream_key_sink_pk_mismatch: bool, + actor_id: ActorId, ) { - let mut input = input.execute(); - + pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; let epoch_pair = barrier.epoch; @@ -164,11 +194,42 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); - // for metrics - let sink_id_str = sink_id.to_string(); - let actor_id_str = actor_context.id.to_string(); - let fragment_id_str = actor_context.fragment_id.to_string(); + #[for_await] + for msg in input { + match msg? { + Message::Watermark(w) => yield Message::Watermark(w), + Message::Chunk(chunk) => { + log_writer.write_chunk(chunk.clone()).await?; + yield Message::Chunk(chunk); + } + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => log_writer.pause()?, + Mutation::Resume => log_writer.resume()?, + _ => (), + } + } + + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_id) { + log_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + yield Message::Barrier(barrier); + } + } + } + } + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn process_msg( + input: impl MessageStream, + sink_type: SinkType, + stream_key: PkIndices, + stream_key_sink_pk_mismatch: bool, + ) { // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order // stream key: a,b // sink pk: a @@ -207,12 +268,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => watermark = Some(w), Message::Chunk(c) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(c.capacity() as u64); - chunk_buffer.push_chunk(c); } Message::Barrier(barrier) => { @@ -234,28 +289,11 @@ impl SinkExecutor { } for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) { - log_writer.write_chunk(c.clone()).await?; yield Message::Chunk(c); } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) } - - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -266,12 +304,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(chunk.capacity() as u64); - // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE // V->V). let chunk = merge_chunk_row(chunk, &stream_key); @@ -284,27 +316,9 @@ impl SinkExecutor { chunk }; - log_writer.write_chunk(chunk.clone()).await?; - - // Use original chunk instead of the reordered one as the executor output. yield Message::Chunk(chunk); } Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -487,7 +501,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -615,7 +628,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -740,7 +752,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 0af4ff34ffa22..659450e83c077 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,14 +15,13 @@ use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::catalog::{ColumnCatalog, TableId}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; -use risingwave_storage::dispatch_state_store; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -37,7 +36,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { async fn new_boxed_executor( params: ExecutorParams, node: &Self::Node, - _store: impl StateStore, + state_store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [input_executor]: [_; 1] = params.input.try_into().unwrap(); @@ -47,7 +46,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_id = sink_desc.get_id().into(); let db_name = sink_desc.get_db_name().into(); let sink_from_name = sink_desc.get_sink_from_name().into(); - let target_table = sink_desc.get_target_table().cloned().ok().map(TableId::new); let properties = sink_desc.get_properties().clone(); let downstream_pk = sink_desc .downstream_pk @@ -102,7 +100,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { format_desc, db_name, sink_from_name, - target_table, }; let sink_id_str = format!("{}", sink_id.sink_id); @@ -152,29 +149,27 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector, ); // TODO: support setting max row count in config - dispatch_state_store!(params.env.state_store(), state_store, { - let factory = KvLogStoreFactory::new( - state_store, - node.table.as_ref().unwrap().clone(), - params.vnode_bitmap.clone().map(Arc::new), - 65536, - metrics, - log_store_identity, - ); - - Ok(Box::new( - SinkExecutor::new( - params.actor_context, - params.info, - input_executor, - sink_write_param, - sink_param, - columns, - factory, - ) - .await?, - )) - }) + let factory = KvLogStoreFactory::new( + state_store, + node.table.as_ref().unwrap().clone(), + params.vnode_bitmap.clone().map(Arc::new), + 65536, + metrics, + log_store_identity, + ); + + Ok(Box::new( + SinkExecutor::new( + params.actor_context, + params.info, + input_executor, + sink_write_param, + sink_param, + columns, + factory, + ) + .await?, + )) } } }