diff --git a/Cargo.lock b/Cargo.lock index 919c5a234dc00..d46a420f8b71c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6889,6 +6889,7 @@ dependencies = [ "criterion", "csv", "duration-str", + "easy-ext", "enum-as-inner", "futures", "futures-async-stream", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8ae92464caaaa..c919bd5ed2397 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6 ] } csv = "1.2" duration-str = "0.5.1" +easy-ext = "1" enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } diff --git a/src/connector/src/sink/blackhole.rs b/src/connector/src/sink/blackhole.rs new file mode 100644 index 0000000000000..1f1ace3b0d104 --- /dev/null +++ b/src/connector/src/sink/blackhole.rs @@ -0,0 +1,68 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::{ + DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam, +}; + +pub const BLACKHOLE_SINK: &str = "blackhole"; + +#[derive(Debug)] +pub struct BlackHoleSink; + +impl TryFrom for BlackHoleSink { + type Error = SinkError; + + fn try_from(_value: SinkParam) -> std::result::Result { + Ok(Self) + } +} + +impl Sink for BlackHoleSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = Self; + + const SINK_NAME: &'static str = BLACKHOLE_SINK; + + async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { + Ok(Self) + } + + async fn validate(&self) -> Result<()> { + Ok(()) + } +} + +impl LogSinker for BlackHoleSink { + async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { + log_reader.init().await?; + loop { + let (epoch, item) = log_reader.next_item().await?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, .. } => { + log_reader + .truncate(TruncateOffset::Chunk { epoch, chunk_id }) + .await?; + } + LogStoreReadItem::Barrier { .. } => { + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + } +} diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 08287b789eaf9..b9733863feccb 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -29,9 +29,9 @@ use serde_with::serde_as; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::ClickHouseCommon; +use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - Result, Sink, SinkError, SinkParam, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; pub const CLICKHOUSE_SINK: &str = "clickhouse"; @@ -209,7 +209,7 @@ impl ClickHouseSink { } impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = ClickHouseSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = CLICKHOUSE_SINK; @@ -243,14 +243,15 @@ impl Sink for ClickHouseSink { Ok(()) } - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - ClickHouseSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(ClickHouseSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } } pub struct ClickHouseSinkWriter { diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index bbcc7b636b17c..0b233d3c4155a 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -21,7 +21,8 @@ use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient}; use tracing::warn; -use crate::sink::{Result, SinkError, SinkParam, SinkWriter}; +use crate::sink::writer::SinkWriter; +use crate::sink::{Result, SinkError, SinkParam}; pub struct CoordinatedSinkWriter>> { epoch: u64, diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 3e226bcb37d46..0a32e4406108b 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -29,6 +29,7 @@ use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_D use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::common::DorisCommon; use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, }; @@ -155,18 +156,19 @@ impl DorisSink { impl Sink for DorisSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = DorisSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = DORIS_SINK; - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - DorisSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(DorisSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 25c5aa7765114..24249531ffbf5 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -36,12 +36,12 @@ use serde_json::Value; use url::Url; use super::{ - Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::deserialize_bool_from_string; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait}; +use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". @@ -257,7 +257,7 @@ impl IcebergSink { impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; - type Writer = CoordinatedSinkWriter; + type LogSinker = LogSinkerOf>; const SINK_NAME: &'static str = ICEBERG_SINK; @@ -266,7 +266,7 @@ impl Sink for IcebergSink { Ok(()) } - async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let table = self.create_table().await?; let inner = IcebergWriter { @@ -277,7 +277,7 @@ impl Sink for IcebergSink { .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, table, }; - CoordinatedSinkWriter::new( + Ok(CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -291,7 +291,8 @@ impl Sink for IcebergSink { })?, inner, ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn new_coordinator( diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 0cad48bbb57f6..a5a524048bfd1 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -32,14 +32,15 @@ use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; use super::{ - FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, - SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::{ - DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter, +use crate::sink::writer::{ + FormattedSink, LogSinkerOf, SinkWriterExt, SinkWriterV1, SinkWriterV1Adapter, }; +use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam}; use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ @@ -300,11 +301,11 @@ impl TryFrom for KafkaSink { impl Sink for KafkaSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = SinkWriterV1Adapter; + type LogSinker = LogSinkerOf>; const SINK_NAME: &'static str = KAFKA_SINK; - async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { Ok(SinkWriterV1Adapter::new( KafkaSinkWriter::new( self.config.clone(), @@ -318,7 +319,8 @@ impl Sink for KafkaSink { )?, ) .await?, - )) + ) + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index f16ca3905b884..bf024ba53b252 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -26,13 +26,14 @@ use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; -use super::{FormattedSink, SinkParam}; +use super::SinkParam; use crate::common::KinesisCommon; use crate::dispatch_sink_formatter_impl; use crate::sink::formatter::SinkFormatterImpl; +use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; pub const KINESIS_SINK: &str = "kinesis"; @@ -66,7 +67,7 @@ impl TryFrom for KinesisSink { impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = KinesisSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = KINESIS_SINK; @@ -93,8 +94,8 @@ impl Sink for KinesisSink { Ok(()) } - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - KinesisSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(KinesisSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), @@ -102,7 +103,8 @@ impl Sink for KinesisSink { self.db_name.clone(), self.sink_from_name.clone(), ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } } diff --git a/src/stream/src/common/log_store/mod.rs b/src/connector/src/sink/log_store.rs similarity index 80% rename from src/stream/src/common/log_store/mod.rs rename to src/connector/src/sink/log_store.rs index 35f1a4145ec55..d8dae1db0ce19 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/connector/src/sink/log_store.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod in_mem; -pub mod kv_log_store; - use std::cmp::Ordering; use std::fmt::Debug; use std::future::Future; @@ -24,25 +21,8 @@ use anyhow::anyhow; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::EpochPair; -use risingwave_common::util::value_encoding::error::ValueEncodingError; -use risingwave_storage::error::StorageError; - -#[derive(thiserror::Error, Debug)] -pub enum LogStoreError { - #[error("EndOfLogStream")] - EndOfLogStream, - - #[error("Storage error: {0}")] - StorageError(#[from] StorageError), - - #[error(transparent)] - Internal(#[from] anyhow::Error), - - #[error("Value encoding error: {0}")] - ValueEncoding(#[from] ValueEncodingError), -} -pub type LogStoreResult = Result; +pub type LogStoreResult = Result; pub type ChunkId = usize; #[derive(Debug, PartialEq, Copy, Clone)] @@ -88,8 +68,7 @@ impl TruncateOffset { "new item epoch {} not match current chunk offset epoch {}", epoch, offset_epoch - ) - .into()); + )); } } TruncateOffset::Barrier { @@ -100,8 +79,7 @@ impl TruncateOffset { "new item epoch {} not exceed barrier offset epoch {}", epoch, offset_epoch - ) - .into()); + )); } } } @@ -145,7 +123,7 @@ pub trait LogWriter { ) -> impl Future> + Send + '_; } -pub trait LogReader { +pub trait LogReader: Send + Sized + 'static { /// Initialize the log reader. Usually function as waiting for log writer to be initialized. fn init(&mut self) -> impl Future> + Send + '_; @@ -169,9 +147,54 @@ pub trait LogStoreFactory: 'static { fn build(self) -> impl Future + Send; } +pub struct TransformChunkLogReader StreamChunk, R: LogReader> { + f: F, + inner: R, +} + +impl StreamChunk + Send + 'static, R: LogReader> LogReader + for TransformChunkLogReader +{ + fn init(&mut self) -> impl Future> + Send + '_ { + self.inner.init() + } + + async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { + let (epoch, item) = self.inner.next_item().await?; + let item = match item { + LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk { + chunk: (self.f)(chunk), + chunk_id, + }, + other => other, + }; + Ok((epoch, item)) + } + + fn truncate( + &mut self, + offset: TruncateOffset, + ) -> impl Future> + Send + '_ { + self.inner.truncate(offset) + } +} + +#[easy_ext::ext(LogStoreTransformChunkLogReader)] +impl T +where + T: LogReader, +{ + pub fn transform_chunk StreamChunk + Sized>( + self, + f: F, + ) -> TransformChunkLogReader { + TransformChunkLogReader { f, inner: self } + } +} + #[cfg(test)] mod tests { - use crate::common::log_store::TruncateOffset; + use crate::sink::log_store::TruncateOffset; #[test] fn test_truncate_offset_cmp() { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index f312453481db6..c67fdb8c1983e 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod blackhole; pub mod boxed; pub mod catalog; pub mod clickhouse; @@ -23,20 +24,22 @@ pub mod formatter; pub mod iceberg; pub mod kafka; pub mod kinesis; +pub mod log_store; pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; pub mod test_sink; pub mod utils; +pub mod writer; use std::collections::HashMap; -use std::sync::Arc; +use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; use anyhow::anyhow; use async_trait::async_trait; -use risingwave_common::array::StreamChunk; +use prometheus::{Histogram, HistogramOpts}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; @@ -48,18 +51,9 @@ use thiserror::Error; pub use tracing; use self::catalog::SinkType; -use self::encoder::SerTo; -use self::formatter::SinkFormatter; use crate::sink::catalog::{SinkCatalog, SinkId}; -pub use crate::sink::clickhouse::ClickHouseSink; -pub use crate::sink::iceberg::{IcebergSink, RemoteIcebergSink}; -pub use crate::sink::kafka::KafkaSink; -pub use crate::sink::kinesis::KinesisSink; -pub use crate::sink::nats::NatsSink; -pub use crate::sink::pulsar::PulsarSink; -pub use crate::sink::redis::RedisSink; -pub use crate::sink::remote::{CassandraSink, DeltaLakeSink, ElasticSearchSink, JdbcSink}; -pub use crate::sink::test_sink::TestSink; +use crate::sink::log_store::LogReader; +use crate::sink::writer::SinkWriter; use crate::ConnectorParams; #[macro_export] @@ -67,21 +61,21 @@ macro_rules! for_all_sinks { ($macro:path $(, $arg:tt)*) => { $macro! { { - { Redis, $crate::sink::RedisSink }, - { Kafka, $crate::sink::KafkaSink }, - { Pulsar, $crate::sink::PulsarSink }, - { BlackHole, $crate::sink::BlackHoleSink }, - { Kinesis, $crate::sink::KinesisSink }, - { ClickHouse, $crate::sink::ClickHouseSink }, - { Iceberg, $crate::sink::IcebergSink }, - { Nats, $crate::sink::NatsSink }, - { RemoteIceberg, $crate::sink::RemoteIcebergSink }, - { Jdbc, $crate::sink::JdbcSink }, - { DeltaLake, $crate::sink::DeltaLakeSink }, - { ElasticSearch, $crate::sink::ElasticSearchSink }, - { Cassandra, $crate::sink::CassandraSink }, + { Redis, $crate::sink::redis::RedisSink }, + { Kafka, $crate::sink::kafka::KafkaSink }, + { Pulsar, $crate::sink::pulsar::PulsarSink }, + { BlackHole, $crate::sink::blackhole::BlackHoleSink }, + { Kinesis, $crate::sink::kinesis::KinesisSink }, + { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, + { Iceberg, $crate::sink::iceberg::IcebergSink }, + { Nats, $crate::sink::nats::NatsSink }, + { RemoteIceberg, $crate::sink::iceberg::RemoteIcebergSink }, + { Jdbc, $crate::sink::remote::JdbcSink }, + { DeltaLake, $crate::sink::remote::DeltaLakeSink }, + { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, + { Cassandra, $crate::sink::remote::CassandraSink }, { Doris, $crate::sink::doris::DorisSink }, - { Test, $crate::sink::TestSink } + { Test, $crate::sink::test_sink::TestSink } } $(,$arg)* } @@ -124,7 +118,7 @@ macro_rules! match_sink_name_str { }}; } -pub const DOWNSTREAM_SINK_KEY: &str = "connector"; +pub const CONNECTOR_TYPE_KEY: &str = "connector"; pub const SINK_TYPE_OPTION: &str = "type"; pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; @@ -201,21 +195,38 @@ impl From for SinkParam { } } +#[derive(Clone)] +pub struct SinkMetrics { + pub sink_commit_duration_metrics: Histogram, +} + +impl Default for SinkMetrics { + fn default() -> Self { + SinkMetrics { + sink_commit_duration_metrics: Histogram::with_opts(HistogramOpts::new( + "unused", "unused", + )) + .unwrap(), + } + } +} + #[derive(Clone, Default)] pub struct SinkWriterParam { pub connector_params: ConnectorParams, pub executor_id: u64, pub vnode_bitmap: Option, pub meta_client: Option, + pub sink_metrics: SinkMetrics, } pub trait Sink: TryFrom { const SINK_NAME: &'static str; - type Writer: SinkWriter; + type LogSinker: LogSinker; type Coordinator: SinkCommitCoordinator; async fn validate(&self) -> Result<()>; - async fn new_writer(&self, writer_param: SinkWriterParam) -> Result; + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result; #[expect(clippy::unused_async)] async fn new_coordinator( &self, @@ -225,129 +236,11 @@ pub trait Sink: TryFrom { } } -#[async_trait] -pub trait SinkWriter: Send + 'static { - type CommitMetadata: Send = (); - /// Begin a new epoch - async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; - - /// Write a stream chunk to sink - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; - - /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink - /// writer should commit the current epoch. - async fn barrier(&mut self, is_checkpoint: bool) -> Result; - - /// Clean up - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - /// Update the vnode bitmap of current sink writer - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { - Ok(()) - } -} - -#[async_trait] -// An old version of SinkWriter for backward compatibility -pub trait SinkWriterV1: Send + 'static { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; - - // the following interface is for transactions, if not supported, return Ok(()) - // start a transaction with epoch number. Note that epoch number should be increasing. - async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; - - // commits the current transaction and marks all messages in the transaction success. - async fn commit(&mut self) -> Result<()>; - - // aborts the current transaction because some error happens. we should rollback to the last - // commit point. - async fn abort(&mut self) -> Result<()>; -} - -/// A free-form sink that may output in multiple formats and encodings. Examples include kafka, -/// kinesis, nats and redis. -/// -/// The implementor specifies required key & value type (likely string or bytes), as well as how to -/// write a single pair. The provided `write_chunk` method would handle the interaction with a -/// `SinkFormatter`. -/// -/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait -/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis` -/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis. -pub trait FormattedSink { - type K; - type V; - async fn write_one(&mut self, k: Option, v: Option) -> Result<()>; - - async fn write_chunk( - &mut self, - chunk: StreamChunk, - formatter: &F, - ) -> Result<()> - where - F::K: SerTo, - F::V: SerTo, - { - for r in formatter.format_chunk(&chunk) { - let (event_key_object, event_object) = r?; - - self.write_one( - event_key_object.map(SerTo::ser_to).transpose()?, - event_object.map(SerTo::ser_to).transpose()?, - ) - .await?; - } - - Ok(()) - } -} - -pub struct SinkWriterV1Adapter { - is_empty: bool, - epoch: u64, - inner: W, -} - -impl SinkWriterV1Adapter { - pub(crate) fn new(inner: W) -> Self { - Self { - inner, - is_empty: true, - epoch: u64::MIN, - } - } -} - -#[async_trait] -impl SinkWriter for SinkWriterV1Adapter { - async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { - self.epoch = epoch; - Ok(()) - } - - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.is_empty { - self.is_empty = false; - self.inner.begin_epoch(self.epoch).await?; - } - self.inner.write_batch(chunk).await - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - if !self.is_empty { - self.inner.commit().await? - } - self.is_empty = true; - } - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - self.inner.abort().await - } +pub trait LogSinker: Send + 'static { + fn consume_log_and_sink( + self, + log_reader: impl LogReader, + ) -> impl Future> + Send + 'static; } #[async_trait] @@ -374,52 +267,8 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator { } } -pub const BLACKHOLE_SINK: &str = "blackhole"; - -#[derive(Debug)] -pub struct BlackHoleSink; - -impl TryFrom for BlackHoleSink { - type Error = SinkError; - - fn try_from(_value: SinkParam) -> std::result::Result { - Ok(Self) - } -} - -impl Sink for BlackHoleSink { - type Coordinator = DummySinkCommitCoordinator; - type Writer = Self; - - const SINK_NAME: &'static str = BLACKHOLE_SINK; - - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - Ok(Self) - } - - async fn validate(&self) -> Result<()> { - Ok(()) - } -} - -#[async_trait] -impl SinkWriter for BlackHoleSink { - async fn write_batch(&mut self, _chunk: StreamChunk) -> Result<()> { - Ok(()) - } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) - } -} - impl SinkImpl { pub fn new(mut param: SinkParam) -> Result { - const CONNECTOR_TYPE_KEY: &str = "connector"; const CONNECTION_NAME_KEY: &str = "connection.name"; const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets"; @@ -473,15 +322,6 @@ macro_rules! def_sink_impl { def_sink_impl!(); -impl SinkImpl { - pub fn get_connector(&self) -> &'static str { - fn get_name(_: &S) -> &'static str { - S::SINK_NAME - } - dispatch_sink!(self, sink, get_name(sink)) - } -} - pub type Result = std::result::Result; #[derive(Error, Debug)] diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 1a126eb2c74d2..8e3f3e2c18022 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -28,6 +28,7 @@ use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam}; use crate::common::NatsCommon; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; pub const NATS_SINK: &str = "nats"; @@ -87,7 +88,7 @@ impl TryFrom for NatsSink { impl Sink for NatsSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = NatsSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = NATS_SINK; @@ -109,8 +110,12 @@ impl Sink for NatsSink { Ok(()) } - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - NatsSinkWriter::new(self.config.clone(), self.schema.clone()).await + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok( + NatsSinkWriter::new(self.config.clone(), self.schema.clone()) + .await? + .into_log_sinker(writer_param.sink_metrics), + ) } } diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index df13c2b4c7d01..148c4e4c9e41e 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -28,11 +28,12 @@ use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use super::{ - FormattedSink, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, + Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::PulsarCommon; use crate::sink::formatter::SinkFormatterImpl; +use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result}; use crate::{deserialize_duration_from_string, dispatch_sink_formatter_impl}; @@ -164,12 +165,12 @@ impl TryFrom for PulsarSink { impl Sink for PulsarSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = PulsarSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = PULSAR_SINK; - async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { - PulsarSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(PulsarSinkWriter::new( self.config.clone(), self.schema.clone(), self.downstream_pk.clone(), @@ -177,7 +178,8 @@ impl Sink for PulsarSink { self.db_name.clone(), self.sink_from_name.clone(), ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 8cd0875300d29..e85e984821b15 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -15,6 +15,7 @@ use async_trait::async_trait; use risingwave_common::array::StreamChunk; +use crate::sink::writer::LogSinkerOf; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, }; @@ -35,11 +36,11 @@ impl TryFrom for RedisSink { impl Sink for RedisSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = RedisSinkWriter; + type LogSinker = LogSinkerOf; const SINK_NAME: &'static str = "redis"; - async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { todo!() } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 51195ac366a27..e7d20d69fbbd4 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -42,12 +42,14 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use tonic::Status; use tracing::{error, warn}; -use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::encoder::{JsonEncoder, RowEncoder}; use crate::sink::coordinate::CoordinatedSinkWriter; +use crate::sink::encoder::TimestampHandlingMode; +use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::SinkError::Remote; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, - SinkWriter, SinkWriterParam, + SinkWriterParam, }; use crate::ConnectorParams; @@ -97,12 +99,16 @@ impl TryFrom for RemoteSink { impl Sink for RemoteSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = RemoteSinkWriter; + type LogSinker = LogSinkerOf>; const SINK_NAME: &'static str = R::SINK_NAME; - async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { - RemoteSinkWriter::new(self.param.clone(), writer_param.connector_params).await + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok( + RemoteSinkWriter::new(self.param.clone(), writer_param.connector_params) + .await? + .into_log_sinker(writer_param.sink_metrics), + ) } async fn validate(&self) -> Result<()> { @@ -195,7 +201,7 @@ impl TryFrom for CoordinatedRemoteSink { impl Sink for CoordinatedRemoteSink { type Coordinator = RemoteCoordinator; - type Writer = CoordinatedSinkWriter>; + type LogSinker = LogSinkerOf>>; const SINK_NAME: &'static str = R::SINK_NAME; @@ -203,8 +209,8 @@ impl Sink for CoordinatedRemoteSink { self.0.validate().await } - async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { - CoordinatedSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -219,7 +225,8 @@ impl Sink for CoordinatedRemoteSink { CoordinatedRemoteSinkWriter::new(self.0.param.clone(), writer_param.connector_params) .await?, ) - .await + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn new_coordinator( diff --git a/src/connector/src/sink/test_sink.rs b/src/connector/src/sink/test_sink.rs index 96ed91d71bb54..6f327ceaf9cbc 100644 --- a/src/connector/src/sink/test_sink.rs +++ b/src/connector/src/sink/test_sink.rs @@ -18,6 +18,7 @@ use anyhow::anyhow; use parking_lot::Mutex; use crate::sink::boxed::{BoxCoordinator, BoxWriter}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam}; pub trait BuildBoxWriterTrait = FnMut(SinkParam, SinkWriterParam) -> BoxWriter<()> + Send + 'static; @@ -44,7 +45,7 @@ impl TryFrom for TestSink { impl Sink for TestSink { type Coordinator = BoxCoordinator; - type Writer = BoxWriter<()>; + type LogSinker = LogSinkerOf>; const SINK_NAME: &'static str = "test"; @@ -52,8 +53,12 @@ impl Sink for TestSink { Ok(()) } - async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result { - Ok(build_box_writer(self.param.clone(), writer_param)) + async fn new_log_sinker( + &self, + writer_param: SinkWriterParam, + ) -> crate::sink::Result { + let metrics = writer_param.sink_metrics.clone(); + Ok(build_box_writer(self.param.clone(), writer_param).into_log_sinker(metrics)) } } diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs new file mode 100644 index 0000000000000..94f1312853593 --- /dev/null +++ b/src/connector/src/sink/writer.rs @@ -0,0 +1,270 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; + +use crate::sink::encoder::SerTo; +use crate::sink::formatter::SinkFormatter; +use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::{LogSinker, Result, SinkMetrics}; + +#[async_trait] +pub trait SinkWriter: Send + 'static { + type CommitMetadata: Send = (); + /// Begin a new epoch + async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; + + /// Write a stream chunk to sink + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; + + /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink + /// writer should commit the current epoch. + async fn barrier(&mut self, is_checkpoint: bool) -> Result; + + /// Clean up + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + /// Update the vnode bitmap of current sink writer + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +// TODO: remove this trait after KafkaSinkWriter implements SinkWriter +#[async_trait] +// An old version of SinkWriter for backward compatibility +pub trait SinkWriterV1: Send + 'static { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; + + // the following interface is for transactions, if not supported, return Ok(()) + // start a transaction with epoch number. Note that epoch number should be increasing. + async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; + + // commits the current transaction and marks all messages in the transaction success. + async fn commit(&mut self) -> Result<()>; + + // aborts the current transaction because some error happens. we should rollback to the last + // commit point. + async fn abort(&mut self) -> Result<()>; +} + +/// A free-form sink that may output in multiple formats and encodings. Examples include kafka, +/// kinesis, nats and redis. +/// +/// The implementor specifies required key & value type (likely string or bytes), as well as how to +/// write a single pair. The provided `write_chunk` method would handle the interaction with a +/// `SinkFormatter`. +/// +/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait +/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis` +/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis. +pub trait FormattedSink { + type K; + type V; + async fn write_one(&mut self, k: Option, v: Option) -> Result<()>; + + async fn write_chunk( + &mut self, + chunk: StreamChunk, + formatter: &F, + ) -> Result<()> + where + F::K: SerTo, + F::V: SerTo, + { + for r in formatter.format_chunk(&chunk) { + let (event_key_object, event_object) = r?; + + self.write_one( + event_key_object.map(SerTo::ser_to).transpose()?, + event_object.map(SerTo::ser_to).transpose()?, + ) + .await?; + } + + Ok(()) + } +} + +pub struct SinkWriterV1Adapter { + is_empty: bool, + epoch: u64, + inner: W, +} + +impl SinkWriterV1Adapter { + pub(crate) fn new(inner: W) -> Self { + Self { + inner, + is_empty: true, + epoch: u64::MIN, + } + } +} + +#[async_trait] +impl SinkWriter for SinkWriterV1Adapter { + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.epoch = epoch; + Ok(()) + } + + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.is_empty { + self.is_empty = false; + self.inner.begin_epoch(self.epoch).await?; + } + self.inner.write_batch(chunk).await + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + if !self.is_empty { + self.inner.commit().await? + } + self.is_empty = true; + } + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } +} + +pub struct LogSinkerOf> { + writer: W, + sink_metrics: SinkMetrics, +} + +impl> LogSinkerOf { + pub fn new(writer: W, sink_metrics: SinkMetrics) -> Self { + LogSinkerOf { + writer, + sink_metrics, + } + } +} + +impl> LogSinker for LogSinkerOf { + async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { + let mut sink_writer = self.writer; + let sink_metrics = self.sink_metrics; + #[derive(Debug)] + enum LogConsumerState { + /// Mark that the log consumer is not initialized yet + Uninitialized, + + /// Mark that a new epoch has begun. + EpochBegun { curr_epoch: u64 }, + + /// Mark that the consumer has just received a barrier + BarrierReceived { prev_epoch: u64 }, + } + + let mut state = LogConsumerState::Uninitialized; + + log_reader.init().await?; + + loop { + let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; + if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { + match &state { + LogConsumerState::BarrierReceived { .. } => {} + _ => unreachable!( + "update vnode bitmap can be accepted only right after \ + barrier, but current state is {:?}", + state + ), + } + } + // begin_epoch when not previously began + state = match state { + LogConsumerState::Uninitialized => { + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::EpochBegun { curr_epoch } => { + assert!( + epoch >= curr_epoch, + "new epoch {} should not be below the current epoch {}", + epoch, + curr_epoch + ); + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::BarrierReceived { prev_epoch } => { + assert!( + epoch > prev_epoch, + "new epoch {} should be greater than prev epoch {}", + epoch, + prev_epoch + ); + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + }; + match item { + LogStoreReadItem::StreamChunk { chunk, .. } => { + if let Err(e) = sink_writer.write_batch(chunk).await { + sink_writer.abort().await?; + return Err(e); + } + } + LogStoreReadItem::Barrier { is_checkpoint } => { + let prev_epoch = match state { + LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, + _ => unreachable!("epoch must have begun before handling barrier"), + }; + if is_checkpoint { + let start_time = Instant::now(); + sink_writer.barrier(true).await?; + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } else { + sink_writer.barrier(false).await?; + } + state = LogConsumerState::BarrierReceived { prev_epoch } + } + LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { + sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + } + } + } +} + +#[easy_ext::ext(SinkWriterExt)] +impl T +where + T: SinkWriter + Sized, +{ + pub fn into_log_sinker(self, sink_metrics: SinkMetrics) -> LogSinkerOf { + LogSinkerOf { + writer: self, + sink_metrics, + } + } +} diff --git a/src/stream/src/common/log_store/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs similarity index 96% rename from src/stream/src/common/log_store/in_mem.rs rename to src/stream/src/common/log_store_impl/in_mem.rs index 01192f951b843..35040be82c93b 100644 --- a/src/stream/src/common/log_store/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -18,16 +18,15 @@ use anyhow::anyhow; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH}; +use risingwave_connector::sink::log_store::{ + LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter, TruncateOffset, +}; use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; use tokio::sync::oneshot; -use crate::common::log_store::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming}; -use crate::common::log_store::{ - LogReader, LogStoreError, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter, - TruncateOffset, -}; +use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming}; enum InMemLogStoreItem { StreamChunk(StreamChunk), @@ -193,10 +192,9 @@ impl LogReader for BoundedInMemLogStoreReader { }, AwaitingTruncate { .. } => Err(anyhow!( "should not call next_item on checkpoint barrier for in-mem log store" - ) - .into()), + )), }, - None => Err(LogStoreError::EndOfLogStream), + None => Err(anyhow!("end of log stream")), } } @@ -207,8 +205,7 @@ impl LogReader for BoundedInMemLogStoreReader { "truncate offset {:?} but prev truncate offset is {:?}", offset, self.truncate_offset - ) - .into()); + )); } // check the truncate offset does not exceed the latest possible offset @@ -217,8 +214,7 @@ impl LogReader for BoundedInMemLogStoreReader { "truncate at {:?} but latest offset is {:?}", offset, self.latest_offset - ) - .into()); + )); } if let AwaitingTruncate { @@ -288,11 +284,10 @@ impl LogWriter for BoundedInMemLogStoreWriter { } async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { - Ok(self - .item_tx + self.item_tx .send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes)) .await - .map_err(|_| anyhow!("unable to send vnode bitmap"))?) + .map_err(|_| anyhow!("unable to send vnode bitmap")) } } @@ -305,11 +300,11 @@ mod tests { use risingwave_common::array::Op; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; - - use crate::common::log_store::in_mem::BoundedInMemLogStoreFactory; - use crate::common::log_store::{ + use risingwave_connector::sink::log_store::{ LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, }; + + use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; use crate::common::StreamChunkBuilder; #[tokio::test] diff --git a/src/stream/src/common/log_store/kv_log_store/buffer.rs b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs similarity index 98% rename from src/stream/src/common/log_store/kv_log_store/buffer.rs rename to src/stream/src/common/log_store_impl/kv_log_store/buffer.rs index b478123e6d9cb..fc3a4b569f7da 100644 --- a/src/stream/src/common/log_store/kv_log_store/buffer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use parking_lot::{Mutex, MutexGuard}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; +use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult, TruncateOffset}; use tokio::sync::{oneshot, Notify}; -use crate::common::log_store::kv_log_store::{ReaderTruncationOffsetType, SeqIdType}; -use crate::common::log_store::{ChunkId, LogStoreResult, TruncateOffset}; +use crate::common::log_store_impl::kv_log_store::{ReaderTruncationOffsetType, SeqIdType}; #[derive(Clone)] pub(crate) enum LogStoreBufferItem { diff --git a/src/stream/src/common/log_store/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs similarity index 96% rename from src/stream/src/common/log_store/kv_log_store/mod.rs rename to src/stream/src/common/log_store_impl/kv_log_store/mod.rs index dc27d1f63b3e8..a84850b04f069 100644 --- a/src/stream/src/common/log_store/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -16,15 +16,15 @@ use std::sync::Arc; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_connector::sink::log_store::LogStoreFactory; use risingwave_pb::catalog::Table; use risingwave_storage::store::NewLocalOptions; use risingwave_storage::StateStore; -use crate::common::log_store::kv_log_store::buffer::new_log_store_buffer; -use crate::common::log_store::kv_log_store::reader::KvLogStoreReader; -use crate::common::log_store::kv_log_store::serde::LogStoreRowSerde; -use crate::common::log_store::kv_log_store::writer::KvLogStoreWriter; -use crate::common::log_store::LogStoreFactory; +use crate::common::log_store_impl::kv_log_store::buffer::new_log_store_buffer; +use crate::common::log_store_impl::kv_log_store::reader::KvLogStoreReader; +use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; +use crate::common::log_store_impl::kv_log_store::writer::KvLogStoreWriter; mod buffer; mod reader; @@ -102,18 +102,18 @@ impl LogStoreFactory for KvLogStoreFactory { #[cfg(test)] mod tests { use risingwave_common::util::epoch::EpochPair; + use risingwave_connector::sink::log_store::{ + LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, + }; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::store::SyncResult; use risingwave_storage::StateStore; - use crate::common::log_store::kv_log_store::test_utils::{ + use crate::common::log_store_impl::kv_log_store::test_utils::{ gen_stream_chunk, gen_test_log_store_table, }; - use crate::common::log_store::kv_log_store::KvLogStoreFactory; - use crate::common::log_store::{ - LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, - }; + use crate::common::log_store_impl::kv_log_store::KvLogStoreFactory; #[tokio::test] async fn test_basic() { diff --git a/src/stream/src/common/log_store/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs similarity index 96% rename from src/stream/src/common/log_store/kv_log_store/reader.rs rename to src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 2d01ab3bc0b9d..4336c3d961626 100644 --- a/src/stream/src/common/log_store/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -22,18 +22,20 @@ use futures::stream::select_all; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; +use risingwave_connector::sink::log_store::{ + LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, +}; use risingwave_hummock_sdk::key::TableKey; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::store::{PrefetchOptions, ReadOptions}; use risingwave_storage::StateStore; use tokio_stream::StreamExt; -use crate::common::log_store::kv_log_store::buffer::{LogStoreBufferItem, LogStoreBufferReceiver}; -use crate::common::log_store::kv_log_store::serde::{ - new_log_store_item_stream, KvLogStoreItem, LogStoreItemStream, LogStoreRowSerde, +use crate::common::log_store_impl::kv_log_store::buffer::{ + LogStoreBufferItem, LogStoreBufferReceiver, }; -use crate::common::log_store::{ - LogReader, LogStoreError, LogStoreReadItem, LogStoreResult, TruncateOffset, +use crate::common::log_store_impl::kv_log_store::serde::{ + new_log_store_item_stream, KvLogStoreItem, LogStoreItemStream, LogStoreRowSerde, }; pub struct KvLogStoreReader { @@ -174,7 +176,7 @@ impl LogReader for KvLogStoreReader { // Use u64::MAX here because the epoch to consume may be below the safe // epoch async move { - Ok::<_, LogStoreError>(Box::pin( + Ok::<_, anyhow::Error>(Box::pin( state_store .iter( (Included(range_start), Included(range_end)), @@ -233,16 +235,14 @@ impl LogReader for KvLogStoreReader { "truncate at a later offset {:?} than the current latest offset {:?}", offset, self.latest_offset - ) - .into()); + )); } if offset <= self.truncate_offset { return Err(anyhow!( "truncate offset {:?} earlier than prev truncate offset {:?}", offset, self.truncate_offset - ) - .into()); + )); } if offset.epoch() >= self.first_write_epoch.expect("should have init") { self.rx.truncate(offset); diff --git a/src/stream/src/common/log_store/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs similarity index 96% rename from src/stream/src/common/log_store/kv_log_store/serde.rs rename to src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 627aee6c22f2f..2aa01c42af196 100644 --- a/src/stream/src/common/log_store/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -38,6 +38,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::{ BasicSerde, ValueRowDeserializer, ValueRowSerializer, }; +use risingwave_connector::sink::log_store::LogStoreResult; use risingwave_hummock_sdk::key::{next_key, TableKey}; use risingwave_pb::catalog::Table; use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode; @@ -45,10 +46,9 @@ use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::StateStoreReadIterStream; use risingwave_storage::table::{compute_vnode, Distribution}; -use crate::common::log_store::kv_log_store::{ +use crate::common::log_store_impl::kv_log_store::{ ReaderTruncationOffsetType, RowOpCodeType, SeqIdType, }; -use crate::common::log_store::{LogStoreError, LogStoreResult}; const INSERT_OP_CODE: RowOpCodeType = 1; const DELETE_OP_CODE: RowOpCodeType = 2; @@ -313,34 +313,32 @@ impl LogStoreRowSerde { match self.deserialize(value)? { (epoch, LogStoreRowOp::Row { op, row }) => { if epoch != expected_epoch { - return Err(LogStoreError::Internal(anyhow!( + return Err(anyhow!( "decoded epoch {} not match expected epoch {}", epoch, expected_epoch - ))); + )); } ops.push(op); if ops.len() > size_bound { - return Err(LogStoreError::Internal(anyhow!( + return Err(anyhow!( "row count {} exceed size bound {}", ops.len(), size_bound - ))); + )); } assert!(data_chunk_builder.append_one_row(row).is_none()); } (_, LogStoreRowOp::Barrier { .. }) => { - return Err(LogStoreError::Internal(anyhow!( - "should not get barrier when decoding stream chunk" - ))); + return Err(anyhow!("should not get barrier when decoding stream chunk")); } } } if ops.is_empty() { - return Err(LogStoreError::Internal(anyhow!( + return Err(anyhow!( "should not get empty row when decoding stream chunk. start seq id: {}, end seq id {}", start_seq_id, - end_seq_id)) + end_seq_id) ); } Ok(StreamChunk::from_parts( @@ -405,11 +403,11 @@ impl LogStoreRowOpStream { StreamState::AllConsumingRow { curr_epoch } | StreamState::BarrierAligning { curr_epoch, .. } => { if *curr_epoch != epoch { - Err(LogStoreError::Internal(anyhow!( + Err(anyhow!( "epoch {} does not match with current epoch {}", epoch, curr_epoch - ))) + )) } else { Ok(()) } @@ -417,11 +415,11 @@ impl LogStoreRowOpStream { StreamState::BarrierEmitted { prev_epoch } => { if *prev_epoch >= epoch { - Err(LogStoreError::Internal(anyhow!( + Err(anyhow!( "epoch {} should be greater than prev epoch {}", epoch, prev_epoch - ))) + )) } else { Ok(()) } @@ -438,18 +436,18 @@ impl LogStoreRowOpStream { if is_checkpoint == *curr_is_checkpoint { Ok(()) } else { - Err(LogStoreError::Internal(anyhow!( + Err(anyhow!( "current aligning barrier is_checkpoint: {}, current barrier is_checkpoint {}", curr_is_checkpoint, is_checkpoint - ))) + )) } } else { Ok(()) } } - #[try_stream(ok = (u64, KvLogStoreItem), error = LogStoreError)] + #[try_stream(ok = (u64, KvLogStoreItem), error = anyhow::Error)] async fn into_log_store_item_stream(self, chunk_size: usize) { let mut ops = Vec::with_capacity(chunk_size); let mut data_chunk_builder = @@ -547,13 +545,14 @@ impl LogStoreRowOpStream { } // End of stream match &self.stream_state { - StreamState::BarrierEmitted { .. } | StreamState::Uninitialized => {}, - s => return Err(LogStoreError::Internal( - anyhow!( - "when any of the stream reaches the end, it should be right after emitting an barrier. Current state: {:?}", - s) - ) - ), + StreamState::BarrierEmitted { .. } | StreamState::Uninitialized => {} + s => { + return Err(anyhow!( + "when any of the stream reaches the end, it should be right \ + after emitting an barrier. Current state: {:?}", + s + )); + } } assert!( self.barrier_streams.is_empty(), @@ -562,8 +561,8 @@ impl LogStoreRowOpStream { if cfg!(debug_assertion) { while let Some((opt, _stream)) = self.row_streams.next().await { if let Some(result) = opt { - return Err(LogStoreError::Internal( - anyhow!("when any of the stream reaches the end, other stream should also reaches the end, but poll result: {:?}", result)) + return Err( + anyhow!("when any of the stream reaches the end, other stream should also reaches the end, but poll result: {:?}", result) ); } } @@ -592,14 +591,14 @@ mod tests { use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; - use crate::common::log_store::kv_log_store::serde::{ + use crate::common::log_store_impl::kv_log_store::serde::{ new_log_store_item_stream, KvLogStoreItem, LogStoreRowOp, LogStoreRowOpStream, LogStoreRowSerde, }; - use crate::common::log_store::kv_log_store::test_utils::{ + use crate::common::log_store_impl::kv_log_store::test_utils::{ gen_test_data, gen_test_log_store_table, TEST_TABLE_ID, }; - use crate::common::log_store::kv_log_store::SeqIdType; + use crate::common::log_store_impl::kv_log_store::SeqIdType; const EPOCH1: u64 = 233; const EPOCH2: u64 = EPOCH1 + 1; diff --git a/src/stream/src/common/log_store/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs similarity index 100% rename from src/stream/src/common/log_store/kv_log_store/test_utils.rs rename to src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs diff --git a/src/stream/src/common/log_store/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs similarity index 94% rename from src/stream/src/common/log_store/kv_log_store/writer.rs rename to src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 54d7db38b8570..a07757a5510a9 100644 --- a/src/stream/src/common/log_store/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -21,12 +21,12 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::util::epoch::EpochPair; +use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; use risingwave_storage::store::{InitOptions, LocalStateStore}; -use crate::common::log_store::kv_log_store::buffer::LogStoreBufferSender; -use crate::common::log_store::kv_log_store::serde::LogStoreRowSerde; -use crate::common::log_store::kv_log_store::{SeqIdType, FIRST_SEQ_ID}; -use crate::common::log_store::{LogStoreResult, LogWriter}; +use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferSender; +use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; +use crate::common::log_store_impl::kv_log_store::{SeqIdType, FIRST_SEQ_ID}; pub struct KvLogStoreWriter { _table_id: TableId, diff --git a/src/stream/src/common/log_store_impl/mod.rs b/src/stream/src/common/log_store_impl/mod.rs new file mode 100644 index 0000000000000..633fa07f2617d --- /dev/null +++ b/src/stream/src/common/log_store_impl/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod in_mem; +pub mod kv_log_store; diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index e865214cb0990..7f5111c29e03e 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -18,6 +18,6 @@ pub use column_mapping::*; mod builder; pub mod cache; mod column_mapping; -pub mod log_store; +pub mod log_store_impl; pub mod metrics; pub mod table; diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index b737de4d2560b..2930cda31747e 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -16,6 +16,7 @@ use std::backtrace::Backtrace; use risingwave_common::array::ArrayError; use risingwave_connector::error::ConnectorError; +use risingwave_connector::sink::SinkError; use risingwave_expr::ExprError; use risingwave_pb::PbFieldNotFound; use risingwave_storage::error::StorageError; @@ -58,6 +59,9 @@ enum ErrorKind { #[error("Executor error: {0:?}")] Executor(#[source] StreamExecutorError), + #[error("Sink error: {0:?}")] + Sink(#[source] SinkError), + #[error(transparent)] Internal(anyhow::Error), } @@ -115,6 +119,12 @@ impl From for StreamError { } } +impl From for StreamError { + fn from(value: SinkError) -> Self { + ErrorKind::Sink(value).into() + } +} + impl From for StreamError { fn from(err: PbFieldNotFound) -> Self { Self::from(anyhow::anyhow!( diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 32d7ee8479110..b1e436c841577 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -25,7 +25,6 @@ use risingwave_rpc_client::error::RpcError; use risingwave_storage::error::StorageError; use super::Barrier; -use crate::common::log_store::LogStoreError; /// A specialized Result type for streaming executors. pub type StreamExecutorResult = std::result::Result; @@ -54,9 +53,6 @@ enum ErrorKind { StorageError, ), - #[error("Log store error: {0}")] - LogStoreError(#[source] LogStoreError), - #[error("Chunk operation error: {0}")] ArrayError(#[source] ArrayError), @@ -154,13 +150,6 @@ impl From for StreamExecutorError { } } -/// Log store error -impl From for StreamExecutorError { - fn from(e: LogStoreError) -> Self { - ErrorKind::LogStoreError(e).into() - } -} - /// Chunk operation error. impl From for StreamExecutorError { fn from(e: ArrayError) -> Self { diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index b36a5613ba7bb..14394073e8df7 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -13,34 +13,33 @@ // limitations under the License. use std::sync::Arc; -use std::time::Instant; +use anyhow::anyhow; use futures::stream::select; use futures::{FutureExt, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; -use prometheus::Histogram; use risingwave_common::array::stream_chunk::StreamChunkMut; use risingwave_common::array::{merge_chunk_row, Op, StreamChunk}; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; +use risingwave_connector::sink::log_store::{ + LogReader, LogStoreFactory, LogStoreTransformChunkLogReader, LogWriter, +}; use risingwave_connector::sink::{ - build_sink, Sink, SinkImpl, SinkParam, SinkWriter, SinkWriterParam, + build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, }; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message, PkIndices}; -use crate::common::log_store::{ - LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, -}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream}; pub struct SinkExecutor { input: BoxedExecutor, - metrics: Arc, + _metrics: Arc, sink: SinkImpl, identity: String, pk_indices: PkIndices, @@ -53,10 +52,6 @@ pub struct SinkExecutor { sink_writer_param: SinkWriterParam, } -struct SinkMetrics { - sink_commit_duration_metrics: Histogram, -} - // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. fn force_append_only(c: StreamChunk) -> StreamChunk { let mut c: StreamChunkMut = c.into(); @@ -91,7 +86,7 @@ impl SinkExecutor { .collect(); Ok(Self { input, - metrics, + _metrics: metrics, sink, identity: format!("SinkExecutor {:X?}", sink_writer_param.executor_id), pk_indices, @@ -106,15 +101,6 @@ impl SinkExecutor { } fn execute_inner(self) -> BoxedMessageStream { - let sink_commit_duration_metrics = self - .metrics - .sink_commit_duration - .with_label_values(&[self.identity.as_str(), self.sink.get_connector()]); - - let sink_metrics = SinkMetrics { - sink_commit_duration_metrics, - }; - let write_log_stream = Self::execute_write_log( self.input, self.pk_indices, @@ -128,7 +114,6 @@ impl SinkExecutor { sink, self.log_reader, self.input_columns, - sink_metrics, self.sink_writer_param, ); select(consume_log_stream.into_stream(), write_log_stream).boxed() @@ -193,13 +178,11 @@ impl SinkExecutor { async fn execute_consume_log( sink: S, - mut log_reader: R, + log_reader: R, columns: Vec, - sink_metrics: SinkMetrics, sink_writer_param: SinkWriterParam, ) -> StreamExecutorResult { - log_reader.init().await?; - let mut sink_writer = sink.new_writer(sink_writer_param).await?; + let log_sinker = sink.new_log_sinker(sink_writer_param).await?; let visible_columns = columns .iter() @@ -207,96 +190,18 @@ impl SinkExecutor { .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx)) .collect_vec(); - #[derive(Debug)] - enum LogConsumerState { - /// Mark that the log consumer is not initialized yet - Uninitialized, - - /// Mark that a new epoch has begun. - EpochBegun { curr_epoch: u64 }, - - /// Mark that the consumer has just received a barrier - BarrierReceived { prev_epoch: u64 }, - } - - let mut state = LogConsumerState::Uninitialized; - - loop { - let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; - if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { - match &state { - LogConsumerState::BarrierReceived { .. } => {} - _ => unreachable!( - "update vnode bitmap can be accepted only right after \ - barrier, but current state is {:?}", - state - ), - } - } - // begin_epoch when not previously began - state = match state { - LogConsumerState::Uninitialized => { - sink_writer.begin_epoch(epoch).await?; - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - LogConsumerState::EpochBegun { curr_epoch } => { - assert!( - epoch >= curr_epoch, - "new epoch {} should not be below the current epoch {}", - epoch, - curr_epoch - ); - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - LogConsumerState::BarrierReceived { prev_epoch } => { - assert!( - epoch > prev_epoch, - "new epoch {} should be greater than prev epoch {}", - epoch, - prev_epoch - ); - sink_writer.begin_epoch(epoch).await?; - LogConsumerState::EpochBegun { curr_epoch: epoch } - } - }; - match item { - LogStoreReadItem::StreamChunk { chunk, .. } => { - let chunk = if visible_columns.len() != columns.len() { - // Do projection here because we may have columns that aren't visible to - // the downstream. - chunk.project(&visible_columns) - } else { - chunk - }; - if let Err(e) = sink_writer.write_batch(chunk).await { - sink_writer.abort().await?; - return Err(e.into()); - } - } - LogStoreReadItem::Barrier { is_checkpoint } => { - let prev_epoch = match state { - LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, - _ => unreachable!("epoch must have begun before handling barrier"), - }; - if is_checkpoint { - let start_time = Instant::now(); - sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics - .observe(start_time.elapsed().as_millis() as f64); - log_reader - .truncate(TruncateOffset::Barrier { epoch: prev_epoch }) - .await?; - } else { - sink_writer.barrier(false).await?; - } - state = LogConsumerState::BarrierReceived { prev_epoch } - } - LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { - sink_writer.update_vnode_bitmap(vnode_bitmap).await?; - } + let log_reader = log_reader.transform_chunk(move |chunk| { + if visible_columns.len() != columns.len() { + // Do projection here because we may have columns that aren't visible to + // the downstream. + chunk.project(&visible_columns) + } else { + chunk } - } + }); + + log_sinker.consume_log_and_sink(log_reader).await?; + Err(anyhow!("end of stream").into()) } } @@ -323,7 +228,7 @@ mod test { use risingwave_common::catalog::{ColumnDesc, ColumnId}; use super::*; - use crate::common::log_store::in_mem::BoundedInMemLogStoreFactory; + use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; use crate::executor::test_utils::*; use crate::executor::ActorContext; diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index a95b7fce22738..295a36376b376 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -14,15 +14,19 @@ use std::sync::Arc; +use anyhow::anyhow; use risingwave_common::catalog::ColumnCatalog; +use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::SinkType; -use risingwave_connector::sink::{SinkParam, SinkWriterParam}; +use risingwave_connector::sink::{ + SinkError, SinkMetrics, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, +}; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; use risingwave_storage::dispatch_state_store; use super::*; -use crate::common::log_store::in_mem::BoundedInMemLogStoreFactory; -use crate::common::log_store::kv_log_store::KvLogStoreFactory; +use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; +use crate::common::log_store_impl::kv_log_store::KvLogStoreFactory; use crate::executor::SinkExecutor; pub struct SinkExecutorBuilder; @@ -56,6 +60,27 @@ impl ExecutorBuilder for SinkExecutorBuilder { .into_iter() .map(ColumnCatalog::from) .collect_vec(); + + let connector = { + let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { + SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) + })?; + + use risingwave_connector::sink::Sink; + + match_sink_name_str!( + sink_type.to_lowercase().as_str(), + SinkType, + Ok(SinkType::SINK_NAME), + |other| { + Err(SinkError::Config(anyhow!( + "unsupported sink connector {}", + other + ))) + } + ) + }?; + let sink_param = SinkParam { sink_id, properties, @@ -70,6 +95,17 @@ impl ExecutorBuilder for SinkExecutorBuilder { sink_from_name, }; + let identity = format!("SinkExecutor {:X?}", params.executor_id); + + let sink_commit_duration_metrics = stream + .streaming_metrics + .sink_commit_duration + .with_label_values(&[identity.as_str(), connector]); + + let sink_metrics = SinkMetrics { + sink_commit_duration_metrics, + }; + match node.log_store_type() { // Default value is the normal in memory log store to be backward compatible with the // previously unset value @@ -84,6 +120,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap, meta_client: params.env.meta_client(), + sink_metrics, }, sink_param, columns, @@ -112,6 +149,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap, meta_client: params.env.meta_client(), + sink_metrics, }, sink_param, columns, diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index f2c6d9f3c8b98..ad8eb8a4d9cf2 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -31,7 +31,8 @@ use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::sink::boxed::{BoxCoordinator, BoxWriter}; use risingwave_connector::sink::test_sink::registry_build_sink; -use risingwave_connector::sink::{Sink, SinkWriter, SinkWriterParam}; +use risingwave_connector::sink::writer::SinkWriter; +use risingwave_connector::sink::{Sink, SinkWriterParam}; use risingwave_connector::source::test_source::{registry_test_source, BoxSource, TestSourceSplit}; use risingwave_connector::source::StreamChunkWithState; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration};