Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jan 11, 2024
1 parent 7ae1611 commit 2c5a9a5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 49 deletions.
66 changes: 40 additions & 26 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
#![feature(stmt_expr_attributes)]
#![feature(let_chains)]

use core::pin::Pin;
use core::str::FromStr;
use core::sync::atomic::{AtomicU64, Ordering};
use std::collections::HashMap;

use clap::Parser;
use futures::prelude::future::Either;
use futures::prelude::stream::PollNext;
use futures::prelude::stream::{PollNext, BoxStream};
use futures::stream::select_with_strategy;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::ColumnId;
Expand All @@ -42,7 +41,7 @@ use risingwave_connector::sink::log_store::{
};
use risingwave_connector::sink::mock_coordination_client::MockMetaClient;
use risingwave_connector::sink::{
build_sink, LogSinker, MetaClientForSink, Sink, SinkError, SinkParam, SinkWriterParam,
build_sink, LogSinker, SinkMetaClient, Sink, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT,
};
use risingwave_connector::source::datagen::{
Expand All @@ -51,9 +50,10 @@ use risingwave_connector::source::datagen::{
use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader};
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, StreamExecutorError};
use risingwave_stream::executor::{Barrier, Message, StreamExecutorError, MessageStreamItem};
use serde::{Deserialize, Deserializer};
use tokio::sync::RwLock;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::time::{sleep, Instant};

const CHECKPOINT_INTERVAL: u64 = 1000;
Expand All @@ -62,20 +62,25 @@ const BENCH_TIME: u64 = 20;
const BENCH_TEST: &str = "bench_test";

pub struct MockRangeLogReader {
up_stream: Pin<Box<dyn Stream<Item = Result<Message, StreamExecutorError>> + Send>>,
upstreams: BoxStream<'static, MessageStreamItem>,
current_epoch: u64,
chunk_id: usize,
throughput_metric: std::sync::Arc<RwLock<ThroughputMetric>>,
throughput_metric: ThroughputMetric,
stop_rx: Receiver<()>,
result_tx: Option<Sender<Vec<String>>>,
}

