From 0e16ac487fa9e7265f89a263d0cb6a066442f33f Mon Sep 17 00:00:00 2001 From: Croxx Date: Thu, 14 Mar 2024 10:43:06 +0800 Subject: [PATCH] feat(storage): replace risingwave lru cache with foyer lru cache for lsm cache (#15382) Signed-off-by: MrCroxx --- Cargo.lock | 141 +++++----- Cargo.toml | 7 +- src/batch/Cargo.toml | 1 + src/batch/src/executor/hash_agg.rs | 1 + src/batch/src/executor/insert.rs | 4 +- src/storage/Cargo.toml | 3 +- src/storage/benches/bench_compactor.rs | 10 +- src/storage/hummock_test/Cargo.toml | 1 + .../benches/bench_hummock_iter.rs | 4 +- .../hummock_test/src/compactor_tests.rs | 14 +- .../hummock_test/src/failpoint_tests.rs | 14 +- .../hummock_test/src/hummock_storage_tests.rs | 86 +++--- .../hummock_test/src/snapshot_tests.rs | 4 +- .../hummock_test/src/state_store_tests.rs | 72 ++--- .../hummock_test/src/sync_point_tests.rs | 4 +- src/storage/hummock_trace/Cargo.toml | 1 + src/storage/hummock_trace/src/opts.rs | 19 ++ src/storage/src/hummock/block_cache.rs | 146 ++++------ .../compactor/shared_buffer_compact.rs | 4 +- src/storage/src/hummock/file_cache/store.rs | 1 - src/storage/src/hummock/mod.rs | 2 +- .../sstable/backward_sstable_iterator.rs | 4 +- .../sstable/forward_sstable_iterator.rs | 4 +- src/storage/src/hummock/sstable/xor_filter.rs | 6 +- src/storage/src/hummock/sstable_store.rs | 251 +++++++++++------- src/storage/src/hummock/test_utils.rs | 4 +- src/storage/src/hummock/utils.rs | 8 +- .../src/table/batch_table/storage_table.rs | 8 +- src/stream/Cargo.toml | 1 + .../log_store_impl/kv_log_store/reader.rs | 6 +- src/stream/src/common/table/state_table.rs | 8 +- src/tests/compaction_test/Cargo.toml | 1 + .../src/compaction_test_runner.rs | 4 +- .../src/delete_range_runner.rs | 8 +- src/workspace-hack/Cargo.toml | 3 +- 35 files changed, 448 insertions(+), 407 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fc8775d64327..c0e821669fda0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -319,7 +319,7 @@ dependencies = [ "chrono", "chrono-tz", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "num", ] @@ -335,7 +335,7 @@ dependencies = [ "arrow-schema 50.0.0", "chrono", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "num", ] @@ -550,7 +550,7 @@ dependencies = [ "arrow-data 48.0.1", "arrow-schema 48.0.1", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -565,7 +565,7 @@ dependencies = [ "arrow-data 50.0.0", "arrow-schema 50.0.0", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -2157,6 +2157,15 @@ dependencies = [ "paste", ] +[[package]] +name = "cmsketch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17a94b4d5dca45a6aa85960fbb1d60096203102716732e8997515cc8ff2b6fff" +dependencies = [ + "paste", +] + [[package]] name = "coarsetime" version = "0.1.23" @@ -2394,7 +2403,7 @@ dependencies = [ "cranelift-entity", "cranelift-isle", "gimli", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "log", "regalloc2", "smallvec", @@ -2935,7 +2944,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core 0.9.8", @@ -2980,7 +2989,7 @@ dependencies = [ "futures", "glob", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "itertools 0.11.0", "log", @@ -3031,7 +3040,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "log", "object_store", "parking_lot 0.12.1", @@ -3067,7 +3076,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "itertools 0.11.0", "log", "regex-syntax 0.8.2", @@ -3092,7 +3101,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hex", "indexmap 2.0.0", "itertools 0.11.0", @@ -3127,7 +3136,7 @@ dependencies = [ "datafusion-physical-expr", "futures", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "itertools 0.11.0", "log", @@ -4089,21 +4098,22 @@ dependencies = [ [[package]] name = "foyer" -version = "0.3.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caed9c227536e3b6fb92aea800c44de7516ecc339a11eb01544549dd6f43cb69" +checksum = "15240094bab62dfeb59bb974e2fb7bff18727c6dc7bd1f5f6fc82a9e6fda5a38" dependencies = [ - "foyer-common 0.2.0", + "foyer-common", "foyer-intrusive", + "foyer-memory", "foyer-storage", "foyer-workspace-hack", ] [[package]] name = "foyer-common" -version = "0.1.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e441ea6dcf68cc0626535868343f838c803a75fd7686c3895e2c99c9c8f11d60" +checksum = "ad95f985cfbd5a8bf9f115d963918e742dfbb75350febff509b09cf194842b0c" dependencies = [ "anyhow", "bytes", @@ -4116,51 +4126,54 @@ dependencies = [ ] [[package]] -name = "foyer-common" -version = "0.2.0" +name = "foyer-intrusive" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f10010b8cb0f380cc30ff8c82ec7f6addc2ced72b3758d336f351831a56a4a59" +checksum = "acf22ed0dfa6315714099046504711d5563d191a442afcd87bd4d0bf5010bb9a" dependencies = [ - "anyhow", "bytes", + "cmsketch 0.1.5", + "foyer-common", "foyer-workspace-hack", "itertools 0.12.0", - "madsim-tokio", + "memoffset", "parking_lot 0.12.1", "paste", "tracing", + "twox-hash", ] [[package]] -name = "foyer-intrusive" +name = "foyer-memory" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f967c3c40435a621fb1b8670d8f6501d67e79cea08b1ce07c150147aabdc588" +checksum = "a694cde4dd2c9fdd8cc2d294fcb265fdd6be1ee62fa35c37e9fc328871f33b6d" dependencies = [ - "bytes", - "cmsketch", - "foyer-common 0.1.0", + "ahash 0.8.6", + "bitflags 2.4.0", + "cmsketch 0.2.0", + "crossbeam", + "foyer-intrusive", "foyer-workspace-hack", + "futures", + "hashbrown 0.14.3", "itertools 0.12.0", - "memoffset", + "libc", + "madsim-tokio", "parking_lot 0.12.1", - "paste", - "tracing", - "twox-hash", ] [[package]] name = "foyer-storage" -version = "0.2.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df47c9672ac893c172832c79c41bc4be7b24f71376fb00be461418c7a4d1cfe" +checksum = "8fe0d0d6090461ad435dc2316e9800f336faa6b858d6c85dfb54091cb4cf127e" dependencies = [ "anyhow", "bitflags 2.4.0", "bitmaps", "bytes", - "cmsketch", - "foyer-common 0.2.0", + "foyer-common", "foyer-intrusive", "foyer-workspace-hack", "futures", @@ -4169,7 +4182,7 @@ dependencies = [ "lz4", "madsim-tokio", "memoffset", - "nix 0.27.1", + "nix 0.28.0", "parking_lot 0.12.1", "paste", "prometheus", @@ -4182,11 +4195,13 @@ dependencies = [ [[package]] name = "foyer-workspace-hack" -version = "0.1.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72699c6a83a9e76c13d1196518be3c7d85183e62507a3c0e248d556a7e875072" +checksum = "77a5ee5ab890d1893232fd44af33197c7499813816b8880649a15ee9438d5e07" dependencies = [ + "ahash 0.8.6", "cc", + "crossbeam-channel", "crossbeam-utils", "either", "futures-channel", @@ -4194,16 +4209,16 @@ dependencies = [ "futures-executor", "futures-sink", "futures-util", + "hashbrown 0.14.3", "libc", - "log", - "memchr", "parking_lot 0.12.1", "parking_lot_core 0.9.8", + "proc-macro2", + "quote", "rand", + "smallvec", + "syn 2.0.48", "tokio", - "tokio-util", - "tower", - "tracing", "tracing-core", ] @@ -4820,9 +4835,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ "ahash 0.8.6", "allocator-api2", @@ -4834,7 +4849,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -5175,7 +5190,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "serde", ] @@ -5797,7 +5812,7 @@ name = "lru" version = "0.7.6" source = "git+https://github.com/risingwavelabs/lru-rs.git?rev=95f347b#95f347b5ca3c947335a51adf40fa86178ef4d60d" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -5815,7 +5830,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -6407,17 +6422,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.0", - "cfg-if", - "libc", -] - [[package]] name = "nix" version = "0.28.0" @@ -6679,7 +6683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "crc32fast", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "indexmap 2.0.0", "memchr", ] @@ -6954,7 +6958,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f" dependencies = [ "dlv-list", - "hashbrown 0.14.0", + "hashbrown 0.14.3", ] [[package]] @@ -7147,7 +7151,7 @@ dependencies = [ "chrono", "flate2", "futures", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lz4_flex", "num", "num-bigint", @@ -7182,7 +7186,7 @@ dependencies = [ "flate2", "futures", "half 2.3.1", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "lz4_flex", "num", "num-bigint", @@ -8718,10 +8722,11 @@ dependencies = [ "async-trait", "criterion", "either", + "foyer", "futures", "futures-async-stream", "futures-util", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hytra", "icelake", "itertools 0.12.0", @@ -9017,6 +9022,7 @@ dependencies = [ "async-trait", "bytes", "clap", + "foyer", "futures", "madsim-tokio", "prometheus", @@ -9522,6 +9528,7 @@ dependencies = [ "criterion", "expect-test", "fail", + "foyer", "futures", "futures-async-stream", "itertools 0.12.0", @@ -9552,6 +9559,7 @@ dependencies = [ "bincode 2.0.0-rc.3", "byteorder", "bytes", + "foyer", "futures", "futures-async-stream", "itertools 0.12.0", @@ -10063,6 +10071,7 @@ dependencies = [ name = "risingwave_storage" version = "1.7.0-alpha" dependencies = [ + "ahash 0.8.6", "anyhow", "arc-swap", "async-trait", @@ -10145,6 +10154,7 @@ dependencies = [ "enum-as-inner", "expect-test", "fail", + "foyer", "futures", "futures-async-stream", "governor", @@ -14059,6 +14069,7 @@ dependencies = [ "clap_builder", "combine", "crossbeam-epoch", + "crossbeam-queue", "crossbeam-utils", "crypto-bigint 0.5.5", "deranged", @@ -14080,7 +14091,7 @@ dependencies = [ "getrandom", "governor", "hashbrown 0.13.2", - "hashbrown 0.14.0", + "hashbrown 0.14.3", "hmac", "hyper", "indexmap 1.9.3", diff --git a/Cargo.toml b/Cargo.toml index 38ee801b39a9a..33164cc2de104 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ license = "Apache-2.0" repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] +foyer = "0.6" await-tree = "0.1.1" aws-config = { version = "1", default-features = false, features = [ "behavior-version-latest", @@ -113,11 +114,7 @@ hytra = "0.1" rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [ "cmake-build", ] } -hashbrown = { version = "0.14.0", features = [ - "ahash", - "inline-more", - "nightly", -] } +hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] } criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index fa18fa0c83717..280c98020fd97 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -21,6 +21,7 @@ assert_matches = "1" async-recursion = "1" async-trait = "0.1" either = "1" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } futures-util = "0.3" diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index e4e6a32e618d8..6acd3de2f54f2 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -466,6 +466,7 @@ mod tests { /// A test to verify that `HashMap` may leak memory counter when using `into_iter`. #[test] + #[should_panic] // TODO(MrCroxx): This bug is fixed and the test should panic. Remove the test and fix the related code later. fn test_hashmap_into_iter_bug() { let dropped: Arc = Arc::new(AtomicBool::new(false)); diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 951aec3573e9e..aa4063be84bb5 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -267,10 +267,10 @@ mod tests { use std::sync::Arc; use assert_matches::assert_matches; + use foyer::memory::CacheContext; use futures::StreamExt; use itertools::Itertools; use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray}; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, }; @@ -400,7 +400,7 @@ mod tests { epoch, None, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 544afb84c163a..fb65be5796d54 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +ahash = "0.8" anyhow = "1" arc-swap = "1" async-trait = "0.1" @@ -26,7 +27,7 @@ dyn-clone = "1.0.14" either = "1" enum-as-inner = "0.6" fail = "0.5" -foyer = "0.3" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hex = "0.4" diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index cadbad5cad85b..edab0d067778d 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; @@ -78,7 +78,7 @@ pub fn default_writer_opts() -> SstableWriterOptions { SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(CacheContext::Default), } } @@ -115,7 +115,7 @@ async fn build_table( SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(CacheContext::Default), }, ); let mut builder = @@ -159,7 +159,7 @@ async fn build_table_2( SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(CacheContext::Default), }, ); let mut builder = @@ -313,7 +313,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { }); let level2 = vec![info1, info2]; let read_options = Arc::new(SstableIteratorReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), prefetch_for_large_query: false, must_iterated_end_user_key: None, max_preload_retry_times: 0, diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index 3d78adf20fca7..1596ea6356ee9 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" bytes = { version = "1" } clap = { version = "4", features = ["derive"] } fail = "0.5" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = "0.2.9" itertools = "0.12" diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index c0d597ac800d9..059853433a2fc 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; +use foyer::memory::CacheContext; use futures::pin_mut; -use risingwave_common::cache::CachePriority; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::HummockEpoch; @@ -110,7 +110,7 @@ fn criterion_benchmark(c: &mut Criterion) { ReadOptions { ignore_range_tombstone: true, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, )) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 9afe31fc59d34..3718b06f00fe5 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -21,10 +21,10 @@ pub(crate) mod tests { use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; + use foyer::memory::CacheContext; use itertools::Itertools; use rand::{Rng, RngCore, SeedableRng}; use risingwave_common::buffer::BitmapBuilder; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::constants::hummock::CompactionFilterFlag; use risingwave_common::hash::VirtualNode; @@ -383,7 +383,7 @@ pub(crate) mod tests { TableKey(key.clone()), read_epoch, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -397,7 +397,7 @@ pub(crate) mod tests { ((TEST_WATERMARK - 1) * 1000) << 16, ReadOptions { prefix_hint: Some(key.clone()), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -521,7 +521,7 @@ pub(crate) mod tests { TableKey(key.clone()), get_epoch, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -871,7 +871,7 @@ pub(crate) mod tests { ReadOptions { table_id: TableId::from(existing_table_ids), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1070,7 +1070,7 @@ pub(crate) mod tests { ReadOptions { table_id: TableId::from(existing_table_id), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1270,7 +1270,7 @@ pub(crate) mod tests { prefix_hint: Some(Bytes::from(bloom_filter_key)), table_id: TableId::from(existing_table_id), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 8112e99358d8c..98fe2759abfcb 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -16,7 +16,7 @@ use std::ops::Bound; use std::sync::Arc; use bytes::{BufMut, Bytes}; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; @@ -114,7 +114,7 @@ async fn test_failpoints_state_store_read_upload() { 1, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -167,7 +167,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -182,7 +182,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { table_id: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -201,7 +201,7 @@ async fn test_failpoints_state_store_read_upload() { 2, ReadOptions { prefix_hint: Some(Bytes::from(bee_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -239,7 +239,7 @@ async fn test_failpoints_state_store_read_upload() { 5, ReadOptions { prefix_hint: Some(Bytes::from(anchor_prefix_hint)), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -256,7 +256,7 @@ async fn test_failpoints_state_store_read_upload() { 5, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 6d11d9c6bf83a..e7b03fa61c504 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -16,10 +16,10 @@ use std::ops::Range; use std::sync::Arc; use bytes::{BufMut, Bytes}; +use foyer::memory::CacheContext; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; @@ -123,7 +123,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -138,7 +138,7 @@ async fn test_storage_basic() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -155,7 +155,7 @@ async fn test_storage_basic() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -185,7 +185,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -216,7 +216,7 @@ async fn test_storage_basic() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -233,7 +233,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -253,7 +253,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -294,7 +294,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -312,7 +312,7 @@ async fn test_storage_basic() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -331,7 +331,7 @@ async fn test_storage_basic() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -385,7 +385,7 @@ async fn test_storage_basic() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -560,7 +560,7 @@ async fn test_state_store_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -602,7 +602,7 @@ async fn test_state_store_sync() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -625,7 +625,7 @@ async fn test_state_store_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -679,7 +679,7 @@ async fn test_state_store_sync() { epoch2, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -811,7 +811,7 @@ async fn test_delete_get() { epoch2, ReadOptions { prefix_hint: None, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() } ) @@ -908,7 +908,7 @@ async fn test_multiple_epoch_sync() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -924,7 +924,7 @@ async fn test_multiple_epoch_sync() { ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -938,7 +938,7 @@ async fn test_multiple_epoch_sync() { epoch3, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1039,7 +1039,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1065,7 +1065,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1090,7 +1090,7 @@ async fn test_iter_with_min_epoch() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1134,7 +1134,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1160,7 +1160,7 @@ async fn test_iter_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1187,7 +1187,7 @@ async fn test_iter_with_min_epoch() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1299,7 +1299,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1334,7 +1334,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1370,7 +1370,7 @@ async fn test_hummock_version_reader() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1430,7 +1430,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1474,7 +1474,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1510,7 +1510,7 @@ async fn test_hummock_version_reader() { table_id: TEST_TABLE_ID, retention_seconds: Some(0), prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1545,7 +1545,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1583,7 +1583,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1615,7 +1615,7 @@ async fn test_hummock_version_reader() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, read_snapshot, @@ -1703,7 +1703,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefetch_options: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1721,7 +1721,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1739,7 +1739,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1759,7 +1759,7 @@ async fn test_get_with_min_epoch() { retention_seconds: Some(0), prefix_hint: Some(Bytes::from(prefix_hint.clone())), prefetch_options: Default::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1801,7 +1801,7 @@ async fn test_get_with_min_epoch() { epoch1, ReadOptions { table_id: TEST_TABLE_ID, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1820,7 +1820,7 @@ async fn test_get_with_min_epoch() { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1840,7 +1840,7 @@ async fn test_get_with_min_epoch() { ReadOptions { table_id: TEST_TABLE_ID, prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -1861,7 +1861,7 @@ async fn test_get_with_min_epoch() { retention_seconds: Some(0), prefix_hint: Some(Bytes::from(prefix_hint.clone())), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 7dda089d897a8..c019f12a0268b 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -15,7 +15,7 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::prefixed_range_with_vnode; @@ -52,7 +52,7 @@ macro_rules! assert_count_range_scan { $epoch, ReadOptions { prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 5b97dde202029..ff6385e35ab1e 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use bytes::Bytes; use expect_test::expect; +use foyer::memory::CacheContext; use futures::{pin_mut, StreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -49,7 +49,7 @@ async fn test_empty_read_v2() { u64::MAX, ReadOptions { table_id: TableId { table_id: 2333 }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -65,7 +65,7 @@ async fn test_empty_read_v2() { u64::MAX, ReadOptions { table_id: TableId { table_id: 2333 }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -170,7 +170,7 @@ async fn test_basic_inner( anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -183,7 +183,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -198,7 +198,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "ab"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -227,7 +227,7 @@ async fn test_basic_inner( anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -257,7 +257,7 @@ async fn test_basic_inner( anchor.clone(), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -271,7 +271,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "ff"), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -291,7 +291,7 @@ async fn test_basic_inner( ), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -306,7 +306,7 @@ async fn test_basic_inner( anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -321,7 +321,7 @@ async fn test_basic_inner( anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -342,7 +342,7 @@ async fn test_basic_inner( epoch2, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -364,7 +364,7 @@ async fn test_basic_inner( epoch3, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -387,7 +387,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -400,7 +400,7 @@ async fn test_basic_inner( gen_key_from_str(VirtualNode::ZERO, "dd"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -600,7 +600,7 @@ async fn test_reload_storage() { anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -615,7 +615,7 @@ async fn test_reload_storage() { gen_key_from_str(VirtualNode::ZERO, "ab"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -643,7 +643,7 @@ async fn test_reload_storage() { anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -665,7 +665,7 @@ async fn test_reload_storage() { epoch1, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -680,7 +680,7 @@ async fn test_reload_storage() { anchor.clone(), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -695,7 +695,7 @@ async fn test_reload_storage() { anchor.clone(), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -716,7 +716,7 @@ async fn test_reload_storage() { epoch2, ReadOptions { prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -752,7 +752,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "aa"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -767,7 +767,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "bb"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -782,7 +782,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "cc"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -799,7 +799,7 @@ async fn test_reload_storage() { // ), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // }, // ) @@ -884,7 +884,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "aa"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -898,7 +898,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "bb"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -912,7 +912,7 @@ async fn test_reload_storage() { // gen_key_from_str(VirtualNode::ZERO, "cc"), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // } // ) @@ -928,7 +928,7 @@ async fn test_reload_storage() { // ), // epoch, // ReadOptions { -// cache_policy: CachePolicy::Fill(CachePriority::High), +// cache_policy: CachePolicy::Fill(CacheContext::Default), // ..Default::default() // }, // ) @@ -1101,7 +1101,7 @@ async fn test_delete_get_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() } ) @@ -1197,7 +1197,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() } ) @@ -1211,7 +1211,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch2, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() } ) @@ -1224,7 +1224,7 @@ async fn test_multiple_epoch_sync_inner( gen_key_from_str(VirtualNode::ZERO, "bb"), epoch3, ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() } ) @@ -1380,7 +1380,7 @@ async fn test_replicated_local_hummock_storage() { table_id: TableId { table_id: TEST_TABLE_ID.table_id, }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 313bb48c60afd..655852451d93b 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::catalog::hummock::CompactionFilterFlag; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; @@ -431,7 +431,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { storage.wait_version(version).await; let read_options = ReadOptions { table_id: TableId::from(existing_table_id), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; let get_result = storage diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 61c4b2ea30f1c..ec6c9aa5ed8b9 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -23,6 +23,7 @@ risingwave_pb = { workspace = true } thiserror = "1.0" tokio = { version = "0.2", package = "madsim-tokio" } tracing = "0.1" +foyer ={ workspace = true } [dev-dependencies] itertools = "0.12" diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 966fd62d34ddc..a0073d620a5bc 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -13,6 +13,7 @@ // limitations under the License. use bincode::{Decode, Encode}; +use foyer::memory::CacheContext; use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; @@ -60,6 +61,24 @@ impl From for CachePriority { } } +impl From for TracedCachePriority { + fn from(value: CacheContext) -> Self { + match value { + CacheContext::Default => Self::High, + CacheContext::LruPriorityLow => Self::Low, + } + } +} + +impl From for CacheContext { + fn from(value: TracedCachePriority) -> Self { + match value { + TracedCachePriority::High => Self::Default, + TracedCachePriority::Low => Self::LruPriorityLow, + } + } +} + #[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub struct TracedTableId { pub table_id: u32, diff --git a/src/storage/src/hummock/block_cache.rs b/src/storage/src/hummock/block_cache.rs index 9023f045c0557..81fa7c2dc8ee0 100644 --- a/src/storage/src/hummock/block_cache.rs +++ b/src/storage/src/hummock/block_cache.rs @@ -12,26 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::ops::Deref; use std::sync::Arc; +use ahash::RandomState; use await_tree::InstrumentAwait; -use futures::Future; -use risingwave_common::cache::{ - CachePriority, CacheableEntry, LookupResponse, LruCache, LruCacheEventListener, +use foyer::memory::{ + Cache, CacheContext, CacheEntry, Entry, EntryState, LruCacheConfig, LruConfig, }; +use futures::Future; use risingwave_hummock_sdk::HummockSstableObjectId; -use tokio::sync::oneshot::Receiver; -use tokio::task::JoinHandle; -use super::{Block, HummockResult}; +use super::{Block, BlockCacheEventListener, HummockResult}; use crate::hummock::HummockError; -const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; +type CachedBlockEntry = + CacheEntry<(HummockSstableObjectId, u64), Box, BlockCacheEventListener>; -type CachedBlockEntry = CacheableEntry<(HummockSstableObjectId, u64), Box>; +const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; enum BlockEntry { Cache(#[allow(dead_code)] CachedBlockEntry), @@ -62,7 +60,7 @@ impl BlockHolder { } pub fn from_cached_block(entry: CachedBlockEntry) -> Self { - let ptr = entry.as_ref() as *const _; + let ptr = entry.deref().as_ref() as *const _; Self { _handle: BlockEntry::Cache(entry), block: ptr, @@ -81,62 +79,43 @@ impl Deref for BlockHolder { unsafe impl Send for BlockHolder {} unsafe impl Sync for BlockHolder {} -type BlockCacheEventListener = - Arc>>; - #[derive(Clone)] pub struct BlockCache { - inner: Arc>>, + inner: Cache<(HummockSstableObjectId, u64), Box, BlockCacheEventListener>, } pub enum BlockResponse { Block(BlockHolder), - WaitPendingRequest(Receiver), - Miss(JoinHandle>), + Entry(Entry<(HummockSstableObjectId, u64), Box, HummockError, BlockCacheEventListener>), } impl BlockResponse { pub async fn wait(self) -> HummockResult { - match self { - BlockResponse::Block(block_holder) => Ok(block_holder), - BlockResponse::WaitPendingRequest(receiver) => receiver + let entry = match self { + BlockResponse::Block(block) => return Ok(block), + BlockResponse::Entry(entry) => entry, + }; + match entry.state() { + EntryState::Hit => entry.await.map(BlockHolder::from_cached_block), + EntryState::Wait => entry .verbose_instrument_await("wait_pending_fetch_block") .await - .map_err(|recv_error| recv_error.into()) .map(BlockHolder::from_cached_block), - BlockResponse::Miss(join_handle) => join_handle + EntryState::Miss => entry .verbose_instrument_await("fetch_block") .await - .unwrap() .map(BlockHolder::from_cached_block), } } } impl BlockCache { - pub fn new(capacity: usize, max_shard_bits: usize, high_priority_ratio: usize) -> Self { - Self::new_inner(capacity, max_shard_bits, high_priority_ratio, None) - } - - pub fn with_event_listener( - capacity: usize, - max_shard_bits: usize, - high_priority_ratio: usize, - listener: BlockCacheEventListener, - ) -> Self { - Self::new_inner( - capacity, - max_shard_bits, - high_priority_ratio, - Some(listener), - ) - } - - fn new_inner( + // TODO(MrCroxx): support other cache algorithm + pub fn new( capacity: usize, mut max_shard_bits: usize, high_priority_ratio: usize, - listener: Option, + event_listener: BlockCacheEventListener, ) -> Self { if capacity == 0 { panic!("block cache capacity == 0"); @@ -144,31 +123,31 @@ impl BlockCache { while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 { max_shard_bits -= 1; } + let shards = 1 << max_shard_bits; - let cache = match listener { - Some(listener) => LruCache::with_event_listener( - max_shard_bits, - capacity, - high_priority_ratio, - listener, - ), - None => LruCache::new(max_shard_bits, capacity, high_priority_ratio), - }; - - Self { - inner: Arc::new(cache), - } + let cache = Cache::lru(LruCacheConfig { + capacity, + shards, + eviction_config: LruConfig { + high_priority_pool_ratio: high_priority_ratio as f64 / 100.0, + }, + object_pool_capacity: shards * 1024, + hash_builder: RandomState::default(), + event_listener, + }); + + Self { inner: cache } } pub fn get(&self, object_id: HummockSstableObjectId, block_idx: u64) -> Option { self.inner - .lookup(Self::hash(object_id, block_idx), &(object_id, block_idx)) + .get(&(object_id, block_idx)) .map(BlockHolder::from_cached_block) } pub fn exists_block(&self, sst_id: HummockSstableObjectId, block_idx: u64) -> bool { - self.inner - .contains(Self::hash(sst_id, block_idx), &(sst_id, block_idx)) + // TODO(MrCroxx): optimize me + self.get(sst_id, block_idx).is_some() } pub fn insert( @@ -176,14 +155,14 @@ impl BlockCache { object_id: HummockSstableObjectId, block_idx: u64, block: Box, - priority: CachePriority, + context: CacheContext, ) -> BlockHolder { - BlockHolder::from_cached_block(self.inner.insert( + let charge = block.capacity(); + BlockHolder::from_cached_block(self.inner.insert_with_context( (object_id, block_idx), - Self::hash(object_id, block_idx), - block.capacity(), block, - priority, + charge, + context, )) } @@ -191,46 +170,29 @@ impl BlockCache { &self, object_id: HummockSstableObjectId, block_idx: u64, - priority: CachePriority, + context: CacheContext, mut fetch_block: F, ) -> BlockResponse where F: FnMut() -> Fut, Fut: Future>> + Send + 'static, { - let h = Self::hash(object_id, block_idx); let key = (object_id, block_idx); - let lookup_response = - self.inner - .lookup_with_request_dedup::<_, HummockError, _>(h, key, priority, || { - let f = fetch_block(); - async move { - let block = f.await?; - let len = block.capacity(); - Ok((block, len)) - } - }); - match lookup_response { - LookupResponse::Invalid => unreachable!(), - LookupResponse::Cached(entry) => { - BlockResponse::Block(BlockHolder::from_cached_block(entry)) - } - LookupResponse::WaitPendingRequest(receiver) => { - BlockResponse::WaitPendingRequest(receiver) + + let entry = self.inner.entry(key, || { + let f = fetch_block(); + async move { + let block = f.await?; + let len = block.capacity(); + Ok::<_, HummockError>((block, len, context)) } - LookupResponse::Miss(join_handle) => BlockResponse::Miss(join_handle), - } - } + }); - fn hash(object_id: HummockSstableObjectId, block_idx: u64) -> u64 { - let mut hasher = DefaultHasher::default(); - object_id.hash(&mut hasher); - block_idx.hash(&mut hasher); - hasher.finish() + BlockResponse::Entry(entry) } pub fn size(&self) -> usize { - self.inner.get_memory_usage() + self.inner.usage() } #[cfg(any(test, feature = "test"))] diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index cdc88ae68c439..dea4f017f4d48 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -17,10 +17,10 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; +use foyer::memory::CacheContext; use futures::future::try_join_all; use futures::{stream, StreamExt, TryFutureExt}; use itertools::Itertools; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -494,7 +494,7 @@ impl SharedBufferCompactRunner { options, super::TaskConfig { key_range, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH, watermark: GC_WATERMARK_FOR_FLUSH, stats_target_table_ids: None, diff --git a/src/storage/src/hummock/file_cache/store.rs b/src/storage/src/hummock/file_cache/store.rs index e47ed830d2199..8b3b931607719 100644 --- a/src/storage/src/hummock/file_cache/store.rs +++ b/src/storage/src/hummock/file_cache/store.rs @@ -279,7 +279,6 @@ where catalog_bits: config.catalog_bits, admissions, reinsertions: config.reinsertions, - flusher_buffer_size: 131072, // TODO: make it configurable flushers: config.flushers, reclaimers: config.reclaimers, clean_region_threshold: config.reclaimers + config.reclaimers / 2, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 9efb22675f072..5cf2d831d325e 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKeyRangeRef}; use risingwave_hummock_sdk::{HummockEpoch, *}; use risingwave_pb::hummock::SstableInfo; -mod block_cache; +pub mod block_cache; pub use block_cache::*; pub mod file_cache; diff --git a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs index e49e04af4f0cc..58c2439771cff 100644 --- a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering::{Equal, Less}; use std::sync::Arc; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_hummock_sdk::key::FullKey; use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta}; @@ -67,7 +67,7 @@ impl BackwardSstableIterator { .get( &self.sst, idx as usize, - crate::hummock::CachePolicy::Fill(CachePriority::High), + crate::hummock::CachePolicy::Fill(CacheContext::Default), &mut self.stats, ) .await?; diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index 5c80aa14e26ac..bcfd7eb86f081 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -316,9 +316,9 @@ mod tests { use std::collections::Bound; use bytes::Bytes; + use foyer::memory::CacheContext; use itertools::Itertools; use rand::prelude::*; - use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; @@ -481,7 +481,7 @@ mod tests { TableKey(Bytes::from(end_key.user_key.table_key.0)), ); let options = Arc::new(SstableIteratorReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), must_iterated_end_user_key: Some(Bound::Included(uk.clone())), max_preload_retry_times: 0, prefetch_for_large_query: false, diff --git a/src/storage/src/hummock/sstable/xor_filter.rs b/src/storage/src/hummock/sstable/xor_filter.rs index 8924a1a36002a..e096676008f89 100644 --- a/src/storage/src/hummock/sstable/xor_filter.rs +++ b/src/storage/src/hummock/sstable/xor_filter.rs @@ -442,8 +442,8 @@ impl Clone for XorFilterReader { #[cfg(test)] mod tests { + use foyer::memory::CacheContext; use rand::RngCore; - use risingwave_common::cache::CachePriority; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::EpochWithGap; @@ -462,7 +462,7 @@ mod tests { let writer_opts = SstableWriterOptions { capacity_hint: None, tracker: None, - policy: CachePolicy::Fill(CachePriority::High), + policy: CachePolicy::Fill(CacheContext::Default), }; let opts = SstableBuilderOptions { capacity: 0, @@ -510,7 +510,7 @@ mod tests { .get_block_response( &sstable, idx, - CachePolicy::Fill(CachePriority::High), + CachePolicy::Fill(CacheContext::Default), &mut stat, ) .await diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 1094558c93305..2640321f552b3 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -18,14 +18,16 @@ use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use ahash::RandomState; use await_tree::InstrumentAwait; use bytes::Bytes; use fail::fail_point; +use foyer::memory::{ + Cache, CacheContext, CacheEntry, CacheEventListener, EntryState, Key, LruCacheConfig, + LruConfig, Value, +}; use futures::{future, StreamExt}; use itertools::Itertools; -use risingwave_common::cache::{ - CachePriority, LookupResponse, LruCacheEventListener, LruKey, LruValue, -}; use risingwave_common::config::StorageMemoryConfig; use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; @@ -48,16 +50,14 @@ use crate::hummock::block_stream::{ }; use crate::hummock::file_cache::preclude::*; use crate::hummock::multi_builder::UploadJoinHandle; -use crate::hummock::{ - BlockHolder, CacheableEntry, HummockError, HummockResult, LruCache, MemoryLimiter, -}; +use crate::hummock::{BlockHolder, HummockError, HummockResult, MemoryLimiter}; use crate::monitor::{HummockStateStoreMetrics, MemoryCollector, StoreLocalStatistic}; const MAX_META_CACHE_SHARD_BITS: usize = 2; const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; // 256MB -pub type TableHolder = CacheableEntry>; +pub type TableHolder = CacheEntry, MetaCacheEventListener>; // TODO: Define policy based on use cases (read / compaction / ...). #[derive(Clone, Copy, Eq, PartialEq)] @@ -65,7 +65,7 @@ pub enum CachePolicy { /// Disable read cache and not fill the cache afterwards. Disable, /// Try reading the cache and fill the cache afterwards. - Fill(CachePriority), + Fill(CacheContext), /// Fill file cache only. FillFileCache, /// Read the cache but not fill the cache afterwards. @@ -74,7 +74,7 @@ pub enum CachePolicy { impl Default for CachePolicy { fn default() -> Self { - CachePolicy::Fill(CachePriority::High) + CachePolicy::Fill(CacheContext::Default) } } @@ -100,16 +100,31 @@ impl From for TracedCachePolicy { } } -struct BlockCacheEventListener { +pub struct BlockCacheEventListener { data_file_cache: FileCache, metrics: Arc, } -impl LruCacheEventListener for BlockCacheEventListener { - type K = (u64, u64); - type T = Box; +impl BlockCacheEventListener { + pub fn new( + data_file_cache: FileCache, + metrics: Arc, + ) -> Self { + Self { + data_file_cache, + metrics, + } + } +} - fn on_release(&self, key: Self::K, value: Self::T) { +impl CacheEventListener<(HummockSstableObjectId, u64), Box> for BlockCacheEventListener { + fn on_release( + &self, + key: (HummockSstableObjectId, u64), + value: Box, + _context: CacheContext, + _charges: usize, + ) { let key = SstableBlockIndex { sst_id: key.0, block_idx: key.1, @@ -126,13 +141,22 @@ impl LruCacheEventListener for BlockCacheEventListener { } } -struct MetaCacheEventListener(FileCache); +pub struct MetaCacheEventListener(FileCache); -impl LruCacheEventListener for MetaCacheEventListener { - type K = HummockSstableObjectId; - type T = Box; +impl From> for MetaCacheEventListener { + fn from(value: FileCache) -> Self { + Self(value) + } +} - fn on_release(&self, key: Self::K, value: Self::T) { +impl CacheEventListener> for MetaCacheEventListener { + fn on_release( + &self, + key: HummockSstableObjectId, + value: Box, + _context: CacheContext, + _charges: usize, + ) { // temporarily avoid spawn task while task drop with madsim // FYI: https://github.com/madsim-rs/madsim/issues/182 #[cfg(not(madsim))] @@ -140,19 +164,21 @@ impl LruCacheEventListener for MetaCacheEventListener { } } -pub enum CachedOrShared +pub enum CachedOrShared where - K: LruKey, - V: LruValue, + K: Key, + V: Value, + L: CacheEventListener>, { - Cached(CacheableEntry>), + Cached(CacheEntry, L>), Shared(Arc), } -impl Deref for CachedOrShared +impl Deref for CachedOrShared where - K: LruKey, - V: LruValue, + K: Key, + V: Value, + L: CacheEventListener>, { type Target = V; @@ -182,7 +208,8 @@ pub struct SstableStore { path: String, store: ObjectStoreRef, block_cache: BlockCache, - meta_cache: Arc>>, + // TODO(MrCroxx): use no hash random state + meta_cache: Arc, MetaCacheEventListener>>, data_file_cache: FileCache, meta_file_cache: FileCache, @@ -199,32 +226,38 @@ impl SstableStore { pub fn new(config: SstableStoreConfig) -> Self { // TODO: We should validate path early. Otherwise object store won't report invalid path // error until first write attempt. - let mut shard_bits = MAX_META_CACHE_SHARD_BITS; - while (config.meta_cache_capacity >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD - && shard_bits > 0 + let mut meta_cache_shard_bits = MAX_META_CACHE_SHARD_BITS; + while (config.meta_cache_capacity >> meta_cache_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD + && meta_cache_shard_bits > 0 { - shard_bits -= 1; + meta_cache_shard_bits -= 1; } - let block_cache_listener = Arc::new(BlockCacheEventListener { - data_file_cache: config.data_file_cache.clone(), - metrics: config.state_store_metrics, - }); - let meta_cache_listener = Arc::new(MetaCacheEventListener(config.meta_file_cache.clone())); + + let block_cache = BlockCache::new( + config.block_cache_capacity, + MAX_CACHE_SHARD_BITS, + config.high_priority_ratio, + BlockCacheEventListener::new( + config.data_file_cache.clone(), + config.state_store_metrics.clone(), + ), + ); + // TODO(MrCroxx): support other cache algorithm + let meta_cache = Arc::new(Cache::lru(LruCacheConfig { + capacity: config.meta_cache_capacity, + shards: 1 << meta_cache_shard_bits, + eviction_config: LruConfig { + high_priority_pool_ratio: 0.0, + }, + object_pool_capacity: (1 << meta_cache_shard_bits) * 1024, + hash_builder: RandomState::default(), + event_listener: MetaCacheEventListener::from(config.meta_file_cache.clone()), + })); Self { path: config.path, store: config.store, - block_cache: BlockCache::with_event_listener( - config.block_cache_capacity, - MAX_CACHE_SHARD_BITS, - config.high_priority_ratio, - block_cache_listener, - ), - meta_cache: Arc::new(LruCache::with_event_listener( - shard_bits, - config.meta_cache_capacity, - 0, - meta_cache_listener, - )), + block_cache, + meta_cache, data_file_cache: config.data_file_cache, meta_file_cache: config.meta_file_cache, @@ -244,11 +277,29 @@ impl SstableStore { block_cache_capacity: usize, meta_cache_capacity: usize, ) -> Self { - let meta_cache = Arc::new(LruCache::new(0, meta_cache_capacity, 0)); + // TODO(MrCroxx): support other cache algorithm + let meta_cache = Arc::new(Cache::lru(LruCacheConfig { + capacity: meta_cache_capacity, + shards: 1, + eviction_config: LruConfig { + high_priority_pool_ratio: 0.0, + }, + object_pool_capacity: 1024, + hash_builder: RandomState::default(), + event_listener: FileCache::none().into(), + })); Self { path, store, - block_cache: BlockCache::new(block_cache_capacity, 0, 0), + block_cache: BlockCache::new( + block_cache_capacity, + 0, + 0, + BlockCacheEventListener::new( + FileCache::none(), + Arc::new(HummockStateStoreMetrics::unused()), + ), + ), meta_cache, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), @@ -264,7 +315,7 @@ impl SstableStore { self.store .delete(self.get_sst_data_path(object_id).as_str()) .await?; - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); self.meta_file_cache .remove(&object_id) .map_err(HummockError::file_cache)?; @@ -286,7 +337,7 @@ impl SstableStore { // Delete from cache. for &object_id in object_id_list { - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); self.meta_file_cache .remove(&object_id) .map_err(HummockError::file_cache)?; @@ -296,7 +347,7 @@ impl SstableStore { } pub fn delete_cache(&self, object_id: HummockSstableObjectId) { - self.meta_cache.erase(object_id, &object_id); + self.meta_cache.remove(&object_id); if let Err(e) = self.meta_file_cache.remove(&object_id) { tracing::warn!(error = %e.as_report(), "meta file cache remove error"); } @@ -408,7 +459,7 @@ impl SstableStore { let cache_priority = if idx == block_index { priority } else { - CachePriority::Low + CacheContext::LruPriorityLow }; self.block_cache .insert(object_id, idx as u64, Box::new(block), cache_priority) @@ -588,8 +639,10 @@ impl SstableStore { pub async fn sstable_cached( &self, sst_obj_id: HummockSstableObjectId, - ) -> HummockResult>> { - if let Some(sst) = self.meta_cache.lookup(sst_obj_id, &sst_obj_id) { + ) -> HummockResult< + Option>, + > { + if let Some(sst) = self.meta_cache.get(&sst_obj_id) { return Ok(Some(CachedOrShared::Cached(sst))); } @@ -612,50 +665,43 @@ impl SstableStore { stats: &mut StoreLocalStatistic, ) -> impl Future> + Send + 'static { let object_id = sst.get_object_id(); - let lookup_response = self - .meta_cache - .lookup_with_request_dedup::<_, HummockError, _>( - object_id, - object_id, - CachePriority::High, - || { - let meta_file_cache = self.meta_file_cache.clone(); - let store = self.store.clone(); - let meta_path = self.get_sst_data_path(object_id); - let stats_ptr = stats.remote_io_time.clone(); - let range = sst.meta_offset as usize..sst.file_size as usize; - async move { - if let Some(sst) = meta_file_cache - .lookup(&object_id) - .await - .map_err(HummockError::file_cache)? - { - // TODO(MrCroxx): Make meta cache receives Arc to reduce copy? - let sst: Box = sst.into(); - let charge = sst.estimate_size(); - return Ok((sst, charge)); - } - - let now = Instant::now(); - let buf = store.read(&meta_path, range).await?; - let meta = SstableMeta::decode(&buf[..])?; - - let sst = Sstable::new(object_id, meta); - let charge = sst.estimate_size(); - let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); - stats_ptr.fetch_add(add as u64, Ordering::Relaxed); - Ok((Box::new(sst), charge)) - } - }, - ); - match &lookup_response { - LookupResponse::Miss(_) | LookupResponse::WaitPendingRequest(_) => { - stats.cache_meta_block_miss += 1; + let entry = self.meta_cache.entry(object_id, || { + let meta_file_cache = self.meta_file_cache.clone(); + let store = self.store.clone(); + let meta_path = self.get_sst_data_path(object_id); + let stats_ptr = stats.remote_io_time.clone(); + let range = sst.meta_offset as usize..sst.file_size as usize; + async move { + if let Some(sst) = meta_file_cache + .lookup(&object_id) + .await + .map_err(HummockError::file_cache)? + { + // TODO(MrCroxx): Make meta cache receives Arc to reduce copy? + let sst: Box = sst.into(); + let charge = sst.estimate_size(); + return Ok((sst, charge, CacheContext::Default)); + } + + let now = Instant::now(); + let buf = store.read(&meta_path, range).await?; + let meta = SstableMeta::decode(&buf[..])?; + + let sst = Sstable::new(object_id, meta); + let charge = sst.estimate_size(); + let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); + stats_ptr.fetch_add(add as u64, Ordering::Relaxed); + Ok((Box::new(sst), charge, CacheContext::Default)) } - _ => (), + }); + + if matches! { entry.state(), EntryState::Wait | EntryState::Miss } { + stats.cache_meta_block_miss += 1; } + stats.cache_meta_block_total += 1; - lookup_response + + entry } pub async fn list_object_metadata_from_object_store( @@ -680,12 +726,11 @@ impl SstableStore { pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) { let sst = Sstable::new(object_id, meta); let charge = sst.estimate_size(); - self.meta_cache.insert( - object_id, + self.meta_cache.insert_with_context( object_id, - charge, Box::new(sst), - CachePriority::High, + charge, + CacheContext::Default, ); } @@ -699,11 +744,11 @@ impl SstableStore { filter.extend([(object_id, usize::MAX), (object_id, block_index as usize)]); } self.block_cache - .insert(object_id, block_index, block, CachePriority::High); + .insert(object_id, block_index, block, CacheContext::Default); } pub fn get_meta_memory_usage(&self) -> u64 { - self.meta_cache.get_memory_usage() as u64 + self.meta_cache.usage() as u64 } pub async fn get_stream_for_blocks( diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 2cd7104cf74ef..b6c658e0fa63f 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -17,8 +17,8 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; +use foyer::memory::CacheContext; use itertools::Itertools; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; @@ -322,7 +322,7 @@ pub async fn gen_test_sstable_with_range_tombstone( kv_iter, range_tombstones, sstable_store.clone(), - CachePolicy::Fill(CachePriority::High), + CachePolicy::Fill(CacheContext::Default), ) .await } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 09c4a7eef0e48..82af4beadf761 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, @@ -387,7 +387,7 @@ pub(crate) async fn do_insert_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; let stored_value = inner.get(key.clone(), epoch, read_options).await?; @@ -419,7 +419,7 @@ pub(crate) async fn do_delete_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; match inner.get(key.clone(), epoch, read_options).await? { @@ -461,7 +461,7 @@ pub(crate) async fn do_update_sanity_check( let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 310d0a842463e..cfed20ddb3ba2 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use auto_enums::auto_enum; use await_tree::InstrumentAwait; use bytes::Bytes; +use foyer::memory::CacheContext; use futures::future::try_join_all; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowExt}; @@ -359,7 +359,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? { @@ -446,8 +446,8 @@ impl StorageTableInner { ) { // To prevent unbounded range scan queries from polluting the block cache, use the // low priority fill policy. - (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CachePriority::Low), - _ => CachePolicy::Fill(CachePriority::High), + (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CacheContext::LruPriorityLow), + _ => CachePolicy::Fill(CacheContext::Default), }; let table_key_ranges = { diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 77a96dcbe0c35..9d66de9191d7e 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -28,6 +28,7 @@ educe = "0.5" either = "1" enum-as-inner = "0.6" fail = "0.5" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } governor = { version = "0.6", default-features = false, features = [ diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 756df5fff48ce..3e9e9ad4833e8 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -18,11 +18,11 @@ use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; +use foyer::memory::CacheContext; use futures::future::{try_join_all, BoxFuture}; use futures::{FutureExt, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; @@ -216,7 +216,7 @@ impl KvLogStoreReader { ReadOptions { // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::Low), + cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow), table_id, ..Default::default() }, @@ -358,7 +358,7 @@ impl LogReader for KvLogStoreReader { ReadOptions { prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::Fill(CachePriority::Low), + cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow), table_id, ..Default::default() }, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 4ed6a1ebd6593..730e94c5345da 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -20,13 +20,13 @@ use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; +use foyer::memory::CacheContext; use futures::{pin_mut, FutureExt, Stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; @@ -753,7 +753,7 @@ where prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -1315,7 +1315,7 @@ where retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, prefetch_options, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -1439,7 +1439,7 @@ where let read_options = ReadOptions { prefix_hint, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index ce63e7a19e9c6..e73798830d237 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1" async-trait = "0.1" bytes = "1" clap = { version = "4", features = ["derive"] } +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } prometheus = { version = "0.13" } rand = "0.8" diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 948678572e3b4..9132c0e0735ab 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -23,7 +23,7 @@ use std::time::Duration; use anyhow::anyhow; use bytes::{BufMut, Bytes, BytesMut}; use clap::Parser; -use risingwave_common::cache::CachePriority; +use foyer::memory::CacheContext; use risingwave_common::catalog::TableId; use risingwave_common::config::{ extract_storage_memory_config, load_config, MetaConfig, NoOverride, @@ -631,7 +631,7 @@ async fn open_hummock_iters( epoch, ReadOptions { table_id: TableId { table_id }, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index d0a75c570eff4..46052015ac688 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -20,9 +20,9 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use bytes::Bytes; +use foyer::memory::CacheContext; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; -use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::config::{ extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, @@ -439,7 +439,7 @@ impl NormalState { ReadOptions { ignore_range_tombstone, table_id: self.table_id, - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -465,7 +465,7 @@ impl NormalState { table_id: self.table_id, read_version_from_backup: false, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) @@ -496,7 +496,7 @@ impl CheckState for NormalState { table_id: self.table_id, read_version_from_backup: false, prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), + cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }, ) diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index b58b8c04cfc7f..bd33f5268aedb 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -41,6 +41,7 @@ clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4", features = ["tokio"] } crossbeam-epoch = { version = "0.9" } +crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] } deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] } @@ -125,7 +126,7 @@ serde_json = { version = "1", features = ["alloc", "raw_value"] } serde_with = { version = "3", features = ["json"] } sha1 = { version = "0.10" } sha2 = { version = "0.10", features = ["oid"] } -smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } +smallvec = { version = "1", default-features = false, features = ["const_new", "serde", "union", "write"] } sqlx = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "mysql", "postgres", "runtime-tokio-native-tls", "rust_decimal", "sqlite", "time", "uuid"] } sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } sqlx-mysql = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] }