Skip to content

Commit

Permalink
add mock coordination
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jan 4, 2024
1 parent 36de58a commit ea658e9
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 196 deletions.
2 changes: 1 addition & 1 deletion src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ parking_lot = "0.12"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true, default-features = false, features = ["sink_bench"] }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rt = { workspace = true, optional = true }
risingwave_storage = { workspace = true }
Expand Down
65 changes: 51 additions & 14 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RisingWave Labs
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,12 +34,16 @@ use risingwave_connector::dispatch_sink;
use risingwave_connector::parser::{
EncodingProperties, ParserConfig, ProtocolProperties, SpecificParserConfig,
};
use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType};
use risingwave_connector::sink::catalog::{
SinkEncode, SinkFormat, SinkFormatDesc, SinkId, SinkType,
};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset,
};
use risingwave_connector::sink::mock_coordination_client::MockMetaClient;
use risingwave_connector::sink::{
build_sink, LogSinker, MetaClientForSink, MockMetaClient, Sink, SinkParam, SinkWriterParam,
build_sink, LogSinker, MetaClientForSink, Sink, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT,
};
use risingwave_connector::source::datagen::{
DatagenProperties, DatagenSplitEnumerator, DatagenSplitReader,
Expand Down Expand Up @@ -154,13 +158,15 @@ impl ThroughputMetric {
}
}

pub fn get_throughput(&self) -> Vec<u64> {
pub fn get_throughput(&self) -> Vec<String> {
#[allow(clippy::disallowed_methods)]
self.chunk_size_list
.iter()
.zip(self.chunk_size_list.iter().skip(1))
.map(|(current, next)| {
(next.0 - current.0) * 1000 / (next.1.duration_since(current.1).as_millis() as u64)
let throughput = (next.0 - current.0) * 1000
/ (next.1.duration_since(current.1).as_millis() as u64);
format!("{} rows/s", throughput)
})
.collect()
}
Expand Down Expand Up @@ -272,10 +278,12 @@ where
<S as risingwave_connector::sink::Sink>::Coordinator: std::marker::Send,
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
{
sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient(MockMetaClient::new(
Box::new(sink.new_coordinator().await.unwrap()),
)));
sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1));
if let Ok(coordinator) = sink.new_coordinator().await {
sink_writer_param.meta_client = Some(MetaClientForSink::MockMetaClient(
MockMetaClient::new(Box::new(coordinator)),
));
sink_writer_param.vnode_bitmap = Some(Bitmap::ones(1));
}
let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap();
if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await {
return Err(e.to_string());
Expand Down Expand Up @@ -358,6 +366,33 @@ pub struct Config {
split_num: String,
}

fn mock_from_legacy_type(
connector: &str,
r#type: &str,
) -> Result<Option<SinkFormatDesc>, SinkError> {
use risingwave_connector::sink::redis::RedisSink;
use risingwave_connector::sink::Sink as _;
if connector.eq(RedisSink::SINK_NAME) {
let format = match r#type {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
_ => {
return Err(SinkError::Config(risingwave_common::array::error::anyhow!(
"sink type unsupported: {}",
r#type
)))
}
};
Ok(Some(SinkFormatDesc {
format,
encode: SinkEncode::Json,
options: Default::default(),
}))
} else {
SinkFormatDesc::from_legacy_type(connector, r#type)
}
}

#[tokio::main]
async fn main() {
let cfg = Config::parse();
Expand Down Expand Up @@ -390,7 +425,7 @@ async fn main() {
.clone();

let connector = properties.get("connector").unwrap().clone();
let format_desc = SinkFormatDesc::mock_from_legacy_type(
let format_desc = mock_from_legacy_type(
&connector.clone(),
properties.get("type").unwrap_or(&"append-only".to_string()),
)
Expand All @@ -416,9 +451,11 @@ async fn main() {
.unwrap();
});
sleep(tokio::time::Duration::from_secs(BENCH_TIME)).await;
println!(
"Throughput Sink: {:?}",
throughput_metric.read().await.get_throughput()
);
let throughput_result = throughput_metric.read().await.get_throughput();
if throughput_result.is_empty() {
println!("Throughput Sink: Don't get Throughput, please check");
} else {
println!("Throughput Sink: {:?}", throughput_result);
}
}
}
3 changes: 0 additions & 3 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ walkdir = "2"
prost-build = "0.12"
protobuf-src = "1"

[features]
sink_bench = []

[[bench]]
name = "parser"
harness = false
Expand Down
25 changes: 0 additions & 25 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,6 @@ impl SinkFormatDesc {
}))
}

