diff --git a/Cargo.lock b/Cargo.lock index 8d794a7115d10..a785a3728e873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7596,6 +7596,7 @@ dependencies = [ "lz4", "mach2", "madsim-tokio", + "madsim-tonic", "memcomparable", "moka", "more-asserts", diff --git a/proto/compactor.proto b/proto/compactor.proto index 29dee929d981b..06f071d75040a 100644 --- a/proto/compactor.proto +++ b/proto/compactor.proto @@ -2,6 +2,10 @@ syntax = "proto3"; package compactor; +import "catalog.proto"; +import "common.proto"; +import "hummock.proto"; + option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -9,6 +13,27 @@ message EchoRequest {} message EchoResponse {} +message DispatchCompactionTaskRequest { + // DispatchCompactionTaskRequest is used to pass compaction task level parameters. + oneof task { + hummock.CompactTask compact_task = 1; + hummock.VacuumTask vacuum_task = 2; + hummock.FullScanTask full_scan_task = 3; + hummock.ValidationTask validation_task = 4; + hummock.CancelCompactTask cancel_compact_task = 5; + } + // Used to build filter_key_extract. + repeated catalog.Table tables = 6; + // We can generate output object ids for each compactor, which equal to the number of input SSTs. + // If all the output object ids are used up, this compaction task fails, and the next allocation will be twice the previous amount of output object ids. + repeated uint64 output_object_ids = 7; +} + +message DispatchCompactionTaskResponse { + common.Status status = 1; +} + service CompactorService { rpc Echo(EchoRequest) returns (EchoResponse); + rpc DispatchCompactionTask(DispatchCompactionTaskRequest) returns (DispatchCompactionTaskResponse); } diff --git a/proto/hummock.proto b/proto/hummock.proto index e32fc66fe8d22..1437641fcd472 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -439,6 +439,26 @@ message SubscribeCompactionEventResponse { uint64 create_at = 7; } +message ReportCompactionTaskRequest { + // ReportTask provides the compact task to report to the meta. + message ReportTask { + CompactTask compact_task = 2; + map table_stats_change = 3; + } + // HeartBeat provides the progress status of all tasks on the Compactor. + message HeartBeat { + repeated CompactTaskProgress progress = 2; + } + oneof event { + ReportTask report_task = 1; + HeartBeat heart_beat = 2; + } +} + +message ReportCompactionTaskResponse { + common.Status status = 1; +} + message ValidationTask { repeated SstableInfo sst_infos = 1; map sst_id_to_worker_id = 2; @@ -685,6 +705,7 @@ service HummockManagerService { rpc SplitCompactionGroup(SplitCompactionGroupRequest) returns (SplitCompactionGroupResponse); rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse); rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse); + rpc ReportCompactionTask(ReportCompactionTaskRequest) returns (ReportCompactionTaskResponse); rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse); rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse); rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse); diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 3b7bc027be870..c1a81e9d5fe71 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -658,6 +658,16 @@ impl AsyncStackTraceOption { } } +#[derive(Debug, Default, Clone, Copy, ValueEnum)] +pub enum CompactorMode { + #[default] + #[clap(alias = "dedicated")] + Dedicated, + + #[clap(alias = "shared")] + Shared, +} + #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct HeapProfilingConfig { /// Enable to auto dump heap profile when memory usage is high diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 830e5a7d6d5da..28a63ddf2b7e7 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -222,7 +222,6 @@ pub async fn compute_node_serve( compactor_metrics: compactor_metrics.clone(), is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(Some(1))), - filter_key_extractor_manager: storage.filter_key_extractor_manager().clone(), memory_limiter, task_progress_manager: Default::default(), @@ -234,6 +233,7 @@ pub async fn compute_node_serve( compactor_context, hummock_meta_client.clone(), storage.sstable_object_id_manager().clone(), + storage.filter_key_extractor_manager().clone(), ); sub_tasks.push((handle, shutdown_sender)); } diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/src/rpc/service/hummock_service.rs index 161e92ddad113..f670c9cb220a5 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/src/rpc/service/hummock_service.rs @@ -526,6 +526,13 @@ impl HummockManagerService for HummockServiceImpl { Ok(Response::new(RwReceiverStream::new(rx))) } + async fn report_compaction_task( + &self, + _request: Request, + ) -> Result, Status> { + unreachable!() + } + async fn list_branched_object( &self, _request: Request, diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index cdd1b08049087..8574e76695740 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -12,11 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use risingwave_common::util::addr::HostAddr; +use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; +use risingwave_pb::hummock::{ + GetNewSstIdsRequest, GetNewSstIdsResponse, ReportCompactionTaskRequest, + ReportCompactionTaskResponse, ReportFullScanTaskRequest, ReportFullScanTaskResponse, + ReportVacuumTaskRequest, ReportVacuumTaskResponse, +}; +use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; +use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse}; use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse}; +use tokio::sync::RwLock; use tonic::transport::{Channel, Endpoint}; use crate::error::Result; @@ -46,3 +56,82 @@ impl CompactorClient { .into_inner()) } } + +#[derive(Debug, Clone)] +pub struct GrpcCompactorProxyClientCore { + hummock_client: HummockManagerServiceClient, + system_params_client: SystemParamsServiceClient, +} + +impl GrpcCompactorProxyClientCore { + pub(crate) fn new(channel: Channel) -> Self { + let hummock_client = + HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX); + let system_params_client = SystemParamsServiceClient::new(channel); + + Self { + hummock_client, + system_params_client, + } + } +} + +/// Client to proxy server. Cloning the instance is lightweight. +/// +/// Todo(wcy-fdu): add refresh client interface. +#[derive(Debug, Clone)] +pub struct GrpcCompactorProxyClient { + pub core: Arc>, +} + +impl GrpcCompactorProxyClient { + pub fn new(channel: Channel) -> Self { + let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel))); + Self { core } + } + + fn _recreate_core(&self, _channel: Channel) { + todo!() + } + + pub async fn get_new_sst_ids( + &self, + request: GetNewSstIdsRequest, + ) -> std::result::Result, tonic::Status> { + let mut hummock_client = self.core.read().await.hummock_client.clone(); + hummock_client.get_new_sst_ids(request).await + } + + pub async fn report_compaction_task( + &self, + request: ReportCompactionTaskRequest, + ) -> std::result::Result, tonic::Status> { + let mut hummock_client = self.core.read().await.hummock_client.clone(); + hummock_client.report_compaction_task(request).await + } + + pub async fn report_full_scan_task( + &self, + request: ReportFullScanTaskRequest, + ) -> std::result::Result, tonic::Status> { + let mut hummock_client = self.core.read().await.hummock_client.clone(); + hummock_client.report_full_scan_task(request).await + } + + pub async fn report_vacuum_task( + &self, + request: ReportVacuumTaskRequest, + ) -> std::result::Result, tonic::Status> { + let mut hummock_client = self.core.read().await.hummock_client.clone(); + hummock_client.report_vacuum_task(request).await + } + + pub async fn get_system_params( + &self, + ) -> std::result::Result, tonic::Status> { + let mut system_params_client = self.core.read().await.system_params_client.clone(); + system_params_client + .get_system_params(GetSystemParamsRequest {}) + .await + } +} diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 7d94c1a0d789d..2d8017e87aa07 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -58,7 +58,7 @@ mod sink_coordinate_client; mod stream_client; mod tracing; -pub use compactor_client::CompactorClient; +pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 8c03464e34ae8..acbc05d8f536a 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -66,6 +66,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", ] } tokio-retry = "0.3" +tonic = { workspace = true } tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } xorf = "0.8.1" diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 8043cb5d2214d..6cca88f6bfc89 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -14,13 +14,15 @@ mod compactor_observer; mod rpc; -mod server; +pub mod server; mod telemetry; use clap::Parser; -use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig}; +use risingwave_common::config::{ + AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, +}; -use crate::server::compactor_serve; +use crate::server::{compactor_serve, shared_compactor_serve}; /// Command-line arguments for compactor-node. #[derive(Parser, Clone, Debug, OverrideConfig)] @@ -87,6 +89,12 @@ pub struct CompactorOpts { #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] #[override_opts(path = storage.object_store_read_timeout_ms)] pub object_store_read_timeout_ms: Option, + + #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)] + pub compactor_mode: Option, + + #[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] + pub proxy_rpc_endpoint: String, } use std::future::Future; @@ -95,28 +103,40 @@ use std::pin::Pin; pub fn start(opts: CompactorOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. - Box::pin(async move { - tracing::info!("Compactor node options: {:?}", opts); - tracing::info!("meta address: {}", opts.meta_address.clone()); - - let listen_addr = opts.listen_addr.parse().unwrap(); - tracing::info!("Server Listening at {}", listen_addr); - - let advertise_addr = opts - .advertise_addr - .as_ref() - .unwrap_or_else(|| { - tracing::warn!("advertise addr is not specified, defaulting to listen address"); - &opts.listen_addr - }) - .parse() - .unwrap(); - tracing::info!(" address is {}", advertise_addr); - - let (join_handle, observer_join_handle, _shutdown_sender) = - compactor_serve(listen_addr, advertise_addr, opts).await; - - join_handle.await.unwrap(); - observer_join_handle.abort(); - }) + match opts.compactor_mode { + Some(CompactorMode::Shared) => Box::pin(async move { + tracing::info!("Shared compactor pod options: {:?}", opts); + tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone()); + + let listen_addr = opts.listen_addr.parse().unwrap(); + tracing::info!("Server Listening at {}", listen_addr); + + let (join_handle, _shutdown_sender) = shared_compactor_serve(listen_addr, opts).await; + + join_handle.await.unwrap(); + }), + None | Some(CompactorMode::Dedicated) => Box::pin(async move { + tracing::info!("Compactor node options: {:?}", opts); + tracing::info!("meta address: {}", opts.meta_address.clone()); + + let listen_addr = opts.listen_addr.parse().unwrap(); + tracing::info!("Server Listening at {}", listen_addr); + + let advertise_addr = opts + .advertise_addr + .as_ref() + .unwrap_or_else(|| { + tracing::warn!("advertise addr is not specified, defaulting to listen address"); + &opts.listen_addr + }) + .parse() + .unwrap(); + tracing::info!(" address is {}", advertise_addr); + let (join_handle, observer_join_handle, _shutdown_sender) = + compactor_serve(listen_addr, advertise_addr, opts).await; + + join_handle.await.unwrap(); + observer_join_handle.abort(); + }), + } } diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index d7f01115610f1..18f5ba6edd443 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -17,23 +17,55 @@ use std::sync::Arc; use parking_lot::RwLock; use risingwave_pb::compactor::compactor_service_server::CompactorService; -use risingwave_pb::compactor::{EchoRequest, EchoResponse}; +use risingwave_pb::compactor::{ + DispatchCompactionTaskRequest, DispatchCompactionTaskResponse, EchoRequest, EchoResponse, +}; use risingwave_pb::monitor_service::monitor_service_server::MonitorService; use risingwave_pb::monitor_service::{ AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, }; +use tokio::sync::mpsc; use tonic::{Request, Response, Status}; #[derive(Default)] -pub struct CompactorServiceImpl {} - +pub struct CompactorServiceImpl { + sender: Option>>, +} +impl CompactorServiceImpl { + pub fn new(sender: mpsc::UnboundedSender>) -> Self { + Self { + sender: Some(sender), + } + } +} #[async_trait::async_trait] impl CompactorService for CompactorServiceImpl { async fn echo(&self, _request: Request) -> Result, Status> { Ok(Response::new(EchoResponse {})) } + + async fn dispatch_compaction_task( + &self, + request: Request, + ) -> Result, Status> { + match &self.sender.as_ref() { + Some(sender) => { + sender + .send(request) + .expect("DispatchCompactionTaskRequest should be able to send"); + } + None => { + tracing::error!( + "fail to send DispatchCompactionTaskRequest, sender has not been initialized." + ); + } + } + Ok(Response::new(DispatchCompactionTaskResponse { + status: None, + })) + } } pub struct MonitorServiceImpl { diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index dd953b87c7af9..e099ed16ffcbd 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -19,10 +19,11 @@ use std::time::Duration; use parking_lot::RwLock; use risingwave_common::config::{ - extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, + extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig, }; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; @@ -35,7 +36,7 @@ use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::common::WorkerType; use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; -use risingwave_rpc_client::MetaClient; +use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient}; use risingwave_storage::filter_key_extractor::{ FilterKeyExtractorManager, RemoteTableAccessor, RpcFilterKeyExtractorManager, }; @@ -45,11 +46,13 @@ use risingwave_storage::hummock::{ HummockMemoryCollector, MemoryLimiter, SstableObjectIdManager, SstableStore, }; use risingwave_storage::monitor::{ - monitor_cache, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, + monitor_cache, CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, }; use risingwave_storage::opts::StorageOpts; +use tokio::sync::mpsc; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; +use tonic::transport::Endpoint; use tracing::info; use super::compactor_observer::observer_manager::CompactorObserverNode; @@ -57,47 +60,23 @@ use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl}; use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; -/// Fetches and runs compaction tasks. -pub async fn compactor_serve( - listen_addr: SocketAddr, - advertise_addr: HostAddr, - opts: CompactorOpts, -) -> (JoinHandle<()>, JoinHandle<()>, Sender<()>) { - type CompactorMemoryCollector = HummockMemoryCollector; - - let config = load_config(&opts.config_path, &opts); - info!("Starting compactor node",); - info!("> config: {:?}", config); - info!( - "> debug assertions: {}", - if cfg!(debug_assertions) { "on" } else { "off" } - ); - info!("> version: {} ({})", RW_VERSION, GIT_SHA); - - // Register to the cluster. - let (meta_client, system_params_reader) = MetaClient::register_new( - &opts.meta_address, - WorkerType::Compactor, - &advertise_addr, - Default::default(), - &config.meta, - ) - .await - .unwrap(); - - info!("Assigned compactor id {}", meta_client.worker_id()); - meta_client.activate(&advertise_addr).await.unwrap(); - +const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; +// See `Endpoint::keep_alive_timeout` +const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; +pub async fn prepare_start_parameters( + config: RwConfig, + system_params_reader: SystemParamsReader, +) -> ( + Arc, + Arc, + Option>>>, + Arc, + Arc, +) { // Boot compactor let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()); - let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone()); let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone()); - let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new( - meta_client.clone(), - hummock_metrics.clone(), - )); - let state_store_url = system_params_reader.state_store(); let storage_memory_config = extract_storage_memory_config(&config); @@ -106,7 +85,6 @@ pub async fn compactor_serve( &system_params_reader, &storage_memory_config, ))); - let total_memory_available_bytes = (resource_util::memory::total_memory_available_bytes() as f64 * config.storage.compactor_memory_available_proportion) as usize; @@ -156,6 +134,74 @@ pub async fn compactor_serve( meta_cache_capacity_bytes, )); + let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes)); + let storage_memory_config = extract_storage_memory_config(&config); + let memory_collector: Arc = Arc::new(HummockMemoryCollector::new( + sstable_store.clone(), + memory_limiter.clone(), + storage_memory_config, + )); + + monitor_cache(memory_collector); + + let await_tree_config = match &config.streaming.async_stack_trace { + AsyncStackTraceOption::Off => None, + c => await_tree::ConfigBuilder::default() + .verbose(c.is_verbose().unwrap()) + .build() + .ok(), + }; + let await_tree_reg = + await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c)))); + + ( + sstable_store, + memory_limiter, + await_tree_reg, + storage_opts, + compactor_metrics, + ) +} + +/// Fetches and runs compaction tasks. +pub async fn compactor_serve( + listen_addr: SocketAddr, + advertise_addr: HostAddr, + opts: CompactorOpts, +) -> (JoinHandle<()>, JoinHandle<()>, Sender<()>) { + let config = load_config(&opts.config_path, &opts); + info!("Starting compactor node",); + info!("> config: {:?}", config); + info!( + "> debug assertions: {}", + if cfg!(debug_assertions) { "on" } else { "off" } + ); + info!("> version: {} ({})", RW_VERSION, GIT_SHA); + + // Register to the cluster. + let (meta_client, system_params_reader) = MetaClient::register_new( + &opts.meta_address, + WorkerType::Compactor, + &advertise_addr, + Default::default(), + &config.meta, + ) + .await + .unwrap(); + + info!("Assigned compactor id {}", meta_client.worker_id()); + meta_client.activate(&advertise_addr).await.unwrap(); + + let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone()); + + let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new( + meta_client.clone(), + hummock_metrics.clone(), + )); + + let (sstable_store, memory_limiter, await_tree_reg, storage_opts, compactor_metrics) = + prepare_start_parameters(config.clone(), system_params_reader.clone()).await; + let filter_key_extractor_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new( RemoteTableAccessor::new(meta_client.clone()), ))); @@ -171,27 +217,13 @@ pub async fn compactor_serve( // limited at first. let observer_join_handle = observer_manager.start().await; - let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes)); - let memory_collector = Arc::new(CompactorMemoryCollector::new( - sstable_store.clone(), - memory_limiter.clone(), - storage_memory_config, - )); - - monitor_cache(memory_collector); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage_opts.sstable_id_remote_fetch_number, )); - let await_tree_config = match &config.streaming.async_stack_trace { - AsyncStackTraceOption::Off => None, - c => await_tree::ConfigBuilder::default() - .verbose(c.is_verbose().unwrap()) - .build() - .ok(), - }; - let await_tree_reg = - await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c)))); + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + filter_key_extractor_manager.clone(), + ); let compactor_context = CompactorContext { storage_opts, sstable_store: sstable_store.clone(), @@ -200,9 +232,6 @@ pub async fn compactor_serve( compaction_executor: Arc::new(CompactionExecutor::new( opts.compaction_worker_threads_number, )), - filter_key_extractor_manager: FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - filter_key_extractor_manager.clone(), - ), memory_limiter, task_progress_manager: Default::default(), @@ -219,6 +248,7 @@ pub async fn compactor_serve( compactor_context.clone(), hummock_meta_client.clone(), sstable_object_id_manager.clone(), + filter_key_extractor_manager.clone(), ), ]; @@ -275,3 +305,94 @@ pub async fn compactor_serve( (join_handle, observer_join_handle, shutdown_send) } + +pub async fn shared_compactor_serve( + listen_addr: SocketAddr, + opts: CompactorOpts, +) -> (JoinHandle<()>, Sender<()>) { + let config = load_config(&opts.config_path, &opts); + info!("Starting shared compactor node",); + info!("> config: {:?}", config); + info!( + "> debug assertions: {}", + if cfg!(debug_assertions) { "on" } else { "off" } + ); + info!("> version: {} ({})", RW_VERSION, GIT_SHA); + + let endpoint: &'static str = Box::leak(opts.proxy_rpc_endpoint.clone().into_boxed_str()); + let endpoint = Endpoint::from_static(endpoint); + let channel = endpoint + .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) + .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) + .connect_timeout(Duration::from_secs(5)) + .connect() + .await + .expect("Failed to create channel via proxy rpc endpoint."); + let grpc_proxy_client = GrpcCompactorProxyClient::new(channel); + let system_params_response = grpc_proxy_client + .get_system_params() + .await + .expect("Fail to get system params, the compactor pod cannot be started."); + let system_params = system_params_response.into_inner().params.unwrap(); + + let (sstable_store, memory_limiter, await_tree_reg, storage_opts, compactor_metrics) = + prepare_start_parameters(config.clone(), system_params.into()).await; + let (sender, receiver) = mpsc::unbounded_channel(); + let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender); + + let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone()); + let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel(); + let compactor_context = CompactorContext { + storage_opts, + sstable_store, + compactor_metrics, + is_share_buffer_compact: false, + compaction_executor: Arc::new(CompactionExecutor::new( + opts.compaction_worker_threads_number, + )), + memory_limiter, + task_progress_manager: Default::default(), + await_tree_reg, + running_task_count: Arc::new(AtomicU32::new(0)), + }; + let join_handle = tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(CompactorServiceServer::new(compactor_srv)) + .add_service(MonitorServiceServer::new(monitor_srv)) + .monitored_serve_with_shutdown( + listen_addr, + "grpc-compactor-node-service", + TcpConfig { + tcp_nodelay: true, + keepalive_duration: None, + }, + async move { + let (join_handle, shutdown_sender) = + risingwave_storage::hummock::compactor::start_shared_compactor( + grpc_proxy_client, + receiver, + compactor_context, + ); + tokio::select! { + _ = tokio::signal::ctrl_c() => {}, + _ = &mut shutdown_recv => { + if let Err(err) = shutdown_sender.send(()) { + tracing::warn!("Failed to send shutdown: {:?}", err); + } + if let Err(err) = join_handle.await { + tracing::warn!("Failed to join shutdown: {:?}", err); + } + }, + } + }, + ) + .await + }); + + // Boot metrics service. + if config.server.metrics_level > MetricLevel::Disabled { + MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone()); + } + + (join_handle, shutdown_send) +} diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 3487edd3a2a06..1ac4fa1b7cfc2 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -46,8 +46,8 @@ pub(crate) mod tests { use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ - FilterKeyExtractorImpl, FilterKeyExtractorManager, FilterKeyExtractorManagerRef, - FixedLengthFilterKeyExtractor, FullKeyFilterKeyExtractor, + FilterKeyExtractorImpl, FilterKeyExtractorManager, FixedLengthFilterKeyExtractor, + FullKeyFilterKeyExtractor, }; use risingwave_storage::hummock::compactor::compactor_runner::{compact, CompactorRunner}; use risingwave_storage::hummock::compactor::fast_compactor_runner::CompactorRunner as FastCompactorRunner; @@ -177,21 +177,13 @@ pub(crate) mod tests { } } - fn get_compactor_context_with_filter_key_extractor_manager( - storage: &HummockStorage, - filter_key_extractor_manager: FilterKeyExtractorManagerRef, - ) -> CompactorContext { - get_compactor_context_with_filter_key_extractor_manager_impl( - storage.storage_opts().clone(), - storage.sstable_store(), - filter_key_extractor_manager, - ) + fn get_compactor_context(storage: &HummockStorage) -> CompactorContext { + get_compactor_context_impl(storage.storage_opts().clone(), storage.sstable_store()) } - fn get_compactor_context_with_filter_key_extractor_manager_impl( + fn get_compactor_context_impl( options: Arc, sstable_store: SstableStoreRef, - filter_key_extractor_manager: FilterKeyExtractorManagerRef, ) -> CompactorContext { CompactorContext { storage_opts: options, @@ -200,9 +192,6 @@ pub(crate) mod tests { is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(Some(1))), memory_limiter: MemoryLimiter::unlimit(), - filter_key_extractor_manager: FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - filter_key_extractor_manager, - ), task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), @@ -242,10 +231,10 @@ pub(crate) mod tests { ) => rpc_filter_key_extractor_manager, FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), }; - let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( - &storage, + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( rpc_filter_key_extractor_manager, ); + let compact_ctx = get_compactor_context(&storage); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -303,6 +292,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), ) .await; @@ -405,10 +395,10 @@ pub(crate) mod tests { ) => rpc_filter_key_extractor_manager, FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), }; - let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( - &storage, + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( rpc_filter_key_extractor_manager, ); + let compact_ctx = get_compactor_context(&storage); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -452,6 +442,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), ) .await; @@ -483,7 +474,6 @@ pub(crate) mod tests { target_table_size ); } - // 5. storage get back the correct kv after compaction storage.wait_version(version).await; let get_val = storage @@ -552,10 +542,10 @@ pub(crate) mod tests { } } - pub(crate) fn prepare_compactor_and_filter( + pub fn prepare_compactor_and_filter( storage: &HummockStorage, existing_table_id: u32, - ) -> CompactorContext { + ) -> (CompactorContext, FilterKeyExtractorManager) { let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() { FilterKeyExtractorManager::RpcFilterKeyExtractorManager( @@ -568,10 +558,11 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); - get_compactor_context_with_filter_key_extractor_manager( - storage, + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( rpc_filter_key_extractor_manager, - ) + ); + + (get_compactor_context(storage), filter_key_extractor_manager) } #[tokio::test] @@ -679,11 +670,12 @@ pub(crate) mod tests { 2, Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); - - let compact_ctx = get_compactor_context_with_filter_key_extractor_manager_impl( + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ); + let compact_ctx = get_compactor_context_impl( global_storage.storage_opts().clone(), global_storage.sstable_store(), - rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), @@ -776,6 +768,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager, ) .await; @@ -869,10 +862,7 @@ pub(crate) mod tests { FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), }; - let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( - &storage, - rpc_filter_key_extractor_manager.clone(), - ); + let compact_ctx = get_compactor_context(&storage); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -884,7 +874,9 @@ pub(crate) mod tests { 2, Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); - + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ); // 1. add sstables let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value @@ -966,6 +958,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager, ) .await; @@ -1070,10 +1063,10 @@ pub(crate) mod tests { FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), )), ); - let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( - &storage, + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( rpc_filter_key_extractor_manager, ); + let compact_ctx = get_compactor_context(&storage); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -1151,6 +1144,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager, ) .await; @@ -1248,7 +1242,8 @@ pub(crate) mod tests { TableId::from(existing_table_id), ) .await; - let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); + let (compact_ctx, filter_key_extractor_manager) = + prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -1309,6 +1304,7 @@ pub(crate) mod tests { compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager, ) .await; @@ -1351,7 +1347,7 @@ pub(crate) mod tests { ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); - let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); + let (compact_ctx, _) = prepare_compactor_and_filter(&storage, existing_table_id); let sstable_store = compact_ctx.sstable_store.clone(); let capacity = 256 * 1024; @@ -1401,17 +1397,17 @@ pub(crate) mod tests { 0, compact_ctx.clone(), task.clone(), - Box::new(SharedComapctorObjectIdManager::new(VecDeque::from_iter([ - 5, 6, 7, 8, 9, - ]))), + Box::new(SharedComapctorObjectIdManager::for_test( + VecDeque::from_iter([5, 6, 7, 8, 9]), + )), ); let fast_compact_runner = FastCompactorRunner::new( compact_ctx.clone(), task.clone(), multi_filter_key_extractor.clone(), - Box::new(SharedComapctorObjectIdManager::new(VecDeque::from_iter([ - 10, 11, 12, 13, 14, - ]))), + Box::new(SharedComapctorObjectIdManager::for_test( + VecDeque::from_iter([10, 11, 12, 13, 14]), + )), Arc::new(TaskProgress::default()), ); let (_, ret1, _) = slow_compact_runner diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index d28db261785c0..83764e4275cb3 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -37,6 +37,7 @@ use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::manager::LocalNotification; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_rpc_client::HummockMetaClient; +use risingwave_storage::filter_key_extractor::FilterKeyExtractorManager; use risingwave_storage::hummock::compactor::compactor_runner::compact; use risingwave_storage::hummock::compactor::CompactorContext; use risingwave_storage::hummock::{CachePolicy, GetObjectId, SstableObjectIdManager}; @@ -44,9 +45,8 @@ use risingwave_storage::store::{LocalStateStore, NewLocalOptions, ReadOptions, S use risingwave_storage::StateStore; use serial_test::serial; -use super::compactor_tests::tests::{ - flush_and_commit, get_hummock_storage, prepare_compactor_and_filter, -}; +use super::compactor_tests::tests::{get_hummock_storage, prepare_compactor_and_filter}; +use crate::compactor_tests::tests::flush_and_commit; use crate::get_notification_client_for_test; use crate::local_state_store_test_utils::LocalStateStoreTestExt; use crate::test_utils::gen_key_from_bytes; @@ -232,6 +232,7 @@ async fn test_syncpoints_test_local_notification_receiver() { pub async fn compact_once( hummock_manager_ref: HummockManagerRef, compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, sstable_object_id_manager: Arc, ) { // 2. get compact task @@ -259,6 +260,7 @@ pub async fn compact_once( compact_task.clone(), rx, Box::new(sstable_object_id_manager), + filter_key_extractor_manager.clone(), ) .await; @@ -291,7 +293,8 @@ async fn test_syncpoints_get_in_delete_range_boundary() { TableId::from(existing_table_id), ) .await; - let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); + let (compact_ctx, filter_key_extractor_manager) = + prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), @@ -348,6 +351,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), + filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), ) .await; @@ -378,6 +382,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), + filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), ) .await; @@ -409,6 +414,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), + filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), ) .await; @@ -434,6 +440,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), + filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), ) .await; diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 245673f7656a8..5f28a076888ef 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -193,6 +193,7 @@ pub async fn generate_splits( indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref())); let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); + let worker_num = context.compaction_executor.worker_num(); let parallelism = std::cmp::min( diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 85bfb7235b1c9..567edc4cdd7e9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -32,7 +32,7 @@ use tokio::sync::oneshot::Receiver; use super::task_progress::TaskProgress; use super::{CompactionStatistics, TaskConfig}; -use crate::filter_key_extractor::FilterKeyExtractorImpl; +use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, }; @@ -244,6 +244,7 @@ pub async fn compact( mut compact_task: CompactTask, mut shutdown_rx: Receiver<()>, object_id_getter: Box, + filter_key_extractor_manager: FilterKeyExtractorManager, ) -> (CompactTask, HashMap) { let context = compactor_context.clone(); let group_label = compact_task.compaction_group_id.to_string(); @@ -316,8 +317,7 @@ pub async fn compact( .into_iter() .filter(|table_id| existing_table_ids.contains(table_id)), ); - let multi_filter_key_extractor = match compactor_context - .filter_key_extractor_manager + let multi_filter_key_extractor = match filter_key_extractor_manager .acquire(compact_table_ids.clone()) .await { @@ -868,6 +868,7 @@ where Ok(compaction_statistics) } + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index ad3d5ffcc2dd6..ef015f26cded7 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use parking_lot::RwLock; use super::task_progress::TaskProgressManagerRef; -use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::CompactionExecutor; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::MemoryLimiter; @@ -42,8 +41,6 @@ pub struct CompactorContext { pub compaction_executor: Arc, - pub filter_key_extractor_manager: FilterKeyExtractorManager, - pub memory_limiter: Arc, pub task_progress_manager: TaskProgressManagerRef, @@ -58,7 +55,6 @@ impl CompactorContext { storage_opts: Arc, sstable_store: SstableStoreRef, compactor_metrics: Arc, - filter_key_extractor_manager: FilterKeyExtractorManager, ) -> Self { let compaction_executor = if storage_opts.share_buffer_compaction_worker_threads_number == 0 { @@ -76,7 +72,6 @@ impl CompactorContext { compactor_metrics, is_share_buffer_compact: true, compaction_executor, - filter_key_extractor_manager, memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 8442bf39c124e..7012cf952317c 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -15,6 +15,16 @@ mod compaction_executor; mod compaction_filter; pub mod compaction_utils; +use risingwave_pb::compactor::{dispatch_compaction_task_request, DispatchCompactionTaskRequest}; +use risingwave_pb::hummock::report_compaction_task_request::{ + Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat, + ReportTask as ReportSharedTask, +}; +use risingwave_pb::hummock::{ReportFullScanTaskRequest, ReportVacuumTaskRequest}; +use risingwave_rpc_client::GrpcCompactorProxyClient; +use tokio::sync::mpsc; +use tonic::Request; + pub mod compactor_runner; mod context; pub mod fast_compactor_runner; @@ -22,7 +32,7 @@ mod iterator; mod shared_buffer_compact; pub(super) mod task_progress; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -46,7 +56,8 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::{ }; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ - CompactTaskProgress, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, + CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest, + SubscribeCompactionEventResponse, }; use risingwave_rpc_client::HummockMetaClient; pub use shared_buffer_compact::{compact, merge_imms_in_memory}; @@ -61,14 +72,17 @@ use super::{ CompactionDeleteRanges, GetObjectId, HummockResult, SstableBuilderOptions, SstableObjectIdManager, Xor16FilterBuilder, }; -use crate::filter_key_extractor::FilterKeyExtractorImpl; +use crate::filter_key_extractor::{ + FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, +}; use crate::hummock::compactor::compactor_runner::compact_and_build_sst; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; use crate::hummock::vacuum::Vacuum; use crate::hummock::{ validate_ssts, BatchSstableWriterFactory, BlockedXor16FilterBuilder, FilterBuilder, - HummockError, SstableWriterFactory, StreamingSstableWriterFactory, + HummockError, SharedComapctorObjectIdManager, SstableWriterFactory, + StreamingSstableWriterFactory, }; use crate::monitor::CompactorMetrics; @@ -316,6 +330,7 @@ pub fn start_compactor( compactor_context: CompactorContext, hummock_meta_client: Arc, sstable_object_id_manager: Arc, + filter_key_extractor_manager: FilterKeyExtractorManager, ) -> (JoinHandle<()>, Sender<()>) { type CompactionShutdownMap = Arc>>>; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); @@ -391,17 +406,8 @@ pub fn start_compactor( let request_sender = request_sender.clone(); let event: Option> = tokio::select! { _ = periodic_event_interval.tick() => { - let mut progress_list = Vec::new(); - for (&task_id, progress) in &*task_progress.lock() { - progress_list.push(CompactTaskProgress { - task_id, - num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed), - num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed), - num_progress_key: progress.num_progress_key.load(Ordering::Relaxed), - num_pending_read_io: progress.num_pending_read_io.load(Ordering::Relaxed) as u64, - num_pending_write_io: progress.num_pending_write_io.load(Ordering::Relaxed) as u64, - }); - } + let progress_list = get_task_progress(task_progress.clone()); + if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { event: Some(RequestEvent::HeartBeat( @@ -458,7 +464,6 @@ pub fn start_compactor( continue; } - event = response_event_stream.next() => { event } @@ -489,6 +494,7 @@ pub fn start_compactor( let meta_client = hummock_meta_client.clone(); let sstable_object_id_manager = sstable_object_id_manager.clone(); + let filter_key_extractor_manager = filter_key_extractor_manager.clone(); executor.spawn(async move { let running_task_count = running_task_count.clone(); match event { @@ -507,7 +513,7 @@ pub fn start_compactor( sstable_object_id_manager.remove_watermark_object_id(tracker_id); }, ); - compactor_runner::compact(context, compact_task, rx, Box::new(sstable_object_id_manager.clone())).await + compactor_runner::compact(context, compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await }, Err(err) => { tracing::warn!("Failed to track pending SST object id. {:#?}", err); @@ -608,3 +614,204 @@ pub fn start_compactor( (join_handle, shutdown_tx) } + +/// The background compaction thread that receives compaction tasks from hummock compaction +/// manager and runs compaction tasks. +#[cfg_attr(coverage, no_coverage)] +pub fn start_shared_compactor( + grpc_proxy_client: GrpcCompactorProxyClient, + mut receiver: mpsc::UnboundedReceiver>, + context: CompactorContext, +) -> (JoinHandle<()>, Sender<()>) { + type CompactionShutdownMap = Arc>>>; + let task_progress = context.task_progress_manager.clone(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let periodic_event_update_interval = Duration::from_millis(1000); + + let join_handle = tokio::spawn(async move { + let shutdown_map = CompactionShutdownMap::default(); + + let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval); + let executor = context.compaction_executor.clone(); + let report_heartbeat_client = grpc_proxy_client.clone(); + 'consume_stream: loop { + let request: Option> = tokio::select! { + _ = periodic_event_interval.tick() => { + let progress_list = get_task_progress(task_progress.clone()); + let report_compaction_task_request = ReportCompactionTaskRequest{ + event: Some(ReportCompactionTaskEvent::HeartBeat( + SharedHeartBeat { + progress: progress_list + } + )), + }; + if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{ + tracing::warn!("Failed to report heartbeat {:#?}", e); + } + continue + } + + + _ = &mut shutdown_rx => { + tracing::info!("Compactor is shutting down"); + return + } + + request = receiver.recv() => { + request + } + + }; + match request { + Some(request) => { + let context = context.clone(); + let shutdown = shutdown_map.clone(); + + let cloned_grpc_proxy_client = grpc_proxy_client.clone(); + executor.spawn(async move { + let DispatchCompactionTaskRequest { + tables, + output_object_ids, + task: dispatch_task, + } = request.into_inner(); + let id_to_tables = tables.into_iter().fold(HashMap::new(), |mut acc, table| { + acc.insert(table.id, table); + acc + }); + let static_filter_key_extractor_manager: Arc = + Arc::new(StaticFilterKeyExtractorManager::new(id_to_tables)); + let filter_key_extractor_manager = + FilterKeyExtractorManager::StaticFilterKeyExtractorManager( + static_filter_key_extractor_manager, + ); + + let mut output_object_ids_deque: VecDeque<_> = VecDeque::new(); + output_object_ids_deque.extend(output_object_ids); + let shared_compactor_object_id_manager = + SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number); + match dispatch_task.unwrap() { + dispatch_compaction_task_request::Task::CompactTask(compact_task) => { + context.running_task_count.fetch_add(1, Ordering::SeqCst); + let (tx, rx) = tokio::sync::oneshot::channel(); + let task_id = compact_task.task_id; + shutdown.lock().unwrap().insert(task_id, tx); + + let (compact_task, table_stats) = compactor_runner::compact( + context.clone(), + compact_task, + rx, + Box::new(shared_compactor_object_id_manager), + filter_key_extractor_manager.clone(), + ) + .await; + shutdown.lock().unwrap().remove(&task_id); + context.running_task_count.fetch_sub(1, Ordering::SeqCst); + let report_compaction_task_request = ReportCompactionTaskRequest { + event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask { + compact_task: Some(compact_task), + table_stats_change: to_prost_table_stats_map(table_stats), + })), + }; + + match cloned_grpc_proxy_client + .report_compaction_task(report_compaction_task_request) + .await + { + Ok(_) => {} + Err(e) => tracing::warn!("Failed to report task {task_id:?} . {e:?}"), + } + } + dispatch_compaction_task_request::Task::VacuumTask(vacuum_task) => { + match Vacuum::handle_vacuum_task( + context.sstable_store.clone(), + &vacuum_task.sstable_object_ids, + ) + .await + { + Ok(_) => { + let report_vacuum_task_request = ReportVacuumTaskRequest { + vacuum_task: Some(vacuum_task), + }; + match cloned_grpc_proxy_client.report_vacuum_task(report_vacuum_task_request).await { + Ok(_) => tracing::info!("Finished vacuuming SSTs"), + Err(e) => tracing::warn!("Failed to report vacuum task: {:#?}", e), + } + } + Err(e) => { + tracing::warn!("Failed to vacuum task: {:#?}", e) + } + } + } + dispatch_compaction_task_request::Task::FullScanTask(full_scan_task) => { + match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()) + .await + { + Ok((object_ids, total_object_count, total_object_size)) => { + let report_full_scan_task_request = ReportFullScanTaskRequest { + object_ids, + total_object_count, + total_object_size, + }; + match cloned_grpc_proxy_client + .report_full_scan_task(report_full_scan_task_request) + .await + { + Ok(_) => tracing::info!("Finished full scan SSTs"), + Err(e) => tracing::warn!("Failed to report full scan task: {:#?}", e), + } + } + Err(e) => { + tracing::warn!("Failed to iter object: {:#?}", e); + } + } + } + dispatch_compaction_task_request::Task::ValidationTask(validation_task) => { + validate_ssts(validation_task, context.sstable_store.clone()).await; + } + dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => { + if let Some(tx) = shutdown + .lock() + .unwrap() + .remove(&cancel_compact_task.task_id) + { + if tx.send(()).is_err() { + tracing::warn!( + "Cancellation of compaction task failed. task_id: {}", + cancel_compact_task.task_id + ); + } + } else { + tracing::warn!( + "Attempting to cancel non-existent compaction task. task_id: {}", + cancel_compact_task.task_id + ); + } + } + } + }); + } + None => continue 'consume_stream, + } + } + }); + (join_handle, shutdown_tx) +} + +fn get_task_progress( + task_progress: Arc< + parking_lot::lock_api::Mutex>>, + >, +) -> Vec { + let mut progress_list = Vec::new(); + for (&task_id, progress) in &*task_progress.lock() { + progress_list.push(CompactTaskProgress { + task_id, + num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed), + num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed), + num_progress_key: progress.num_progress_key.load(Ordering::Relaxed), + num_pending_read_io: progress.num_pending_read_io.load(Ordering::Relaxed) as u64, + num_pending_write_io: progress.num_pending_write_io.load(Ordering::Relaxed) as u64, + }); + } + progress_list +} diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 0eca74f1dcaba..428361237c0ac 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; use tracing::error; -use crate::filter_key_extractor::FilterKeyExtractorImpl; +use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_filter::DummyCompactionFilter; use crate::hummock::compactor::context::CompactorContext; use crate::hummock::compactor::{CompactOutput, Compactor}; @@ -59,6 +59,7 @@ pub async fn compact( sstable_object_id_manager: SstableObjectIdManagerRef, payload: UploadTaskPayload, compaction_group_index: Arc>, + filter_key_extractor_manager: FilterKeyExtractorManager, ) -> HummockResult> { let mut grouped_payload: HashMap = HashMap::new(); for imm in payload { @@ -86,6 +87,7 @@ pub async fn compact( compact_shared_buffer( context.clone(), sstable_object_id_manager.clone(), + filter_key_extractor_manager.clone(), group_payload, ) .map_ok(move |results| { @@ -112,6 +114,7 @@ pub async fn compact( async fn compact_shared_buffer( context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, + filter_key_extractor_manager: FilterKeyExtractorManager, mut payload: UploadTaskPayload, ) -> HummockResult> { // Local memory compaction looks at all key ranges. @@ -124,8 +127,7 @@ async fn compact_shared_buffer( assert!(!existing_table_ids.is_empty()); - let multi_filter_key_extractor = context - .filter_key_extractor_manager + let multi_filter_key_extractor = filter_key_extractor_manager .acquire(existing_table_ids.clone()) .await?; if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index d9e25ebe46555..c55b73e6af6b0 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -30,6 +30,7 @@ use tracing::{error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; +use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::CacheRefillerEvent; @@ -133,6 +134,7 @@ async fn flush_imms( payload: UploadTaskPayload, task_info: UploadTaskInfo, compactor_context: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, sstable_object_id_manager: Arc, ) -> HummockResult> { for epoch in &task_info.epochs { @@ -148,6 +150,7 @@ async fn flush_imms( sstable_object_id_manager, payload, task_info.compaction_group_index, + filter_key_extractor_manager, ) .verbose_instrument_await("shared_buffer_compact") .await @@ -159,6 +162,7 @@ impl HummockEventHandler { hummock_event_rx: mpsc::UnboundedReceiver, pinned_version: PinnedVersion, compactor_context: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, sstable_object_id_manager: Arc, state_store_metrics: Arc, cache_refill_config: CacheRefillConfig, @@ -184,6 +188,7 @@ impl HummockEventHandler { payload, task_info, upload_compactor_context.clone(), + filter_key_extractor_manager.clone(), cloned_sstable_object_id_manager.clone(), )) }), diff --git a/src/storage/src/hummock/sstable/sstable_object_id_manager.rs b/src/storage/src/hummock/sstable/sstable_object_id_manager.rs index 6ae7ddad4a7ea..69ca3712eb379 100644 --- a/src/storage/src/hummock/sstable/sstable_object_id_manager.rs +++ b/src/storage/src/hummock/sstable/sstable_object_id_manager.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::Mutex; use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SstObjectIdRange}; +use risingwave_pb::hummock::GetNewSstIdsRequest; use risingwave_pb::meta::heartbeat_request::extra_info::Info; -use risingwave_rpc_client::{ExtraInfoSource, HummockMetaClient}; +use risingwave_rpc_client::{ExtraInfoSource, GrpcCompactorProxyClient, HummockMetaClient}; use sync_point::sync_point; use tokio::sync::oneshot; @@ -198,25 +199,95 @@ impl GetObjectId for Arc { } } +struct SharedComapctorObjectIdManagerCore { + output_object_ids: VecDeque, + client: Option, + sstable_id_remote_fetch_number: u32, +} +impl SharedComapctorObjectIdManagerCore { + pub fn new( + output_object_ids: VecDeque, + client: GrpcCompactorProxyClient, + sstable_id_remote_fetch_number: u32, + ) -> Self { + Self { + output_object_ids, + client: Some(client), + sstable_id_remote_fetch_number, + } + } + + pub fn for_test(output_object_ids: VecDeque) -> Self { + Self { + output_object_ids, + client: None, + sstable_id_remote_fetch_number: 0, + } + } +} /// `SharedComapctorObjectIdManager` is used to get output sst id for serverless compaction. #[derive(Clone)] pub struct SharedComapctorObjectIdManager { - output_object_ids: VecDeque, + core: Arc>, } impl SharedComapctorObjectIdManager { - pub fn new(output_object_ids: VecDeque) -> Self { - Self { output_object_ids } + pub fn new( + output_object_ids: VecDeque, + client: GrpcCompactorProxyClient, + sstable_id_remote_fetch_number: u32, + ) -> Self { + Self { + core: Arc::new(tokio::sync::Mutex::new( + SharedComapctorObjectIdManagerCore::new( + output_object_ids, + client, + sstable_id_remote_fetch_number, + ), + )), + } + } + + pub fn for_test(output_object_ids: VecDeque) -> Self { + Self { + core: Arc::new(tokio::sync::Mutex::new( + SharedComapctorObjectIdManagerCore::for_test(output_object_ids), + )), + } } } #[async_trait::async_trait] impl GetObjectId for SharedComapctorObjectIdManager { async fn get_new_sst_object_id(&mut self) -> HummockResult { - if let Some(first_element) = self.output_object_ids.pop_front() { + let mut guard = self.core.lock().await; + let core = guard.deref_mut(); + + if let Some(first_element) = core.output_object_ids.pop_front() { Ok(first_element) } else { - return Err(HummockError::other("Output object id runs out")); + tracing::warn!("The pre-allocated object ids are used up, and new object id are obtained through RPC."); + let request = GetNewSstIdsRequest { + number: core.sstable_id_remote_fetch_number, + }; + match core + .client + .as_mut() + .expect("GrpcCompactorProxyClient is None") + .get_new_sst_ids(request) + .await + { + Ok(response) => { + let resp = response.into_inner(); + let start_id = resp.start_id; + core.output_object_ids.extend((start_id + 1)..resp.end_id); + Ok(start_id) + } + Err(e) => Err(HummockError::other(format!( + "Fail to get new sst id, {}", + e + ))), + } } } } @@ -313,14 +384,10 @@ impl SstObjectIdTrackerInner { #[cfg(test)] mod test { - use std::collections::VecDeque; - use risingwave_common::try_match_expand; use crate::hummock::sstable::sstable_object_id_manager::AutoTrackerId; - use crate::hummock::{ - GetObjectId, SharedComapctorObjectIdManager, SstObjectIdTracker, TrackerId, - }; + use crate::hummock::{SstObjectIdTracker, TrackerId}; #[tokio::test] async fn test_object_id_tracker_basic() { @@ -390,18 +457,4 @@ mod test { object_id_tacker.remove_tracker(auto_id_3); assert!(object_id_tacker.tracking_object_ids().is_empty()); } - - #[tokio::test] - async fn test_shared_comapctor_object_id_manager() { - let mut pre_allocated_object_ids: VecDeque<_> = VecDeque::new(); - pre_allocated_object_ids.extend(vec![1, 3, 5]); - let mut object_id_manager = SharedComapctorObjectIdManager::new(pre_allocated_object_ids); - assert_eq!(object_id_manager.get_new_sst_object_id().await.unwrap(), 1); - - assert_eq!(object_id_manager.get_new_sst_object_id().await.unwrap(), 3); - - assert_eq!(object_id_manager.get_new_sst_object_id().await.unwrap(), 5); - - assert!(object_id_manager.get_new_sst_object_id().await.is_err()); - } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index a4bafcdb99c07..8ccf1af632066 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -82,6 +82,8 @@ pub struct HummockStorage { context: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: SstableObjectIdManagerRef, buffer_tracker: BufferTracker, @@ -154,14 +156,13 @@ impl HummockStorage { pin_version_rx, hummock_meta_client.clone(), )); - + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + filter_key_extractor_manager.clone(), + ); let compactor_context = CompactorContext::new_local_compact_context( options.clone(), sstable_store.clone(), compactor_metrics.clone(), - FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - filter_key_extractor_manager.clone(), - ), ); let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); @@ -171,6 +172,7 @@ impl HummockStorage { event_rx, pinned_version, compactor_context.clone(), + filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), state_store_metrics.clone(), CacheRefillConfig { @@ -186,6 +188,7 @@ impl HummockStorage { let instance = Self { context: compactor_context, + filter_key_extractor_manager: filter_key_extractor_manager.clone(), sstable_object_id_manager, buffer_tracker: hummock_event_handler.buffer_tracker().clone(), version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), @@ -348,7 +351,7 @@ impl HummockStorage { } pub fn filter_key_extractor_manager(&self) -> &FilterKeyExtractorManager { - &self.context.filter_key_extractor_manager + &self.filter_key_extractor_manager } pub fn get_memory_limiter(&self) -> Arc { diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 5b53ba70b86bd..486b819d3ee7e 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -581,21 +581,26 @@ fn run_compactor_thread( tokio::task::JoinHandle<()>, tokio::sync::oneshot::Sender<()>, ) { + let filter_key_extractor_manager = + FilterKeyExtractorManager::RpcFilterKeyExtractorManager(filter_key_extractor_manager); let compactor_context = CompactorContext { storage_opts, sstable_store, compactor_metrics, is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(None)), - filter_key_extractor_manager: FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - filter_key_extractor_manager, - ), + memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), }; - start_compactor(compactor_context, meta_client, sstable_object_id_manager) + start_compactor( + compactor_context, + meta_client, + sstable_object_id_manager, + filter_key_extractor_manager, + ) } #[cfg(test)]