Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compactor): introduce dedicated config for compactor meta cache #19484

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ pub struct CacheConfig {
#[serde(default)]
#[config_doc(omitted)]
pub meta_cache_eviction: CacheEvictionConfig,

/// Configure the capacity of the meta cache in MB explicitly for compactor.
#[serde(default)]
pub compactor_meta_cache_capacity_mb: Option<usize>,
}

/// the section `[storage.cache.eviction]` in `risingwave.toml`.
Expand Down Expand Up @@ -2442,7 +2446,7 @@ pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.

pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
pub fn extract_storage_memory_config_default(s: &RwConfig) -> StorageMemoryConfig {
let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or(
// adapt to old version
s.storage
Expand Down Expand Up @@ -2474,10 +2478,6 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
}
shard_bits
});
let compactor_memory_limit_mb = s
.storage
.compactor_memory_limit_mb
.unwrap_or(default::storage::compactor_memory_limit_mb());

let get_eviction_config = |c: &CacheEvictionConfig| {
match c {
Expand Down Expand Up @@ -2564,15 +2564,42 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
meta_cache_capacity_mb,
meta_cache_shard_num,
shared_buffer_capacity_mb,
compactor_memory_limit_mb,
prefetch_buffer_capacity_mb,
block_cache_eviction_config,
meta_cache_eviction_config,
block_file_cache_flush_buffer_threshold_mb,
meta_file_cache_flush_buffer_threshold_mb,

compactor_memory_limit_mb: 0,
}
}

pub fn extract_compactor_memory_config(
s: &RwConfig,
total_memory_bytes: u64,
) -> StorageMemoryConfig {
let mut default = extract_storage_memory_config_default(s);

pub const MIN_COMPACTOR_META_CACHE_CAPACITY_MB: usize = 128;
pub const MAX_COMPACTOR_META_CACHE_CAPACITY_PROPORTION: f64 = 0.02;

default.compactor_memory_limit_mb =
(total_memory_bytes as f64 * s.storage.compactor_memory_available_proportion) as usize;
default.meta_cache_capacity_mb = s
.storage
.cache
.compactor_meta_cache_capacity_mb
.unwrap_or_else(|| {
std::cmp::min(
MIN_COMPACTOR_META_CACHE_CAPACITY_MB,
(total_memory_bytes as f64 * MAX_COMPACTOR_META_CACHE_CAPACITY_PROPORTION) as usize
>> 20,
)
});

default
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct CompactionConfig {
#[serde(default = "default::compaction_config::max_bytes_for_level_base")]
Expand Down
17 changes: 13 additions & 4 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,14 @@ pub fn storage_memory_config(
.unwrap_or(default_block_cache_capacity_mb),
);

let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
);
let compactor_memory_limit_mb = if embedded_compactor_enabled {
storage_config.compactor_memory_limit_mb.unwrap_or(
((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize)
>> 20,
)
} else {
0
};

// The file cache flush buffer threshold is used as a emergency limitation.
// On most cases the flush buffer is not supposed to be as large as the threshold.
Expand Down Expand Up @@ -328,7 +333,11 @@ pub fn storage_memory_config(
meta_cache_capacity_mb,
meta_cache_shard_num,
shared_buffer_capacity_mb,
compactor_memory_limit_mb,
compactor_memory_limit_mb: if embedded_compactor_enabled {
compactor_memory_limit_mb
} else {
0 // unused
},
prefetch_buffer_capacity_mb,
block_cache_eviction_config,
meta_cache_eviction_config,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub async fn compute_node_serve(
if embedded_compactor_enabled {
tracing::info!("start embedded compactor");
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
storage_memory_config.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));

let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
Expand Down
29 changes: 14 additions & 15 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ use std::sync::Arc;
use std::time::Duration;

use risingwave_common::config::{
extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig,
extract_compactor_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig,
};
use risingwave_common::monitor::{RouterExt, TcpConfig};
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
Expand Down Expand Up @@ -58,6 +57,7 @@ use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
use crate::telemetry::CompactorTelemetryCreator;
use crate::CompactorOpts;

// Only used for non-embedded compactor
pub async fn prepare_start_parameters(
compactor_opts: &CompactorOpts,
config: RwConfig,
Expand All @@ -76,24 +76,24 @@ pub async fn prepare_start_parameters(

let state_store_url = system_params_reader.state_store();

let storage_memory_config = extract_storage_memory_config(&config);
let storage_memory_config = extract_compactor_memory_config(
&config,
compactor_opts.compactor_total_memory_bytes as u64,
);
let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
&config,
&system_params_reader,
&storage_memory_config,
)));
let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
* config.storage.compactor_memory_available_proportion)
as usize;
let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20);
let compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
Some(compactor_memory_limit_mb) => compactor_memory_limit_mb as u64 * (1 << 20),
None => (non_reserved_memory_bytes - meta_cache_capacity_bytes) as u64,
};
let compactor_memory_limit_bytes =
storage_memory_config.compactor_memory_limit_mb as u64 * (1 << 20);
let meta_cache_capacity_bytes = storage_memory_config.meta_cache_capacity_mb * (1 << 20);
let compactor_memory_limit_bytes =
compactor_memory_limit_bytes.saturating_sub(meta_cache_capacity_bytes as u64);

