diff --git a/Cargo.lock b/Cargo.lock index ebaceffb0dac5..0d92e3c889835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3784,11 +3784,11 @@ dependencies = [ [[package]] name = "foyer" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b6c6bc7a5cfdba3af30c7fdea5841c6e8564369116d3276cf13d742245f8cef" +checksum = "caed9c227536e3b6fb92aea800c44de7516ecc339a11eb01544549dd6f43cb69" dependencies = [ - "foyer-common", + "foyer-common 0.2.0", "foyer-intrusive", "foyer-storage", "foyer-workspace-hack", @@ -3810,6 +3810,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "foyer-common" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f10010b8cb0f380cc30ff8c82ec7f6addc2ced72b3758d336f351831a56a4a59" +dependencies = [ + "anyhow", + "bytes", + "foyer-workspace-hack", + "itertools 0.12.0", + "madsim-tokio", + "parking_lot 0.12.1", + "paste", + "tracing", +] + [[package]] name = "foyer-intrusive" version = "0.1.0" @@ -3818,7 +3834,7 @@ checksum = "2f967c3c40435a621fb1b8670d8f6501d67e79cea08b1ce07c150147aabdc588" dependencies = [ "bytes", "cmsketch", - "foyer-common", + "foyer-common 0.1.0", "foyer-workspace-hack", "itertools 0.12.0", "memoffset", @@ -3830,16 +3846,16 @@ dependencies = [ [[package]] name = "foyer-storage" -version = "0.1.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e0c1b14504ee66a9c6e9a81982fb62fe8dc1b45d15684bd3e1fcda4bc6c25d" +checksum = "8df47c9672ac893c172832c79c41bc4be7b24f71376fb00be461418c7a4d1cfe" dependencies = [ "anyhow", "bitflags 2.4.0", "bitmaps", "bytes", "cmsketch", - "foyer-common", + "foyer-common 0.2.0", "foyer-intrusive", "foyer-workspace-hack", "futures", @@ -11782,7 +11798,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.4.6", "static_assertions", ] diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 07bb7fd528890..e1cff95c761ea 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -26,7 +26,7 @@ dyn-clone = "1.0.14" either = "1" enum-as-inner = "0.6" fail = "0.5" -foyer = "0.2" +foyer = "0.3" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hex = "0.4" diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 496ae53ebba2d..71f38b7186514 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -344,7 +344,7 @@ impl CacheRefillTask { fn get_units_to_refill_by_inheritance( context: &CacheRefillContext, ssts: &[TableHolder], - parent_ssts: &[impl Deref>], + parent_ssts: &[impl Deref], ) -> HashSet { let mut res = HashSet::default(); @@ -606,6 +606,7 @@ impl CacheRefillTask { bytes, uncompressed_capacity: writer.weight() - writer.key().serialized_len(), }; + writer.force(); // TODO(MrCroxx): compress if raw is not compressed? // skip compression for it may already be compressed. diff --git a/src/storage/src/hummock/file_cache/store.rs b/src/storage/src/hummock/file_cache/store.rs index c1a514219c5e0..d7c48a81ed412 100644 --- a/src/storage/src/hummock/file_cache/store.rs +++ b/src/storage/src/hummock/file_cache/store.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use std::sync::Arc; use bytes::{Buf, BufMut, Bytes}; -use foyer::common::code::{CodingResult, Key, Value}; +use foyer::common::code::{CodingResult, Cursor, Key, Value}; use foyer::intrusive::eviction::lfu::LfuConfig; use foyer::storage::admission::rated_ticket::RatedTicketAdmissionPolicy; use foyer::storage::admission::AdmissionPolicy; @@ -33,7 +33,7 @@ use foyer::storage::storage::{Storage, StorageWriter}; use foyer::storage::store::{LfuFsStoreConfig, NoneStore, NoneStoreWriter}; use risingwave_hummock_sdk::HummockSstableObjectId; -use crate::hummock::{Block, Sstable, SstableMeta}; +use crate::hummock::{Block, HummockResult, Sstable, SstableMeta}; pub mod preclude { pub use foyer::storage::storage::{ @@ -50,6 +50,12 @@ pub type FileCacheResult = foyer::storage::error::Result; pub type FileCacheError = foyer::storage::error::Error; pub type FileCacheCompression = foyer::storage::compress::Compression; +fn copy(src: impl AsRef<[u8]>, mut dst: impl BufMut + AsRef<[u8]>) -> usize { + let n = std::cmp::min(src.as_ref().len(), dst.as_ref().len()); + dst.put_slice(&src.as_ref()[..n]); + n +} + #[derive(Debug)] pub struct FileCacheConfig where @@ -270,7 +276,6 @@ where align: config.device_align, io_size: config.device_io_size, }, - ring_buffer_capacity: config.ring_buffer_capacity, catalog_bits: config.catalog_bits, admissions, reinsertions: config.reinsertions, @@ -348,27 +353,67 @@ pub struct SstableBlockIndex { } impl Key for SstableBlockIndex { + type Cursor = SstableBlockIndexCursor; + fn serialized_len(&self) -> usize { 8 + 8 // sst_id (8B) + block_idx (8B) } - fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> { - buf.put_u64(self.sst_id); - buf.put_u64(self.block_idx); - Ok(()) - } - fn read(mut buf: &[u8]) -> CodingResult { let sst_id = buf.get_u64(); let block_idx = buf.get_u64(); Ok(Self { sst_id, block_idx }) } + + fn into_cursor(self) -> Self::Cursor { + SstableBlockIndexCursor::new(self) + } +} + +#[derive(Debug)] +pub struct SstableBlockIndexCursor { + inner: SstableBlockIndex, + pos: u8, +} + +impl SstableBlockIndexCursor { + pub fn new(inner: SstableBlockIndex) -> Self { + Self { inner, pos: 0 } + } +} + +impl std::io::Read for SstableBlockIndexCursor { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + let pos = self.pos; + if self.pos < 8 { + self.pos += copy( + &self.inner.sst_id.to_be_bytes()[self.pos as usize..], + &mut buf, + ) as u8; + } + if self.pos < 16 { + self.pos += copy( + &self.inner.block_idx.to_be_bytes()[self.pos as usize - 8..], + &mut buf, + ) as u8; + } + let n = (self.pos - pos) as usize; + Ok(n) + } +} + +impl Cursor for SstableBlockIndexCursor { + type T = SstableBlockIndex; + + fn into_inner(self) -> Self::T { + self.inner + } } /// [`CachedBlock`] uses different coding for writing to use/bypass compression. /// /// But when reading, it will always be `Loaded`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum CachedBlock { Loaded { block: Box, @@ -388,15 +433,26 @@ impl CachedBlock { } } - pub fn into_inner(self) -> Box { - match self { + pub fn try_into_block(self) -> HummockResult> { + let block = match self { CachedBlock::Loaded { block } => block, - CachedBlock::Fetched { .. } => unreachable!(), - } + // for the block was not loaded yet (refill + inflight), we need to decode it. + // TODO(MrCroxx): avoid decode twice? + CachedBlock::Fetched { + bytes, + uncompressed_capacity, + } => { + let block = Block::decode(bytes, uncompressed_capacity)?; + Box::new(block) + } + }; + Ok(block) } } impl Value for CachedBlock { + type Cursor = CachedBlockCursor; + fn serialized_len(&self) -> usize { 1 /* type */ + match self { CachedBlock::Loaded { block } => block.raw_data().len(), @@ -404,24 +460,6 @@ impl Value for CachedBlock { } } - fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> { - match self { - CachedBlock::Loaded { block } => { - buf.put_u8(0); - buf.put_slice(block.raw_data()) - } - CachedBlock::Fetched { - bytes, - uncompressed_capacity, - } => { - buf.put_u8(1); - buf.put_u64(*uncompressed_capacity as u64); - buf.put_slice(bytes); - } - } - Ok(()) - } - fn read(mut buf: &[u8]) -> CodingResult { let v = buf.get_u8(); let res = match v { @@ -442,45 +480,196 @@ impl Value for CachedBlock { }; Ok(res) } + + fn into_cursor(self) -> Self::Cursor { + CachedBlockCursor::new(self) + } +} + +#[derive(Debug)] +pub struct CachedBlockCursor { + inner: CachedBlock, + pos: usize, +} + +impl CachedBlockCursor { + pub fn new(inner: CachedBlock) -> Self { + Self { inner, pos: 0 } + } +} + +impl std::io::Read for CachedBlockCursor { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + let pos = self.pos; + match &self.inner { + CachedBlock::Loaded { block } => { + if self.pos < 1 { + self.pos += copy([0], &mut buf); + } + self.pos += copy(&block.raw_data()[self.pos - 1..], &mut buf); + } + CachedBlock::Fetched { + bytes, + uncompressed_capacity, + } => { + if self.pos < 1 { + self.pos += copy([1], &mut buf); + } + if self.pos < 9 { + self.pos += copy( + &uncompressed_capacity.to_be_bytes()[self.pos - 1..], + &mut buf, + ); + } + self.pos += copy(&bytes[self.pos - 9..], &mut buf); + } + } + let n = self.pos - pos; + Ok(n) + } +} + +impl Cursor for CachedBlockCursor { + type T = CachedBlock; + + fn into_inner(self) -> Self::T { + self.inner + } } impl Value for Box { + type Cursor = BoxBlockCursor; + fn serialized_len(&self) -> usize { self.raw_data().len() } - fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> { - buf.put_slice(self.raw_data()); - Ok(()) - } - fn read(buf: &[u8]) -> CodingResult { let data = Bytes::copy_from_slice(buf); let block = Block::decode_from_raw(data); let block = Box::new(block); Ok(block) } + + fn into_cursor(self) -> Self::Cursor { + BoxBlockCursor::new(self) + } } -impl Value for Box { - fn serialized_len(&self) -> usize { - 8 + self.meta.encoded_size() // id (8B) + meta size +#[derive(Debug)] +pub struct BoxBlockCursor { + inner: Box, + pos: usize, +} + +impl BoxBlockCursor { + pub fn new(inner: Box) -> Self { + Self { inner, pos: 0 } + } +} + +impl std::io::Read for BoxBlockCursor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let pos = self.pos; + self.pos += copy(&self.inner.raw_data()[self.pos..], buf); + let n = self.pos - pos; + Ok(n) + } +} + +impl Cursor for BoxBlockCursor { + type T = Box; + + fn into_inner(self) -> Self::T { + self.inner + } +} + +#[derive(Debug)] +pub struct CachedSstable(Arc); + +impl Clone for CachedSstable { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl From> for CachedSstable { + fn from(value: Box) -> Self { + Self(Arc::new(*value)) + } +} + +impl From for Box { + fn from(value: CachedSstable) -> Self { + Box::new(Arc::unwrap_or_clone(value.0)) + } +} + +impl CachedSstable { + pub fn into_inner(self) -> Arc { + self.0 + } +} + +impl Value for CachedSstable { + type Cursor = CachedSstableCursor; + + fn weight(&self) -> usize { + self.0.estimate_size() } - fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> { - buf.put_u64(self.id); - // TODO(MrCroxx): avoid buffer copy - let mut buffer = vec![]; - self.meta.encode_to(&mut buffer); - buf.put_slice(&buffer[..]); - Ok(()) + fn serialized_len(&self) -> usize { + 8 + self.0.meta.encoded_size() // id (8B) + meta size } fn read(mut buf: &[u8]) -> CodingResult { let id = buf.get_u64(); let meta = SstableMeta::decode(buf).unwrap(); - let sstable = Box::new(Sstable::new(id, meta)); - Ok(sstable) + let sstable = Arc::new(Sstable::new(id, meta)); + Ok(Self(sstable)) + } + + fn into_cursor(self) -> Self::Cursor { + CachedSstableCursor::new(self) + } +} + +#[derive(Debug)] +pub struct CachedSstableCursor { + inner: CachedSstable, + pos: usize, + /// store pre-encoded bytes here, for it's hard to encode JIT + bytes: Vec, +} + +impl CachedSstableCursor { + pub fn new(inner: CachedSstable) -> Self { + let mut bytes = vec![]; + bytes.put_u64(inner.0.id); + inner.0.meta.encode_to(&mut bytes); + Self { + inner, + bytes, + pos: 0, + } + } +} + +impl std::io::Read for CachedSstableCursor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let pos = self.pos; + self.pos += copy(&self.bytes[self.pos..], buf); + let n = self.pos - pos; + Ok(n) + } +} + +impl Cursor for CachedSstableCursor { + type T = CachedSstable; + + fn into_inner(self) -> Self::T { + self.inner } } @@ -490,12 +679,17 @@ mod tests { use risingwave_hummock_sdk::key::FullKey; use super::*; - use crate::hummock::{ - BlockBuilder, BlockBuilderOptions, BlockHolder, BlockIterator, CompressionAlgorithm, - }; + use crate::hummock::{BlockBuilder, BlockBuilderOptions, BlockMeta, CompressionAlgorithm}; - #[test] - fn test_enc_dec() { + pub fn construct_full_key_struct( + table_id: u32, + table_key: &[u8], + epoch: u64, + ) -> FullKey<&[u8]> { + FullKey::for_test(TableId::new(table_id), table_key, epoch) + } + + fn block_for_test() -> Box { let options = BlockBuilderOptions { compression_algorithm: CompressionAlgorithm::Lz4, ..Default::default() @@ -507,50 +701,98 @@ mod tests { builder.add_for_test(construct_full_key_struct(0, b"k3", 3), b"v03"); builder.add_for_test(construct_full_key_struct(0, b"k4", 4), b"v04"); - let block = Box::new( + Box::new( Block::decode( builder.build().to_vec().into(), builder.uncompressed_block_size(), ) .unwrap(), - ); - - let mut buf = vec![0; block.serialized_len()]; - block.write(&mut buf[..]).unwrap(); - - let block = as Value>::read(&buf[..]).unwrap(); - - let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block)); - - bi.seek_to_first(); - assert!(bi.is_valid()); - assert_eq!(construct_full_key_struct(0, b"k1", 1), bi.key()); - assert_eq!(b"v01", bi.value()); - - bi.next(); - assert!(bi.is_valid()); - assert_eq!(construct_full_key_struct(0, b"k2", 2), bi.key()); - assert_eq!(b"v02", bi.value()); + ) + } + + fn sstable_for_test() -> Sstable { + Sstable::new( + 114514, + SstableMeta { + block_metas: vec![ + BlockMeta { + smallest_key: b"0-smallest-key".to_vec(), + len: 100, + ..Default::default() + }, + BlockMeta { + smallest_key: b"5-some-key".to_vec(), + offset: 100, + len: 100, + ..Default::default() + }, + ], + bloom_filter: b"0123456789012345".to_vec(), + estimated_size: 123, + key_count: 123, + smallest_key: b"0-smallest-key".to_vec(), + largest_key: b"9-largest-key".to_vec(), + meta_offset: 123, + monotonic_tombstone_events: vec![], + version: 2, + }, + ) + } - bi.next(); - assert!(bi.is_valid()); - assert_eq!(construct_full_key_struct(0, b"k3", 3), bi.key()); - assert_eq!(b"v03", bi.value()); + #[test] + fn test_cursor() { + { + let block = block_for_test(); + let mut cursor = block.into_cursor(); + let mut buf = vec![]; + std::io::copy(&mut cursor, &mut buf).unwrap(); + let target = cursor.into_inner(); + let block = Box::::read(&buf[..]).unwrap(); + assert_eq!(target.raw_data(), block.raw_data()); + } - bi.next(); - assert!(bi.is_valid()); - assert_eq!(construct_full_key_struct(0, b"k4", 4), bi.key()); - assert_eq!(b"v04", bi.value()); + { + let sstable: CachedSstable = Box::new(sstable_for_test()).into(); + let mut cursor = sstable.into_cursor(); + let mut buf = vec![]; + std::io::copy(&mut cursor, &mut buf).unwrap(); + let target = cursor.into_inner(); + let sstable = CachedSstable::read(&buf[..]).unwrap(); + assert_eq!(target.0.id, sstable.0.id); + assert_eq!(target.0.meta, sstable.0.meta); + } - bi.next(); - assert!(!bi.is_valid()); - } + { + let cached = CachedBlock::Loaded { + block: block_for_test(), + }; + let mut cursor = cached.into_cursor(); + let mut buf = vec![]; + std::io::copy(&mut cursor, &mut buf).unwrap(); + let target = cursor.into_inner(); + let cached = CachedBlock::read(&buf[..]).unwrap(); + let target = match target { + CachedBlock::Loaded { block } => block, + CachedBlock::Fetched { .. } => panic!(), + }; + let block = match cached { + CachedBlock::Loaded { block } => block, + CachedBlock::Fetched { .. } => panic!(), + }; + assert_eq!(target.raw_data(), block.raw_data()); + } - pub fn construct_full_key_struct( - table_id: u32, - table_key: &[u8], - epoch: u64, - ) -> FullKey<&[u8]> { - FullKey::for_test(TableId::new(table_id), table_key, epoch) + { + let index = SstableBlockIndex { + sst_id: 114, + block_idx: 514, + }; + let mut cursor = index.into_cursor(); + let mut buf = vec![]; + std::io::copy(&mut cursor, &mut buf).unwrap(); + let target = cursor.into_inner(); + let index = SstableBlockIndex::read(&buf[..]).unwrap(); + assert_eq!(target, index); + } } } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index f682718bb07bc..79ca15035b920 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -39,8 +39,8 @@ use zstd::zstd_safe::WriteBuf; use super::utils::MemoryTracker; use super::{ - Block, BlockCache, BlockMeta, BlockResponse, CachedBlock, FileCache, RecentFilter, Sstable, - SstableBlockIndex, SstableMeta, SstableWriter, + Block, BlockCache, BlockMeta, BlockResponse, CachedBlock, CachedSstable, FileCache, + RecentFilter, Sstable, SstableBlockIndex, SstableMeta, SstableWriter, }; use crate::hummock::block_stream::{ BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream, @@ -120,7 +120,7 @@ impl LruCacheEventListener for BlockCacheEventListener { } } -struct MetaCacheEventListener(FileCache>); +struct MetaCacheEventListener(FileCache); impl LruCacheEventListener for MetaCacheEventListener { type K = HummockSstableObjectId; @@ -130,20 +130,20 @@ impl LruCacheEventListener for MetaCacheEventListener { // temporarily avoid spawn task while task drop with madsim // FYI: https://github.com/madsim-rs/madsim/issues/182 #[cfg(not(madsim))] - self.0.insert_if_not_exists_async(key, value); + self.0.insert_if_not_exists_async(key, value.into()); } } -pub enum CachedOrOwned +pub enum CachedOrShared where K: LruKey, V: LruValue, { - Cached(CacheableEntry), - Owned(V), + Cached(CacheableEntry>), + Shared(Arc), } -impl Deref for CachedOrOwned +impl Deref for CachedOrShared where K: LruKey, V: LruValue, @@ -152,8 +152,8 @@ where fn deref(&self) -> &Self::Target { match self { - CachedOrOwned::Cached(entry) => entry, - CachedOrOwned::Owned(v) => v, + CachedOrShared::Cached(entry) => entry, + CachedOrShared::Shared(v) => v, } } } @@ -165,7 +165,7 @@ pub struct SstableStore { meta_cache: Arc>>, data_file_cache: FileCache, - meta_file_cache: FileCache>, + meta_file_cache: FileCache, /// Recent filter for `(sst_obj_id, blk_idx)`. /// /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry. @@ -183,7 +183,7 @@ impl SstableStore { high_priority_ratio: usize, prefetch_buffer_capacity: usize, data_file_cache: FileCache, - meta_file_cache: FileCache>, + meta_file_cache: FileCache, recent_filter: Option>>, ) -> Self { // TODO: We should validate path early. Otherwise object store won't report invalid path @@ -442,7 +442,7 @@ impl SstableStore { .await .map_err(HummockError::file_cache)? { - let block = block.into_inner(); + let block = block.try_into_block()?; return Ok(block); } @@ -573,9 +573,9 @@ impl SstableStore { pub async fn sstable_cached( &self, sst_obj_id: HummockSstableObjectId, - ) -> HummockResult>>> { + ) -> HummockResult>> { if let Some(sst) = self.meta_cache.lookup(sst_obj_id, &sst_obj_id) { - return Ok(Some(CachedOrOwned::Cached(sst))); + return Ok(Some(CachedOrShared::Cached(sst))); } if let Some(sst) = self @@ -584,7 +584,7 @@ impl SstableStore { .await .map_err(HummockError::file_cache)? { - return Ok(Some(CachedOrOwned::Owned(sst))); + return Ok(Some(CachedOrShared::Shared(sst.into_inner()))); } Ok(None) @@ -615,6 +615,8 @@ impl SstableStore { .await .map_err(HummockError::file_cache)? { + // TODO(MrCroxx): Make meta cache receives Arc to reduce copy? + let sst: Box = sst.into(); let charge = sst.estimate_size(); return Ok((sst, charge)); }