diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index cb1774feed752..e4996a1d88165 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -18,11 +18,12 @@ use core::str::FromStr; use core::sync::atomic::{AtomicU64, Ordering}; +use core::time::Duration; use std::collections::HashMap; use clap::Parser; use futures::prelude::future::Either; -use futures::prelude::stream::{PollNext, BoxStream}; +use futures::prelude::stream::{BoxStream, PollNext}; use futures::stream::select_with_strategy; use futures::{FutureExt, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; @@ -41,7 +42,7 @@ use risingwave_connector::sink::log_store::{ }; use risingwave_connector::sink::mock_coordination_client::MockMetaClient; use risingwave_connector::sink::{ - build_sink, LogSinker, SinkMetaClient, Sink, SinkError, SinkParam, SinkWriterParam, + build_sink, LogSinker, Sink, SinkError, SinkMetaClient, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT, }; use risingwave_connector::source::datagen::{ @@ -50,11 +51,10 @@ use risingwave_connector::source::datagen::{ use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader}; use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; -use risingwave_stream::executor::{Barrier, Message, StreamExecutorError, MessageStreamItem}; +use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError}; use serde::{Deserialize, Deserializer}; use tokio::sync::RwLock; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::time::{sleep, Instant}; +use tokio::time::{interval, sleep, Instant}; const CHECKPOINT_INTERVAL: u64 = 1000; const THROUGHPUT_METRIC_RECORD_INTERVAL: u128 = 500; @@ -65,21 +65,16 @@ pub struct MockRangeLogReader { upstreams: BoxStream<'static, MessageStreamItem>, current_epoch: u64, chunk_id: usize, - throughput_metric: ThroughputMetric, - stop_rx: Receiver<()>, - result_tx: Option>>, + result_tx: tokio::sync::mpsc::Sender, } impl LogReader for MockRangeLogReader { async fn init(&mut self) -> LogStoreResult<()> { - self.throughput_metric.add_metric(0); + self.result_tx.send(0).await.unwrap(); Ok(()) } async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { - if let Ok(()) = self.stop_rx.try_recv(){ - self.result_tx.take().unwrap().send(self.throughput_metric.get_throughput()).unwrap(); - } match self.upstreams.next().await.unwrap().unwrap() { Message::Barrier(barrier) => { let prev_epoch = self.current_epoch; @@ -92,8 +87,7 @@ impl LogReader for MockRangeLogReader { )) } Message::Chunk(chunk) => { - self.throughput_metric - .add_metric(chunk.capacity()); + self.result_tx.send(chunk.capacity()).await.unwrap(); self.chunk_id += 1; Ok(( self.current_epoch, @@ -119,17 +113,13 @@ impl LogReader for MockRangeLogReader { impl MockRangeLogReader { fn new( mock_source: MockDatagenSource, - throughput_metric: ThroughputMetric, - stop_rx: Receiver<()>, - result_tx: Sender>, + result_tx: tokio::sync::mpsc::Sender, ) -> MockRangeLogReader { MockRangeLogReader { upstreams: mock_source.into_stream().boxed(), current_epoch: 0, chunk_id: 0, - throughput_metric, - stop_rx, - result_tx: Some(result_tx), + result_tx, } } } @@ -286,9 +276,9 @@ where ::Coordinator: 'static, { if let Ok(coordinator) = sink.new_coordinator().await { - sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient( - MockMetaClient::new(Box::new(coordinator)), - )); + sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(MockMetaClient::new( + Box::new(coordinator), + ))); sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1)); } let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap(); @@ -400,14 +390,6 @@ fn mock_from_legacy_type( } } -fn print_throughput_result(throughput_result: Vec){ - if throughput_result.is_empty() { - println!("Throughput Sink: Don't get Throughput, please check"); - } else { - println!("Throughput Sink: {:?}", throughput_result); - } -} - #[tokio::main] async fn main() { let cfg = Config::parse(); @@ -418,24 +400,43 @@ async fn main() { cfg.split_num, ) .await; - let (data_size_tx,data_size_rx) = tokio::sync::oneshot::channel::>(); - let (stop_tx,stop_rx) = tokio::sync::oneshot::channel::<()>(); - let throughput_metric = ThroughputMetric::new(); + let (data_size_tx, mut data_size_rx) = tokio::sync::mpsc::channel::(100); + let mut throughput_metric = ThroughputMetric::new(); - let mut mock_range_log_reader = - MockRangeLogReader::new(mock_datagen_source, throughput_metric,stop_rx,data_size_tx); - if cfg.sink.eq(&BENCH_TEST.to_string()) { + let mut mock_range_log_reader = MockRangeLogReader::new(mock_datagen_source, data_size_tx); + let join_handle = async move { println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); + let mut interval = interval(Duration::from_secs(BENCH_TIME)); + interval.tick().await; + loop { + tokio::select! { + data_size = data_size_rx.recv() => { + throughput_metric.add_metric(data_size.unwrap()); + } + _ = interval.tick() => { + data_size_rx.close(); + while let Some(data_size) = data_size_rx.recv().await { + throughput_metric.add_metric(data_size); + } + println!("Bench Over!"); + break; + } + } + } + let throughput_result = throughput_metric.get_throughput(); + if throughput_result.is_empty() { + println!("Throughput Sink: Don't get Throughput, please check"); + } else { + println!("Throughput Sink: {:?}", throughput_result); + } + }; + if cfg.sink.eq(&BENCH_TEST.to_string()) { tokio::spawn(async move { mock_range_log_reader.init().await.unwrap(); loop { mock_range_log_reader.next_item().await.unwrap(); } }); - sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await; - println!("Bench Over!"); - stop_tx.send(()).unwrap(); - print_throughput_result(data_size_rx.await.unwrap()) } else { let properties = read_sink_option_from_yml(&cfg.option_path) .get(&cfg.sink) @@ -460,7 +461,6 @@ async fn main() { }; let sink = build_sink(sink_param).unwrap(); let mut sink_writer_param = SinkWriterParam::for_test(); - println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); sink_writer_param.connector_params.sink_payload_format = SinkPayloadFormat::StreamChunk; tokio::spawn(async move { dispatch_sink!(sink, sink, { @@ -469,9 +469,6 @@ async fn main() { .await .unwrap(); }); - sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await; - println!("Bench Over!"); - stop_tx.send(()).unwrap(); - print_throughput_result(data_size_rx.await.unwrap()); } + join_handle.await; } diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index 0519cb247e8d9..f2bc4f4b3b362 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -64,7 +64,8 @@ impl>> SinkWriter for Coordi SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier")) })?; // TODO: add metrics to measure time to commit - self.coordinator_stream_handle.commit(self.epoch, metadata) + self.coordinator_stream_handle + .commit(self.epoch, metadata) .await?; Ok(()) } else { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 711c7eef43b22..a44a2cdd3f0c8 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -19,7 +19,7 @@ use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ - SinkMetaClient, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, + SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};