Skip to content

Commit

Permalink
refactor(rpc): prefer monitored_connect over connect (#18635)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 23, 2024
1 parent a898dcc commit a5ec3ea
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 282 deletions.
2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ disallowed-methods = [
{ path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." },
{ path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." },
{ path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." },
{ path = "tonic::transport::Endpoint::connect", reason = "Please use `EndpointExt::monitored_connect` instead." },
{ path = "tonic::transport::Endpoint::connect_lazy", reason = "Please use `EndpointExt::monitored_connect_lazy` instead." },
]
disallowed-types = [
{ path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
Expand Down
10 changes: 10 additions & 0 deletions src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,16 @@ pub struct TcpConfig {
pub keepalive_duration: Option<Duration>,
}

#[allow(clippy::derivable_impls)]
impl Default for TcpConfig {
fn default() -> Self {
Self {
tcp_nodelay: false,
keepalive_duration: None,
}
}
}

pub fn monitor_connector<C>(
connector: C,
connection_type: impl Into<String>,
Expand Down
3 changes: 2 additions & 1 deletion src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::monitor::EndpointExt;
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::TieredCacheTracingRequest;
use tonic::transport::Endpoint;
Expand All @@ -40,7 +41,7 @@ pub async fn tiered_cache_tracing(
let addr = worker_node.get_host().unwrap();
let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
.await?;
let mut client = MonitorServiceClient::new(channel);
client
Expand Down
15 changes: 8 additions & 7 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::monitor::EndpointExt;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -47,7 +48,7 @@ impl CompactorClient {
pub async fn new(host_addr: HostAddr) -> Result<Self> {
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-client", Default::default())
.await?;
Ok(Self {
monitor_client: MonitorServiceClient::new(channel),
Expand Down Expand Up @@ -94,26 +95,26 @@ pub struct GrpcCompactorProxyClient {
}

impl GrpcCompactorProxyClient {
pub fn new(channel: Channel, endpoint: String) -> Self {
pub async fn new(endpoint: String) -> Self {
let channel = Self::connect_to_endpoint(endpoint.clone()).await;
let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
Self { core, endpoint }
}

async fn recreate_core(&self) {
tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
let channel = self.connect_to_endpoint().await;
let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
let mut core = self.core.write().await;
*core = GrpcCompactorProxyClientCore::new(channel);
}

async fn connect_to_endpoint(&self) -> Channel {
let endpoint =
Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint");
async fn connect_to_endpoint(endpoint: String) -> Channel {
let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
endpoint
.http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
.keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-proxy-client", Default::default())
.await
.expect("Failed to create channel via proxy rpc endpoint.")
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ComputeClient {
"grpc-compute-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
..Default::default()
},
)
.await?;
Expand Down
257 changes: 3 additions & 254 deletions src/rpc_client/src/connector_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::time::Duration;

use anyhow::{anyhow, Context};
use futures::TryStreamExt;
use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE};
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient;
use risingwave_pb::connector_service::sink_coordinator_stream_request::{
CommitMetadata, StartCoordinator,
};
use anyhow::anyhow;
use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
use risingwave_pb::connector_service::sink_writer_stream_request::{
Barrier, Request as SinkRequest, StartSink, WriteBatch,
Barrier, Request as SinkRequest, WriteBatch,
};
use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
use risingwave_pb::connector_service::*;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::{Channel, Endpoint};
use tonic::Streaming;
use tracing::error;

use crate::error::{Result, RpcError};
use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};

#[derive(Clone, Debug)]
pub struct ConnectorClient {
rpc_client: ConnectorServiceClient<Channel>,
endpoint: String,
}

pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;

Expand Down Expand Up @@ -143,232 +121,3 @@ impl SinkCoordinatorStreamHandle {
}
}
}

impl ConnectorClient {
pub async fn try_new(connector_endpoint: Option<&String>) -> Option<Self> {
match connector_endpoint {
None => None,
Some(connector_endpoint) => match ConnectorClient::new(connector_endpoint).await {
Ok(client) => Some(client),
Err(e) => {
error!(
endpoint = connector_endpoint,
error = %e.as_report(),
"invalid connector endpoint",
);
None
}
},
}
}

#[allow(clippy::unused_async)]
pub async fn new(connector_endpoint: &String) -> Result<Self> {
let endpoint = Endpoint::from_shared(format!("http://{}", connector_endpoint))
.with_context(|| format!("invalid connector endpoint `{}`", connector_endpoint))?
.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
.initial_stream_window_size(STREAM_WINDOW_SIZE)
.connect_timeout(Duration::from_secs(5));

let channel = {
#[cfg(madsim)]
{
endpoint.connect().await?
}
#[cfg(not(madsim))]
{
endpoint.monitored_connect_lazy(
"grpc-connector-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
},
)
}
};
Ok(Self {
rpc_client: ConnectorServiceClient::new(channel).max_decoding_message_size(usize::MAX),
endpoint: connector_endpoint.to_string(),
})
}

pub fn endpoint(&self) -> &String {
&self.endpoint
}

/// Get source event stream
pub async fn start_source_stream(
&self,
source_id: u64,
source_type: SourceType,
start_offset: Option<String>,
properties: BTreeMap<String, String>,
snapshot_done: bool,
is_source_job: bool,
) -> Result<Streaming<GetEventStreamResponse>> {
Ok(self
.rpc_client
.clone()
.get_event_stream(GetEventStreamRequest {
source_id,
source_type: source_type as _,
start_offset: start_offset.unwrap_or_default(),
properties,
snapshot_done,
is_source_job,
})
.await
.inspect_err(|err| {
tracing::error!(
"failed to start stream for CDC source {}: {}",
source_id,
err.message()
)
})
.map_err(RpcError::from_connector_status)?
.into_inner())
}

/// Validate source properties
pub async fn validate_source_properties(
&self,
source_id: u64,
source_type: SourceType,
properties: BTreeMap<String, String>,
table_schema: Option<TableSchema>,
is_source_job: bool,
is_backfill_table: bool,
) -> Result<()> {
let table_schema = table_schema.map(|mut table_schema| {
table_schema.columns.retain(|c| {
!matches!(
c.generated_or_default_column,
Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
)
});
table_schema
});
let response = self
.rpc_client
.clone()
.validate_source(ValidateSourceRequest {
source_id,
source_type: source_type as _,
properties,
table_schema,
is_source_job,
is_backfill_table,
})
.await
.inspect_err(|err| {
tracing::error!("failed to validate source#{}: {}", source_id, err.message())
})
.map_err(RpcError::from_connector_status)?
.into_inner();

response.error.map_or(Ok(()), |err| {
Err(RpcError::Internal(anyhow!(format!(
"source cannot pass validation: {}",
err.error_message
))))
})
}

pub async fn start_sink_writer_stream(
&self,
payload_schema: Option<TableSchema>,
sink_proto: PbSinkParam,
) -> Result<SinkWriterStreamHandle> {
let mut rpc_client = self.rpc_client.clone();
let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
SinkWriterStreamRequest {
request: Some(SinkRequest::Start(StartSink {
payload_schema,
sink_param: Some(sink_proto),
})),
},
|rx| async move {
rpc_client
.sink_writer_stream(ReceiverStream::new(rx))
.await
.map(|response| {
response
.into_inner()
.map_err(RpcError::from_connector_status)
})
.map_err(RpcError::from_connector_status)
},
)
.await?;

match first_rsp {
SinkWriterStreamResponse {
response: Some(sink_writer_stream_response::Response::Start(_)),
} => Ok(handle),
msg => Err(RpcError::Internal(anyhow!(
"should get start response but get {:?}",
msg
))),
}
}

pub async fn start_sink_coordinator_stream(
&self,
param: SinkParam,
) -> Result<SinkCoordinatorStreamHandle> {
let mut rpc_client = self.rpc_client.clone();
let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
SinkCoordinatorStreamRequest {
request: Some(sink_coordinator_stream_request::Request::Start(
StartCoordinator { param: Some(param) },
)),
},
|rx| async move {
rpc_client
.sink_coordinator_stream(ReceiverStream::new(rx))
.await
.map(|response| {
response
.into_inner()
.map_err(RpcError::from_connector_status)
})
.map_err(RpcError::from_connector_status)
},
)
.await?;

match first_rsp {
SinkCoordinatorStreamResponse {
response: Some(sink_coordinator_stream_response::Response::Start(_)),
} => Ok(handle),
msg => Err(RpcError::Internal(anyhow!(
"should get start response but get {:?}",
msg
))),
}
}

pub async fn validate_sink_properties(&self, sink_param: SinkParam) -> Result<()> {
let response = self
.rpc_client
.clone()
.validate_sink(ValidateSinkRequest {
sink_param: Some(sink_param),
})
.await
.inspect_err(|err| {
tracing::error!("failed to validate sink properties: {}", err.message())
})
.map_err(RpcError::from_connector_status)?
.into_inner();
response.error.map_or_else(
|| Ok(()), // If there is no error message, return Ok here.
|err| {
Err(RpcError::Internal(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
},
)
}
}
2 changes: 1 addition & 1 deletion src/rpc_client/src/frontend_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl FrontendClient {
"grpc-frontend-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
..Default::default()
},
)
.await?
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tracing;

pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
Expand Down
Loading

0 comments on commit a5ec3ea

Please sign in to comment.