Skip to content

Commit

Permalink
feat(memory): Separate total memory configurations for FE and Compact…
Browse files Browse the repository at this point in the history
…or (#19372)
  • Loading branch information
Li0k authored Nov 15, 2024
1 parent ac6cb38 commit 4085f56
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 12 deletions.
5 changes: 3 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,15 @@ mod test {
let raw_opts = "
--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
--prometheus-listener-addr=127.0.0.1:1234
--config-path=src/config/test.toml
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
compactor_opts: None,
prometheus_listener_addr: Some("127.0.0.1:1234".into()),
config_path: Some("src/config/test.toml".into()),
Expand Down Expand Up @@ -508,6 +508,7 @@ mod test {
metrics_level: None,
enable_barrier_read: None,
temp_secret_file_dir: "./frontend/secrets/",
frontend_total_memory_bytes: 34359738368,
},
),
compactor_opts: None,
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod session;
mod stream_fragmenter;
use risingwave_common::config::{MetricLevel, OverrideConfig};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
pub use stream_fragmenter::build_graph;
mod utils;
Expand Down Expand Up @@ -159,6 +160,10 @@ pub struct FrontendOpts {
default_value = "./secrets"
)]
pub temp_secret_file_dir: String,

/// Total available memory for the frontend node in bytes. Used by both computing and storage.
#[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
pub frontend_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for FrontendOpts {
Expand Down Expand Up @@ -220,3 +225,7 @@ pub fn start(
.unwrap()
})
}

pub fn default_frontend_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
Expand Down Expand Up @@ -444,7 +444,7 @@ impl FrontendEnv {
.map_err(|err| anyhow!(err))?;
}

let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let total_memory_bytes = opts.frontend_total_memory_bytes;
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
// Run a background heap profiler
Expand Down
9 changes: 9 additions & 0 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;

use crate::server::{compactor_serve, shared_compactor_serve};
Expand Down Expand Up @@ -92,6 +93,10 @@ pub struct CompactorOpts {

#[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,

/// Total available memory for the frontend node in bytes. Used by compactor.
#[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
pub compactor_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for CompactorOpts {
Expand Down Expand Up @@ -143,3 +148,7 @@ pub fn start(
}),
}
}

pub fn default_compactor_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
9 changes: 5 additions & 4 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::telemetry::CompactorTelemetryCreator;
use crate::CompactorOpts;

pub async fn prepare_start_parameters(
compactor_opts: &CompactorOpts,
config: RwConfig,
system_params_reader: SystemParamsReader,
) -> (
Expand All @@ -81,7 +82,7 @@ pub async fn prepare_start_parameters(
&system_params_reader,
&storage_memory_config,
)));
let non_reserved_memory_bytes = (system_memory_available_bytes() as f64
let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
* config.storage.compactor_memory_available_proportion)
as usize;
let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20);
Expand Down Expand Up @@ -186,7 +187,7 @@ pub async fn compactor_serve(

// Register to the cluster.
let (meta_client, system_params_reader) = MetaClient::register_new(
opts.meta_address,
opts.meta_address.clone(),
WorkerType::Compactor,
&advertise_addr,
Default::default(),
Expand All @@ -210,7 +211,7 @@ pub async fn compactor_serve(
await_tree_reg,
storage_opts,
compactor_metrics,
) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await;
) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;

let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
RemoteTableAccessor::new(meta_client.clone()),
Expand Down Expand Up @@ -338,7 +339,7 @@ pub async fn shared_compactor_serve(
await_tree_reg,
storage_opts,
compactor_metrics,
) = prepare_start_parameters(config.clone(), system_params.into()).await;
) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
let (sender, receiver) = mpsc::unbounded_channel();
let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);

Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use tokio::sync::oneshot;

use super::local_hummock_storage::LocalHummockStorage;
use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader};
use crate::compaction_catalog_manager::{
CompactionCatalogManager, CompactionCatalogManagerRef, FakeRemoteTableAccessor,
};
use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
#[cfg(any(test, feature = "test"))]
use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
use crate::error::StorageResult;
use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
use crate::hummock::compactor::{
Expand Down Expand Up @@ -734,7 +734,6 @@ impl HummockStorage {
}

/// Used in the compaction test tool
#[cfg(any(test, feature = "test"))]
pub async fn update_version_and_wait(&self, version: HummockVersion) {
use tokio::task::yield_now;
let version_id = version.id;
Expand Down

0 comments on commit 4085f56

Please sign in to comment.