From 11feadeaeece918ed80d24162b75505bef639d06 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 13 Nov 2024 15:32:40 +0800 Subject: [PATCH 1/4] feat(memory): Separate memory configurations --- src/frontend/src/lib.rs | 9 +++++++++ src/frontend/src/session.rs | 4 ++-- src/storage/compactor/src/lib.rs | 9 +++++++++ src/storage/compactor/src/server.rs | 9 +++++---- src/storage/src/hummock/store/hummock_storage.rs | 6 +++--- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5c006e191157e..c7effa0404bef 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -61,6 +61,7 @@ pub mod session; mod stream_fragmenter; use risingwave_common::config::{MetricLevel, OverrideConfig}; use risingwave_common::util::meta_addr::MetaAddressStrategy; +use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; pub use stream_fragmenter::build_graph; mod utils; @@ -159,6 +160,10 @@ pub struct FrontendOpts { default_value = "./secrets" )] pub temp_secret_file_dir: String, + + /// Total available memory for the frontend node in bytes. Used by both computing and storage. + #[clap(long, env = "RW_FE_TOTAL_MEMORY_BYTES", default_value_t = default_fe_total_memory_bytes())] + pub fe_total_memory_bytes: usize, } impl risingwave_common::opts::Opts for FrontendOpts { @@ -220,3 +225,7 @@ pub fn start( .unwrap() }) } + +pub fn default_fe_total_memory_bytes() -> usize { + system_memory_available_bytes() +} diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9157466d9af91..7d1ee7b7d06e1 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -59,10 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::cluster_limit; use risingwave_common::util::cluster_limit::ActorCountPerParallelism; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::runtime::BackgroundShutdownRuntime; -use risingwave_common::util::{cluster_limit, resource_util}; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager}; @@ -444,7 +444,7 @@ impl FrontendEnv { .map_err(|err| anyhow!(err))?; } - let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); + let total_memory_bytes = opts.fe_total_memory_bytes; let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); // Run a background heap profiler diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 4c503f3d7a8d5..21c3c0b92d803 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -22,6 +22,7 @@ use risingwave_common::config::{ AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, }; use risingwave_common::util::meta_addr::MetaAddressStrategy; +use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; use crate::server::{compactor_serve, shared_compactor_serve}; @@ -92,6 +93,10 @@ pub struct CompactorOpts { #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] pub proxy_rpc_endpoint: String, + + /// Total available memory for the frontend node in bytes. Used by both computing and storage. + #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())] + pub compactor_total_memory_bytes: usize, } impl risingwave_common::opts::Opts for CompactorOpts { @@ -143,3 +148,7 @@ pub fn start( }), } } + +pub fn default_compactor_total_memory_bytes() -> usize { + system_memory_available_bytes() +} diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 72ae1542f116a..bc787e5b18ca3 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -59,6 +59,7 @@ use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; pub async fn prepare_start_parameters( + compactor_opts: &CompactorOpts, config: RwConfig, system_params_reader: SystemParamsReader, ) -> ( @@ -81,7 +82,7 @@ pub async fn prepare_start_parameters( &system_params_reader, &storage_memory_config, ))); - let non_reserved_memory_bytes = (system_memory_available_bytes() as f64 + let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64 * config.storage.compactor_memory_available_proportion) as usize; let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20); @@ -186,7 +187,7 @@ pub async fn compactor_serve( // Register to the cluster. let (meta_client, system_params_reader) = MetaClient::register_new( - opts.meta_address, + opts.meta_address.clone(), WorkerType::Compactor, &advertise_addr, Default::default(), @@ -210,7 +211,7 @@ pub async fn compactor_serve( await_tree_reg, storage_opts, compactor_metrics, - ) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await; + ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await; let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new( RemoteTableAccessor::new(meta_client.clone()), @@ -338,7 +339,7 @@ pub async fn shared_compactor_serve( await_tree_reg, storage_opts, compactor_metrics, - ) = prepare_start_parameters(config.clone(), system_params.into()).await; + ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await; let (sender, receiver) = mpsc::unbounded_channel(); let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 3b4a143d0ad53..8544d1195ca8a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -38,9 +38,9 @@ use tokio::sync::oneshot; use super::local_hummock_storage::LocalHummockStorage; use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; -use crate::compaction_catalog_manager::{ - CompactionCatalogManager, CompactionCatalogManagerRef, FakeRemoteTableAccessor, -}; +use crate::compaction_catalog_manager::CompactionCatalogManagerRef; +#[cfg(test)] +use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor}; use crate::error::StorageResult; use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::{ From 67abe540004810ae4ce000a42473e517ac201c9e Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 13 Nov 2024 16:07:00 +0800 Subject: [PATCH 2/4] fix check --- src/storage/src/hummock/store/hummock_storage.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 8544d1195ca8a..8ff3110dd735a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -39,7 +39,7 @@ use tokio::sync::oneshot; use super::local_hummock_storage::LocalHummockStorage; use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; use crate::compaction_catalog_manager::CompactionCatalogManagerRef; -#[cfg(test)] +#[cfg(any(test, feature = "test"))] use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor}; use crate::error::StorageResult; use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; @@ -724,7 +724,6 @@ impl HummockStorage { } /// Used in the compaction test tool - #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; From fd169aa2c9713ff8d754e3eb9e3fd8a55113a46c Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 14 Nov 2024 16:09:41 +0800 Subject: [PATCH 3/4] address comments --- src/cmd_all/src/standalone.rs | 1 + src/frontend/src/lib.rs | 6 +++--- src/frontend/src/session.rs | 2 +- src/storage/compactor/src/lib.rs | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 3353b1a433163..70f0d1d3ea430 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -508,6 +508,7 @@ mod test { metrics_level: None, enable_barrier_read: None, temp_secret_file_dir: "./frontend/secrets/", + frontend_total_memory_bytes: 34359738368, }, ), compactor_opts: None, diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index c7effa0404bef..9f29fd82e066e 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -162,8 +162,8 @@ pub struct FrontendOpts { pub temp_secret_file_dir: String, /// Total available memory for the frontend node in bytes. Used by both computing and storage. - #[clap(long, env = "RW_FE_TOTAL_MEMORY_BYTES", default_value_t = default_fe_total_memory_bytes())] - pub fe_total_memory_bytes: usize, + #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())] + pub frontend_total_memory_bytes: usize, } impl risingwave_common::opts::Opts for FrontendOpts { @@ -226,6 +226,6 @@ pub fn start( }) } -pub fn default_fe_total_memory_bytes() -> usize { +pub fn default_frontend_total_memory_bytes() -> usize { system_memory_available_bytes() } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7d1ee7b7d06e1..fa4836e73e958 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -444,7 +444,7 @@ impl FrontendEnv { .map_err(|err| anyhow!(err))?; } - let total_memory_bytes = opts.fe_total_memory_bytes; + let total_memory_bytes = opts.frontend_total_memory_bytes; let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); // Run a background heap profiler diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 21c3c0b92d803..ebacbfb93f59a 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -94,7 +94,7 @@ pub struct CompactorOpts { #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] pub proxy_rpc_endpoint: String, - /// Total available memory for the frontend node in bytes. Used by both computing and storage. + /// Total available memory for the frontend node in bytes. Used by compactor. #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())] pub compactor_total_memory_bytes: usize, } From 4d2ee96a5d01adbb1192dcf85c739725520334f7 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 15 Nov 2024 15:55:43 +0800 Subject: [PATCH 4/4] fix ut --- src/cmd_all/src/standalone.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 70f0d1d3ea430..6eb62492999a0 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -408,7 +408,7 @@ mod test { let raw_opts = " --compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/ --meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/ ---frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ +--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368 --prometheus-listener-addr=127.0.0.1:1234 --config-path=src/config/test.toml "; @@ -416,7 +416,7 @@ mod test { let opts = StandaloneOpts { compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()), meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()), - frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/".into()), + frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ), compactor_opts: None, prometheus_listener_addr: Some("127.0.0.1:1234".into()), config_path: Some("src/config/test.toml".into()),