diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 3353b1a433163..6eb62492999a0 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -408,7 +408,7 @@ mod test { let raw_opts = " --compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/ --meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/ ---frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ +--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368 --prometheus-listener-addr=127.0.0.1:1234 --config-path=src/config/test.toml "; @@ -416,7 +416,7 @@ mod test { let opts = StandaloneOpts { compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()), meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()), - frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/".into()), + frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ), compactor_opts: None, prometheus_listener_addr: Some("127.0.0.1:1234".into()), config_path: Some("src/config/test.toml".into()), @@ -508,6 +508,7 @@ mod test { metrics_level: None, enable_barrier_read: None, temp_secret_file_dir: "./frontend/secrets/", + frontend_total_memory_bytes: 34359738368, }, ), compactor_opts: None, diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5c006e191157e..9f29fd82e066e 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -61,6 +61,7 @@ pub mod session; mod stream_fragmenter; use risingwave_common::config::{MetricLevel, OverrideConfig}; use risingwave_common::util::meta_addr::MetaAddressStrategy; +use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; pub use stream_fragmenter::build_graph; mod utils; @@ -159,6 +160,10 @@ pub struct FrontendOpts { default_value = "./secrets" )] pub temp_secret_file_dir: String, + + /// Total available memory for the frontend node in bytes. Used by both computing and storage. + #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())] + pub frontend_total_memory_bytes: usize, } impl risingwave_common::opts::Opts for FrontendOpts { @@ -220,3 +225,7 @@ pub fn start( .unwrap() }) } + +pub fn default_frontend_total_memory_bytes() -> usize { + system_memory_available_bytes() +} diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9157466d9af91..fa4836e73e958 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -59,10 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::cluster_limit; use risingwave_common::util::cluster_limit::ActorCountPerParallelism; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::runtime::BackgroundShutdownRuntime; -use risingwave_common::util::{cluster_limit, resource_util}; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager}; @@ -444,7 +444,7 @@ impl FrontendEnv { .map_err(|err| anyhow!(err))?; } - let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); + let total_memory_bytes = opts.frontend_total_memory_bytes; let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); // Run a background heap profiler diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 4c503f3d7a8d5..ebacbfb93f59a 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -22,6 +22,7 @@ use risingwave_common::config::{ AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, }; use risingwave_common::util::meta_addr::MetaAddressStrategy; +use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; use crate::server::{compactor_serve, shared_compactor_serve}; @@ -92,6 +93,10 @@ pub struct CompactorOpts { #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] pub proxy_rpc_endpoint: String, + + /// Total available memory for the frontend node in bytes. Used by compactor. + #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())] + pub compactor_total_memory_bytes: usize, } impl risingwave_common::opts::Opts for CompactorOpts { @@ -143,3 +148,7 @@ pub fn start( }), } } + +pub fn default_compactor_total_memory_bytes() -> usize { + system_memory_available_bytes() +} diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 72ae1542f116a..bc787e5b18ca3 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -59,6 +59,7 @@ use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; pub async fn prepare_start_parameters( + compactor_opts: &CompactorOpts, config: RwConfig, system_params_reader: SystemParamsReader, ) -> ( @@ -81,7 +82,7 @@ pub async fn prepare_start_parameters( &system_params_reader, &storage_memory_config, ))); - let non_reserved_memory_bytes = (system_memory_available_bytes() as f64 + let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64 * config.storage.compactor_memory_available_proportion) as usize; let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20); @@ -186,7 +187,7 @@ pub async fn compactor_serve( // Register to the cluster. let (meta_client, system_params_reader) = MetaClient::register_new( - opts.meta_address, + opts.meta_address.clone(), WorkerType::Compactor, &advertise_addr, Default::default(), @@ -210,7 +211,7 @@ pub async fn compactor_serve( await_tree_reg, storage_opts, compactor_metrics, - ) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await; + ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await; let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new( RemoteTableAccessor::new(meta_client.clone()), @@ -338,7 +339,7 @@ pub async fn shared_compactor_serve( await_tree_reg, storage_opts, compactor_metrics, - ) = prepare_start_parameters(config.clone(), system_params.into()).await; + ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await; let (sender, receiver) = mpsc::unbounded_channel(); let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 43bb08c44a0ba..74ecff472a5d5 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -38,9 +38,9 @@ use tokio::sync::oneshot; use super::local_hummock_storage::LocalHummockStorage; use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; -use crate::compaction_catalog_manager::{ - CompactionCatalogManager, CompactionCatalogManagerRef, FakeRemoteTableAccessor, -}; +use crate::compaction_catalog_manager::CompactionCatalogManagerRef; +#[cfg(any(test, feature = "test"))] +use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor}; use crate::error::StorageResult; use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::{ @@ -734,7 +734,6 @@ impl HummockStorage { } /// Used in the compaction test tool - #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id;