tracing::info!(
"Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
non_reserved_memory_bytes, meta_cache_capacity_bytes, compactor_memory_limit_bytes,
"Compactor compactor_memory_limit_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
compactor_memory_limit_bytes, meta_cache_capacity_bytes, compactor_memory_limit_bytes,
storage_opts.sstable_size_mb * (1 << 20),
storage_opts.block_size_kb * (1 << 10),
);
Expand Down Expand Up @@ -134,15 +134,14 @@ pub async fn prepare_start_parameters(
);

let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes));
let storage_memory_config = extract_storage_memory_config(&config);
let memory_collector = Arc::new(HummockMemoryCollector::new(
sstable_store.clone(),
memory_limiter.clone(),
storage_memory_config,
));

let heap_profiler = HeapProfiler::new(
system_memory_available_bytes(),
compactor_opts.compactor_total_memory_bytes,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? The documentation of HeapProfiler says:

total_memory must be the total available memory for the process.
It will be compared with the process resident memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong.

I refer to cn's approach, where compactor_opts.compactor_total_memory_bytes represents the memory limit for the current process, whether it is a dedicated or standalone deployment.

I understand that after #19477, standalone will also set total_memory_bytes for cn, so the current pr is similar to CN.

config.server.heap_profiling.clone(),
);

Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use clap::Parser;
use foyer::{Engine, HybridCacheBuilder};
use replay_impl::{get_replay_notification_client, GlobalReplayImpl};
use risingwave_common::config::{
extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig,
extract_storage_memory_config_default, load_config, NoOverride, ObjectStoreConfig,
};
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_hummock_trace::{
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn run_replay(args: Args) -> Result<()> {

async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay> {
let config = load_config(&args.config, NoOverride);
let storage_memory_config = extract_storage_memory_config(&config);
let storage_memory_config = extract_storage_memory_config_default(&config);
let system_params_reader =
SystemParamsReader::from(config.system.clone().into_init_system_params());

Expand Down
1 change: 0 additions & 1 deletion src/storage/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub fn default_opts_for_test() -> StorageOpts {
block_cache_eviction_config: EvictionConfig::for_test(),
disable_remote_compactor: false,
share_buffer_upload_concurrency: 1,
compactor_memory_limit_mb: 64,
sstable_id_remote_fetch_number: 1,
..Default::default()
}
Expand Down
8 changes: 3 additions & 5 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

use risingwave_common::config::{
extract_storage_memory_config, EvictionConfig, ObjectStoreConfig, RwConfig, StorageMemoryConfig,
extract_storage_memory_config_default, EvictionConfig, ObjectStoreConfig, RwConfig,
StorageMemoryConfig,
};
use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
use risingwave_common::system_param::system_params_for_test;
Expand Down Expand Up @@ -70,8 +71,6 @@ pub struct StorageOpts {
pub disable_remote_compactor: bool,
/// Number of tasks shared buffer can upload in parallel.
pub share_buffer_upload_concurrency: usize,
/// Capacity of sstable meta cache.
pub compactor_memory_limit_mb: usize,
/// compactor streaming iterator recreate timeout.
/// deprecated
pub compact_iter_recreate_timeout_ms: u64,
Expand Down Expand Up @@ -151,7 +150,7 @@ impl Default for StorageOpts {
fn default() -> Self {
let c = RwConfig::default();
let p = system_params_for_test();
let s = extract_storage_memory_config(&c);
let s = extract_storage_memory_config_default(&c);
Self::from((&c, &p.into(), &s))
}
}
Expand Down Expand Up @@ -184,7 +183,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
max_prefetch_block_number: c.storage.max_prefetch_block_number,
disable_remote_compactor: c.storage.disable_remote_compactor,
share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency,
compactor_memory_limit_mb: s.compactor_memory_limit_mb,
sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number,
min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload,
max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number,
Expand Down
4 changes: 2 additions & 2 deletions src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use clap::Parser;
use foyer::CacheContext;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{
extract_storage_memory_config, load_config, MetaConfig, NoOverride,
extract_storage_memory_config_default, load_config, MetaConfig, NoOverride,
};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn start_replay(
}

// Creates a hummock state store *after* we reset the hummock version
let storage_memory_config = extract_storage_memory_config(&config);
let storage_memory_config = extract_storage_memory_config_default(&config);
let storage_opts = Arc::new(StorageOpts::from((
&config,
&system_params,
Expand Down
Loading