Skip to content

Commit

Permalink
can merge
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Sep 22, 2023
1 parent 441fd9a commit 1e7f089
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 25 deletions.
10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,16 @@ impl AsyncStackTraceOption {
}
}

#[derive(Debug, Default, Clone, Copy, ValueEnum)]
pub enum CompactorMode {
#[default]
#[clap(alias = "dedicated")]
Dedicated,

#[clap(alias = "shared")]
Shared,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct HeapProfilingConfig {
/// Enable to auto dump heap profile when memory usage is high
Expand Down
10 changes: 5 additions & 5 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,39 +95,39 @@ impl GrpcCompactorProxyClient {
}

pub async fn get_new_sst_ids(
&mut self,
&self,
request: GetNewSstIdsRequest,
) -> std::result::Result<tonic::Response<GetNewSstIdsResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.get_new_sst_ids(request).await
}

pub async fn report_compaction_task(
&mut self,
&self,
request: ReportCompactionTaskRequest,
) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_compaction_task(request).await
}

pub async fn report_full_scan_task(
&mut self,
&self,
request: ReportFullScanTaskRequest,
) -> std::result::Result<tonic::Response<ReportFullScanTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_full_scan_task(request).await
}

pub async fn report_vacuum_task(
&mut self,
&self,
request: ReportVacuumTaskRequest,
) -> std::result::Result<tonic::Response<ReportVacuumTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_vacuum_task(request).await
}

pub async fn get_system_params(
&mut self,
&self,
) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
let mut system_params_client = self.core.read().await.system_params_client.clone();
system_params_client
Expand Down
20 changes: 8 additions & 12 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub mod server;
mod telemetry;

use clap::Parser;
use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};

use crate::server::{compactor_serve, shared_compactor_serve};

Expand Down Expand Up @@ -88,8 +90,8 @@ pub struct CompactorOpts {
#[override_opts(path = storage.object_store_read_timeout_ms)]
pub object_store_read_timeout_ms: Option<u64>,

#[clap(long, env = "RW_COMPACTOR_MODE", default_value = "dedicated")]
pub compactor_mode: String,
#[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
pub compactor_mode: Option<CompactorMode>,

#[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,
Expand All @@ -101,8 +103,8 @@ use std::pin::Pin;
pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
match opts.compactor_mode.as_str() {
"shared" => Box::pin(async move {
match opts.compactor_mode {
Some(CompactorMode::Shared) => Box::pin(async move {
tracing::info!("Shared compactor pod options: {:?}", opts);
tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());

Expand All @@ -113,7 +115,7 @@ pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

join_handle.await.unwrap();
}),
"dedicated" => Box::pin(async move {
None | Some(CompactorMode::Dedicated) => Box::pin(async move {
tracing::info!("Compactor node options: {:?}", opts);
tracing::info!("meta address: {}", opts.meta_address.clone());

Expand All @@ -136,11 +138,5 @@ pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
join_handle.await.unwrap();
observer_join_handle.abort();
}),
_ => {
panic!(
"Compactor mode {} is configured incorrectly",
opts.compactor_mode
);
}
}
}
2 changes: 1 addition & 1 deletion src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub async fn shared_compactor_serve(
.connect()
.await
.expect("Failed to create channel via proxy rpc endpoint.");
let mut grpc_proxy_client = GrpcCompactorProxyClient::new(channel);
let grpc_proxy_client = GrpcCompactorProxyClient::new(channel);
let system_params_response = grpc_proxy_client
.get_system_params()
.await
Expand Down
13 changes: 6 additions & 7 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ pub fn start_shared_compactor(

let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
let executor = context.compaction_executor.clone();
let mut report_heartbeat_client = grpc_proxy_client.clone();
let report_heartbeat_client = grpc_proxy_client.clone();
'consume_stream: loop {
let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
_ = periodic_event_interval.tick() => {
Expand Down Expand Up @@ -667,8 +667,7 @@ pub fn start_shared_compactor(
let context = context.clone();
let shutdown = shutdown_map.clone();

let mut report_task_client = grpc_proxy_client.clone();
let get_sst_id_client = grpc_proxy_client.clone();
let cloned_grpc_proxy_client = grpc_proxy_client.clone();
executor.spawn(async move {
let DispatchCompactionTaskRequest {
tables,
Expand All @@ -689,7 +688,7 @@ pub fn start_shared_compactor(
let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
output_object_ids_deque.extend(output_object_ids);
let shared_compactor_object_id_manager =
SharedComapctorObjectIdManager::new(output_object_ids_deque, get_sst_id_client, context.storage_opts.sstable_id_remote_fetch_number);
SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
match dispatch_task.unwrap() {
dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
context.running_task_count.fetch_add(1, Ordering::SeqCst);
Expand All @@ -714,7 +713,7 @@ pub fn start_shared_compactor(
})),
};

match report_task_client
match cloned_grpc_proxy_client
.report_compaction_task(report_compaction_task_request)
.await
{
Expand All @@ -733,7 +732,7 @@ pub fn start_shared_compactor(
let report_vacuum_task_request = ReportVacuumTaskRequest {
vacuum_task: Some(vacuum_task),
};
match report_task_client.report_vacuum_task(report_vacuum_task_request).await {
match cloned_grpc_proxy_client.report_vacuum_task(report_vacuum_task_request).await {
Ok(_) => tracing::info!("Finished vacuuming SSTs"),
Err(e) => tracing::warn!("Failed to report vacuum task: {:#?}", e),
}
Expand All @@ -753,7 +752,7 @@ pub fn start_shared_compactor(
total_object_count,
total_object_size,
};
match report_task_client
match cloned_grpc_proxy_client
.report_full_scan_task(report_full_scan_task_request)
.await
{
Expand Down

0 comments on commit 1e7f089

Please sign in to comment.