diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index 11fa749e524f4..9dcc493117d9b 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -27,7 +27,7 @@ parking_lot = "0.12" prometheus = { version = "0.13", features = ["process"] } rand = "0.8" risingwave_common = { workspace = true } -risingwave_connector = { workspace = true, default-features = false, features = ["sink_bench"] } +risingwave_connector = { workspace = true } risingwave_pb = { workspace = true } risingwave_rt = { workspace = true, optional = true } risingwave_storage = { workspace = true } diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index b6f939185262a..f19f6428c3cbf 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RisingWave Labs +// Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -34,12 +34,16 @@ use risingwave_connector::dispatch_sink; use risingwave_connector::parser::{ EncodingProperties, ParserConfig, ProtocolProperties, SpecificParserConfig, }; -use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::catalog::{ + SinkEncode, SinkFormat, SinkFormatDesc, SinkId, SinkType, +}; use risingwave_connector::sink::log_store::{ LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, }; +use risingwave_connector::sink::mock_coordination_client::MockMetaClient; use risingwave_connector::sink::{ - build_sink, LogSinker, MetaClientForSink, MockMetaClient, Sink, SinkParam, SinkWriterParam, + build_sink, LogSinker, MetaClientForSink, Sink, SinkError, SinkParam, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT, }; use risingwave_connector::source::datagen::{ DatagenProperties, DatagenSplitEnumerator, DatagenSplitReader, @@ -154,13 +158,15 @@ impl ThroughputMetric { } } - pub fn get_throughput(&self) -> Vec { + pub fn get_throughput(&self) -> Vec { #[allow(clippy::disallowed_methods)] self.chunk_size_list .iter() .zip(self.chunk_size_list.iter().skip(1)) .map(|(current, next)| { - (next.0 - current.0) * 1000 / (next.1.duration_since(current.1).as_millis() as u64) + let throughput = (next.0 - current.0) * 1000 + / (next.1.duration_since(current.1).as_millis() as u64); + format!("{} rows/s", throughput) }) .collect() } @@ -272,10 +278,12 @@ where ::Coordinator: std::marker::Send, ::Coordinator: 'static, { - sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient(MockMetaClient::new( - Box::new(sink.new_coordinator().await.unwrap()), - ))); - sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); + if let Ok(coordinator) = sink.new_coordinator().await { + sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient( + MockMetaClient::new(Box::new(coordinator)), + )); + sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); + } let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap(); if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await { return Err(e.to_string()); @@ -358,6 +366,33 @@ pub struct Config { split_num: String, } +fn mock_from_legacy_type( + connector: &str, + r#type: &str, +) -> Result, SinkError> { + use risingwave_connector::sink::redis::RedisSink; + use risingwave_connector::sink::Sink as _; + if connector.eq(RedisSink::SINK_NAME) { + let format = match r#type { + SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly, + SINK_TYPE_UPSERT => SinkFormat::Upsert, + _ => { + return Err(SinkError::Config(risingwave_common::array::error::anyhow!( + "sink type unsupported: {}", + r#type + ))) + } + }; + Ok(Some(SinkFormatDesc { + format, + encode: SinkEncode::Json, + options: Default::default(), + })) + } else { + SinkFormatDesc::from_legacy_type(connector, r#type) + } +} + #[tokio::main] async fn main() { let cfg = Config::parse(); @@ -390,7 +425,7 @@ async fn main() { .clone(); let connector = properties.get("connector").unwrap().clone(); - let format_desc = SinkFormatDesc::mock_from_legacy_type( + let format_desc = mock_from_legacy_type( &connector.clone(), properties.get("type").unwrap_or(&"append-only".to_string()), ) @@ -407,6 +442,7 @@ async fn main() { }; let sink = build_sink(sink_param).unwrap(); let mut sink_writer_param = SinkWriterParam::for_test(); + println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); sink_writer_param.connector_params.sink_payload_format = SinkPayloadFormat::StreamChunk; tokio::spawn(async move { dispatch_sink!(sink, sink, { @@ -416,9 +452,12 @@ async fn main() { .unwrap(); }); sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await; - println!( - "Throughput Sink: {:?}", - throughput_metric.read().await.get_throughput() - ); + println!("Bench Over!"); + let throughput_result = throughput_metric.read().await.get_throughput(); + if throughput_result.is_empty() { + println!("Throughput Sink: Don't get Throughput, please check"); + } else { + println!("Throughput Sink: {:?}", throughput_result); + } } } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index a5826514e120e..82303fd620f29 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -162,9 +162,6 @@ walkdir = "2" prost-build = "0.12" protobuf-src = "1" -[features] -sink_bench = [] - [[bench]] name = "parser" harness = false diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 86b68a20024f2..be4f94fa0706b 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -166,31 +166,6 @@ impl SinkFormatDesc { })) } - #[cfg(feature = "sink_bench")] - pub fn mock_from_legacy_type(connector: &str, r#type: &str) -> Result, SinkError> { - use crate::sink::redis::RedisSink; - use crate::sink::Sink as _; - if connector.eq(RedisSink::SINK_NAME) { - let format = match r#type { - SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly, - SINK_TYPE_UPSERT => SinkFormat::Upsert, - _ => { - return Err(SinkError::Config(anyhow!( - "sink type unsupported: {}", - r#type - ))) - } - }; - Ok(Some(Self { - format, - encode: SinkEncode::Json, - options: Default::default(), - })) - } else { - Self::from_legacy_type(connector, r#type) - } - } - pub fn to_proto(&self) -> PbSinkFormatDesc { use risingwave_pb::plan_common::{EncodeType as E, FormatType as F}; diff --git a/src/connector/src/sink/mock_coordination_client.rs b/src/connector/src/sink/mock_coordination_client.rs new file mode 100644 index 0000000000000..8089a99e32ec0 --- /dev/null +++ b/src/connector/src/sink/mock_coordination_client.rs @@ -0,0 +1,170 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::buffer::Bitmap; +use risingwave_pb::connector_service::coordinate_response::{ + self, CommitResponse, StartCoordinationResponse, +}; +use risingwave_pb::connector_service::{ + coordinate_request, CoordinateRequest, CoordinateResponse, PbSinkParam, +}; +use risingwave_rpc_client::error::RpcError; +use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient}; +use tokio::sync::mpsc::{self, Receiver}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Status; + +use super::boxed::BoxCoordinator; +use super::{SinkParam, BOUNDED_CHANNEL_SIZE}; + +#[derive(Clone)] +pub enum SinkCoordinationRpcClientEnum { + SinkCoordinationRpcClient(SinkCoordinationRpcClient), + MockSinkCoordinationRpcClient(MockSinkCoordinationRpcClient), +} + +impl SinkCoordinationRpcClientEnum { + pub async fn new_stream_handle( + self, + param: SinkParam, + vnode_bitmap: Bitmap, + ) -> super::Result { + match self { + SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient( + sink_coordination_rpc_client, + ) => Ok(CoordinatorStreamHandle::new( + sink_coordination_rpc_client, + param.to_proto(), + vnode_bitmap, + ) + .await?), + SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient( + mock_sink_coordination_rpc_client, + ) => Ok(mock_sink_coordination_rpc_client + .new_stream_handle(param.to_proto(), vnode_bitmap) + .await?), + } + } +} + +#[derive(Clone)] +pub struct MockMetaClient { + mock_coordinator_committer: std::sync::Arc>, +} +impl MockMetaClient { + pub fn new(mock_coordinator_committer: BoxCoordinator) -> Self { + Self { + mock_coordinator_committer: std::sync::Arc::new(tokio::sync::Mutex::new( + mock_coordinator_committer, + )), + } + } + + pub fn sink_coordinate_client(&self) -> MockSinkCoordinationRpcClient { + MockSinkCoordinationRpcClient::new(self.mock_coordinator_committer.clone()) + } +} + +#[derive(Clone)] +pub struct MockSinkCoordinationRpcClient { + mock_coordinator_committer: std::sync::Arc>, +} + +impl MockSinkCoordinationRpcClient { + pub fn new( + mock_coordinator_committer: std::sync::Arc>, + ) -> Self { + Self { + mock_coordinator_committer, + } + } + + pub async fn new_stream_handle( + &self, + param: PbSinkParam, + vnode_bitmap: Bitmap, + ) -> std::result::Result { + CoordinatorStreamHandle::new_with_init_stream(param, vnode_bitmap, |rx| async move { + self.coordinate(rx).await + }) + .await + } + + pub async fn coordinate( + &self, + mut receiver_stream: Receiver, + ) -> std::result::Result< + tonic::Response>>, + Status, + > { + match receiver_stream.try_recv() { + Ok(CoordinateRequest { + msg: + Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest( + coordinate_request::StartCoordinationRequest { + param: Some(_param), + vnode_bitmap: Some(_vnode_bitmap), + }, + )), + }) => (), + msg => { + return Err(Status::invalid_argument(format!( + "expected CoordinateRequest::StartRequest in the first request, get {:?}", + msg + ))); + } + }; + + let (response_tx, response_rx) = + mpsc::channel::>(BOUNDED_CHANNEL_SIZE); + let response_tx = std::sync::Arc::new(response_tx); + response_tx + .send(Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::StartResponse( + StartCoordinationResponse {}, + )), + })) + .await + .map_err(|e| Status::from_error(Box::new(e)))?; + + let mock_coordinator_committer = self.mock_coordinator_committer.clone(); + let response_tx_clone = response_tx.clone(); + tokio::spawn(async move { + loop { + match receiver_stream.recv().await { + Some(CoordinateRequest { + msg: + Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest { + epoch, + metadata, + })), + }) => { + mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?; + response_tx_clone.clone().send(Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})), + })).await.map_err(|e| Status::from_error(Box::new(e)))?; + }, + msg => { + return Err::, tonic::Status>(Status::invalid_argument(format!( + "expected CoordinateRequest::CommitRequest , get {:?}", + msg + ))); + } + } + } + }); + + Ok(tonic::Response::new(ReceiverStream::new(response_rx))) + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index accd4da625280..c58ea97718103 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -26,6 +26,7 @@ pub mod iceberg; pub mod kafka; pub mod kinesis; pub mod log_store; +pub mod mock_coordination_client; pub mod nats; pub mod pulsar; pub mod redis; @@ -51,23 +52,14 @@ use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, }; use risingwave_pb::catalog::PbSinkType; -use risingwave_pb::connector_service::coordinate_response::{ - CommitResponse, StartCoordinationResponse, -}; -use risingwave_pb::connector_service::{ - coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, PbSinkParam, - SinkMetadata, TableSchema, -}; +use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; -use risingwave_rpc_client::{CoordinatorStreamHandle, MetaClient, SinkCoordinationRpcClient}; +use risingwave_rpc_client::MetaClient; use thiserror::Error; -use tokio::sync::mpsc::{self, Receiver}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::Status; pub use tracing; -use self::boxed::BoxCoordinator; use self::catalog::{SinkFormatDesc, SinkType}; +use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; @@ -284,36 +276,6 @@ pub enum MetaClientForSink { MockMetaClient(MockMetaClient), } -#[derive(Clone)] -pub enum SinkCoordinationRpcClientEnum { - SinkCoordinationRpcClient(SinkCoordinationRpcClient), - MockSinkCoordinationRpcClient(MockSinkCoordinationRpcClient), -} - -impl SinkCoordinationRpcClientEnum { - pub async fn new_stream_handle( - self, - param: SinkParam, - vnode_bitmap: Bitmap, - ) -> Result { - match self { - SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient( - sink_coordination_rpc_client, - ) => Ok(CoordinatorStreamHandle::new( - sink_coordination_rpc_client, - param.to_proto(), - vnode_bitmap, - ) - .await?), - SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient( - mock_sink_coordination_rpc_client, - ) => Ok(mock_sink_coordination_rpc_client - .new_stream_handle(param.to_proto(), vnode_bitmap) - .await?), - } - } -} - impl MetaClientForSink { pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum { match self { @@ -331,117 +293,6 @@ impl MetaClientForSink { } } -#[derive(Clone)] -pub struct MockMetaClient { - mock_coordinator_committer: std::sync::Arc>, -} -impl MockMetaClient { - pub fn new(mock_coordinator_committer: BoxCoordinator) -> Self { - Self { - mock_coordinator_committer: std::sync::Arc::new(tokio::sync::Mutex::new( - mock_coordinator_committer, - )), - } - } - - pub fn sink_coordinate_client(&self) -> MockSinkCoordinationRpcClient { - MockSinkCoordinationRpcClient::new(self.mock_coordinator_committer.clone()) - } -} - -#[derive(Clone)] -pub struct MockSinkCoordinationRpcClient { - mock_coordinator_committer: std::sync::Arc>, -} - -impl MockSinkCoordinationRpcClient { - pub fn new( - mock_coordinator_committer: std::sync::Arc>, - ) -> Self { - Self { - mock_coordinator_committer, - } - } - - pub async fn new_stream_handle( - &self, - param: PbSinkParam, - vnode_bitmap: Bitmap, - ) -> std::result::Result { - CoordinatorStreamHandle::new_with_init_stream(param, vnode_bitmap, |rx| async move { - self.coordinate(rx).await - }) - .await - } - - pub async fn coordinate( - &self, - mut receiver_stream: Receiver, - ) -> std::result::Result< - tonic::Response>>, - Status, - > { - match receiver_stream.try_recv() { - Ok(CoordinateRequest { - msg: - Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest( - coordinate_request::StartCoordinationRequest { - param: Some(_param), - vnode_bitmap: Some(_vnode_bitmap), - }, - )), - }) => (), - msg => { - return Err(Status::invalid_argument(format!( - "expected CoordinateRequest::StartRequest in the first request, get {:?}", - msg - ))); - } - }; - - let (response_tx, response_rx) = - mpsc::channel::>(BOUNDED_CHANNEL_SIZE); - let response_tx = std::sync::Arc::new(response_tx); - response_tx - .send(Ok(CoordinateResponse { - msg: Some(coordinate_response::Msg::StartResponse( - StartCoordinationResponse {}, - )), - })) - .await - .map_err(|e| Status::from_error(Box::new(e)))?; - - let mock_coordinator_committer = self.mock_coordinator_committer.clone(); - let response_tx_clone = response_tx.clone(); - tokio::spawn(async move { - loop { - match receiver_stream.recv().await { - Some(CoordinateRequest { - msg: - Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest { - epoch, - metadata, - })), - }) => { - mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?; - response_tx_clone.clone().send(Ok(CoordinateResponse { - msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})), - })).await.map_err(|e| Status::from_error(Box::new(e)))?; - }, - msg => { - return Err::, tonic::Status>(Status::invalid_argument(format!( - "expected CoordinateRequest::CommitRequest , get {:?}", - msg - ))); - } - } - } - }); - - Ok(tonic::Response::new(ReceiverStream::new(response_rx))) - } -} - impl SinkWriterParam { pub fn for_test() -> Self { SinkWriterParam {