Skip to content

Commit

Permalink
use channal instead of lock
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jan 11, 2024
1 parent 2c5a9a5 commit 0da3c67
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 48 deletions.
89 changes: 43 additions & 46 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

use core::str::FromStr;
use core::sync::atomic::{AtomicU64, Ordering};
use core::time::Duration;
use std::collections::HashMap;

use clap::Parser;
use futures::prelude::future::Either;
use futures::prelude::stream::{PollNext, BoxStream};
use futures::prelude::stream::{BoxStream, PollNext};
use futures::stream::select_with_strategy;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
Expand All @@ -41,7 +42,7 @@ use risingwave_connector::sink::log_store::{
};
use risingwave_connector::sink::mock_coordination_client::MockMetaClient;
use risingwave_connector::sink::{
build_sink, LogSinker, SinkMetaClient, Sink, SinkError, SinkParam, SinkWriterParam,
build_sink, LogSinker, Sink, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT,
};
use risingwave_connector::source::datagen::{
Expand All @@ -50,11 +51,10 @@ use risingwave_connector::source::datagen::{
use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader};
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, StreamExecutorError, MessageStreamItem};
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
use serde::{Deserialize, Deserializer};
use tokio::sync::RwLock;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::time::{sleep, Instant};
use tokio::time::{interval, sleep, Instant};

const CHECKPOINT_INTERVAL: u64 = 1000;
const THROUGHPUT_METRIC_RECORD_INTERVAL: u128 = 500;
Expand All @@ -65,21 +65,16 @@ pub struct MockRangeLogReader {
upstreams: BoxStream<'static, MessageStreamItem>,
current_epoch: u64,
chunk_id: usize,
throughput_metric: ThroughputMetric,
stop_rx: Receiver<()>,
result_tx: Option<Sender<Vec<String>>>,
result_tx: tokio::sync::mpsc::Sender<usize>,
}

impl LogReader for MockRangeLogReader {
async fn init(&mut self) -> LogStoreResult<()> {
self.throughput_metric.add_metric(0);
self.result_tx.send(0).await.unwrap();
Ok(())
}

async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
if let Ok(()) = self.stop_rx.try_recv(){
self.result_tx.take().unwrap().send(self.throughput_metric.get_throughput()).unwrap();
}
match self.upstreams.next().await.unwrap().unwrap() {
Message::Barrier(barrier) => {
let prev_epoch = self.current_epoch;
Expand All @@ -92,8 +87,7 @@ impl LogReader for MockRangeLogReader {
))
}
Message::Chunk(chunk) => {
self.throughput_metric
.add_metric(chunk.capacity());
self.result_tx.send(chunk.capacity()).await.unwrap();
self.chunk_id += 1;
Ok((
self.current_epoch,
Expand All @@ -119,17 +113,13 @@ impl LogReader for MockRangeLogReader {
impl MockRangeLogReader {
fn new(
mock_source: MockDatagenSource,
throughput_metric: ThroughputMetric,
stop_rx: Receiver<()>,
result_tx: Sender<Vec<String>>,
result_tx: tokio::sync::mpsc::Sender<usize>,
) -> MockRangeLogReader {
MockRangeLogReader {
upstreams: mock_source.into_stream().boxed(),
current_epoch: 0,
chunk_id: 0,
throughput_metric,
stop_rx,
result_tx: Some(result_tx),
result_tx,
}
}
}
Expand Down Expand Up @@ -286,9 +276,9 @@ where
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
{
if let Ok(coordinator) = sink.new_coordinator().await {
sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(
MockMetaClient::new(Box::new(coordinator)),
));
sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(MockMetaClient::new(
Box::new(coordinator),
)));
sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1));
}
let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap();
Expand Down Expand Up @@ -400,14 +390,6 @@ fn mock_from_legacy_type(
}
}

fn print_throughput_result(throughput_result: Vec<String>){
if throughput_result.is_empty() {
println!("Throughput Sink: Don't get Throughput, please check");
} else {
println!("Throughput Sink: {:?}", throughput_result);
}
}

#[tokio::main]
async fn main() {
let cfg = Config::parse();
Expand All @@ -418,24 +400,43 @@ async fn main() {
cfg.split_num,
)
.await;
let (data_size_tx,data_size_rx) = tokio::sync::oneshot::channel::<Vec<String>>();
let (stop_tx,stop_rx) = tokio::sync::oneshot::channel::<()>();
let throughput_metric = ThroughputMetric::new();
let (data_size_tx, mut data_size_rx) = tokio::sync::mpsc::channel::<usize>(100);
let mut throughput_metric = ThroughputMetric::new();

let mut mock_range_log_reader =
MockRangeLogReader::new(mock_datagen_source, throughput_metric,stop_rx,data_size_tx);
if cfg.sink.eq(&BENCH_TEST.to_string()) {
let mut mock_range_log_reader = MockRangeLogReader::new(mock_datagen_source, data_size_tx);
let join_handle = async move {
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
let mut interval = interval(Duration::from_secs(BENCH_TIME));
interval.tick().await;
loop {
tokio::select! {
data_size = data_size_rx.recv() => {
throughput_metric.add_metric(data_size.unwrap());
}
_ = interval.tick() => {
data_size_rx.close();
while let Some(data_size) = data_size_rx.recv().await {
throughput_metric.add_metric(data_size);
}
println!("Bench Over!");
break;
}
}
}
let throughput_result = throughput_metric.get_throughput();
if throughput_result.is_empty() {
println!("Throughput Sink: Don't get Throughput, please check");
} else {
println!("Throughput Sink: {:?}", throughput_result);
}
};
if cfg.sink.eq(&BENCH_TEST.to_string()) {
tokio::spawn(async move {
mock_range_log_reader.init().await.unwrap();
loop {
mock_range_log_reader.next_item().await.unwrap();
}
});
sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await;
println!("Bench Over!");
stop_tx.send(()).unwrap();
print_throughput_result(data_size_rx.await.unwrap())
} else {
let properties = read_sink_option_from_yml(&cfg.option_path)
.get(&cfg.sink)
Expand All @@ -460,7 +461,6 @@ async fn main() {
};
let sink = build_sink(sink_param).unwrap();
let mut sink_writer_param = SinkWriterParam::for_test();
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
sink_writer_param.connector_params.sink_payload_format = SinkPayloadFormat::StreamChunk;
tokio::spawn(async move {
dispatch_sink!(sink, sink, {
Expand All @@ -469,9 +469,6 @@ async fn main() {
.await
.unwrap();
});
sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await;
println!("Bench Over!");
stop_tx.send(()).unwrap();
print_throughput_result(data_size_rx.await.unwrap());
}
join_handle.await;
}
3 changes: 2 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier"))
})?;
// TODO: add metrics to measure time to commit
self.coordinator_stream_handle.commit(self.epoch, metadata)
self.coordinator_stream_handle
.commit(self.epoch, metadata)
.await?;
Ok(())
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::ColumnCatalog;
use risingwave_connector::match_sink_name_str;
use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
SinkMetaClient, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION,
SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION,
};
use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};

Expand Down

0 comments on commit 0da3c67

Please sign in to comment.