impl LogReader for MockRangeLogReader {
async fn init(&mut self) -> LogStoreResult<()> {
self.throughput_metric.write().await.add_metric(0);
self.throughput_metric.add_metric(0);
Ok(())
}

async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
match self.up_stream.next().await.unwrap().unwrap() {
if let Ok(()) = self.stop_rx.try_recv(){
self.result_tx.take().unwrap().send(self.throughput_metric.get_throughput()).unwrap();
}
match self.upstreams.next().await.unwrap().unwrap() {
Message::Barrier(barrier) => {
let prev_epoch = self.current_epoch;
self.current_epoch = barrier.epoch.curr;
Expand All @@ -88,8 +93,6 @@ impl LogReader for MockRangeLogReader {
}
Message::Chunk(chunk) => {
self.throughput_metric
.write()
.await
.add_metric(chunk.capacity());
self.chunk_id += 1;
Ok((
Expand All @@ -116,13 +119,17 @@ impl LogReader for MockRangeLogReader {
impl MockRangeLogReader {
fn new(
mock_source: MockDatagenSource,
throughput_metric: std::sync::Arc<RwLock<ThroughputMetric>>,
throughput_metric: ThroughputMetric,
stop_rx: Receiver<()>,
result_tx: Sender<Vec<String>>,
) -> MockRangeLogReader {
MockRangeLogReader {
up_stream: mock_source.into_stream().boxed(),
upstreams: mock_source.into_stream().boxed(),
current_epoch: 0,
chunk_id: 0,
throughput_metric,
stop_rx,
result_tx: Some(result_tx),
}
}
}
Expand Down Expand Up @@ -279,7 +286,7 @@ where
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
{
if let Ok(coordinator) = sink.new_coordinator().await {
sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient(
sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(
MockMetaClient::new(Box::new(coordinator)),
));
sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1));
Expand Down Expand Up @@ -393,6 +400,14 @@ fn mock_from_legacy_type(
}
}

fn print_throughput_result(throughput_result: Vec<String>){
if throughput_result.is_empty() {
println!("Throughput Sink: Don't get Throughput, please check");
} else {
println!("Throughput Sink: {:?}", throughput_result);
}
}

#[tokio::main]
async fn main() {
let cfg = Config::parse();
Expand All @@ -403,21 +418,24 @@ async fn main() {
cfg.split_num,
)
.await;
let throughput_metric = std::sync::Arc::new(RwLock::new(ThroughputMetric::new()));
let (data_size_tx,data_size_rx) = tokio::sync::oneshot::channel::<Vec<String>>();
let (stop_tx,stop_rx) = tokio::sync::oneshot::channel::<()>();
let throughput_metric = ThroughputMetric::new();

let mut mock_range_log_reader =
MockRangeLogReader::new(mock_datagen_source, throughput_metric.clone());
MockRangeLogReader::new(mock_datagen_source, throughput_metric,stop_rx,data_size_tx);
if cfg.sink.eq(&BENCH_TEST.to_string()) {
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
tokio::spawn(async move {
mock_range_log_reader.init().await.unwrap();
loop {
mock_range_log_reader.next_item().await.unwrap();
}
});
sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await;
println!(
"Throughput Test: {:?}",
throughput_metric.read().await.get_throughput()
);
println!("Bench Over!");
stop_tx.send(()).unwrap();
print_throughput_result(data_size_rx.await.unwrap())
} else {
let properties = read_sink_option_from_yml(&cfg.option_path)
.get(&cfg.sink)
Expand Down Expand Up @@ -453,11 +471,7 @@ async fn main() {
});
sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await;
println!("Bench Over!");
let throughput_result = throughput_metric.read().await.get_throughput();
if throughput_result.is_empty() {
println!("Throughput Sink: Don't get Throughput, please check");
} else {
println!("Throughput Sink: {:?}", throughput_result);
}
stop_tx.send(()).unwrap();
print_throughput_result(data_size_rx.await.unwrap());
}
}
21 changes: 5 additions & 16 deletions src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::sink::{Result, SinkError, SinkParam};

pub struct CoordinatedSinkWriter<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
epoch: u64,
coordinator_stream_handle: Option<CoordinatorStreamHandle>,
coordinator_stream_handle: CoordinatorStreamHandle,
inner: W,
}

Expand All @@ -40,24 +40,12 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedSinkWriter
) -> Result<Self> {
Ok(Self {
epoch: 0,
coordinator_stream_handle: Some(client.new_stream_handle(param, vnode_bitmap).await?),
coordinator_stream_handle: client.new_stream_handle(param, vnode_bitmap).await?,
inner,
})
}
}
impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedSinkWriter<W> {
async fn commit_metadate(&mut self, metadata: SinkMetadata) -> Result<()> {
if let Some(coordinator_stream_handle) = self.coordinator_stream_handle.as_mut() {
coordinator_stream_handle
.commit(self.epoch, metadata)
.await?;
return Ok(());
}
Err(SinkError::Coordinator(anyhow!(
"coordinator_stream_handle is None"
)))
}
}

#[async_trait::async_trait]
impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for CoordinatedSinkWriter<W> {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
Expand All @@ -76,7 +64,8 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier"))
})?;
// TODO: add metrics to measure time to commit
self.commit_metadate(metadata).await?;
self.coordinator_stream_handle.commit(self.epoch, metadata)
.await?;
Ok(())
} else {
if metadata.is_some() {
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,25 +266,25 @@ pub struct SinkWriterParam {
pub connector_params: ConnectorParams,
pub executor_id: u64,
pub vnode_bitmap: Option<Bitmap>,
pub meta_client: Option<MetaClientForSink>,
pub meta_client: Option<SinkMetaClient>,
pub sink_metrics: SinkMetrics,
}

#[derive(Clone)]
pub enum MetaClientForSink {
pub enum SinkMetaClient {
MetaClient(MetaClient),
MockMetaClient(MockMetaClient),
}

impl MetaClientForSink {
impl SinkMetaClient {
pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
match self {
MetaClientForSink::MetaClient(meta_client) => {
SinkMetaClient::MetaClient(meta_client) => {
SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
meta_client.sink_coordinate_client().await,
)
}
MetaClientForSink::MockMetaClient(mock_meta_client) => {
SinkMetaClient::MockMetaClient(mock_meta_client) => {
SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
mock_meta_client.sink_coordinate_client(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::ColumnCatalog;
use risingwave_connector::match_sink_name_str;
use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
MetaClientForSink, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION,
SinkMetaClient, SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION,
};
use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};

Expand Down Expand Up @@ -114,7 +114,7 @@ impl ExecutorBuilder for SinkExecutorBuilder {
connector_params: params.env.connector_params(),
executor_id: params.executor_id,
vnode_bitmap: params.vnode_bitmap.clone(),
meta_client: params.env.meta_client().map(MetaClientForSink::MetaClient),
meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
sink_metrics,
};

Expand Down

0 comments on commit 2c5a9a5

Please sign in to comment.