#[cfg(feature = "sink_bench")]
pub fn mock_from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::redis::RedisSink;
use crate::sink::Sink as _;
if connector.eq(RedisSink::SINK_NAME) {
let format = match r#type {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
_ => {
return Err(SinkError::Config(anyhow!(
"sink type unsupported: {}",
r#type
)))
}
};
Ok(Some(Self {
format,
encode: SinkEncode::Json,
options: Default::default(),
}))
} else {
Self::from_legacy_type(connector, r#type)
}
}

pub fn to_proto(&self) -> PbSinkFormatDesc {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

Expand Down
170 changes: 170 additions & 0 deletions src/connector/src/sink/mock_coordination_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::buffer::Bitmap;
use risingwave_pb::connector_service::coordinate_response::{
self, CommitResponse, StartCoordinationResponse,
};
use risingwave_pb::connector_service::{
coordinate_request, CoordinateRequest, CoordinateResponse, PbSinkParam,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
use tokio::sync::mpsc::{self, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;

use super::boxed::BoxCoordinator;
use super::{SinkParam, BOUNDED_CHANNEL_SIZE};

#[derive(Clone)]
pub enum SinkCoordinationRpcClientEnum {
SinkCoordinationRpcClient(SinkCoordinationRpcClient),
MockSinkCoordinationRpcClient(MockSinkCoordinationRpcClient),
}

impl SinkCoordinationRpcClientEnum {
pub async fn new_stream_handle(
self,
param: SinkParam,
vnode_bitmap: Bitmap,
) -> super::Result<CoordinatorStreamHandle> {
match self {
SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
sink_coordination_rpc_client,
) => Ok(CoordinatorStreamHandle::new(
sink_coordination_rpc_client,
param.to_proto(),
vnode_bitmap,
)
.await?),
SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
mock_sink_coordination_rpc_client,
) => Ok(mock_sink_coordination_rpc_client
.new_stream_handle(param.to_proto(), vnode_bitmap)
.await?),
}
}
}

#[derive(Clone)]
pub struct MockMetaClient {
mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
}
impl MockMetaClient {
pub fn new(mock_coordinator_committer: BoxCoordinator) -> Self {
Self {
mock_coordinator_committer: std::sync::Arc::new(tokio::sync::Mutex::new(
mock_coordinator_committer,
)),
}
}

pub fn sink_coordinate_client(&self) -> MockSinkCoordinationRpcClient {
MockSinkCoordinationRpcClient::new(self.mock_coordinator_committer.clone())
}
}

#[derive(Clone)]
pub struct MockSinkCoordinationRpcClient {
mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
}

impl MockSinkCoordinationRpcClient {
pub fn new(
mock_coordinator_committer: std::sync::Arc<tokio::sync::Mutex<BoxCoordinator>>,
) -> Self {
Self {
mock_coordinator_committer,
}
}

pub async fn new_stream_handle(
&self,
param: PbSinkParam,
vnode_bitmap: Bitmap,
) -> std::result::Result<CoordinatorStreamHandle, RpcError> {
CoordinatorStreamHandle::new_with_init_stream(param, vnode_bitmap, |rx| async move {
self.coordinate(rx).await
})
.await
}

pub async fn coordinate(
&self,
mut receiver_stream: Receiver<CoordinateRequest>,
) -> std::result::Result<
tonic::Response<ReceiverStream<std::result::Result<CoordinateResponse, tonic::Status>>>,
Status,
> {
match receiver_stream.try_recv() {
Ok(CoordinateRequest {
msg:
Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest(
coordinate_request::StartCoordinationRequest {
param: Some(_param),
vnode_bitmap: Some(_vnode_bitmap),
},
)),
}) => (),
msg => {
return Err(Status::invalid_argument(format!(
"expected CoordinateRequest::StartRequest in the first request, get {:?}",
msg
)));
}
};

let (response_tx, response_rx) =
mpsc::channel::<std::result::Result<CoordinateResponse, Status>>(BOUNDED_CHANNEL_SIZE);
let response_tx = std::sync::Arc::new(response_tx);
response_tx
.send(Ok(CoordinateResponse {
msg: Some(coordinate_response::Msg::StartResponse(
StartCoordinationResponse {},
)),
}))
.await
.map_err(|e| Status::from_error(Box::new(e)))?;

let mock_coordinator_committer = self.mock_coordinator_committer.clone();
let response_tx_clone = response_tx.clone();
tokio::spawn(async move {
loop {
match receiver_stream.recv().await {
Some(CoordinateRequest {
msg:
Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest {
epoch,
metadata,
})),
}) => {
mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?;
response_tx_clone.clone().send(Ok(CoordinateResponse {
msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})),
})).await.map_err(|e| Status::from_error(Box::new(e)))?;
},
msg => {
return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(Status::invalid_argument(format!(
"expected CoordinateRequest::CommitRequest , get {:?}",
msg
)));
}
}
}
});

Ok(tonic::Response::new(ReceiverStream::new(response_rx)))
}
}
Loading

0 comments on commit ea658e9

Please sign in to comment.