diff --git a/clippy.toml b/clippy.toml index 21b972376b0ed..23134a6fe11f6 100644 --- a/clippy.toml +++ b/clippy.toml @@ -13,6 +13,8 @@ disallowed-methods = [ { path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." }, { path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." }, { path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." }, + { path = "tonic::transport::Endpoint::connect", reason = "Please use `EndpointExt::monitored_connect` instead." }, + { path = "tonic::transport::Endpoint::connect_lazy", reason = "Please use `EndpointExt::monitored_connect_lazy` instead." }, ] disallowed-types = [ { path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." }, diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs index aa7c8c8d4baa3..492fdf4c049a8 100644 --- a/src/common/metrics/src/monitor/connection.rs +++ b/src/common/metrics/src/monitor/connection.rs @@ -482,6 +482,16 @@ pub struct TcpConfig { pub keepalive_duration: Option, } +#[allow(clippy::derivable_impls)] +impl Default for TcpConfig { + fn default() -> Self { + Self { + tcp_nodelay: false, + keepalive_duration: None, + } + } +} + pub fn monitor_connector( connector: C, connection_type: impl Into, diff --git a/src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs b/src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs index a2eee44fa4295..393cbb6b7e59a 100644 --- a/src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs +++ b/src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs @@ -16,6 +16,7 @@ use std::time::Duration; use futures::future::try_join_all; use itertools::Itertools; +use risingwave_common::monitor::EndpointExt; use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; use risingwave_pb::monitor_service::TieredCacheTracingRequest; use tonic::transport::Endpoint; @@ -40,7 +41,7 @@ pub async fn tiered_cache_tracing( let addr = worker_node.get_host().unwrap(); let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))? .connect_timeout(Duration::from_secs(5)) - .connect() + .monitored_connect("grpc-tiered-cache-tracing-client", Default::default()) .await?; let mut client = MonitorServiceClient::new(channel); client diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index e571dbd504d4b..7daae12d27552 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use risingwave_common::monitor::EndpointExt; use risingwave_common::util::addr::HostAddr; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::{ @@ -47,7 +48,7 @@ impl CompactorClient { pub async fn new(host_addr: HostAddr) -> Result { let channel = Endpoint::from_shared(format!("http://{}", &host_addr))? .connect_timeout(Duration::from_secs(5)) - .connect() + .monitored_connect("grpc-compactor-client", Default::default()) .await?; Ok(Self { monitor_client: MonitorServiceClient::new(channel), @@ -94,26 +95,26 @@ pub struct GrpcCompactorProxyClient { } impl GrpcCompactorProxyClient { - pub fn new(channel: Channel, endpoint: String) -> Self { + pub async fn new(endpoint: String) -> Self { + let channel = Self::connect_to_endpoint(endpoint.clone()).await; let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel))); Self { core, endpoint } } async fn recreate_core(&self) { tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect"); - let channel = self.connect_to_endpoint().await; + let channel = Self::connect_to_endpoint(self.endpoint.clone()).await; let mut core = self.core.write().await; *core = GrpcCompactorProxyClientCore::new(channel); } - async fn connect_to_endpoint(&self) -> Channel { - let endpoint = - Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint"); + async fn connect_to_endpoint(endpoint: String) -> Channel { + let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint"); endpoint .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) .connect_timeout(Duration::from_secs(5)) - .connect() + .monitored_connect("grpc-compactor-proxy-client", Default::default()) .await .expect("Failed to create channel via proxy rpc endpoint.") } diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index c065bb6935954..5ff5671cb3320 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -73,7 +73,7 @@ impl ComputeClient { "grpc-compute-client", TcpConfig { tcp_nodelay: true, - keepalive_duration: None, + ..Default::default() }, ) .await?; diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index c81a74d2fa709..b94f7516e4b85 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -12,40 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; -use std::fmt::Debug; -use std::time::Duration; - -use anyhow::{anyhow, Context}; -use futures::TryStreamExt; -use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; -use risingwave_common::monitor::{EndpointExt, TcpConfig}; -use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; -use risingwave_pb::connector_service::sink_coordinator_stream_request::{ - CommitMetadata, StartCoordinator, -}; +use anyhow::anyhow; +use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload; use risingwave_pb::connector_service::sink_writer_stream_request::{ - Barrier, Request as SinkRequest, StartSink, WriteBatch, + Barrier, Request as SinkRequest, WriteBatch, }; use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse; use risingwave_pb::connector_service::*; -use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; -use thiserror_ext::AsReport; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::{Channel, Endpoint}; -use tonic::Streaming; -use tracing::error; use crate::error::{Result, RpcError}; use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender}; -#[derive(Clone, Debug)] -pub struct ConnectorClient { - rpc_client: ConnectorServiceClient, - endpoint: String, -} - pub type SinkWriterRequestSender = BidiStreamSender; pub type SinkWriterResponseReceiver = BidiStreamReceiver; @@ -143,232 +121,3 @@ impl SinkCoordinatorStreamHandle { } } } - -impl ConnectorClient { - pub async fn try_new(connector_endpoint: Option<&String>) -> Option { - match connector_endpoint { - None => None, - Some(connector_endpoint) => match ConnectorClient::new(connector_endpoint).await { - Ok(client) => Some(client), - Err(e) => { - error!( - endpoint = connector_endpoint, - error = %e.as_report(), - "invalid connector endpoint", - ); - None - } - }, - } - } - - #[allow(clippy::unused_async)] - pub async fn new(connector_endpoint: &String) -> Result { - let endpoint = Endpoint::from_shared(format!("http://{}", connector_endpoint)) - .with_context(|| format!("invalid connector endpoint `{}`", connector_endpoint))? - .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) - .initial_stream_window_size(STREAM_WINDOW_SIZE) - .connect_timeout(Duration::from_secs(5)); - - let channel = { - #[cfg(madsim)] - { - endpoint.connect().await? - } - #[cfg(not(madsim))] - { - endpoint.monitored_connect_lazy( - "grpc-connector-client", - TcpConfig { - tcp_nodelay: true, - keepalive_duration: None, - }, - ) - } - }; - Ok(Self { - rpc_client: ConnectorServiceClient::new(channel).max_decoding_message_size(usize::MAX), - endpoint: connector_endpoint.to_string(), - }) - } - - pub fn endpoint(&self) -> &String { - &self.endpoint - } - - /// Get source event stream - pub async fn start_source_stream( - &self, - source_id: u64, - source_type: SourceType, - start_offset: Option, - properties: BTreeMap, - snapshot_done: bool, - is_source_job: bool, - ) -> Result> { - Ok(self - .rpc_client - .clone() - .get_event_stream(GetEventStreamRequest { - source_id, - source_type: source_type as _, - start_offset: start_offset.unwrap_or_default(), - properties, - snapshot_done, - is_source_job, - }) - .await - .inspect_err(|err| { - tracing::error!( - "failed to start stream for CDC source {}: {}", - source_id, - err.message() - ) - }) - .map_err(RpcError::from_connector_status)? - .into_inner()) - } - - /// Validate source properties - pub async fn validate_source_properties( - &self, - source_id: u64, - source_type: SourceType, - properties: BTreeMap, - table_schema: Option, - is_source_job: bool, - is_backfill_table: bool, - ) -> Result<()> { - let table_schema = table_schema.map(|mut table_schema| { - table_schema.columns.retain(|c| { - !matches!( - c.generated_or_default_column, - Some(GeneratedOrDefaultColumn::GeneratedColumn(_)) - ) - }); - table_schema - }); - let response = self - .rpc_client - .clone() - .validate_source(ValidateSourceRequest { - source_id, - source_type: source_type as _, - properties, - table_schema, - is_source_job, - is_backfill_table, - }) - .await - .inspect_err(|err| { - tracing::error!("failed to validate source#{}: {}", source_id, err.message()) - }) - .map_err(RpcError::from_connector_status)? - .into_inner(); - - response.error.map_or(Ok(()), |err| { - Err(RpcError::Internal(anyhow!(format!( - "source cannot pass validation: {}", - err.error_message - )))) - }) - } - - pub async fn start_sink_writer_stream( - &self, - payload_schema: Option, - sink_proto: PbSinkParam, - ) -> Result { - let mut rpc_client = self.rpc_client.clone(); - let (handle, first_rsp) = SinkWriterStreamHandle::initialize( - SinkWriterStreamRequest { - request: Some(SinkRequest::Start(StartSink { - payload_schema, - sink_param: Some(sink_proto), - })), - }, - |rx| async move { - rpc_client - .sink_writer_stream(ReceiverStream::new(rx)) - .await - .map(|response| { - response - .into_inner() - .map_err(RpcError::from_connector_status) - }) - .map_err(RpcError::from_connector_status) - }, - ) - .await?; - - match first_rsp { - SinkWriterStreamResponse { - response: Some(sink_writer_stream_response::Response::Start(_)), - } => Ok(handle), - msg => Err(RpcError::Internal(anyhow!( - "should get start response but get {:?}", - msg - ))), - } - } - - pub async fn start_sink_coordinator_stream( - &self, - param: SinkParam, - ) -> Result { - let mut rpc_client = self.rpc_client.clone(); - let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize( - SinkCoordinatorStreamRequest { - request: Some(sink_coordinator_stream_request::Request::Start( - StartCoordinator { param: Some(param) }, - )), - }, - |rx| async move { - rpc_client - .sink_coordinator_stream(ReceiverStream::new(rx)) - .await - .map(|response| { - response - .into_inner() - .map_err(RpcError::from_connector_status) - }) - .map_err(RpcError::from_connector_status) - }, - ) - .await?; - - match first_rsp { - SinkCoordinatorStreamResponse { - response: Some(sink_coordinator_stream_response::Response::Start(_)), - } => Ok(handle), - msg => Err(RpcError::Internal(anyhow!( - "should get start response but get {:?}", - msg - ))), - } - } - - pub async fn validate_sink_properties(&self, sink_param: SinkParam) -> Result<()> { - let response = self - .rpc_client - .clone() - .validate_sink(ValidateSinkRequest { - sink_param: Some(sink_param), - }) - .await - .inspect_err(|err| { - tracing::error!("failed to validate sink properties: {}", err.message()) - }) - .map_err(RpcError::from_connector_status)? - .into_inner(); - response.error.map_or_else( - || Ok(()), // If there is no error message, return Ok here. - |err| { - Err(RpcError::Internal(anyhow!(format!( - "sink cannot pass validation: {}", - err.error_message - )))) - }, - ) - } -} diff --git a/src/rpc_client/src/frontend_client.rs b/src/rpc_client/src/frontend_client.rs index 3fd02d99107e2..4d72bbb76d4c4 100644 --- a/src/rpc_client/src/frontend_client.rs +++ b/src/rpc_client/src/frontend_client.rs @@ -45,7 +45,7 @@ impl FrontendClient { "grpc-frontend-client", TcpConfig { tcp_nodelay: true, - keepalive_duration: None, + ..Default::default() }, ) .await? diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 60a4ca537d21c..9f0d458252659 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -65,7 +65,7 @@ mod tracing; pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; -pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; +pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2648843abe195..bdbba36d17a82 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -29,6 +29,7 @@ use lru::LruCache; use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; +use risingwave_common::monitor::EndpointExt; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -2012,7 +2013,7 @@ impl GrpcMetaClient { .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) .connect_timeout(Duration::from_secs(5)) - .connect() + .monitored_connect("grpc-meta-client", Default::default()) .await? .tracing_injected(); diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 92f0b25f762a0..dcd4a8edbf729 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -55,7 +55,7 @@ impl StreamClient { "grpc-stream-client", TcpConfig { tcp_nodelay: true, - keepalive_duration: None, + ..Default::default() }, ) .await? diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 6b1257f352483..b7cbe6b020826 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -51,7 +51,6 @@ use risingwave_storage::monitor::{ }; use risingwave_storage::opts::StorageOpts; use tokio::sync::mpsc; -use tonic::transport::Endpoint; use tracing::info; use super::compactor_observer::observer_manager::CompactorObserverNode; @@ -59,9 +58,6 @@ use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl}; use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; -const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; -// See `Endpoint::keep_alive_timeout` -const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; pub async fn prepare_start_parameters( config: RwConfig, system_params_reader: SystemParamsReader, @@ -332,17 +328,7 @@ pub async fn shared_compactor_serve( ); info!("> version: {} ({})", RW_VERSION, GIT_SHA); - let endpoint_str = opts.proxy_rpc_endpoint.clone().to_string(); - let endpoint = - Endpoint::from_shared(opts.proxy_rpc_endpoint).expect("Fail to construct tonic Endpoint"); - let channel = endpoint - .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) - .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) - .connect_timeout(Duration::from_secs(5)) - .connect() - .await - .expect("Failed to create channel via proxy rpc endpoint."); - let grpc_proxy_client = GrpcCompactorProxyClient::new(channel, endpoint_str); + let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await; let system_params_response = grpc_proxy_client .get_system_params() .await