diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index f19f6428c3cbf..cb1774feed752 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -16,16 +16,15 @@ #![feature(stmt_expr_attributes)] #![feature(let_chains)] -use core::pin::Pin; use core::str::FromStr; use core::sync::atomic::{AtomicU64, Ordering}; use std::collections::HashMap; use clap::Parser; use futures::prelude::future::Either; -use futures::prelude::stream::PollNext; +use futures::prelude::stream::{PollNext, BoxStream}; use futures::stream::select_with_strategy; -use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::ColumnId; @@ -42,7 +41,7 @@ use risingwave_connector::sink::log_store::{ }; use risingwave_connector::sink::mock_coordination_client::MockMetaClient; use risingwave_connector::sink::{ - build_sink, LogSinker, MetaClientForSink, Sink, SinkError, SinkParam, SinkWriterParam, + build_sink, LogSinker, SinkMetaClient, Sink, SinkError, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT, }; use risingwave_connector::source::datagen::{ @@ -51,9 +50,10 @@ use risingwave_connector::source::datagen::{ use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader}; use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; -use risingwave_stream::executor::{Barrier, Message, StreamExecutorError}; +use risingwave_stream::executor::{Barrier, Message, StreamExecutorError, MessageStreamItem}; use serde::{Deserialize, Deserializer}; use tokio::sync::RwLock; +use tokio::sync::oneshot::{Receiver, Sender}; use tokio::time::{sleep, Instant}; const CHECKPOINT_INTERVAL: u64 = 1000; @@ -62,20 +62,25 @@ const BENCH_TIME: u64 = 20; const BENCH_TEST: &str = "bench_test"; pub struct MockRangeLogReader { - up_stream: Pin> + Send>>, + upstreams: BoxStream<'static, MessageStreamItem>, current_epoch: u64, chunk_id: usize, - throughput_metric: std::sync::Arc>, + throughput_metric: ThroughputMetric, + stop_rx: Receiver<()>, + result_tx: Option>>, } impl LogReader for MockRangeLogReader { async fn init(&mut self) -> LogStoreResult<()> { - self.throughput_metric.write().await.add_metric(0); + self.throughput_metric.add_metric(0); Ok(()) } async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { - match self.up_stream.next().await.unwrap().unwrap() { + if let Ok(()) = self.stop_rx.try_recv(){ + self.result_tx.take().unwrap().send(self.throughput_metric.get_throughput()).unwrap(); + } + match self.upstreams.next().await.unwrap().unwrap() { Message::Barrier(barrier) => { let prev_epoch = self.current_epoch; self.current_epoch = barrier.epoch.curr; @@ -88,8 +93,6 @@ impl LogReader for MockRangeLogReader { } Message::Chunk(chunk) => { self.throughput_metric - .write() - .await .add_metric(chunk.capacity()); self.chunk_id += 1; Ok(( @@ -116,13 +119,17 @@ impl LogReader for MockRangeLogReader { impl MockRangeLogReader { fn new( mock_source: MockDatagenSource, - throughput_metric: std::sync::Arc>, + throughput_metric: ThroughputMetric, + stop_rx: Receiver<()>, + result_tx: Sender>, ) -> MockRangeLogReader { MockRangeLogReader { - up_stream: mock_source.into_stream().boxed(), + upstreams: mock_source.into_stream().boxed(), current_epoch: 0, chunk_id: 0, throughput_metric, + stop_rx, + result_tx: Some(result_tx), } } } @@ -279,7 +286,7 @@ where ::Coordinator: 'static, { if let Ok(coordinator) = sink.new_coordinator().await { - sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient( + sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient( MockMetaClient::new(Box::new(coordinator)), )); sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); @@ -393,6 +400,14 @@ fn mock_from_legacy_type( } } +fn print_throughput_result(throughput_result: Vec){ + if throughput_result.is_empty() { + println!("Throughput Sink: Don't get Throughput, please check"); + } else { + println!("Throughput Sink: {:?}", throughput_result); + } +} + #[tokio::main] async fn main() { let cfg = Config::parse(); @@ -403,10 +418,14 @@ async fn main() { cfg.split_num, ) .await; - let throughput_metric = std::sync::Arc::new(RwLock::new(ThroughputMetric::new())); + let (data_size_tx,data_size_rx) = tokio::sync::oneshot::channel::>(); + let (stop_tx,stop_rx) = tokio::sync::oneshot::channel::<()>(); + let throughput_metric = ThroughputMetric::new(); + let mut mock_range_log_reader = - MockRangeLogReader::new(mock_datagen_source, throughput_metric.clone()); + MockRangeLogReader::new(mock_datagen_source, throughput_metric,stop_rx,data_size_tx); if cfg.sink.eq(&BENCH_TEST.to_string()) { + println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); tokio::spawn(async move { mock_range_log_reader.init().await.unwrap(); loop { @@ -414,10 +433,9 @@ async fn main() { } }); sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await; - println!( - "Throughput Test: {:?}", - throughput_metric.read().await.get_throughput() - ); + println!("Bench Over!"); + stop_tx.send(()).unwrap(); + print_throughput_result(data_size_rx.await.unwrap()) } else { let properties = read_sink_option_from_yml(&cfg.option_path) .get(&cfg.sink) @@ -453,11 +471,7 @@ async fn main() { }); sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await; println!("Bench Over!"); - let throughput_result = throughput_metric.read().await.get_throughput(); - if throughput_result.is_empty() { - println!("Throughput Sink: Don't get Throughput, please check"); - } else { - println!("Throughput Sink: {:?}", throughput_result); - } + stop_tx.send(()).unwrap(); + print_throughput_result(data_size_rx.await.unwrap()); } } diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index 9a1f52154d06d..0519cb247e8d9 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -27,7 +27,7 @@ use crate::sink::{Result, SinkError, SinkParam}; pub struct CoordinatedSinkWriter>> { epoch: u64, - coordinator_stream_handle: Option, + coordinator_stream_handle: CoordinatorStreamHandle, inner: W, } @@ -40,24 +40,12 @@ impl>> CoordinatedSinkWriter ) -> Result { Ok(Self { epoch: 0, - coordinator_stream_handle: Some(client.new_stream_handle(param, vnode_bitmap).await?), + coordinator_stream_handle: client.new_stream_handle(param, vnode_bitmap).await?, inner, }) } } -impl>> CoordinatedSinkWriter { - async fn commit_metadate(&mut self, metadata: SinkMetadata) -> Result<()> { - if let Some(coordinator_stream_handle) = self.coordinator_stream_handle.as_mut() { - coordinator_stream_handle - .commit(self.epoch, metadata) - .await?; - return Ok(()); - } - Err(SinkError::Coordinator(anyhow!( - "coordinator_stream_handle is None" - ))) - } -} + #[async_trait::async_trait] impl>> SinkWriter for CoordinatedSinkWriter { async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { @@ -76,7 +64,8 @@ impl>> SinkWriter for Coordi SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier")) })?; // TODO: add metrics to measure time to commit - self.commit_metadate(metadata).await?; + self.coordinator_stream_handle.commit(self.epoch, metadata) + .await?; Ok(()) } else { if metadata.is_some() { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c58ea97718103..81fb920a80ab8 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -266,25 +266,25 @@ pub struct SinkWriterParam { pub connector_params: ConnectorParams, pub executor_id: u64, pub vnode_bitmap: Option, - pub meta_client: Option, + pub meta_client: Option, pub sink_metrics: SinkMetrics, } #[derive(Clone)] -pub enum MetaClientForSink { +pub enum SinkMetaClient { MetaClient(MetaClient), MockMetaClient(MockMetaClient), } -impl MetaClientForSink { +impl SinkMetaClient { pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum { match self { - MetaClientForSink::MetaClient(meta_client) => { + SinkMetaClient::MetaClient(meta_client) => { SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient( meta_client.sink_coordinate_client().await, ) } - MetaClientForSink::MockMetaClient(mock_meta_client) => { + SinkMetaClient::MockMetaClient(mock_meta_client) => { SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient( mock_meta_client.sink_coordinate_client(), ) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 0c35cbd255261..711c7eef43b22 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -19,7 +19,7 @@ use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ - MetaClientForSink, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, + SinkMetaClient, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; @@ -114,7 +114,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector_params: params.env.connector_params(), executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap.clone(), - meta_client: params.env.meta_client().map(MetaClientForSink::MetaClient), + meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), sink_metrics, };