From 89cd4b28ff8905bb5fc8e1d7453cc43d6514f551 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 27 Dec 2023 19:55:18 +0800 Subject: [PATCH 1/7] refactor(sink): unify log write path and distinguish table sink from external sink --- proto/connector_service.proto | 1 - src/connector/src/sink/log_store.rs | 6 +- src/connector/src/sink/mod.rs | 22 +- src/connector/src/sink/table.rs | 76 ------- src/connector/src/source/external.rs | 1 - .../src/manager/sink_coordination/manager.rs | 5 - .../src/executor/monitor/streaming_stats.rs | 8 +- src/stream/src/executor/sink.rs | 206 ++++++++++-------- src/stream/src/from_proto/sink.rs | 8 +- 9 files changed, 129 insertions(+), 204 deletions(-) delete mode 100644 src/connector/src/sink/table.rs diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 7bc46cab9c187..ddda62c6aace5 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -27,7 +27,6 @@ message SinkParam { string db_name = 5; string sink_from_name = 6; catalog.SinkFormatDesc format_desc = 7; - optional uint32 target_table = 8; } enum SinkPayloadFormat { diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 8436c0d8988e5..3e2bf740d32d9 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static { ) -> impl Future)>> + Send + '_; } -pub trait LogStoreFactory: 'static { - type Reader: LogReader + Send + 'static; - type Writer: LogWriter + Send + 'static; +pub trait LogStoreFactory: Send + 'static { + type Reader: LogReader; + type Writer: LogWriter; fn build(self) -> impl Future + Send; } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index d31c1cb475d19..8a5a9d49d25b3 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -32,7 +32,6 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod starrocks; -pub mod table; pub mod test_sink; pub mod utils; pub mod writer; @@ -46,7 +45,7 @@ use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, @@ -62,7 +61,6 @@ use self::catalog::{SinkFormatDesc, SinkType}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; -use crate::sink::table::TABLE_SINK; use crate::sink::writer::SinkWriter; use crate::ConnectorParams; @@ -89,8 +87,7 @@ macro_rules! for_all_sinks { { Starrocks, $crate::sink::starrocks::StarrocksSink }, { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, - { Test, $crate::sink::test_sink::TestSink }, - { Table, $crate::sink::table::TableSink } + { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* } @@ -151,7 +148,6 @@ pub struct SinkParam { pub format_desc: Option, pub db_name: String, pub sink_from_name: String, - pub target_table: Option, } impl SinkParam { @@ -183,7 +179,6 @@ impl SinkParam { format_desc, db_name: pb_param.db_name, sink_from_name: pb_param.sink_from_name, - target_table: pb_param.target_table.map(TableId::new), } } @@ -199,7 +194,6 @@ impl SinkParam { format_desc: self.format_desc.as_ref().map(|f| f.to_proto()), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), - target_table: self.target_table.map(|table_id| table_id.table_id()), } } @@ -225,7 +219,6 @@ impl From for SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, - target_table: sink_catalog.target_table, } } } @@ -373,13 +366,10 @@ impl SinkImpl { param.properties.remove(PRIVATE_LINK_TARGET_KEY); param.properties.remove(CONNECTION_NAME_KEY); - let sink_type = if param.target_table.is_some() { - TABLE_SINK - } else { - param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) - })? - }; + let sink_type = param + .properties + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; match_sink_name_str!( sink_type.to_lowercase().as_str(), diff --git a/src/connector/src/sink/table.rs b/src/connector/src/sink/table.rs deleted file mode 100644 index 20293c4781a2e..0000000000000 --- a/src/connector/src/sink/table.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; - -use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; -use crate::sink::{ - DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, - SinkWriterParam, -}; - -pub const TABLE_SINK: &str = "table"; - -/// A table sink outputs stream into another RisingWave's table. -/// -/// Different from a materialized view, table sinks do not enforce strong consistency between upstream and downstream in principle. As a result, the `create sink` statement returns immediately, which is similar to any other `create sink`. It also allows users to execute DMLs on these target tables. -/// -/// See also [RFC: Create Sink into Table](https://github.com/risingwavelabs/rfcs/pull/52). -#[derive(Debug)] -pub struct TableSink; - -impl TryFrom for TableSink { - type Error = SinkError; - - fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) - } -} - -impl Sink for TableSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = Self; - - const SINK_NAME: &'static str = TABLE_SINK; - - async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) - } - - async fn validate(&self) -> Result<()> { - Ok(()) - } -} - -#[async_trait] -impl LogSinker for TableSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { - loop { - let (epoch, item) = log_reader.next_item().await?; - match item { - LogStoreReadItem::StreamChunk { chunk_id, .. } => { - log_reader - .truncate(TruncateOffset::Chunk { epoch, chunk_id }) - .await?; - } - LogStoreReadItem::Barrier { .. } => { - log_reader - .truncate(TruncateOffset::Barrier { epoch }) - .await?; - } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} - } - } - } -} diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index 76c8bb5b3081e..7943b1774fef5 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -605,7 +605,6 @@ mod tests { format_desc: None, db_name: "db".into(), sink_from_name: "table".into(), - target_table: None, }; let rw_schema = param.schema(); diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index f63675ae3a141..4c2b2a37295b1 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -407,7 +407,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -578,7 +577,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch1 = 233; @@ -698,7 +696,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(); @@ -738,7 +735,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; @@ -818,7 +814,6 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let epoch = 233; diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 1b69741df899a..3631792ee91f7 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -19,7 +19,7 @@ use prometheus::{ exponential_buckets, histogram_opts, register_gauge_vec_with_registry, register_histogram_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntCounterVec, IntGauge, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ @@ -68,8 +68,8 @@ pub struct StreamingMetrics { pub source_split_change_count: GenericCounterVec, // Sink & materialized view - pub sink_input_row_count: GenericCounterVec, - pub mview_input_row_count: GenericCounterVec, + pub sink_input_row_count: LabelGuardedIntCounterVec<3>, + pub mview_input_row_count: IntCounterVec, // Exchange (see also `compute::ExchangeServiceMetrics`) pub exchange_frag_recv_size: GenericCounterVec, @@ -232,7 +232,7 @@ impl StreamingMetrics { ) .unwrap(); - let sink_input_row_count = register_int_counter_vec_with_registry!( + let sink_input_row_count = register_guarded_int_counter_vec_with_registry!( "stream_sink_input_row_count", "Total number of rows streamed into sink executors", &["sink_id", "actor_id", "fragment_id"], diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index f3e8b9b4bb8d6..1ebf45b02d560 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -16,7 +16,7 @@ use std::mem; use anyhow::anyhow; use futures::stream::select; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkMut; @@ -24,29 +24,29 @@ use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkComp use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_connector::dispatch_sink; -use risingwave_connector::sink::catalog::{SinkId, SinkType}; +use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, }; -use risingwave_connector::sink::{ - build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, -}; +use risingwave_connector::sink::{build_sink, LogSinker, Sink, SinkParam, SinkWriterParam}; use thiserror_ext::AsReport; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, ExecutorInfo, Message, PkIndices}; -use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream, Mutation}; +use crate::executor::{ + expect_first_barrier, ActorContextRef, BoxedMessageStream, MessageStream, Mutation, +}; +use crate::task::ActorId; pub struct SinkExecutor { actor_context: ActorContextRef, info: ExecutorInfo, input: BoxedExecutor, - sink: SinkImpl, input_columns: Vec, sink_param: SinkParam, - log_reader: F::Reader, - log_writer: F::Writer, + log_store_factory: F, sink_writer_param: SinkWriterParam, + is_external_sink: bool, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -77,6 +77,7 @@ fn force_delete_only(c: StreamChunk) -> StreamChunk { impl SinkExecutor { #[allow(clippy::too_many_arguments)] + #[expect(clippy::unused_async)] pub async fn new( actor_context: ActorContextRef, info: ExecutorInfo, @@ -85,10 +86,8 @@ impl SinkExecutor { sink_param: SinkParam, columns: Vec, log_store_factory: F, + is_external_sink: bool, ) -> StreamExecutorResult { - let (log_reader, log_writer) = log_store_factory.build().await; - - let sink = build_sink(sink_param.clone())?; let input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) @@ -99,16 +98,19 @@ impl SinkExecutor { actor_context, info, input, - sink, input_columns: columns, sink_param, - log_reader, - log_writer, + log_store_factory, sink_writer_param, + is_external_sink, }) } - fn execute_inner(self) -> BoxedMessageStream { + #[try_stream(boxed, ok = Message, error = StreamExecutorError)] + async fn execute_inner(self) { + let sink_id = self.sink_param.sink_id; + let actor_id = self.actor_context.id; + let stream_key = self.info.pk_indices.clone(); let stream_key_sink_pk_mismatch = { @@ -117,42 +119,72 @@ impl SinkExecutor { .any(|i| !self.sink_param.downstream_pk.contains(i)) }; - let write_log_stream = Self::execute_write_log( - self.input, - stream_key, - self.log_writer - .monitored(self.sink_writer_param.sink_metrics.clone()), - self.sink_param.sink_id, + let input = self.input.execute(); + + let input_row_count = self + .actor_context + .streaming_metrics + .sink_input_row_count + .with_guarded_label_values(&[ + &sink_id.to_string(), + &actor_id.to_string(), + &self.actor_context.fragment_id.to_string(), + ]); + + let input = input.inspect_ok(move |msg| { + if let Message::Chunk(c) = msg { + input_row_count.inc_by(c.capacity() as u64); + } + }); + + let processed_input = Self::process_input( + input, self.sink_param.sink_type, - self.actor_context.clone(), + stream_key, stream_key_sink_pk_mismatch, ); - dispatch_sink!(self.sink, sink, { - let consume_log_stream = Self::execute_consume_log( - sink, - self.log_reader, - self.input_columns, - self.sink_writer_param, - self.actor_context, - self.info, + if !self.is_external_sink { + #[for_await] + for msg in processed_input { + yield msg?; + } + } else { + let sink = build_sink(self.sink_param)?; + let (log_reader, log_writer) = self.log_store_factory.build().await; + + let write_log_stream = Self::execute_write_log( + processed_input, + log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + actor_id, ); - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }) + + let output = dispatch_sink!(sink, sink, { + let consume_log_stream = Self::execute_consume_log( + sink, + log_reader, + self.input_columns, + self.sink_writer_param, + self.actor_context, + self.info, + ); + // TODO: may try to remove the boxed + select(consume_log_stream.into_stream(), write_log_stream).boxed() + }); + #[for_await] + for msg in output { + yield msg?; + } + } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_write_log( - input: BoxedExecutor, - stream_key: PkIndices, + input: impl MessageStream, mut log_writer: impl LogWriter, - sink_id: SinkId, - sink_type: SinkType, - actor_context: ActorContextRef, - stream_key_sink_pk_mismatch: bool, + actor_id: ActorId, ) { - let mut input = input.execute(); - + pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; let epoch_pair = barrier.epoch; @@ -164,11 +196,42 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); - // for metrics - let sink_id_str = sink_id.to_string(); - let actor_id_str = actor_context.id.to_string(); - let fragment_id_str = actor_context.fragment_id.to_string(); + #[for_await] + for msg in input { + match msg? { + Message::Watermark(w) => yield Message::Watermark(w), + Message::Chunk(chunk) => { + log_writer.write_chunk(chunk.clone()).await?; + yield Message::Chunk(chunk); + } + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => log_writer.pause()?, + Mutation::Resume => log_writer.resume()?, + _ => (), + } + } + + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_id) { + log_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + yield Message::Barrier(barrier); + } + } + } + } + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn process_input( + input: impl MessageStream, + sink_type: SinkType, + stream_key: PkIndices, + stream_key_sink_pk_mismatch: bool, + ) { // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order // stream key: a,b // sink pk: a @@ -207,12 +270,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => watermark = Some(w), Message::Chunk(c) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(c.capacity() as u64); - chunk_buffer.push_chunk(c); } Message::Barrier(barrier) => { @@ -234,28 +291,11 @@ impl SinkExecutor { } for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) { - log_writer.write_chunk(c.clone()).await?; yield Message::Chunk(c); } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) } - - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -266,12 +306,6 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - actor_context - .streaming_metrics - .sink_input_row_count - .with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str]) - .inc_by(chunk.capacity() as u64); - // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE // V->V). let chunk = merge_chunk_row(chunk, &stream_key); @@ -284,27 +318,9 @@ impl SinkExecutor { chunk }; - log_writer.write_chunk(chunk.clone()).await?; - - // Use original chunk instead of the reordered one as the executor output. yield Message::Chunk(chunk); } Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, - _ => (), - } - } - - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) - { - log_writer.update_vnode_bitmap(vnode_bitmap).await?; - } yield Message::Barrier(barrier); } } @@ -487,7 +503,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -504,6 +519,7 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); @@ -615,7 +631,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -632,6 +647,7 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); @@ -740,7 +756,6 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), - target_table: None, }; let info = ExecutorInfo { @@ -757,6 +772,7 @@ mod test { sink_param, columns, BoundedInMemLogStoreFactory::new(1), + true, ) .await .unwrap(); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 615da9d81219f..a2e135e718977 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::catalog::{ColumnCatalog, TableId}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ @@ -47,7 +47,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_id = sink_desc.get_id().into(); let db_name = sink_desc.get_db_name().into(); let sink_from_name = sink_desc.get_sink_from_name().into(); - let target_table = sink_desc.get_target_table().cloned().ok().map(TableId::new); + // when target table is not present, it should be an external sink + let is_external_sink = sink_desc.get_target_table().is_err(); let properties = sink_desc.get_properties().clone(); let downstream_pk = sink_desc .downstream_pk @@ -102,7 +103,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { format_desc, db_name, sink_from_name, - target_table, }; let sink_id_str = format!("{}", sink_id.sink_id); @@ -140,6 +140,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + is_external_sink, ) .await?, )) @@ -171,6 +172,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, + is_external_sink, ) .await?, )) From 314a022f24b545d76e8b3840457805d6699c80e0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 18:04:39 +0800 Subject: [PATCH 2/7] fix validate --- src/connector/src/sink/mod.rs | 11 ++++++----- src/meta/src/stream/sink.rs | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 8a5a9d49d25b3..ecd929b99c87a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -202,15 +202,16 @@ impl SinkParam { fields: self.columns.iter().map(Field::from).collect(), } } -} -impl From for SinkParam { - fn from(sink_catalog: SinkCatalog) -> Self { + pub fn from_catalog(sink_catalog: SinkCatalog) -> Option { + if sink_catalog.target_table.is_some() { + return None; + } let columns = sink_catalog .visible_columns() .map(|col| col.column_desc.clone()) .collect(); - Self { + Some(Self { sink_id: sink_catalog.id, properties: sink_catalog.properties, columns, @@ -219,7 +220,7 @@ impl From for SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, - } + }) } } diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index 8544011071ec2..a33bb2d51ca3b 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -21,9 +21,10 @@ use crate::MetaResult; pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> { let sink_catalog = SinkCatalog::from(prost_sink_catalog); - let param = SinkParam::from(sink_catalog); - - let sink = build_sink(param)?; - - dispatch_sink!(sink, sink, Ok(sink.validate().await?)) + if let Some(param) = SinkParam::from_catalog(sink_catalog) { + let sink = build_sink(param)?; + dispatch_sink!(sink, sink, Ok(sink.validate().await?)) + } else { + Ok(()) + } } From 44a7491c46645386cf7544646baa33e2b2056bd6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 18:49:44 +0800 Subject: [PATCH 3/7] enable e2e test and fix --- ci/scripts/e2e-sink-test.sh | 1 + .../src/optimizer/plan_node/stream_sink.rs | 56 ++++++++++--------- src/stream/src/from_proto/sink.rs | 8 ++- 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 0dbdce47abf55..92f3e30f6cf4c 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -61,6 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt' +sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt' sleep 1 echo "--- testing remote sinks" diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..e6a25d5ede446 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -106,21 +106,24 @@ impl StreamSink { format_desc, )?; - // check and ensure that the sink connector is specified and supported - match sink.properties.get(CONNECTOR_TYPE_KEY) { - Some(connector) => match_sink_name_str!( - connector.to_lowercase().as_str(), - SinkType, - Ok(()), - |other| Err(SinkError::Config(anyhow!( - "unsupported sink type {}", - other - ))) - )?, - None => { - return Err( - SinkError::Config(anyhow!("connector not specified when create sink")).into(), - ); + if sink.target_table.is_none() { + // check and ensure that the sink connector is specified and supported + match sink.properties.get(CONNECTOR_TYPE_KEY) { + Some(connector) => match_sink_name_str!( + connector.to_lowercase().as_str(), + SinkType, + Ok(()), + |other| Err(SinkError::Config(anyhow!( + "unsupported sink type {}", + other + ))) + )?, + None => { + return Err(SinkError::Config(anyhow!( + "connector not specified when create sink" + )) + .into()); + } } } @@ -435,10 +438,10 @@ impl StreamNode for StreamSink { PbNodeBody::Sink(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), - log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() { - SinkDecouple::Default => { - let enable_sink_decouple = - match_sink_name_str!( + log_store_type: if self.sink_desc.target_table.is_none() { + match self.base.ctx().session_ctx().config().sink_decouple() { + SinkDecouple::Default => { + let enable_sink_decouple = match_sink_name_str!( self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect( "have checked connector is contained when create the `StreamSink`" ).to_lowercase().as_str(), @@ -448,14 +451,17 @@ impl StreamNode for StreamSink { "have checked connector is supported when create the `StreamSink`" ) ); - if enable_sink_decouple { - SinkLogStoreType::KvLogStore as i32 - } else { - SinkLogStoreType::InMemoryLogStore as i32 + if enable_sink_decouple { + SinkLogStoreType::KvLogStore as i32 + } else { + SinkLogStoreType::InMemoryLogStore as i32 + } } + SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, + SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, } - SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, - SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, + } else { + SinkLogStoreType::InMemoryLogStore as i32 }, }) } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index a2e135e718977..58fa01bd649c4 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -62,7 +62,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { .map(ColumnCatalog::from) .collect_vec(); - let connector = { + let connector = if is_external_sink { let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) })?; @@ -77,8 +77,10 @@ impl ExecutorBuilder for SinkExecutorBuilder { other ))) } - ) - }?; + )? + } else { + "table" + }; let format_desc = match &sink_desc.format_desc { // Case A: new syntax `format ... encode ...` Some(f) => Some(f.clone().try_into()?), From 55fedb7aba8063a84f059386a0c34bd799e615b8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 22:55:48 +0800 Subject: [PATCH 4/7] refactor --- e2e_test/sink/sink_into_table.slt | 3 + src/connector/src/sink/mod.rs | 22 ++++--- .../src/sink/{blackhole.rs => trivial.rs} | 39 +++++++++--- .../src/optimizer/plan_node/stream_sink.rs | 62 ++++++++++--------- src/meta/src/stream/sink.rs | 11 ++-- src/stream/src/executor/sink.rs | 18 +++--- src/stream/src/from_proto/sink.rs | 12 +--- 7 files changed, 97 insertions(+), 70 deletions(-) rename src/connector/src/sink/{blackhole.rs => trivial.rs} (71%) diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt index d5ce724b8f5a7..4537810651be7 100644 --- a/e2e_test/sink/sink_into_table.slt +++ b/e2e_test/sink/sink_into_table.slt @@ -6,6 +6,9 @@ SET RW_IMPLICIT_FLUSH TO true; statement ok create table t_simple (v1 int, v2 int); +statement error unsupported sink type table +create sink table_sink from t_simple with (connector = 'table'); + statement ok create table m_simple (v1 int primary key, v2 int); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index ecd929b99c87a..366d2aba7736b 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -13,7 +13,6 @@ // limitations under the License. pub mod big_query; -pub mod blackhole; pub mod boxed; pub mod catalog; pub mod clickhouse; @@ -33,6 +32,7 @@ pub mod redis; pub mod remote; pub mod starrocks; pub mod test_sink; +pub mod trivial; pub mod utils; pub mod writer; @@ -72,7 +72,7 @@ macro_rules! for_all_sinks { { Redis, $crate::sink::redis::RedisSink }, { Kafka, $crate::sink::kafka::KafkaSink }, { Pulsar, $crate::sink::pulsar::PulsarSink }, - { BlackHole, $crate::sink::blackhole::BlackHoleSink }, + { BlackHole, $crate::sink::trivial::BlackHoleSink }, { Kinesis, $crate::sink::kinesis::KinesisSink }, { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, { Iceberg, $crate::sink::iceberg::IcebergSink }, @@ -87,7 +87,8 @@ macro_rules! for_all_sinks { { Starrocks, $crate::sink::starrocks::StarrocksSink }, { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, - { Test, $crate::sink::test_sink::TestSink } + { Test, $crate::sink::test_sink::TestSink }, + { Table, $crate::sink::trivial::TableSink } } $(,$arg)* } @@ -202,16 +203,15 @@ impl SinkParam { fields: self.columns.iter().map(Field::from).collect(), } } +} - pub fn from_catalog(sink_catalog: SinkCatalog) -> Option { - if sink_catalog.target_table.is_some() { - return None; - } +impl From for SinkParam { + fn from(sink_catalog: SinkCatalog) -> Self { let columns = sink_catalog .visible_columns() .map(|col| col.column_desc.clone()) .collect(); - Some(Self { + Self { sink_id: sink_catalog.id, properties: sink_catalog.properties, columns, @@ -220,7 +220,7 @@ impl SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, - }) + } } } @@ -384,6 +384,10 @@ impl SinkImpl { } ) } + + pub fn is_sink_into_table(&self) -> bool { + matches!(self, SinkImpl::Table(_)) + } } pub fn build_sink(param: SinkParam) -> Result { diff --git a/src/connector/src/sink/blackhole.rs b/src/connector/src/sink/trivial.rs similarity index 71% rename from src/connector/src/sink/blackhole.rs rename to src/connector/src/sink/trivial.rs index 8883e85440754..d7339a782a495 100644 --- a/src/connector/src/sink/blackhole.rs +++ b/src/connector/src/sink/trivial.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; + use async_trait::async_trait; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; @@ -21,26 +23,49 @@ use crate::sink::{ }; pub const BLACKHOLE_SINK: &str = "blackhole"; +pub const TABLE_SINK: &str = "table"; + +pub trait TrivialSinkName: Send + 'static { + const SINK_NAME: &'static str; +} #[derive(Debug)] -pub struct BlackHoleSink; +pub struct BlackHoleSinkName; + +impl TrivialSinkName for BlackHoleSinkName { + const SINK_NAME: &'static str = BLACKHOLE_SINK; +} -impl TryFrom for BlackHoleSink { +pub type BlackHoleSink = TrivialSink; + +#[derive(Debug)] +pub struct TableSinkName; + +impl TrivialSinkName for TableSinkName { + const SINK_NAME: &'static str = TABLE_SINK; +} + +pub type TableSink = TrivialSink; + +#[derive(Debug)] +pub struct TrivialSink(PhantomData); + +impl TryFrom for TrivialSink { type Error = SinkError; fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) + Ok(Self(PhantomData)) } } -impl Sink for BlackHoleSink { +impl Sink for TrivialSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = Self; - const SINK_NAME: &'static str = BLACKHOLE_SINK; + const SINK_NAME: &'static str = T::SINK_NAME; async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) + Ok(Self(PhantomData)) } async fn validate(&self) -> Result<()> { @@ -49,7 +74,7 @@ impl Sink for BlackHoleSink { } #[async_trait] -impl LogSinker for BlackHoleSink { +impl LogSinker for TrivialSink { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { loop { let (epoch, item) = log_reader.next_item().await?; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index e6a25d5ede446..832892b671077 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -29,6 +29,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, @@ -106,24 +107,30 @@ impl StreamSink { format_desc, )?; - if sink.target_table.is_none() { - // check and ensure that the sink connector is specified and supported - match sink.properties.get(CONNECTOR_TYPE_KEY) { - Some(connector) => match_sink_name_str!( + let unsupported_sink = + |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); + + // check and ensure that the sink connector is specified and supported + match sink.properties.get(CONNECTOR_TYPE_KEY) { + Some(connector) => { + match_sink_name_str!( connector.to_lowercase().as_str(), SinkType, - Ok(()), - |other| Err(SinkError::Config(anyhow!( - "unsupported sink type {}", - other - ))) - )?, - None => { - return Err(SinkError::Config(anyhow!( - "connector not specified when create sink" - )) - .into()); - } + { + // the table sink is created by with properties + if connector == TABLE_SINK && sink.target_table.is_none() { + unsupported_sink(TABLE_SINK) + } else { + Ok(()) + } + }, + |other: &str| unsupported_sink(other) + )?; + } + None => { + return Err( + SinkError::Config(anyhow!("connector not specified when create sink")).into(), + ); } } @@ -438,10 +445,10 @@ impl StreamNode for StreamSink { PbNodeBody::Sink(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), - log_store_type: if self.sink_desc.target_table.is_none() { - match self.base.ctx().session_ctx().config().sink_decouple() { - SinkDecouple::Default => { - let enable_sink_decouple = match_sink_name_str!( + log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() { + SinkDecouple::Default => { + let enable_sink_decouple = + match_sink_name_str!( self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect( "have checked connector is contained when create the `StreamSink`" ).to_lowercase().as_str(), @@ -451,17 +458,14 @@ impl StreamNode for StreamSink { "have checked connector is supported when create the `StreamSink`" ) ); - if enable_sink_decouple { - SinkLogStoreType::KvLogStore as i32 - } else { - SinkLogStoreType::InMemoryLogStore as i32 - } + if enable_sink_decouple { + SinkLogStoreType::KvLogStore as i32 + } else { + SinkLogStoreType::InMemoryLogStore as i32 } - SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, - SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, } - } else { - SinkLogStoreType::InMemoryLogStore as i32 + SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, + SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, }, }) } diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index a33bb2d51ca3b..8544011071ec2 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -21,10 +21,9 @@ use crate::MetaResult; pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> { let sink_catalog = SinkCatalog::from(prost_sink_catalog); - if let Some(param) = SinkParam::from_catalog(sink_catalog) { - let sink = build_sink(param)?; - dispatch_sink!(sink, sink, Ok(sink.validate().await?)) - } else { - Ok(()) - } + let param = SinkParam::from(sink_catalog); + + let sink = build_sink(param)?; + + dispatch_sink!(sink, sink, Ok(sink.validate().await?)) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 1ebf45b02d560..b8e786f6f9c40 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -28,7 +28,9 @@ use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, }; -use risingwave_connector::sink::{build_sink, LogSinker, Sink, SinkParam, SinkWriterParam}; +use risingwave_connector::sink::{ + build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, +}; use thiserror_ext::AsReport; use super::error::{StreamExecutorError, StreamExecutorResult}; @@ -42,11 +44,11 @@ pub struct SinkExecutor { actor_context: ActorContextRef, info: ExecutorInfo, input: BoxedExecutor, + sink: SinkImpl, input_columns: Vec, sink_param: SinkParam, log_store_factory: F, sink_writer_param: SinkWriterParam, - is_external_sink: bool, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -86,8 +88,8 @@ impl SinkExecutor { sink_param: SinkParam, columns: Vec, log_store_factory: F, - is_external_sink: bool, ) -> StreamExecutorResult { + let sink = build_sink(sink_param.clone())?; let input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) @@ -98,11 +100,11 @@ impl SinkExecutor { actor_context, info, input, + sink, input_columns: columns, sink_param, log_store_factory, sink_writer_param, - is_external_sink, }) } @@ -144,13 +146,12 @@ impl SinkExecutor { stream_key_sink_pk_mismatch, ); - if !self.is_external_sink { + if !self.sink.is_sink_into_table() { #[for_await] for msg in processed_input { yield msg?; } } else { - let sink = build_sink(self.sink_param)?; let (log_reader, log_writer) = self.log_store_factory.build().await; let write_log_stream = Self::execute_write_log( @@ -159,7 +160,7 @@ impl SinkExecutor { actor_id, ); - let output = dispatch_sink!(sink, sink, { + let output = dispatch_sink!(self.sink, sink, { let consume_log_stream = Self::execute_consume_log( sink, log_reader, @@ -519,7 +520,6 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), - true, ) .await .unwrap(); @@ -647,7 +647,6 @@ mod test { sink_param, columns.clone(), BoundedInMemLogStoreFactory::new(1), - true, ) .await .unwrap(); @@ -772,7 +771,6 @@ mod test { sink_param, columns, BoundedInMemLogStoreFactory::new(1), - true, ) .await .unwrap(); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 58fa01bd649c4..c60467fa4ad17 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -47,8 +47,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_id = sink_desc.get_id().into(); let db_name = sink_desc.get_db_name().into(); let sink_from_name = sink_desc.get_sink_from_name().into(); - // when target table is not present, it should be an external sink - let is_external_sink = sink_desc.get_target_table().is_err(); let properties = sink_desc.get_properties().clone(); let downstream_pk = sink_desc .downstream_pk @@ -62,7 +60,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { .map(ColumnCatalog::from) .collect_vec(); - let connector = if is_external_sink { + let connector = { let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) })?; @@ -77,10 +75,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { other ))) } - )? - } else { - "table" - }; + ) + }?; let format_desc = match &sink_desc.format_desc { // Case A: new syntax `format ... encode ...` Some(f) => Some(f.clone().try_into()?), @@ -142,7 +138,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, - is_external_sink, ) .await?, )) @@ -174,7 +169,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_param, columns, factory, - is_external_sink, ) .await?, )) From 7d23d032df3a3f179a49240c16057c0c92fedcc9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 22:57:56 +0800 Subject: [PATCH 5/7] fix --- src/stream/src/executor/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index b8e786f6f9c40..5dcaa65679ea0 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -146,7 +146,7 @@ impl SinkExecutor { stream_key_sink_pk_mismatch, ); - if !self.sink.is_sink_into_table() { + if self.sink.is_sink_into_table() { #[for_await] for msg in processed_input { yield msg?; From 2c00dcebf469d9bbb11ba44ae05c4a16be4b4614 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 23:13:01 +0800 Subject: [PATCH 6/7] refactor --- src/stream/src/executor/sink.rs | 61 ++++++++++++++++----------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 5dcaa65679ea0..505e0e47e62d5 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -108,8 +108,7 @@ impl SinkExecutor { }) } - #[try_stream(boxed, ok = Message, error = StreamExecutorError)] - async fn execute_inner(self) { + fn execute_inner(self) -> BoxedMessageStream { let sink_id = self.sink_param.sink_id; let actor_id = self.actor_context.id; @@ -139,7 +138,7 @@ impl SinkExecutor { } }); - let processed_input = Self::process_input( + let processed_input = Self::process_msg( input, self.sink_param.sink_type, stream_key, @@ -147,35 +146,33 @@ impl SinkExecutor { ); if self.sink.is_sink_into_table() { - #[for_await] - for msg in processed_input { - yield msg?; - } + processed_input.boxed() } else { - let (log_reader, log_writer) = self.log_store_factory.build().await; - - let write_log_stream = Self::execute_write_log( - processed_input, - log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), - actor_id, - ); - - let output = dispatch_sink!(self.sink, sink, { - let consume_log_stream = Self::execute_consume_log( - sink, - log_reader, - self.input_columns, - self.sink_writer_param, - self.actor_context, - self.info, - ); - // TODO: may try to remove the boxed - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }); - #[for_await] - for msg in output { - yield msg?; - } + self.log_store_factory + .build() + .map(move |(log_reader, log_writer)| { + let write_log_stream = Self::execute_write_log( + processed_input, + log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + actor_id, + ); + + dispatch_sink!(self.sink, sink, { + let consume_log_stream = Self::execute_consume_log( + sink, + log_reader, + self.input_columns, + self.sink_writer_param, + self.actor_context, + self.info, + ); + // TODO: may try to remove the boxed + select(consume_log_stream.into_stream(), write_log_stream).boxed() + }) + }) + .into_stream() + .flatten() + .boxed() } } @@ -227,7 +224,7 @@ impl SinkExecutor { } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn process_input( + async fn process_msg( input: impl MessageStream, sink_type: SinkType, stream_key: PkIndices, From db7c3d0cd67725a481ce347ed95094b58f87637c Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 2 Jan 2024 17:36:50 +0800 Subject: [PATCH 7/7] refactor --- src/meta/src/hummock/manager/mod.rs | 7 ++--- src/stream/src/from_proto/sink.rs | 47 ++++++++++++++--------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0ea5c51904c21..e37848a77ffd5 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -81,10 +81,9 @@ use crate::hummock::metrics_utils::{ trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; -use crate::manager::{ - ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, MetadataManager, TableId, - META_NODE_ID, -}; +#[cfg(any(test, feature = "test"))] +use crate::manager::{ClusterManagerRef, FragmentManagerRef}; +use crate::manager::{IdCategory, MetaSrvEnv, MetadataManager, TableId, META_NODE_ID}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction, VarTransaction, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index a1071096e218a..659450e83c077 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -22,7 +22,6 @@ use risingwave_connector::sink::{ SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; -use risingwave_storage::dispatch_state_store; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -37,7 +36,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { async fn new_boxed_executor( params: ExecutorParams, node: &Self::Node, - _store: impl StateStore, + state_store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [input_executor]: [_; 1] = params.input.try_into().unwrap(); @@ -150,29 +149,27 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector, ); // TODO: support setting max row count in config - dispatch_state_store!(params.env.state_store(), state_store, { - let factory = KvLogStoreFactory::new( - state_store, - node.table.as_ref().unwrap().clone(), - params.vnode_bitmap.clone().map(Arc::new), - 65536, - metrics, - log_store_identity, - ); - - Ok(Box::new( - SinkExecutor::new( - params.actor_context, - params.info, - input_executor, - sink_write_param, - sink_param, - columns, - factory, - ) - .await?, - )) - }) + let factory = KvLogStoreFactory::new( + state_store, + node.table.as_ref().unwrap().clone(), + params.vnode_bitmap.clone().map(Arc::new), + 65536, + metrics, + log_store_identity, + ); + + Ok(Box::new( + SinkExecutor::new( + params.actor_context, + params.info, + input_executor, + sink_write_param, + sink_param, + columns, + factory, + ) + .await?, + )) } } }