From 83784e269664cb5c107247c19e2b05ffcef512c7 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Fri, 1 Mar 2024 17:50:54 +0800 Subject: [PATCH] feat: replace risingwave lru cache with foyer lru cache for sst cache Signed-off-by: MrCroxx --- Cargo.lock | 71 +++---- Cargo.toml | 7 +- src/batch/Cargo.toml | 1 + src/batch/src/executor/insert.rs | 4 +- src/storage/Cargo.toml | 3 +- src/storage/benches/bench_compactor.rs | 8 +- src/storage/hummock_test/Cargo.toml | 1 + .../benches/bench_hummock_iter.rs | 4 +- .../hummock_test/src/compactor_tests.rs | 14 +- .../hummock_test/src/failpoint_tests.rs | 14 +- .../hummock_test/src/hummock_storage_tests.rs | 86 ++++----- .../hummock_test/src/snapshot_tests.rs | 4 +- .../hummock_test/src/state_store_tests.rs | 72 +++---- .../hummock_test/src/sync_point_tests.rs | 4 +- src/storage/hummock_trace/Cargo.toml | 1 + src/storage/hummock_trace/src/opts.rs | 19 ++ src/storage/src/hummock/block_cache.rs | 141 +++++++------- .../compactor/shared_buffer_compact.rs | 4 +- src/storage/src/hummock/mod.rs | 2 +- .../sstable/backward_sstable_iterator.rs | 4 +- .../sstable/forward_sstable_iterator.rs | 4 +- src/storage/src/hummock/sstable/xor_filter.rs | 6 +- src/storage/src/hummock/sstable_store.rs | 176 ++++++++++-------- src/storage/src/hummock/test_utils.rs | 4 +- src/storage/src/hummock/utils.rs | 8 +- .../src/table/batch_table/storage_table.rs | 8 +- src/stream/Cargo.toml | 1 + .../log_store_impl/kv_log_store/reader.rs | 6 +- src/stream/src/common/table/state_table.rs | 8 +- src/tests/compaction_test/Cargo.toml | 1 + .../src/compaction_test_runner.rs | 4 +- .../src/delete_range_runner.rs | 8 +- src/workspace-hack/Cargo.toml | 3 +- 33 files changed, 374 insertions(+), 327 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bea91ed3c65b..9ad68a1966ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -319,7 +319,7 @@ dependencies = [ "chrono", "chrono-tz", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "num", ] @@ -335,7 +335,7 @@ dependencies = [ "arrow-schema 50.0.0", "chrono", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "num", ] @@ -550,7 +550,7 @@ dependencies = [ "arrow-data 48.0.1", "arrow-schema 48.0.1", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -565,7 +565,7 @@ dependencies = [ "arrow-data 50.0.0", "arrow-schema 50.0.0", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -2411,7 +2411,7 @@ dependencies = [ "cranelift-entity", "cranelift-isle", "gimli", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "log", "regalloc2", "smallvec", @@ -2952,7 +2952,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core 0.9.8", @@ -2997,7 +2997,7 @@ dependencies = [ "futures", "glob", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "itertools 0.11.0", "log", @@ -3048,7 +3048,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "log", "object_store", "parking_lot 0.12.1", @@ -3084,7 +3084,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "itertools 0.11.0", "log", "regex-syntax 0.8.2", @@ -3109,7 +3109,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hex", "indexmap 2.0.0", "itertools 0.11.0", @@ -3144,7 +3144,7 @@ dependencies = [ "datafusion-physical-expr", "futures", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "itertools 0.11.0", "log", @@ -4100,7 +4100,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.5.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "foyer-common", "foyer-intrusive", @@ -4112,7 +4112,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.3.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "anyhow", "bytes", @@ -4127,7 +4127,7 @@ dependencies = [ [[package]] name = "foyer-intrusive" version = "0.2.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "bytes", "cmsketch", @@ -4144,14 +4144,15 @@ dependencies = [ [[package]] name = "foyer-memory" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "ahash 0.8.6", "bitflags 2.4.0", "crossbeam", "foyer-intrusive", "foyer-workspace-hack", - "hashbrown 0.14.0", + "futures", + "hashbrown 0.14.3", "itertools 0.12.0", "libc", "madsim-tokio", @@ -4161,7 +4162,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.4.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "anyhow", "bitflags 2.4.0", @@ -4191,7 +4192,7 @@ dependencies = [ [[package]] name = "foyer-workspace-hack" version = "0.2.0" -source = "git+https://github.com/mrcroxx/foyer.git?rev=2968b21#2968b214af7f990e4f7d46d12928e824312a3347" +source = "git+https://github.com/mrcroxx/foyer?rev=cd4319d#cd4319d180894d9914bcc86bb73252cc1c05d039" dependencies = [ "ahash 0.8.6", "cc", @@ -4203,9 +4204,8 @@ dependencies = [ "futures-executor", "futures-sink", "futures-util", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "libc", - "memchr", "parking_lot 0.12.1", "parking_lot_core 0.9.8", "proc-macro2", @@ -4804,9 +4804,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ "ahash 0.8.6", "allocator-api2", @@ -4818,7 +4818,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -5149,7 +5149,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "serde", ] @@ -5758,7 +5758,7 @@ name = "lru" version = "0.7.6" source = "git+https://github.com/risingwavelabs/lru-rs.git?rev=95f347b#95f347b5ca3c947335a51adf40fa86178ef4d60d" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -5776,7 +5776,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -6610,7 +6610,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "crc32fast", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "memchr", ] @@ -6890,7 +6890,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f" dependencies = [ "dlv-list", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -7083,7 +7083,7 @@ dependencies = [ "chrono", "flate2", "futures", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lz4_flex", "num", "num-bigint", @@ -7118,7 +7118,7 @@ dependencies = [ "flate2", "futures", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lz4_flex", "num", "num-bigint", @@ -8576,10 +8576,11 @@ dependencies = [ "async-trait", "criterion", "either", + "foyer", "futures", "futures-async-stream", "futures-util", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hytra", "icelake", "itertools 0.12.0", @@ -8872,6 +8873,7 @@ dependencies = [ "async-trait", "bytes", "clap", + "foyer", "futures", "madsim-tokio", "prometheus", @@ -9353,6 +9355,7 @@ dependencies = [ "criterion", "expect-test", "fail", + "foyer", "futures", "futures-async-stream", "itertools 0.12.0", @@ -9383,6 +9386,7 @@ dependencies = [ "bincode 2.0.0-rc.3", "byteorder", "bytes", + "foyer", "futures", "futures-async-stream", "itertools 0.12.0", @@ -9892,6 +9896,7 @@ dependencies = [ name = "risingwave_storage" version = "1.7.0-alpha" dependencies = [ + "ahash 0.8.6", "anyhow", "arc-swap", "async-trait", @@ -9974,6 +9979,7 @@ dependencies = [ "enum-as-inner", "expect-test", "fail", + "foyer", "futures", "futures-async-stream", "governor", @@ -13729,6 +13735,7 @@ dependencies = [ "clap_builder", "combine", "crossbeam-epoch", + "crossbeam-queue", "crossbeam-utils", "crypto-bigint 0.5.5", "deranged", @@ -13749,7 +13756,7 @@ dependencies = [ "generic-array", "governor", "hashbrown 0.13.2", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hmac", "hyper", "indexmap 1.9.3", diff --git a/Cargo.toml b/Cargo.toml index 98844e0f36f4..2fd4d5ed7bf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ license = "Apache-2.0" repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] +foyer = { git = "https://github.com/mrcroxx/foyer", rev = "cd4319d" } await-tree = "0.1.1" aws-config = { version = "1", default-features = false, features = [ "behavior-version-latest", @@ -111,11 +112,7 @@ hytra = "0.1" rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [ "cmake-build", ] } -hashbrown = { version = "0.14.0", features = [ - "ahash", - "inline-more", - "nightly", -] } +hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] } criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index c1fcc23ca348..57fcc5034ed5 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -21,6 +21,7 @@ assert_matches = "1" async-recursion = "1" async-trait = "0.1" either = "1" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } futures-util = "0.3" diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 951aec3573e9..048f4e59e27e 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -267,10 +267,10 @@ mod tests { use std::sync::Arc; use assert_matches::assert_matches; + use foyer::memory::eviction::lru::LruContext; use futures::StreamExt; use itertools::Itertools; use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray}; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, }; @@ -400,7 +400,7 @@ mod tests { epoch, None, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index c6bb45e8aa33..4f116f9f4372 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +ahash = "0.8" anyhow = "1" arc-swap = "1" async-trait = "0.1" @@ -26,7 +27,7 @@ dyn-clone = "1.0.14" either = "1" enum-as-inner = "0.6" fail = "0.5" -foyer = { git = "https://github.com/mrcroxx/foyer.git", rev = "2968b21" } +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hex = "0.4" diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index decaffdb63e4..04deee6b1959 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; -use risingwave_common::cache::CachePriority; +use foyer::memory::eviction::lru::LruContext; use risingwave_common::catalog::TableId; use risingwave_common::config::MetricLevel; use risingwave_common::hash::VirtualNode; @@ -71,7 +71,7 @@ pub fn default_writer_opts() -> SstableWriterOptions { SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(LruContext::HighPriority), } } @@ -108,7 +108,7 @@ async fn build_table( SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(LruContext::HighPriority), }, ); let mut builder = @@ -231,7 +231,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { .block_on(async { build_table(sstable_store.clone(), 4, 0..test_key_size, 2).await }); let level2 = vec![info1, info2]; let read_options = Arc::new(SstableIteratorReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), prefetch_for_large_query: false, must_iterated_end_user_key: None, max_preload_retry_times: 0, diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index 04bda1fdb625..76506aa462f6 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" bytes = { version = "1" } clap = { version = "4", features = ["derive"] } fail = "0.5" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = "0.2.9" itertools = "0.12" diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 82804ab09633..5dd2ecfc4e76 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; +use foyer::memory::eviction::lru::LruContext; use futures::{pin_mut, TryStreamExt}; -use risingwave_common::cache::CachePriority; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::HummockEpoch; use risingwave_hummock_test::get_notification_client_for_test; @@ -109,7 +109,7 @@ fn criterion_benchmark(c: &mut Criterion) { ReadOptions { ignore_range_tombstone: true, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, )) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index fc6debe8183c..e453edf9d508 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -21,9 +21,9 @@ pub(crate) mod tests { use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; + use foyer::memory::eviction::lru::LruContext; use itertools::Itertools; use rand::{Rng, RngCore, SeedableRng}; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::constants::hummock::CompactionFilterFlag; use risingwave_common::util::epoch::Epoch; @@ -364,7 +364,7 @@ pub(crate) mod tests { TableKey(key.clone()), read_epoch, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -378,7 +378,7 @@ pub(crate) mod tests { ((TEST_WATERMARK - 1) * 1000) << 16, ReadOptions { prefix_hint: Some(key.clone()), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -501,7 +501,7 @@ pub(crate) mod tests { TableKey(key.clone()), SST_COUNT + 1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -846,7 +846,7 @@ pub(crate) mod tests { ReadOptions { table_id: TableId::from(existing_table_ids), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1041,7 +1041,7 @@ pub(crate) mod tests { ReadOptions { table_id: TableId::from(existing_table_id), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1241,7 +1241,7 @@ pub(crate) mod tests { prefix_hint: Some(Bytes::from(bloom_filter_key)), table_id: TableId::from(existing_table_id), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 2b2f3da3f15a..c81957d263d4 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -16,7 +16,7 @@ use std::ops::Bound; use std::sync::Arc; use bytes::{BufMut, Bytes}; -use risingwave_common::cache::CachePriority; +use foyer::memory::eviction::lru::LruContext; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; @@ -112,7 +112,7 @@ async fn test_failpoints_state_store_read_upload() { 1, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -165,7 +165,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -180,7 +180,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { table_id: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -199,7 +199,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { prefix_hint: Some(Bytes::from(bee_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -237,7 +237,7 @@ async fn test_failpoints_state_store_read_upload() { 5, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -254,7 +254,7 @@ async fn test_failpoints_state_store_read_upload() { 5, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b60481f69921..08a7fd70ed87 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -16,11 +16,11 @@ use std::ops::Range; use std::sync::Arc; use bytes::{BufMut, Bytes}; +use foyer::memory::eviction::lru::LruContext; use futures::TryStreamExt; use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; @@ -122,7 +122,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -137,7 +137,7 @@ async fn test_storage_basic() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -154,7 +154,7 @@ async fn test_storage_basic() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -184,7 +184,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -215,7 +215,7 @@ async fn test_storage_basic() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -232,7 +232,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -252,7 +252,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -292,7 +292,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -310,7 +310,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -329,7 +329,7 @@ async fn test_storage_basic() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -382,7 +382,7 @@ async fn test_storage_basic() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -555,7 +555,7 @@ async fn test_state_store_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -597,7 +597,7 @@ async fn test_state_store_sync() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -620,7 +620,7 @@ async fn test_state_store_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -658,7 +658,7 @@ async fn test_state_store_sync() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -764,7 +764,7 @@ async fn test_delete_get() { epoch2, ReadOptions { prefix_hint: None, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() } ) @@ -861,7 +861,7 @@ async fn test_multiple_epoch_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -877,7 +877,7 @@ async fn test_multiple_epoch_sync() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -891,7 +891,7 @@ async fn test_multiple_epoch_sync() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -989,7 +989,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1011,7 +1011,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1032,7 +1032,7 @@ async fn test_iter_with_min_epoch() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1072,7 +1072,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1094,7 +1094,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1117,7 +1117,7 @@ async fn test_iter_with_min_epoch() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1222,7 +1222,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1250,7 +1250,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1279,7 +1279,7 @@ async fn test_hummock_version_reader() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1344,7 +1344,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1381,7 +1381,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1414,7 +1414,7 @@ async fn test_hummock_version_reader() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1446,7 +1446,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1484,7 +1484,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1516,7 +1516,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, read_snapshot, @@ -1603,7 +1603,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1621,7 +1621,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1639,7 +1639,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1659,7 +1659,7 @@ async fn test_get_with_min_epoch() { retention_seconds: Some(0), prefix_hint: Some(Bytes::from(prefix_hint.clone())), prefetch_options: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1701,7 +1701,7 @@ async fn test_get_with_min_epoch() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1720,7 +1720,7 @@ async fn test_get_with_min_epoch() { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1740,7 +1740,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -1761,7 +1761,7 @@ async fn test_get_with_min_epoch() { retention_seconds: Some(0), prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 2a6dd31dbfee..7bd551339f79 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -15,8 +15,8 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; +use foyer::memory::eviction::lru::LruContext; use futures::TryStreamExt; -use risingwave_common::cache::CachePriority; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::HummockReadEpoch; @@ -48,7 +48,7 @@ macro_rules! assert_count_range_scan { $epoch, ReadOptions { prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index e52983dc787a..8068abf7e9af 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use bytes::Bytes; use expect_test::expect; +use foyer::memory::eviction::lru::LruContext; use futures::{pin_mut, StreamExt, TryStreamExt}; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::{ @@ -49,7 +49,7 @@ async fn test_empty_read_v2() { u64::MAX, ReadOptions { table_id: TableId { table_id: 2333 }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -62,7 +62,7 @@ async fn test_empty_read_v2() { u64::MAX, ReadOptions { table_id: TableId { table_id: 2333 }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -165,7 +165,7 @@ async fn test_basic_inner( anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -178,7 +178,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -193,7 +193,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "ab"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -222,7 +222,7 @@ async fn test_basic_inner( anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -252,7 +252,7 @@ async fn test_basic_inner( anchor.clone(), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -266,7 +266,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "ff"), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -283,7 +283,7 @@ async fn test_basic_inner( ), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -298,7 +298,7 @@ async fn test_basic_inner( anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -313,7 +313,7 @@ async fn test_basic_inner( anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -331,7 +331,7 @@ async fn test_basic_inner( epoch2, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -350,7 +350,7 @@ async fn test_basic_inner( epoch3, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -373,7 +373,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -386,7 +386,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "dd"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -583,7 +583,7 @@ async fn test_reload_storage() { anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -598,7 +598,7 @@ async fn test_reload_storage() { gen_key_from_str(VirtualNode::ZERO, "ab"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -626,7 +626,7 @@ async fn test_reload_storage() { anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -645,7 +645,7 @@ async fn test_reload_storage() { epoch1, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -660,7 +660,7 @@ async fn test_reload_storage() { anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -675,7 +675,7 @@ async fn test_reload_storage() { anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -693,7 +693,7 @@ async fn test_reload_storage() { epoch2, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -729,7 +729,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "aa"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -744,7 +744,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "bb"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -759,7 +759,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "cc"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -776,7 +776,7 @@ async fn test_reload_storage() { // ), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // }, // ) @@ -861,7 +861,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "aa"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -875,7 +875,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "bb"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -889,7 +889,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "cc"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // } // ) @@ -905,7 +905,7 @@ async fn test_reload_storage() { // ), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(LruContext::HighPriority), // ..Default::default() // }, // ) @@ -1076,7 +1076,7 @@ async fn test_delete_get_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() } ) @@ -1170,7 +1170,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() } ) @@ -1184,7 +1184,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() } ) @@ -1197,7 +1197,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() } ) @@ -1353,7 +1353,7 @@ async fn test_replicated_local_hummock_storage() { table_id: TableId { table_id: TEST_TABLE_ID.table_id, }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index b70df9184e05..ee96ddff8c02 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use risingwave_common::cache::CachePriority; +use foyer::memory::eviction::lru::LruContext; use risingwave_common::catalog::hummock::CompactionFilterFlag; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; @@ -430,7 +430,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { storage.wait_version(version).await; let read_options = ReadOptions { table_id: TableId::from(existing_table_id), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; let get_result = storage diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 7bc7de61fefc..6246be4243e9 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -23,6 +23,7 @@ risingwave_pb = { workspace = true } thiserror = "1.0" tokio = { version = "0.2", package = "madsim-tokio" } tracing = "0.1" +foyer ={ workspace = true } [dev-dependencies] itertools = "0.12" diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 241a8e26769d..aceab98cf9e9 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -13,6 +13,7 @@ // limitations under the License. use bincode::{Decode, Encode}; +use foyer::memory::eviction::lru::LruContext; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::EpochPair; @@ -58,6 +59,24 @@ impl From for CachePriority { } } +impl From for TracedCachePriority { + fn from(value: LruContext) -> Self { + match value { + LruContext::HighPriority => Self::High, + LruContext::LowPriority => Self::Low, + } + } +} + +impl From for LruContext { + fn from(value: TracedCachePriority) -> Self { + match value { + TracedCachePriority::High => Self::HighPriority, + TracedCachePriority::Low => Self::LowPriority, + } + } +} + #[derive(Copy, Encode, Decode, PartialEq, Eq, Debug, Clone, Hash)] pub struct TracedTableId { pub table_id: u32, diff --git a/src/storage/src/hummock/block_cache.rs b/src/storage/src/hummock/block_cache.rs index e71714e8f8bb..8931cad47940 100644 --- a/src/storage/src/hummock/block_cache.rs +++ b/src/storage/src/hummock/block_cache.rs @@ -12,16 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::ops::Deref; use std::sync::Arc; +use ahash::RandomState; use await_tree::InstrumentAwait; +use foyer::memory::cache::{LruCache, LruCacheConfig, LruCacheEntry}; +use foyer::memory::eviction::lru::{LruConfig, LruContext}; use futures::Future; -use risingwave_common::cache::{ - CachePriority, CacheableEntry, LookupResponse, LruCache, LruCacheEventListener, -}; use risingwave_hummock_sdk::HummockSstableObjectId; use tokio::sync::oneshot::Receiver; use tokio::task::JoinHandle; @@ -29,9 +27,7 @@ use tokio::task::JoinHandle; use super::{Block, HummockResult}; use crate::hummock::HummockError; -const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; - -type CachedBlockEntry = CacheableEntry<(HummockSstableObjectId, u64), Box>; +type CachedBlockEntry = LruCacheEntry<(HummockSstableObjectId, u64), Box>; enum BlockEntry { Cache(CachedBlockEntry), @@ -81,8 +77,8 @@ impl Deref for BlockHolder { unsafe impl Send for BlockHolder {} unsafe impl Sync for BlockHolder {} -type BlockCacheEventListener = - Arc>>; +// type BlockCacheEventListener = +// Arc>>; #[derive(Clone)] pub struct BlockCache { @@ -115,45 +111,56 @@ impl BlockResponse { impl BlockCache { pub fn new(capacity: usize, max_shard_bits: usize, high_priority_ratio: usize) -> Self { - Self::new_inner(capacity, max_shard_bits, high_priority_ratio, None) + // Self::new_inner(capacity, max_shard_bits, high_priority_ratio, None) + Self::new_inner(capacity, max_shard_bits, high_priority_ratio) } - pub fn with_event_listener( - capacity: usize, - max_shard_bits: usize, - high_priority_ratio: usize, - listener: BlockCacheEventListener, - ) -> Self { - Self::new_inner( - capacity, - max_shard_bits, - high_priority_ratio, - Some(listener), - ) - } + // pub fn with_event_listener( + // capacity: usize, + // max_shard_bits: usize, + // high_priority_ratio: usize, + // listener: BlockCacheEventListener, + // ) -> Self { + // Self::new_inner( + // capacity, + // max_shard_bits, + // high_priority_ratio, + // Some(listener), + // ) + // } fn new_inner( capacity: usize, - mut max_shard_bits: usize, + max_shard_bits: usize, high_priority_ratio: usize, - listener: Option, + // listener: Option, ) -> Self { if capacity == 0 { panic!("block cache capacity == 0"); } - while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 { - max_shard_bits -= 1; - } - - let cache = match listener { - Some(listener) => LruCache::with_event_listener( - max_shard_bits, - capacity, - high_priority_ratio, - listener, - ), - None => LruCache::new(max_shard_bits, capacity, high_priority_ratio), - }; + // while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 { + // max_shard_bits -= 1; + // } + + // let cache = match listener { + // Some(listener) => LruCache::with_event_listener( + // max_shard_bits, + // capacity, + // high_priority_ratio, + // listener, + // ), + // None => LruCache::new(max_shard_bits, capacity, high_priority_ratio), + // }; + + let cache = LruCache::new(LruCacheConfig { + capacity, + shards: 1 << max_shard_bits, + eviction_config: LruConfig { + high_priority_pool_ratio: high_priority_ratio as f64 / 100.0, + }, + object_pool_capacity: (1 << max_shard_bits) * 1024, + hash_builder: RandomState::default(), + }); Self { inner: Arc::new(cache), @@ -162,13 +169,13 @@ impl BlockCache { pub fn get(&self, object_id: HummockSstableObjectId, block_idx: u64) -> Option { self.inner - .lookup(Self::hash(object_id, block_idx), &(object_id, block_idx)) + .get(&(object_id, block_idx)) .map(BlockHolder::from_cached_block) } pub fn exists_block(&self, sst_id: HummockSstableObjectId, block_idx: u64) -> bool { - self.inner - .contains(Self::hash(sst_id, block_idx), &(sst_id, block_idx)) + // TODO(MrCroxx): optimize me + self.get(sst_id, block_idx).is_some() } pub fn insert( @@ -176,13 +183,13 @@ impl BlockCache { object_id: HummockSstableObjectId, block_idx: u64, block: Box, - priority: CachePriority, + priority: LruContext, ) -> BlockHolder { - BlockHolder::from_cached_block(self.inner.insert( + let charge = block.capacity(); + BlockHolder::from_cached_block(self.inner.insert_with_context( (object_id, block_idx), - Self::hash(object_id, block_idx), - block.capacity(), block, + charge, priority, )) } @@ -191,46 +198,38 @@ impl BlockCache { &self, object_id: HummockSstableObjectId, block_idx: u64, - priority: CachePriority, + priority: LruContext, mut fetch_block: F, ) -> BlockResponse where F: FnMut() -> Fut, Fut: Future>> + Send + 'static, { - let h = Self::hash(object_id, block_idx); let key = (object_id, block_idx); - let lookup_response = - self.inner - .lookup_with_request_dedup::<_, HummockError, _>(h, key, priority, || { - let f = fetch_block(); - async move { - let block = f.await?; - let len = block.capacity(); - Ok((block, len)) - } - }); - match lookup_response { - LookupResponse::Invalid => unreachable!(), - LookupResponse::Cached(entry) => { + + let entry = self.inner.entry(key, || { + let f = fetch_block(); + async move { + let block = f.await?; + let len = block.capacity(); + Ok::<_, HummockError>((block, len, Some(priority))) + } + }); + + match entry { + foyer::memory::cache::Entry::Hit(entry) => { BlockResponse::Block(BlockHolder::from_cached_block(entry)) } - LookupResponse::WaitPendingRequest(receiver) => { + foyer::memory::cache::Entry::Wait(receiver) => { BlockResponse::WaitPendingRequest(receiver) } - LookupResponse::Miss(join_handle) => BlockResponse::Miss(join_handle), + foyer::memory::cache::Entry::Miss(join_handle) => BlockResponse::Miss(join_handle), + foyer::memory::cache::Entry::Invalid => unreachable!(), } } - fn hash(object_id: HummockSstableObjectId, block_idx: u64) -> u64 { - let mut hasher = DefaultHasher::default(); - object_id.hash(&mut hasher); - block_idx.hash(&mut hasher); - hasher.finish() - } - pub fn size(&self) -> usize { - self.inner.get_memory_usage() + self.inner.usage() } #[cfg(any(test, feature = "test"))] diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index a06a1eaca676..2c3db3eb1488 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -17,10 +17,10 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; +use foyer::memory::eviction::lru::LruContext; use futures::future::try_join_all; use futures::{stream, StreamExt, TryFutureExt}; use itertools::Itertools; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -500,7 +500,7 @@ impl SharedBufferCompactRunner { options, super::TaskConfig { key_range, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH, watermark: GC_WATERMARK_FOR_FLUSH, stats_target_table_ids: None, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 9efb22675f07..5cf2d831d325 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKeyRangeRef}; use risingwave_hummock_sdk::{HummockEpoch, *}; use risingwave_pb::hummock::SstableInfo; -mod block_cache; +pub mod block_cache; pub use block_cache::*; pub mod file_cache; diff --git a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs index 35445b1138bf..23481d53a0f3 100644 --- a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering::{Equal, Less}; use std::sync::Arc; -use risingwave_common::cache::CachePriority; +use foyer::memory::eviction::lru::LruContext; use risingwave_hummock_sdk::key::FullKey; use crate::hummock::iterator::{Backward, HummockIterator}; @@ -67,7 +67,7 @@ impl BackwardSstableIterator { .get( &self.sst, idx as usize, - crate::hummock::CachePolicy::Fill(CachePriority::High), + crate::hummock::CachePolicy::Fill(LruContext::HighPriority), &mut self.stats, ) .await?; diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index fbb3c8785dc5..bc59c7e9737b 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -309,9 +309,9 @@ mod tests { use std::collections::Bound; use bytes::Bytes; + use foyer::memory::eviction::lru::LruContext; use itertools::Itertools; use rand::prelude::*; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, UserKey}; @@ -473,7 +473,7 @@ mod tests { TableKey(Bytes::from(end_key.user_key.table_key.0)), ); let options = Arc::new(SstableIteratorReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), must_iterated_end_user_key: Some(Bound::Included(uk.clone())), max_preload_retry_times: 0, prefetch_for_large_query: false, diff --git a/src/storage/src/hummock/sstable/xor_filter.rs b/src/storage/src/hummock/sstable/xor_filter.rs index 3ac7a19058ca..d7b32d563b79 100644 --- a/src/storage/src/hummock/sstable/xor_filter.rs +++ b/src/storage/src/hummock/sstable/xor_filter.rs @@ -442,8 +442,8 @@ impl Clone for XorFilterReader { #[cfg(test)] mod tests { + use foyer::memory::eviction::lru::LruContext; use rand::RngCore; - use risingwave_common::cache::CachePriority; use risingwave_hummock_sdk::EpochWithGap; use super::*; @@ -461,7 +461,7 @@ mod tests { let writer_opts = SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(LruContext::HighPriority), }; let opts = SstableBuilderOptions { capacity: 0, @@ -509,7 +509,7 @@ mod tests { .get_block_response( &sstable, idx, - CachePolicy::Fill(CachePriority::High), + CachePolicy::Fill(LruContext::HighPriority), &mut stat, ) .await diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index c603b7d8f503..1305a92feafe 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -18,14 +18,16 @@ use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use ahash::RandomState; use await_tree::InstrumentAwait; use bytes::Bytes; use fail::fail_point; +use foyer::memory::cache::{LruCache, LruCacheConfig, LruCacheEntry}; +use foyer::memory::eviction::lru::{LruConfig, LruContext}; +use foyer::memory::{Key, Value}; use futures::{future, StreamExt}; use itertools::Itertools; -use risingwave_common::cache::{ - CachePriority, LookupResponse, LruCacheEventListener, LruKey, LruValue, -}; +use risingwave_common::cache::LruCacheEventListener; use risingwave_common::config::StorageMemoryConfig; use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; @@ -48,16 +50,14 @@ use crate::hummock::block_stream::{ }; use crate::hummock::file_cache::preclude::*; use crate::hummock::multi_builder::UploadJoinHandle; -use crate::hummock::{ - BlockHolder, CacheableEntry, HummockError, HummockResult, LruCache, MemoryLimiter, -}; +use crate::hummock::{BlockHolder, HummockError, HummockResult, MemoryLimiter}; use crate::monitor::{HummockStateStoreMetrics, MemoryCollector, StoreLocalStatistic}; const MAX_META_CACHE_SHARD_BITS: usize = 2; const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; // 256MB -pub type TableHolder = CacheableEntry>; +pub type TableHolder = LruCacheEntry>; // TODO: Define policy based on use cases (read / compaction / ...). #[derive(Clone, Copy, Eq, PartialEq)] @@ -65,7 +65,7 @@ pub enum CachePolicy { /// Disable read cache and not fill the cache afterwards. Disable, /// Try reading the cache and fill the cache afterwards. - Fill(CachePriority), + Fill(LruContext), /// Fill file cache only. FillFileCache, /// Read the cache but not fill the cache afterwards. @@ -74,7 +74,7 @@ pub enum CachePolicy { impl Default for CachePolicy { fn default() -> Self { - CachePolicy::Fill(CachePriority::High) + CachePolicy::Fill(LruContext::HighPriority) } } @@ -142,17 +142,17 @@ impl LruCacheEventListener for MetaCacheEventListener { pub enum CachedOrShared where - K: LruKey, - V: LruValue, + K: Key, + V: Value, { - Cached(CacheableEntry>), + Cached(LruCacheEntry>), Shared(Arc), } impl Deref for CachedOrShared where - K: LruKey, - V: LruValue, + K: Key, + V: Value, { type Target = V; @@ -182,6 +182,7 @@ pub struct SstableStore { path: String, store: ObjectStoreRef, block_cache: BlockCache, + // TODO(MrCroxx): use no hash random state meta_cache: Arc>>, data_file_cache: FileCache, @@ -205,26 +206,41 @@ impl SstableStore { { shard_bits -= 1; } - let block_cache_listener = Arc::new(BlockCacheEventListener { - data_file_cache: config.data_file_cache.clone(), - metrics: config.state_store_metrics, - }); - let meta_cache_listener = Arc::new(MetaCacheEventListener(config.meta_file_cache.clone())); + // let block_cache_listener = Arc::new(BlockCacheEventListener { + // data_file_cache: config.data_file_cache.clone(), + // metrics: config.state_store_metrics, + // }); + // let meta_cache_listener = Arc::new(MetaCacheEventListener(config.meta_file_cache.clone())); Self { path: config.path, store: config.store, - block_cache: BlockCache::with_event_listener( + // block_cache: BlockCache::with_event_listener( + // config.block_cache_capacity, + // MAX_CACHE_SHARD_BITS, + // config.high_priority_ratio, + // block_cache_listener, + // ), + // meta_cache: Arc::new(LruCache::with_event_listener( + // shard_bits, + // config.meta_cache_capacity, + // 0, + // meta_cache_listener, + // )), + block_cache: BlockCache::new( config.block_cache_capacity, MAX_CACHE_SHARD_BITS, config.high_priority_ratio, - block_cache_listener, ), - meta_cache: Arc::new(LruCache::with_event_listener( - shard_bits, - config.meta_cache_capacity, - 0, - meta_cache_listener, - )), + // meta_cache: Arc::new(LruCache::new(shard_bits, config.meta_cache_capacity, 0)), + meta_cache: Arc::new(LruCache::new(LruCacheConfig { + capacity: config.meta_cache_capacity, + shards: 1 << shard_bits, + eviction_config: LruConfig { + high_priority_pool_ratio: 0.0, + }, + object_pool_capacity: (1 << shard_bits) * 1024, + hash_builder: RandomState::default(), + })), data_file_cache: config.data_file_cache, meta_file_cache: config.meta_file_cache, @@ -244,7 +260,15 @@ impl SstableStore { block_cache_capacity: usize, meta_cache_capacity: usize, ) -> Self { - let meta_cache = Arc::new(LruCache::new(0, meta_cache_capacity, 0)); + let meta_cache = Arc::new(LruCache::new(LruCacheConfig { + capacity: meta_cache_capacity, + shards: 1, + eviction_config: LruConfig { + high_priority_pool_ratio: 0.0, + }, + object_pool_capacity: 1024, + hash_builder: RandomState::default(), + })); Self { path, store, @@ -264,7 +288,7 @@ impl SstableStore { self.store .delete(self.get_sst_data_path(object_id).as_str()) .await?; - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); self.meta_file_cache .remove(&object_id) .map_err(HummockError::file_cache)?; @@ -286,7 +310,7 @@ impl SstableStore { // Delete from cache. for &object_id in object_id_list { - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); self.meta_file_cache .remove(&object_id) .map_err(HummockError::file_cache)?; @@ -296,7 +320,7 @@ impl SstableStore { } pub fn delete_cache(&self, object_id: HummockSstableObjectId) { - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); if let Err(e) = self.meta_file_cache.remove(&object_id) { tracing::warn!(error = %e.as_report(), "meta file cache remove error"); } @@ -408,7 +432,7 @@ impl SstableStore { let cache_priority = if idx == block_index { priority } else { - CachePriority::Low + LruContext::LowPriority }; self.block_cache .insert(object_id, idx as u64, Box::new(block), cache_priority) @@ -589,7 +613,7 @@ impl SstableStore { &self, sst_obj_id: HummockSstableObjectId, ) -> HummockResult>> { - if let Some(sst) = self.meta_cache.lookup(sst_obj_id, &sst_obj_id) { + if let Some(sst) = self.meta_cache.get(&sst_obj_id) { return Ok(Some(CachedOrShared::Cached(sst))); } @@ -612,50 +636,45 @@ impl SstableStore { stats: &mut StoreLocalStatistic, ) -> impl Future> + Send + 'static { let object_id = sst.get_object_id(); - let lookup_response = self - .meta_cache - .lookup_with_request_dedup::<_, HummockError, _>( - object_id, - object_id, - CachePriority::High, - || { - let meta_file_cache = self.meta_file_cache.clone(); - let store = self.store.clone(); - let meta_path = self.get_sst_data_path(object_id); - let stats_ptr = stats.remote_io_time.clone(); - let range = sst.meta_offset as usize..sst.file_size as usize; - async move { - if let Some(sst) = meta_file_cache - .lookup(&object_id) - .await - .map_err(HummockError::file_cache)? - { - // TODO(MrCroxx): Make meta cache receives Arc to reduce copy? - let sst: Box = sst.into(); - let charge = sst.estimate_size(); - return Ok((sst, charge)); - } - - let now = Instant::now(); - let buf = store.read(&meta_path, range).await?; - let meta = SstableMeta::decode(&buf[..])?; - - let sst = Sstable::new(object_id, meta); - let charge = sst.estimate_size(); - let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); - stats_ptr.fetch_add(add as u64, Ordering::Relaxed); - Ok((Box::new(sst), charge)) - } - }, - ); - match &lookup_response { - LookupResponse::Miss(_) | LookupResponse::WaitPendingRequest(_) => { + let entry = self.meta_cache.entry(object_id, || { + let meta_file_cache = self.meta_file_cache.clone(); + let store = self.store.clone(); + let meta_path = self.get_sst_data_path(object_id); + let stats_ptr = stats.remote_io_time.clone(); + let range = sst.meta_offset as usize..sst.file_size as usize; + async move { + if let Some(sst) = meta_file_cache + .lookup(&object_id) + .await + .map_err(HummockError::file_cache)? + { + // TODO(MrCroxx): Make meta cache receives Arc to reduce copy? + let sst: Box = sst.into(); + let charge = sst.estimate_size(); + return Ok((sst, charge, Some(LruContext::HighPriority))); + } + + let now = Instant::now(); + let buf = store.read(&meta_path, range).await?; + let meta = SstableMeta::decode(&buf[..])?; + + let sst = Sstable::new(object_id, meta); + let charge = sst.estimate_size(); + let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); + stats_ptr.fetch_add(add as u64, Ordering::Relaxed); + Ok((Box::new(sst), charge, Some(LruContext::HighPriority))) + } + }); + + match &entry { + foyer::memory::cache::Entry::Miss(_) | foyer::memory::cache::Entry::Wait(_) => { stats.cache_meta_block_miss += 1; } - _ => (), + _ => {} } + stats.cache_meta_block_total += 1; - lookup_response.verbose_instrument_await("sstable") + entry.verbose_instrument_await("sstable") } pub async fn list_object_metadata_from_object_store( @@ -680,12 +699,11 @@ impl SstableStore { pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) { let sst = Sstable::new(object_id, meta); let charge = sst.estimate_size(); - self.meta_cache.insert( + self.meta_cache.insert_with_context( object_id, - object_id, - charge, Box::new(sst), - CachePriority::High, + charge, + LruContext::HighPriority, ); } @@ -699,11 +717,11 @@ impl SstableStore { filter.extend([(object_id, usize::MAX), (object_id, block_index as usize)]); } self.block_cache - .insert(object_id, block_index, block, CachePriority::High); + .insert(object_id, block_index, block, LruContext::HighPriority); } pub fn get_meta_memory_usage(&self) -> u64 { - self.meta_cache.get_memory_usage() as u64 + self.meta_cache.usage() as u64 } pub async fn get_stream_for_blocks( diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index d6c2ce901178..2cd009da8d10 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -17,9 +17,9 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; +use foyer::memory::eviction::lru::LruContext; use futures::{Stream, TryStreamExt}; use itertools::Itertools; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; @@ -322,7 +322,7 @@ pub async fn gen_test_sstable_with_range_tombstone( kv_iter, range_tombstones, sstable_store.clone(), - CachePolicy::Fill(CachePriority::High), + CachePolicy::Fill(LruContext::HighPriority), ) .await } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 09c4a7eef0e4..4668826d286b 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use risingwave_common::cache::CachePriority; +use foyer::memory::eviction::lru::LruContext; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, @@ -387,7 +387,7 @@ pub(crate) async fn do_insert_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; let stored_value = inner.get(key.clone(), epoch, read_options).await?; @@ -419,7 +419,7 @@ pub(crate) async fn do_delete_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; match inner.get(key.clone(), epoch, read_options).await? { @@ -461,7 +461,7 @@ pub(crate) async fn do_update_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 4caa43725172..1ec83cfedf98 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use auto_enums::auto_enum; use await_tree::InstrumentAwait; use bytes::Bytes; +use foyer::memory::eviction::lru::LruContext; use futures::future::try_join_all; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowExt}; @@ -366,7 +366,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? { @@ -453,8 +453,8 @@ impl StorageTableInner { ) { // To prevent unbounded range scan queries from polluting the block cache, use the // low priority fill policy. - (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CachePriority::Low), - _ => CachePolicy::Fill(CachePriority::High), + (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(LruContext::LowPriority), + _ => CachePolicy::Fill(LruContext::HighPriority), }; let table_key_ranges = { diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 389b88aed043..4533f89ae89e 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -28,6 +28,7 @@ educe = "0.5" either = "1" enum-as-inner = "0.6" fail = "0.5" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } governor = { version = "0.6", default-features = false, features = [ diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index ed8b14ead516..897501da82b5 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -18,12 +18,12 @@ use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; +use foyer::memory::eviction::lru::LruContext; use futures::future::{try_join_all, BoxFuture}; use futures::stream::select_all; use futures::{FutureExt, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; @@ -216,7 +216,7 @@ impl KvLogStoreReader { ReadOptions { // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::Low), + cache_policy: CachePolicy::Fill(LruContext::LowPriority), table_id, ..Default::default() }, @@ -358,7 +358,7 @@ impl LogReader for KvLogStoreReader { ReadOptions { prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::Low), + cache_policy: CachePolicy::Fill(LruContext::LowPriority), table_id, ..Default::default() }, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 81ab622d3e61..917728462772 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -20,13 +20,13 @@ use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; +use foyer::memory::eviction::lru::LruContext; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; @@ -744,7 +744,7 @@ where prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; @@ -1299,7 +1299,7 @@ where retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, prefetch_options, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; @@ -1423,7 +1423,7 @@ where let read_options = ReadOptions { prefix_hint, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }; diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index 786ddb269fbd..232e7ce5b039 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1" async-trait = "0.1" bytes = "1" clap = { version = "4", features = ["derive"] } +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } prometheus = { version = "0.13" } rand = "0.8" diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index d1606d4e8590..4894d344a040 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -23,8 +23,8 @@ use std::time::Duration; use anyhow::anyhow; use bytes::{BufMut, Bytes, BytesMut}; use clap::Parser; +use foyer::memory::eviction::lru::LruContext; use futures::TryStreamExt; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::config::{ extract_storage_memory_config, load_config, MetaConfig, NoOverride, @@ -633,7 +633,7 @@ async fn open_hummock_iters( epoch, ReadOptions { table_id: TableId { table_id }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 2f501ff98591..ae4784d4f2ab 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use bytes::Bytes; +use foyer::memory::eviction::lru::LruContext; use futures::StreamExt; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::config::{ extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, @@ -438,7 +438,7 @@ impl NormalState { ReadOptions { ignore_range_tombstone, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -464,7 +464,7 @@ impl NormalState { table_id: self.table_id, read_version_from_backup: false, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) @@ -495,7 +495,7 @@ impl CheckState for NormalState { table_id: self.table_id, read_version_from_backup: false, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(LruContext::HighPriority), ..Default::default() }, ) diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0c7df07fa5b7..82018a6f93f1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -41,6 +41,7 @@ clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4", features = ["tokio"] } crossbeam-epoch = { version = "0.9" } +crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] } deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] } @@ -125,7 +126,7 @@ serde_json = { version = "1", features = ["alloc", "raw_value"] } serde_with = { version = "3", features = ["json"] } sha1 = { version = "0.10" } sha2 = { version = "0.10", features = ["oid"] } -smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } +smallvec = { version = "1", default-features = false, features = ["const_new", "serde", "union", "write"] } sqlx = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "mysql", "postgres", "runtime-tokio-native-tls", "rust_decimal", "sqlite", "time", "uuid"] } sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } sqlx-mysql = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] }