Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): follow server.connection_pool_size config for exchange service #17755

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct BatchEnvironment {
/// Executor level metrics.
executor_metrics: Arc<BatchExecutorMetrics>,

/// Compute client pool for grpc exchange.
/// Compute client pool for batch gRPC exchange.
client_pool: ComputeClientPoolRef,

/// Manages dml information.
Expand Down Expand Up @@ -119,7 +119,7 @@ impl BatchEnvironment {
MonitoredStorageMetrics::unused(),
)),
task_metrics: Arc::new(BatchTaskMetrics::for_test()),
client_pool: Arc::new(ComputeClientPool::default()),
client_pool: Arc::new(ComputeClientPool::for_test()),
dml_manager: Arc::new(DmlManager::for_test()),
source_metrics: Arc::new(SourceMetrics::default()),
executor_metrics: Arc::new(BatchExecutorMetrics::for_test()),
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ pub async fn compute_node_serve(
));

// Initialize batch environment.
let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_env = BatchEnvironment::new(
batch_mgr.clone(),
advertise_addr.clone(),
Expand All @@ -354,14 +354,15 @@ pub async fn compute_node_serve(
state_store.clone(),
batch_task_metrics.clone(),
batch_executor_metrics.clone(),
client_pool,
batch_client_pool,
dml_mgr.clone(),
source_metrics.clone(),
batch_spill_metrics.clone(),
config.server.metrics_level,
);

// Initialize the streaming environment.
let stream_client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let stream_env = StreamEnvironment::new(
advertise_addr.clone(),
stream_config,
Expand All @@ -371,6 +372,7 @@ pub async fn compute_node_serve(
system_params_manager.clone(),
source_metrics,
meta_client.clone(),
stream_client_pool,
);

let stream_mgr = LocalStreamManager::new(
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
let compute_nodes = meta_client
.list_worker_nodes(Some(WorkerType::ComputeNode))
.await?;
let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

// FIXME: the compute node may not be accessible directly from risectl, we may let the meta
// service collect the reports from all compute nodes in the future.
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<(
.into_iter()
.filter(|w| w.r#type() == WorkerType::ComputeNode);

let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
tracing::info!("PREFIX_PROFILING is not set, using current directory");
Expand Down Expand Up @@ -96,7 +96,7 @@ pub async fn heap_profile(context: &CtlContext, dir: Option<String>) -> anyhow::
.into_iter()
.filter(|w| w.r#type() == WorkerType::ComputeNode);

let clients = ComputeClientPool::default();
let clients = ComputeClientPool::adhoc();

let mut profile_futs = vec![];

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ pub(crate) mod tests {
async fn test_query_should_not_hang_with_empty_worker() {
let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false);
let compute_client_pool = Arc::new(ComputeClientPool::default());
let compute_client_pool = Arc::new(ComputeClientPool::for_test());
let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(Arc::new(
MockFrontendMetaClient {},
)));
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl FrontendEnv {
let meta_client = Arc::new(MockFrontendMetaClient {});
let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
let compute_client_pool = Arc::new(ComputeClientPool::default());
let compute_client_pool = Arc::new(ComputeClientPool::for_test());
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool,
Expand All @@ -198,7 +198,7 @@ impl FrontendEnv {
None,
);
let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
let client_pool = Arc::new(ComputeClientPool::default());
let client_pool = Arc::new(ComputeClientPool::for_test());
let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ pub async fn start_service_as_election_leader(
prometheus_client,
prometheus_selector,
metadata_manager: metadata_manager.clone(),
compute_clients: ComputeClientPool::default(),
compute_clients: ComputeClientPool::new(1), // typically no need for plural clients
diagnose_command,
trace_state,
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl DiagnoseCommand {

let mut all = StackTraceResponse::default();

let compute_clients = ComputeClientPool::default();
let compute_clients = ComputeClientPool::adhoc();
for worker_node in &worker_nodes {
if let Ok(client) = compute_clients.get(worker_node).await
&& let Ok(result) = client.stack_trace().await
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl MetaSrvEnv {
meta_store_impl: MetaStoreImpl,
) -> MetaResult<Self> {
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let stream_client_pool = Arc::new(StreamClientPool::default());
let stream_client_pool = Arc::new(StreamClientPool::new(1)); // typically no need for plural clients
let event_log_manager = Arc::new(start_event_log_manager(
opts.event_log_enabled,
opts.event_log_channel_max_size,
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,4 @@ impl RpcClient for ComputeClient {
}

pub type ComputeClientPool = RpcClientPool<ComputeClient>;
pub type ComputeClientPoolRef = Arc<ComputeClientPool>;
pub type ComputeClientPoolRef = Arc<ComputeClientPool>; // TODO: no need for `Arc` since clone is cheap and shared
28 changes: 22 additions & 6 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,42 @@ pub struct RpcClientPool<S> {
clients: Cache<HostAddr, Arc<Vec<S>>>,
}

impl<S> Default for RpcClientPool<S>
where
S: RpcClient,
{
fn default() -> Self {
Self::new(1)
impl<S> std::fmt::Debug for RpcClientPool<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RpcClientPool")
.field("connection_pool_size", &self.connection_pool_size)
.field("type", &type_name::<S>())
.field("len", &self.clients.entry_count())
.finish()
}
}

/// Intentionally not implementing `Default` to let callers be explicit about the pool size.
impl<S> !Default for RpcClientPool<S> {}

impl<S> RpcClientPool<S>
where
S: RpcClient,
{
/// Create a new pool with the given `connection_pool_size`, which is the number of
/// connections to each node that will be reused.
pub fn new(connection_pool_size: u16) -> Self {
Self {
connection_pool_size,
clients: Cache::new(u64::MAX),
}
}

/// Create a pool for testing purposes. Same as [`Self::adhoc`].
pub fn for_test() -> Self {
Self::adhoc()
}

/// Create a pool for ad-hoc usage, where the number of connections to each node is 1.
pub fn adhoc() -> Self {
Self::new(1)
}

/// Gets the RPC client for the given node. If the connection is not established, a
/// new client will be created and returned.
pub async fn get(&self, node: &WorkerNode) -> Result<S> {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub(crate) fn new_input(
} else {
RemoteInput::new(
context.local_barrier_manager.clone(),
context.compute_client_pool.clone(),
context.compute_client_pool.as_ref().to_owned(),
upstream_addr,
(upstream_actor_id, actor_id),
(upstream_fragment_id, fragment_id),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ mod tests {
let test_env = LocalBarrierTestEnv::for_test().await;

let remote_input = {
let pool = ComputeClientPool::default();
let pool = ComputeClientPool::for_test();
RemoteInput::new(
test_env.shared_context.local_barrier_manager.clone(),
pool,
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ impl LocalBarrierWorker {
let (event_tx, event_rx) = unbounded_channel();
let (failure_tx, failure_rx) = unbounded_channel();
let shared_context = Arc::new(SharedContext::new(
actor_manager.env.server_address().clone(),
actor_manager.env.config(),
&actor_manager.env,
LocalBarrierManager {
barrier_event_sender: event_tx,
actor_failure_sender: failure_tx,
Expand Down
13 changes: 12 additions & 1 deletion src/stream/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common::util::addr::HostAddr;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::MetaClient;
use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
use risingwave_storage::StateStoreImpl;

pub(crate) type WorkerNodeId = u32;
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct StreamEnvironment {

/// Meta client. Use `None` for test only
meta_client: Option<MetaClient>,

/// Compute client pool for streaming gRPC exchange.
client_pool: ComputeClientPoolRef,
}

impl StreamEnvironment {
Expand All @@ -68,6 +71,7 @@ impl StreamEnvironment {
system_params_manager: LocalSystemParamsManagerRef,
source_metrics: Arc<SourceMetrics>,
meta_client: MetaClient,
client_pool: ComputeClientPoolRef,
) -> Self {
StreamEnvironment {
server_addr,
Expand All @@ -79,6 +83,7 @@ impl StreamEnvironment {
source_metrics,
total_mem_val: Arc::new(TrAdder::new()),
meta_client: Some(meta_client),
client_pool,
}
}

Expand All @@ -87,6 +92,7 @@ impl StreamEnvironment {
pub fn for_test() -> Self {
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_storage::monitor::MonitoredStorageMetrics;
StreamEnvironment {
server_addr: "127.0.0.1:2333".parse().unwrap(),
Expand All @@ -100,6 +106,7 @@ impl StreamEnvironment {
source_metrics: Arc::new(SourceMetrics::default()),
total_mem_val: Arc::new(TrAdder::new()),
meta_client: None,
client_pool: Arc::new(ComputeClientPool::for_test()),
}
}

Expand Down Expand Up @@ -138,4 +145,8 @@ impl StreamEnvironment {
pub fn meta_client(&self) -> Option<MetaClient> {
self.meta_client.clone()
}

pub fn client_pool(&self) -> ComputeClientPoolRef {
self.client_pool.clone()
}
}
23 changes: 11 additions & 12 deletions src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::ActorInfo;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::error::StreamResult;
use crate::executor::exchange::permit::{self, Receiver, Sender};
Expand Down Expand Up @@ -76,10 +76,10 @@ pub struct SharedContext {
/// between two actors/actors.
pub(crate) addr: HostAddr,

/// The pool of compute clients.
/// Compute client pool for streaming gRPC exchange.
// TODO: currently the client pool won't be cleared. Should remove compute clients when
// disconnected.
pub(crate) compute_client_pool: ComputeClientPool,
pub(crate) compute_client_pool: ComputeClientPoolRef,

pub(crate) config: StreamingConfig,

Expand All @@ -95,30 +95,28 @@ impl std::fmt::Debug for SharedContext {
}

impl SharedContext {
pub fn new(
addr: HostAddr,
config: &StreamingConfig,
local_barrier_manager: LocalBarrierManager,
) -> Self {
pub fn new(env: &StreamEnvironment, local_barrier_manager: LocalBarrierManager) -> Self {
Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr,
compute_client_pool: ComputeClientPool::default(),
config: config.clone(),
addr: env.server_address().clone(),
config: env.config().as_ref().to_owned(),
compute_client_pool: env.client_pool(),
local_barrier_manager,
}
}

#[cfg(test)]
pub fn for_test() -> Self {
use std::sync::Arc;

use risingwave_common::config::StreamingDeveloperConfig;
use risingwave_rpc_client::ComputeClientPool;

Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr: LOCAL_TEST_ADDR.clone(),
compute_client_pool: ComputeClientPool::default(),
config: StreamingConfig {
developer: StreamingDeveloperConfig {
exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
Expand All @@ -128,6 +126,7 @@ impl SharedContext {
},
..Default::default()
},
compute_client_pool: Arc::new(ComputeClientPool::for_test()),
local_barrier_manager: LocalBarrierManager::for_test(),
}
}
Expand Down
Loading