Skip to content

Commit

Permalink
refactor(cache): upgrade foyer to 0.12 and related deps (#18822)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Oct 11, 2024
1 parent 442086b commit 7fa1dff
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 86 deletions.
2 changes: 2 additions & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ extend-exclude = [
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
# We don't want to fix "Divy" here, but may want in other places.
"integration_tests/deltalake-sink/spark-script/run-sql-file.sh"
]
32 changes: 22 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.11.5", features = ["mtrace", "nightly"] }
foyer = { version = "0.12.2", features = ["tracing", "nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down
10 changes: 5 additions & 5 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::num::NonZeroUsize;
use anyhow::Context;
use clap::ValueEnum;
use educe::Educe;
use foyer::{Compression, LfuConfig, LruConfig, RecoverMode, RuntimeConfig, S3FifoConfig};
use foyer::{Compression, LfuConfig, LruConfig, RecoverMode, RuntimeOptions, S3FifoConfig};
use risingwave_common_proc_macro::ConfigDoc;
pub use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -897,7 +897,7 @@ pub struct FileCacheConfig {
pub recover_mode: RecoverMode,

#[serde(default = "default::file_cache::runtime_config")]
pub runtime_config: RuntimeConfig,
pub runtime_config: RuntimeOptions,

#[serde(default, flatten)]
#[config_doc(omitted)]
Expand Down Expand Up @@ -1717,7 +1717,7 @@ pub mod default {
}

pub mod file_cache {
use foyer::{Compression, RecoverMode, RuntimeConfig, TokioRuntimeConfig};
use foyer::{Compression, RecoverMode, RuntimeOptions, TokioRuntimeOptions};

pub fn dir() -> String {
"".to_string()
Expand Down Expand Up @@ -1763,8 +1763,8 @@ pub mod default {
RecoverMode::None
}

pub fn runtime_config() -> RuntimeConfig {
RuntimeConfig::Unified(TokioRuntimeConfig::default())
pub fn runtime_config() -> RuntimeOptions {
RuntimeOptions::Unified(TokioRuntimeOptions::default())
}
}

Expand Down
38 changes: 25 additions & 13 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::fs;
use std::path::Path;
use std::time::Duration;

use foyer::HybridCache;
use foyer::{HybridCache, TracingOptions};
use itertools::Itertools;
use prometheus::core::Collector;
use risingwave_common::config::{MetricLevel, ServerConfig};
Expand Down Expand Up @@ -378,22 +378,28 @@ impl MonitorService for MonitorServiceImpl {
} else {
cache.disable_tracing();
}
let config = cache.tracing_config();
let mut options = TracingOptions::new();
if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
config.set_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_get_threshold_ms {
config.set_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
options =
options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
config.set_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
config.set_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
config.set_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
}
cache.update_tracing_options(options);
}

if let Some(cache) = &self.block_cache {
Expand All @@ -402,22 +408,28 @@ impl MonitorService for MonitorServiceImpl {
} else {
cache.disable_tracing();
}
let config = cache.tracing_config();
let mut options = TracingOptions::new();
if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
config.set_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_get_threshold_ms {
config.set_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
options =
options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
config.set_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
config.set_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
config.set_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
options = options
.with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
}
cache.update_tracing_options(options);
}

Ok(Response::new(TieredCacheTracingResponse::default()))
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct SinkDesc {
/// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
pub columns: Vec<ColumnCatalog>,

/// Primiary keys of the sink. Derived by the frontend.
/// Primary keys of the sink. Derived by the frontend.
pub plan_pk: Vec<ColumnOrder>,

/// User-defined primary key indices for upsert sink.
Expand Down
6 changes: 3 additions & 3 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use foyer::HybridCacheBuilder;
use foyer::{Engine, HybridCacheBuilder};
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_rpc_client::MetaClient;
Expand Down Expand Up @@ -179,13 +179,13 @@ impl HummockServiceOpts {
let meta_cache = HybridCacheBuilder::new()
.memory(opts.meta_cache_capacity_mb * (1 << 20))
.with_shards(opts.meta_cache_shard_num)
.storage()
.storage(Engine::Large)
.build()
.await?;
let block_cache = HybridCacheBuilder::new()
.memory(opts.block_cache_capacity_mb * (1 << 20))
.with_shards(opts.block_cache_shard_num)
.storage()
.storage(Engine::Large)
.build()
.await?;

Expand Down
6 changes: 3 additions & 3 deletions src/java_binding/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use bytes::Bytes;
use foyer::HybridCacheBuilder;
use foyer::{Engine, HybridCacheBuilder};
use futures::{TryFutureExt, TryStreamExt};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
Expand Down Expand Up @@ -83,15 +83,15 @@ pub(crate) async fn new_hummock_java_binding_iter(
let meta_cache = HybridCacheBuilder::new()
.memory(1 << 10)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.map_err(HummockError::foyer_error)
.map_err(StorageError::from)
.await?;
let block_cache = HybridCacheBuilder::new()
.memory(1 << 10)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.map_err(HummockError::foyer_error)
.map_err(StorageError::from)
Expand Down
5 changes: 3 additions & 2 deletions src/storage/benches/bench_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use foyer::Engine;
use moka::future::Cache;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
Expand Down Expand Up @@ -187,7 +188,7 @@ impl FoyerHybridCache {
high_priority_pool_ratio: 0.8,
})
.with_object_pool_capacity(8 * 1024)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
Expand All @@ -208,7 +209,7 @@ impl FoyerHybridCache {
cmsketch_confidence: 0.9,
})
.with_object_pool_capacity(8 * 1024)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use foyer::{CacheContext, HybridCacheBuilder};
use foyer::{CacheContext, Engine, HybridCacheBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -63,14 +63,14 @@ pub async fn mock_sstable_store() -> SstableStoreRef {
let meta_cache = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
let block_cache = HybridCacheBuilder::new()
.memory(128 << 20)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};
use foyer::HybridCacheBuilder;
use foyer::{Engine, HybridCacheBuilder};
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
Expand Down Expand Up @@ -130,14 +130,14 @@ async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<Sstab
let meta_cache = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
let block_cache = HybridCacheBuilder::new()
.memory(128 << 20)
.with_shards(2)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::path::Path;
use std::sync::Arc;

use clap::Parser;
use foyer::HybridCacheBuilder;
use foyer::{Engine, HybridCacheBuilder};
use replay_impl::{get_replay_notification_client, GlobalReplayImpl};
use risingwave_common::config::{
extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig,
Expand Down Expand Up @@ -115,14 +115,14 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
let meta_cache = HybridCacheBuilder::new()
.memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
.with_shards(storage_opts.meta_cache_shard_num)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
let block_cache = HybridCacheBuilder::new()
.memory(storage_opts.block_cache_capacity_mb * (1 << 20))
.with_shards(storage_opts.block_cache_shard_num)
.storage()
.storage(Engine::Large)
.build()
.await
.unwrap();
Expand Down
Loading

0 comments on commit 7fa1dff

Please sign in to comment.