Skip to content

Commit

Permalink
feat(storage): replace risingwave lru cache with foyer lru cache for …
Browse files Browse the repository at this point in the history
…lsm cache (#15382)

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Mar 14, 2024
1 parent 0e460ca commit 0e16ac4
Show file tree
Hide file tree
Showing 35 changed files with 448 additions and 407 deletions.
141 changes: 76 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = "0.6"
await-tree = "0.1.1"
aws-config = { version = "1", default-features = false, features = [
"behavior-version-latest",
Expand Down Expand Up @@ -113,11 +114,7 @@ hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = [
"ahash",
"inline-more",
"nightly",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
futures-util = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ mod tests {

/// A test to verify that `HashMap` may leak memory counter when using `into_iter`.
#[test]
#[should_panic] // TODO(MrCroxx): This bug is fixed and the test should panic. Remove the test and fix the related code later.
fn test_hashmap_into_iter_bug() {
let dropped: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ mod tests {
use std::sync::Arc;

use assert_matches::assert_matches;
use foyer::memory::CacheContext;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{
schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID,
};
Expand Down Expand Up @@ -400,7 +400,7 @@ mod tests {
epoch,
None,
ReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down
3 changes: 2 additions & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
ahash = "0.8"
anyhow = "1"
arc-swap = "1"
async-trait = "0.1"
Expand All @@ -26,7 +27,7 @@ dyn-clone = "1.0.14"
either = "1"
enum-as-inner = "0.6"
fail = "0.5"
foyer = "0.3"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
Expand Down
10 changes: 5 additions & 5 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use foyer::memory::CacheContext;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub fn default_writer_opts() -> SstableWriterOptions {
SstableWriterOptions {
capacity_hint: None,
tracker: None,
policy: CachePolicy::Fill(CachePriority::High),
policy: CachePolicy::Fill(CacheContext::Default),
}
}

Expand Down Expand Up @@ -115,7 +115,7 @@ async fn build_table(
SstableWriterOptions {
capacity_hint: None,
tracker: None,
policy: CachePolicy::Fill(CachePriority::High),
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
Expand Down Expand Up @@ -159,7 +159,7 @@ async fn build_table_2(
SstableWriterOptions {
capacity_hint: None,
tracker: None,
policy: CachePolicy::Fill(CachePriority::High),
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
Expand Down Expand Up @@ -313,7 +313,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) {
});
let level2 = vec![info1, info2];
let read_options = Arc::new(SstableIteratorReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
prefetch_for_large_query: false,
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait = "0.1"
bytes = { version = "1" }
clap = { version = "4", features = ["derive"] }
fail = "0.5"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2.9"
itertools = "0.12"
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use foyer::memory::CacheContext;
use futures::pin_mut;
use risingwave_common::cache::CachePriority;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -110,7 +110,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ReadOptions {
ignore_range_tombstone: true,
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
))
Expand Down
14 changes: 7 additions & 7 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ pub(crate) mod tests {
use std::sync::Arc;

use bytes::{BufMut, Bytes, BytesMut};
use foyer::memory::CacheContext;
use itertools::Itertools;
use rand::{Rng, RngCore, SeedableRng};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::constants::hummock::CompactionFilterFlag;
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -383,7 +383,7 @@ pub(crate) mod tests {
TableKey(key.clone()),
read_epoch,
ReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand All @@ -397,7 +397,7 @@ pub(crate) mod tests {
((TEST_WATERMARK - 1) * 1000) << 16,
ReadOptions {
prefix_hint: Some(key.clone()),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -521,7 +521,7 @@ pub(crate) mod tests {
TableKey(key.clone()),
get_epoch,
ReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -871,7 +871,7 @@ pub(crate) mod tests {
ReadOptions {
table_id: TableId::from(existing_table_ids),
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -1070,7 +1070,7 @@ pub(crate) mod tests {
ReadOptions {
table_id: TableId::from(existing_table_id),
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -1270,7 +1270,7 @@ pub(crate) mod tests {
prefix_hint: Some(Bytes::from(bloom_filter_key)),
table_id: TableId::from(existing_table_id),
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down
14 changes: 7 additions & 7 deletions src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::Bound;
use std::sync::Arc;

use bytes::{BufMut, Bytes};
use risingwave_common::cache::CachePriority;
use foyer::memory::CacheContext;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn test_failpoints_state_store_read_upload() {
1,
ReadOptions {
prefix_hint: Some(Bytes::from(anchor_prefix_hint)),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn test_failpoints_state_store_read_upload() {
2,
ReadOptions {
prefix_hint: Some(Bytes::from(anchor_prefix_hint)),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand All @@ -182,7 +182,7 @@ async fn test_failpoints_state_store_read_upload() {
2,
ReadOptions {
table_id: Default::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand All @@ -201,7 +201,7 @@ async fn test_failpoints_state_store_read_upload() {
2,
ReadOptions {
prefix_hint: Some(Bytes::from(bee_prefix_hint)),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down Expand Up @@ -239,7 +239,7 @@ async fn test_failpoints_state_store_read_upload() {
5,
ReadOptions {
prefix_hint: Some(Bytes::from(anchor_prefix_hint)),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand All @@ -256,7 +256,7 @@ async fn test_failpoints_state_store_read_upload() {
5,
ReadOptions {
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down
Loading

0 comments on commit 0e16ac4

Please sign in to comment.