diff --git a/Cargo.lock b/Cargo.lock index 920f6196f32a0..12a575daed40e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -911,20 +911,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "crossbeam" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch 0.9.8", - "crossbeam-queue", - "crossbeam-utils 0.8.8", -] - [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -975,16 +961,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-utils 0.8.8", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -3631,7 +3607,6 @@ dependencies = [ "chrono", "crc32fast", "criterion", - "crossbeam", "dashmap", "either", "enum-as-inner", @@ -4957,8 +4932,6 @@ dependencies = [ "axum", "bstr", "bytes", - "crossbeam-deque", - "crossbeam-utils 0.8.8", "either", "fail", "fixedbitset", diff --git a/src/bench/ss_bench/operations/write_batch.rs b/src/bench/ss_bench/operations/write_batch.rs index b776f01000972..400788e50f14c 100644 --- a/src/bench/ss_bench/operations/write_batch.rs +++ b/src/bench/ss_bench/operations/write_batch.rs @@ -23,6 +23,7 @@ use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::compactor::{Compactor, CompactorContext}; use risingwave_storage::storage_value::StorageValue; +use risingwave_storage::store::GLOBAL_STORAGE_TABLE_ID; use risingwave_storage::StateStore; use super::{Batch, Operations, PerfMetrics}; @@ -187,14 +188,23 @@ impl Operations { .map(|(k, v)| (k, StorageValue::new(Default::default(), v))) .collect_vec(); let epoch = ctx.epoch.load(Ordering::Acquire); - store.ingest_batch(batch, epoch).await.unwrap(); + store + .ingest_batch(batch, epoch, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); let last_batch = i + 1 == l; if ctx.epoch_barrier_finish(last_batch) { - store.sync(Some(epoch)).await.unwrap(); + store + .sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); ctx.meta_client.commit_epoch(epoch).await.unwrap(); ctx.epoch.fetch_add(1, Ordering::SeqCst); } - store.wait_epoch(epoch).await.unwrap(); + store + .wait_epoch(epoch, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); let time_nano = start.elapsed().as_nanos(); latencies.push(time_nano); } diff --git a/src/connector/src/state.rs b/src/connector/src/state.rs index ba14526b4f6c2..04688d31d6bbd 100644 --- a/src/connector/src/state.rs +++ b/src/connector/src/state.rs @@ -108,7 +108,7 @@ impl SourceStateHandler { // TODO should be a clear Error Code Err(anyhow!("states require not null")) } else { - let mut write_batch = self.keyspace.state_store().start_write_batch(); + let mut write_batch = self.keyspace.start_write_batch(); let mut local_batch = write_batch.prefixify(&self.keyspace); states.iter().for_each(|state| { // state inner key format (state_identifier | epoch) diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index dc7a22b48ac35..da04a3d25c15b 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -17,7 +17,6 @@ byteorder = "1" bytes = { version = "1", features = ["serde"] } chrono = "0.4" crc32fast = "1" -crossbeam = "0.8.1" dashmap = { version = "5", default-features = false } either = "1" enum-as-inner = "0.4" diff --git a/src/storage/src/hummock/compactor_tests.rs b/src/storage/src/hummock/compactor_tests.rs index e76493caba9eb..c3e8d408c6c2a 100644 --- a/src/storage/src/hummock/compactor_tests.rs +++ b/src/storage/src/hummock/compactor_tests.rs @@ -28,6 +28,7 @@ mod tests { use crate::monitor::StateStoreMetrics; use crate::object::{InMemObjectStore, ObjectStoreImpl}; use crate::storage_value::StorageValue; + use crate::store::GLOBAL_STORAGE_TABLE_ID; use crate::StateStore; async fn get_hummock_storage( @@ -102,10 +103,14 @@ mod tests { .ingest_batch( vec![(key.clone(), StorageValue::new_default_put(val.clone()))], epoch, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - storage.sync(Some(epoch)).await.unwrap(); + storage + .sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); hummock_meta_client.commit_epoch(epoch).await.unwrap(); } diff --git a/src/storage/src/hummock/conflict_detector.rs b/src/storage/src/hummock/conflict_detector.rs index 01b36ae54014a..5c257fd380419 100644 --- a/src/storage/src/hummock/conflict_detector.rs +++ b/src/storage/src/hummock/conflict_detector.rs @@ -14,51 +14,63 @@ //! This mod implements a `ConflictDetector` that detect write key conflict in each epoch -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use bytes::Bytes; -use crossbeam::atomic::AtomicCell; use dashmap::DashMap; use crate::hummock::value::HummockValue; use crate::hummock::HummockEpoch; +use crate::store::StorageTableId; pub struct ConflictDetector { - // epoch -> key-sets - epoch_history: DashMap>, - epoch_watermark: AtomicCell, + // epoch -> table_id -> key-sets + epoch_history: DashMap>>>, + table_epoch_watermark: DashMap, } impl ConflictDetector { pub fn new() -> ConflictDetector { ConflictDetector { epoch_history: DashMap::new(), - epoch_watermark: AtomicCell::new(HummockEpoch::MIN), + table_epoch_watermark: DashMap::new(), } } - pub fn get_epoch_watermark(&self) -> HummockEpoch { - self.epoch_watermark.load() + pub fn get_epoch_watermark(&self, table_id: StorageTableId) -> HummockEpoch { + self.table_epoch_watermark + .get(&table_id) + .map(|entry| *entry.value()) + .unwrap_or(HummockEpoch::MIN) } - // Sets the new watermark with CAS to enable detection in concurrent update - pub fn set_watermark(&self, epoch: HummockEpoch) { - loop { - let current_watermark = self.get_epoch_watermark(); - assert!( - epoch > current_watermark, - "not allowed to set epoch watermark to equal to or lower than current watermark: current is {}, epoch to set {}", - current_watermark, - epoch - ); - if self - .epoch_watermark - .compare_exchange(current_watermark, epoch) - .is_ok() - { - return; - } - } + pub fn set_single_table_watermark(&self, epoch: HummockEpoch, table_id: StorageTableId) { + let mut table_watermark = self + .table_epoch_watermark + .entry(table_id) + .or_insert(HummockEpoch::MIN); + + assert!( + epoch > *table_watermark.value(), + "not allowed to set epoch watermark to equal to or lower than current watermark: current is {}, epoch to set {}", + *table_watermark.value(), + epoch + ); + + *table_watermark = epoch; + } + + pub fn set_watermark(&self, epoch: HummockEpoch, table_ids: Option<&Vec>) { + let table_ids = table_ids.cloned().unwrap_or_else(|| { + self.table_epoch_watermark + .iter() + .map(|entry| *entry.key()) + .collect() + }); + + table_ids.iter().for_each(|table_id| { + self.set_single_table_watermark(epoch, *table_id); + }); } /// Checks whether there is key conflict for the given `kv_pairs` and adds the key in `kv_pairs` @@ -68,18 +80,35 @@ impl ConflictDetector { &self, kv_pairs: &[(Bytes, HummockValue)], epoch: HummockEpoch, + table_id: StorageTableId, ) { assert!( - epoch > self.get_epoch_watermark(), + epoch > self.get_epoch_watermark(table_id), "write to an archived epoch: {}", epoch ); - let mut written_key = self.epoch_history.entry(epoch).or_insert(HashSet::new()); + let mut epoch_written_key = self.epoch_history.entry(epoch).or_insert(HashMap::new()); + // check whether the key has been written in the epoch in any table + epoch_written_key.values().for_each(|table_written_key| { + for (key, value) in kv_pairs { + assert!( + !table_written_key.contains(&key.to_vec()), + "key {:?} is written again after previously written, value is {:?}", + key, + value, + ); + } + }); + + let table_written_key = epoch_written_key + .entry(table_id) + .or_insert_with(HashSet::new); + // add the keys to history for (key, value) in kv_pairs.iter() { assert!( - written_key.insert(key.clone()), + table_written_key.insert(key.to_vec()), "key {:?} is written again after previously written, value is {:?}", key, value, @@ -87,10 +116,24 @@ impl ConflictDetector { } } - /// Archives an epoch. An archived epoch cannot be written anymore. - pub fn archive_epoch(&self, epoch: HummockEpoch) { - self.epoch_history.remove(&epoch); - self.set_watermark(epoch); + /// Archives an epoch for a storage table. An archived epoch cannot be written anymore in the + /// storage table. + /// + /// `table_ids` is an optional parameter that specifies which storage tables to archive. If + /// `None`, all tables are archived. + pub fn archive_epoch(&self, epoch: HummockEpoch, table_ids: Option<&Vec>) { + if let Some(mut epoch_history) = self.epoch_history.get_mut(&epoch) { + if let Some(table_ids) = table_ids { + for table_id in table_ids { + epoch_history.remove(table_id); + } + } else { + epoch_history.clear(); + } + } + self.epoch_history + .remove_if(&epoch, |_, epoch_history| epoch_history.is_empty()); + self.set_watermark(epoch, table_ids); } } @@ -103,6 +146,7 @@ mod test { use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::value::HummockValue; + use crate::store::GLOBAL_STORAGE_TABLE_ID; #[test] #[should_panic] @@ -120,6 +164,7 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); } @@ -135,6 +180,7 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); detector.check_conflict_and_track_write_batch( once(( @@ -144,6 +190,7 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); } @@ -158,6 +205,7 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); detector.check_conflict_and_track_write_batch( once(( @@ -167,8 +215,9 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); - detector.archive_epoch(233); + detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID])); detector.check_conflict_and_track_write_batch( once(( Bytes::from("key1"), @@ -177,6 +226,7 @@ mod test { .collect_vec() .as_slice(), 234, + GLOBAL_STORAGE_TABLE_ID, ); } @@ -192,8 +242,9 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); - detector.archive_epoch(233); + detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID])); detector.check_conflict_and_track_write_batch( once(( Bytes::from("key1"), @@ -202,6 +253,7 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); } @@ -216,9 +268,10 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); assert!(!detector.epoch_history.get(&233).unwrap().is_empty()); - detector.archive_epoch(233); + detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID])); assert!(detector.epoch_history.get(&233).is_none()); } @@ -234,8 +287,9 @@ mod test { .collect_vec() .as_slice(), 233, + GLOBAL_STORAGE_TABLE_ID, ); - detector.archive_epoch(233); + detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID])); detector.check_conflict_and_track_write_batch( once(( Bytes::from("key1"), @@ -244,6 +298,7 @@ mod test { .collect_vec() .as_slice(), 232, + GLOBAL_STORAGE_TABLE_ID, ); } } diff --git a/src/storage/src/hummock/local_version_manager.rs b/src/storage/src/hummock/local_version_manager.rs index bc48da906dd2c..c726fc1de8588 100644 --- a/src/storage/src/hummock/local_version_manager.rs +++ b/src/storage/src/hummock/local_version_manager.rs @@ -32,6 +32,7 @@ use crate::hummock::utils::validate_table_key_range; use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockVersionId, Sstable, INVALID_VERSION_ID, }; +use crate::store::StorageTableId; #[derive(Debug)] pub struct ScopedLocalVersion { @@ -154,15 +155,19 @@ impl LocalVersionManager { } /// Waits until the local hummock version contains the given committed epoch - pub async fn wait_epoch(&self, epoch: HummockEpoch) -> HummockResult<()> { - if epoch == HummockEpoch::MAX { - panic!("epoch should not be u64::MAX"); - } + pub async fn wait_epoch( + &self, + epoch: HummockEpoch, + _table_id: StorageTableId, + ) -> HummockResult<()> { + assert_ne!(epoch, HummockEpoch::MAX, "epoch should not be u64::MAX"); let mut receiver = self.update_notifier_tx.subscribe(); + // TODO: use some signal to wake up on version change instead of waiting in a loop loop { { let current_version = self.current_version.read(); if let Some(version) = current_version.as_ref() { + // TODO(partial checkpoint): check the version of the table id if version.version.max_committed_epoch >= epoch { return Ok(()); } @@ -381,6 +386,7 @@ mod tests { use crate::hummock::value::HummockValue; use crate::monitor::StateStoreMetrics; use crate::object::{InMemObjectStore, ObjectStoreImpl}; + use crate::store::GLOBAL_STORAGE_TABLE_ID; fn gen_dummy_batch(epoch: u64) -> Vec<(Bytes, HummockValue)> { vec![( @@ -412,7 +418,11 @@ mod tests { // Fill shared buffer with a dummy empty batch in epochs[0] shared_buffer_manager - .write_batch(gen_dummy_batch(epochs[0]), epochs[0]) + .write_batch( + gen_dummy_batch(epochs[0]), + epochs[0], + GLOBAL_STORAGE_TABLE_ID, + ) .await .unwrap(); assert!(!shared_buffer_manager.get_shared_buffer().is_empty()); @@ -432,7 +442,7 @@ mod tests { // Fill shared buffer with a dummy empty batch in epochs[1..=3] and ref them for epoch in epochs.iter().skip(1) { shared_buffer_manager - .write_batch(gen_dummy_batch(*epoch), *epoch) + .write_batch(gen_dummy_batch(*epoch), *epoch, GLOBAL_STORAGE_TABLE_ID) .await .unwrap(); local_version_manager.ref_committed_epoch(*epoch); diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 355d6037a92d5..d21b0629c1216 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -131,7 +131,10 @@ impl HummockStorage { shared_buffer_manager.clone(), ); // Ensure at least one available version in cache. - local_version_manager.wait_epoch(HummockEpoch::MIN).await?; + // TODO(partial checkpoint): wait epoch for all tables? + local_version_manager + .wait_epoch(HummockEpoch::MIN, GLOBAL_STORAGE_TABLE_ID) + .await?; let instance = Self { options: options.clone(), @@ -367,6 +370,7 @@ impl StateStore for HummockStorage { &self, kv_pairs: Vec<(Bytes, StorageValue)>, epoch: u64, + table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { let batch = kv_pairs @@ -378,11 +382,15 @@ impl StateStore for HummockStorage { ) }) .collect_vec(); - - let batch_size = self.shared_buffer_manager.write_batch(batch, epoch).await?; + let batch_size = self + .shared_buffer_manager + .write_batch(batch, epoch, table_id) + .await?; if !self.options.async_checkpoint_enabled { - self.shared_buffer_manager.sync(Some(epoch)).await?; + self.shared_buffer_manager + .sync(Some(epoch), Some(vec![table_id])) + .await?; } Ok(batch_size) } @@ -591,13 +599,26 @@ impl StateStore for HummockStorage { } } - fn wait_epoch(&self, epoch: u64) -> Self::WaitEpochFuture<'_> { - async move { Ok(self.local_version_manager.wait_epoch(epoch).await?) } + fn wait_epoch( + &self, + epoch: HummockEpoch, + table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { + async move { + Ok(self + .local_version_manager + .wait_epoch(epoch, table_id) + .await?) + } } - fn sync(&self, epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + epoch: Option, + table_ids: Option>, + ) -> Self::SyncFuture<'_> { async move { - self.shared_buffer_manager.sync(epoch).await?; + self.shared_buffer_manager.sync(epoch, table_ids).await?; Ok(()) } } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_manager.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_manager.rs index c7e8089447d86..941885c79ee0c 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_manager.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_manager.rs @@ -20,6 +20,7 @@ use std::sync::{atomic, Arc}; use itertools::Itertools; use parking_lot::{Mutex, RwLock as PLRwLock}; use risingwave_common::config::StorageConfig; +use risingwave_hummock_sdk::HummockEpoch; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; use tokio::task::JoinHandle; @@ -35,8 +36,9 @@ use crate::hummock::shared_buffer::shared_buffer_uploader::{ }; use crate::hummock::utils::range_overlap; use crate::hummock::value::HummockValue; -use crate::hummock::{HummockEpoch, HummockError, HummockResult, SstableStoreRef}; +use crate::hummock::{HummockError, HummockResult, SstableStoreRef}; use crate::monitor::StateStoreMetrics; +use crate::store::{StorageTableId, GLOBAL_STORAGE_TABLE_ID}; #[derive(Debug)] pub struct SharedBufferMetrics { @@ -174,6 +176,7 @@ impl SharedBufferManager { &self, batch: Vec, epoch: HummockEpoch, + table_id: StorageTableId, ) -> HummockResult { let batch = SharedBufferBatch::new(batch, epoch); let size = batch.size; @@ -186,7 +189,7 @@ impl SharedBufferManager { .or_insert(BTreeMap::new()) .insert(batch.end_user_key().to_vec(), batch.clone()); self.uploader_tx - .send(SharedBufferUploaderItem::Batch(batch)) + .send(SharedBufferUploaderItem::Batch(batch, table_id)) .map_err(HummockError::shared_buffer_error)?; Ok(size) } @@ -195,7 +198,7 @@ impl SharedBufferManager { pub fn replicate_remote_batch( &self, batch: Vec, - epoch: u64, + epoch: HummockEpoch, ) -> HummockResult<()> { let batch = SharedBufferBatch::new(batch, epoch); self.shared_buffer @@ -223,7 +226,7 @@ impl SharedBufferManager { } else { self.ongoing_flush.init(); // Flush all batches in the shared buffer - res = self.sync(None).await; + res = self.sync(None, Some(vec![GLOBAL_STORAGE_TABLE_ID])).await; // Notify other waiters if any self.ongoing_flush.notify(res.is_ok()); log::debug!("flush: notify subscribers, result {}", res.is_ok()); @@ -232,7 +235,11 @@ impl SharedBufferManager { } // TODO: support time-based syncing - pub async fn sync(&self, epoch: Option) -> HummockResult<()> { + pub async fn sync( + &self, + epoch: Option, + table_ids: Option>, + ) -> HummockResult<()> { if self.empty() { return Ok(()); } @@ -242,6 +249,7 @@ impl SharedBufferManager { .send(SharedBufferUploaderItem::Sync(SyncItem { epoch, notifier: Some(tx), + table_ids, })) .unwrap(); @@ -270,7 +278,7 @@ impl SharedBufferManager { pub fn get( &self, user_key: &[u8], - epoch_range: impl RangeBounds, + epoch_range: impl RangeBounds, ) -> Option>> { let guard = self.shared_buffer.read(); for (_epoch, buffers) in guard.range(epoch_range).rev() { @@ -292,7 +300,7 @@ impl SharedBufferManager { pub fn iters( &self, key_range: &R, - epoch_range: impl RangeBounds, + epoch_range: impl RangeBounds, ) -> Vec> where R: RangeBounds, @@ -321,7 +329,7 @@ impl SharedBufferManager { pub fn reverse_iters( &self, key_range: &R, - epoch_range: impl RangeBounds, + epoch_range: impl RangeBounds, ) -> Vec> where R: RangeBounds, @@ -346,7 +354,7 @@ impl SharedBufferManager { } /// Deletes shared buffers before a given `epoch` exclusively. - pub fn delete_before(&self, epoch: u64) { + pub fn delete_before(&self, epoch: HummockEpoch) { let mut guard = self.shared_buffer.write(); let new = guard.split_off(&epoch); *guard = new; @@ -357,7 +365,7 @@ impl SharedBufferManager { self.uploader_handle.await.unwrap() } - pub fn reset(&mut self, epoch: u64) { + pub fn reset(&mut self, epoch: HummockEpoch) { // Reset uploader item. self.uploader_tx .send(SharedBufferUploaderItem::Reset(epoch)) @@ -371,7 +379,9 @@ impl SharedBufferManager { } #[cfg(test)] - pub fn get_shared_buffer(&self) -> BTreeMap, SharedBufferBatch>> { + pub fn get_shared_buffer( + &self, + ) -> BTreeMap, SharedBufferBatch>> { self.shared_buffer.read().clone() } } @@ -392,6 +402,7 @@ mod tests { use crate::hummock::test_utils::default_config_for_test; use crate::hummock::SstableStore; use crate::object::{InMemObjectStore, ObjectStoreImpl}; + use crate::store::GLOBAL_STORAGE_TABLE_ID; async fn new_shared_buffer_manager() -> SharedBufferManager { let obj_client = Arc::new(ObjectStoreImpl::Mem(InMemObjectStore::new())); @@ -422,8 +433,9 @@ mod tests { async fn generate_and_write_batch( put_keys: &[Vec], delete_keys: &[Vec], - epoch: u64, + epoch: HummockEpoch, idx: &mut usize, + table_id: StorageTableId, shared_buffer_manager: &SharedBufferManager, ) -> Vec<(Vec, HummockValue>)> { let mut shared_buffer_items = Vec::new(); @@ -442,7 +454,7 @@ mod tests { } shared_buffer_items.sort_by(|l, r| user_key(&l.0).cmp(&r.0)); shared_buffer_manager - .write_batch(shared_buffer_items.clone(), epoch) + .write_batch(shared_buffer_items.clone(), epoch, table_id) .await .unwrap(); shared_buffer_items @@ -469,6 +481,7 @@ mod tests { &[], epoch1, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -481,6 +494,7 @@ mod tests { &keys[2..3], epoch2, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -558,6 +572,7 @@ mod tests { &[], epoch1, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -570,6 +585,7 @@ mod tests { &keys[2..3], epoch2, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -695,6 +711,7 @@ mod tests { &[], epoch1, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -707,6 +724,7 @@ mod tests { &keys[2..3], epoch2, &mut idx, + GLOBAL_STORAGE_TABLE_ID, &shared_buffer_manager, ) .await; @@ -828,8 +846,15 @@ mod tests { // Write a batch let epoch = 1; - let shared_buffer_items = - generate_and_write_batch(&keys, &[], epoch, &mut idx, &shared_buffer_manager).await; + let shared_buffer_items = generate_and_write_batch( + &keys, + &[], + epoch, + &mut idx, + GLOBAL_STORAGE_TABLE_ID, + &shared_buffer_manager, + ) + .await; // Get and check value with epoch 0..=epoch1 for (idx, key) in keys.iter().enumerate() { @@ -848,8 +873,15 @@ mod tests { // Generate new items overlapping with old items and check keys.push(format!("key_test_{:05}", 100).as_bytes().to_vec()); let epoch = 1; - let new_shared_buffer_items = - generate_and_write_batch(&keys, &[], epoch, &mut idx, &shared_buffer_manager).await; + let new_shared_buffer_items = generate_and_write_batch( + &keys, + &[], + epoch, + &mut idx, + GLOBAL_STORAGE_TABLE_ID, + &shared_buffer_manager, + ) + .await; for (idx, key) in keys.iter().enumerate() { assert_eq!( shared_buffer_manager.get(key.as_slice(), ..=epoch).unwrap(), diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_uploader.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_uploader.rs index 38360f181cc43..f973555041090 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_uploader.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_uploader.rs @@ -27,25 +27,28 @@ use crate::hummock::local_version_manager::LocalVersionManager; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::hummock::{HummockError, HummockResult, SstableStoreRef}; use crate::monitor::StateStoreMetrics; +use crate::store::StorageTableId; #[derive(Debug)] pub struct SyncItem { - /// Epoch to sync. None means syncing all epochs. + /// Epoch to sync. `None` means syncing all epochs. pub(super) epoch: Option, /// Notifier to notify on sync finishes pub(super) notifier: Option>>, + /// Table ids to sync. `None` means syncing all tables. + pub(super) table_ids: Option>, } #[derive(Debug)] pub enum SharedBufferUploaderItem { - Batch(SharedBufferBatch), + Batch(SharedBufferBatch, StorageTableId), Sync(SyncItem), Reset(u64), } pub struct SharedBufferUploader { - /// Batches to upload grouped by epoch - batches_to_upload: BTreeMap>, + /// Batches to upload grouped by epoch and table_id + batches_to_upload: BTreeMap>>, local_version_manager: Arc, options: Arc, @@ -89,13 +92,42 @@ impl SharedBufferUploader { } /// Uploads buffer batches to S3. - async fn sync(&mut self, epoch: u64) -> HummockResult { + async fn sync( + &mut self, + epoch: u64, + table_ids: Option>, + ) -> HummockResult { if let Some(detector) = &self.write_conflict_detector { - detector.archive_epoch(epoch); + detector.archive_epoch(epoch, table_ids.as_ref()); } - let buffers = match self.batches_to_upload.remove(&epoch) { - Some(m) => m, + // TODO: may want to recover the removed batches in case of compaction failure + let buffers = match self.batches_to_upload.get_mut(&epoch) { + Some(table_batches) => { + if let Some(table_ids) = table_ids { + let mut ret = vec![]; + for table_id in table_ids { + if let Some(batches) = table_batches.remove(&table_id) { + ret.extend(batches); + } else { + tracing::warn!("no table for table_id `{}` exists to sync", table_id) + } + } + if table_batches.is_empty() { + self.batches_to_upload.remove(&epoch); + } + ret + } else { + self.batches_to_upload + .remove(&epoch) + // existence of `epoch` has previously been checked, so it's safe to use + // unwrap + .unwrap() + .into_values() + .flat_map(Vec::into_iter) + .collect_vec() + } + } None => return Ok(0), }; @@ -146,13 +178,15 @@ impl SharedBufferUploader { async fn handle(&mut self, item: SharedBufferUploaderItem) -> StorageResult<()> { match item { - SharedBufferUploaderItem::Batch(m) => { + SharedBufferUploaderItem::Batch(m, table_id) => { if let Some(detector) = &self.write_conflict_detector { - detector.check_conflict_and_track_write_batch(&m.inner, m.epoch); + detector.check_conflict_and_track_write_batch(&m.inner, m.epoch, table_id); } self.batches_to_upload .entry(m.epoch()) + .or_insert(BTreeMap::new()) + .entry(table_id) .or_insert(Vec::new()) .push(m); Ok(()) @@ -161,7 +195,7 @@ impl SharedBufferUploader { let res = match sync_item.epoch { Some(e) => { // Sync a specific epoch - self.sync(e).await + self.sync(e, sync_item.table_ids).await } None => { // Sync all epochs @@ -170,7 +204,7 @@ impl SharedBufferUploader { let mut size_total: u64 = 0; for e in epochs { - res = self.sync(e).await; + res = self.sync(e, sync_item.table_ids.clone()).await; if res.is_err() { break; } diff --git a/src/storage/src/hummock/snapshot_tests.rs b/src/storage/src/hummock/snapshot_tests.rs index f715000cf9ef6..edb5632a50e29 100644 --- a/src/storage/src/hummock/snapshot_tests.rs +++ b/src/storage/src/hummock/snapshot_tests.rs @@ -92,10 +92,14 @@ async fn test_snapshot() { (Bytes::from("2"), StorageValue::new_default_put("test")), ], epoch1, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch1)).await.unwrap(); + hummock_storage + .sync(Some(epoch1), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client.commit_epoch(epoch1).await.unwrap(); vm.refresh_version(mock_hummock_meta_client.as_ref()).await; assert_count_range_scan!(hummock_storage, .., 2, epoch1); @@ -109,10 +113,14 @@ async fn test_snapshot() { (Bytes::from("4"), StorageValue::new_default_put("test")), ], epoch2, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch2)).await.unwrap(); + hummock_storage + .sync(Some(epoch2), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client.commit_epoch(epoch2).await.unwrap(); vm.refresh_version(mock_hummock_meta_client.as_ref()).await; assert_count_range_scan!(hummock_storage, .., 3, epoch2); @@ -127,10 +135,14 @@ async fn test_snapshot() { (Bytes::from("4"), StorageValue::new_default_delete()), ], epoch3, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch3)).await.unwrap(); + hummock_storage + .sync(Some(epoch3), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client.commit_epoch(epoch3).await.unwrap(); vm.refresh_version(mock_hummock_meta_client.as_ref()).await; assert_count_range_scan!(hummock_storage, .., 0, epoch3); @@ -178,10 +190,14 @@ async fn test_snapshot_range_scan() { (Bytes::from("4"), StorageValue::new_default_put("test")), ], epoch, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch)).await.unwrap(); + hummock_storage + .sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client.commit_epoch(epoch).await.unwrap(); vm.refresh_version(mock_hummock_meta_client.as_ref()).await; macro_rules! key { @@ -239,10 +255,14 @@ async fn test_snapshot_reverse_range_scan() { (Bytes::from("6"), StorageValue::new_default_put("test")), ], epoch, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch)).await.unwrap(); + hummock_storage + .sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client.commit_epoch(epoch).await.unwrap(); vm.refresh_version(mock_hummock_meta_client.as_ref()).await; hummock_storage @@ -254,10 +274,14 @@ async fn test_snapshot_reverse_range_scan() { (Bytes::from("8"), StorageValue::new_default_put("test")), ], epoch + 1, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); - hummock_storage.sync(Some(epoch + 1)).await.unwrap(); + hummock_storage + .sync(Some(epoch + 1), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); mock_hummock_meta_client .commit_epoch(epoch + 1) .await diff --git a/src/storage/src/hummock/state_store_tests.rs b/src/storage/src/hummock/state_store_tests.rs index 676434323451b..868d4b66e9653 100644 --- a/src/storage/src/hummock/state_store_tests.rs +++ b/src/storage/src/hummock/state_store_tests.rs @@ -28,6 +28,7 @@ use crate::hummock::test_utils::{count_iter, default_config_for_test}; use crate::monitor::StateStoreMetrics; use crate::object::{InMemObjectStore, ObjectStoreImpl}; use crate::storage_value::{StorageValue, VALUE_META_SIZE}; +use crate::store::GLOBAL_STORAGE_TABLE_ID; use crate::StateStore; #[tokio::test] @@ -85,7 +86,10 @@ async fn test_basic() { let epoch1: u64 = 1; // Write the first batch. - hummock_storage.ingest_batch(batch1, epoch1).await.unwrap(); + hummock_storage + .ingest_batch(batch1, epoch1, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap(); @@ -106,7 +110,10 @@ async fn test_basic() { // Write the second batch. let epoch2 = epoch1 + 1; - hummock_storage.ingest_batch(batch2, epoch2).await.unwrap(); + hummock_storage + .ingest_batch(batch2, epoch2, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); @@ -114,7 +121,10 @@ async fn test_basic() { // Write the third batch. let epoch3 = epoch2 + 1; - hummock_storage.ingest_batch(batch3, epoch3).await.unwrap(); + hummock_storage + .ingest_batch(batch3, epoch3, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch3).await.unwrap(); @@ -157,9 +167,15 @@ async fn test_basic() { .unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 4); - hummock_storage.sync(Some(epoch1)).await.unwrap(); + hummock_storage + .sync(Some(epoch1), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); meta_client.commit_epoch(epoch1).await.unwrap(); - hummock_storage.wait_epoch(epoch1).await.unwrap(); + hummock_storage + .wait_epoch(epoch1, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); let value = hummock_storage .get(&Bytes::from("bb"), epoch2) .await @@ -210,7 +226,10 @@ async fn test_state_store_sync() { // Make sure the batch is sorted. batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - hummock_storage.ingest_batch(batch1, epoch).await.unwrap(); + hummock_storage + .ingest_batch(batch1, epoch, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // check sync state store metrics // Note: epoch(8B) and ValueMeta(2B) will be appended to each kv pair @@ -230,7 +249,10 @@ async fn test_state_store_sync() { (Bytes::from("eeee"), StorageValue::new_default_put("5555")), ]; batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - hummock_storage.ingest_batch(batch2, epoch).await.unwrap(); + hummock_storage + .ingest_batch(batch2, epoch, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // shared buffer threshold size should have been reached and will trigger a flush // then ingest the batch @@ -248,7 +270,10 @@ async fn test_state_store_sync() { // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![(Bytes::from("eeee"), StorageValue::new_default_put("5555"))]; batch3.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - hummock_storage.ingest_batch(batch3, epoch).await.unwrap(); + hummock_storage + .ingest_batch(batch3, epoch, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // 16B in total with 8B epoch appended to the key assert_eq!( @@ -260,8 +285,11 @@ async fn test_state_store_sync() { .load(Ordering::SeqCst) ); - // triger a sync - hummock_storage.sync(Some(epoch)).await.unwrap(); + // trigger a sync + hummock_storage + .sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID])) + .await + .unwrap(); assert_eq!( 0, @@ -321,7 +349,10 @@ async fn test_reload_storage() { let epoch1: u64 = 1; // Write the first batch. - hummock_storage.ingest_batch(batch1, epoch1).await.unwrap(); + hummock_storage + .ingest_batch(batch1, epoch1, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // Mock something happened to storage internal, and storage is reloaded. drop(hummock_storage); @@ -348,7 +379,10 @@ async fn test_reload_storage() { // Write the second batch. let epoch2 = epoch1 + 1; - hummock_storage.ingest_batch(batch2, epoch2).await.unwrap(); + hummock_storage + .ingest_batch(batch2, epoch2, GLOBAL_STORAGE_TABLE_ID) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); diff --git a/src/storage/src/keyspace.rs b/src/storage/src/keyspace.rs index 2dfc4436a42a1..778f424cc6ceb 100644 --- a/src/storage/src/keyspace.rs +++ b/src/storage/src/keyspace.rs @@ -17,6 +17,8 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::next_key; use crate::error::StorageResult; +use crate::store::{StorageTableId, GLOBAL_STORAGE_TABLE_ID}; +use crate::write_batch::WriteBatch; use crate::StateStore; /// Provides API to read key-value pairs of a prefix in the storage backend. @@ -26,6 +28,8 @@ pub struct Keyspace { /// Encoded representation for all segments. prefix: Vec, + + table_id: StorageTableId, } impl Keyspace { @@ -44,7 +48,12 @@ impl Keyspace { buf.put_u64(operator_id); buf.to_vec() }; - Self { store, prefix } + Self { + store, + prefix, + // TODO(partial checkpoint): use the table id obtained locally. + table_id: GLOBAL_STORAGE_TABLE_ID, + } } /// Creates a root [`Keyspace`] for an executor. @@ -55,7 +64,12 @@ impl Keyspace { buf.put_u64(executor_id); buf.to_vec() }; - Self { store, prefix } + Self { + store, + prefix, + // TODO(partial checkpoint): use the table id obtained locally. + table_id: GLOBAL_STORAGE_TABLE_ID, + } } /// Creates a root [`Keyspace`] for a table. @@ -66,7 +80,12 @@ impl Keyspace { buf.put_u32(id.table_id); buf.to_vec() }; - Self { store, prefix } + Self { + store, + prefix, + // TODO(partial checkpoint): use the table id obtained locally. + table_id: GLOBAL_STORAGE_TABLE_ID, + } } /// Appends more bytes to the prefix and returns a new `Keyspace` @@ -77,6 +96,7 @@ impl Keyspace { Self { store: self.store.clone(), prefix, + table_id: self.table_id, } } @@ -133,8 +153,9 @@ impl Keyspace { limit: Option, epoch: u64, ) -> StorageResult> { - assert!( - start_key[..self.prefix.len()] == self.prefix, + assert_eq!( + start_key[..self.prefix.len()], + self.prefix, "{:?} does not start with prefix {:?}", start_key, self.prefix @@ -143,6 +164,21 @@ impl Keyspace { self.store.scan(range, limit, epoch).await } + /// Scans `limit` keys from the sub keyspace specified by `subspace_key` and get their values. + /// If `limit` is None, all keys of the given prefix will be scanned. + /// + /// The subspace is `[prefix, subspace_prefix]..next_key([prefix, subspace_prefix])`. + pub async fn scan_subspace( + &self, + subspace_prefix: Vec, + limit: Option, + epoch: u64, + ) -> StorageResult> { + let start_key = self.prefixed_key(subspace_prefix); + let end_key = next_key(start_key.as_slice()); + self.store.scan(start_key..end_key, limit, epoch).await + } + /// Scans from the keyspace, and then strips the prefix of this keyspace. /// The returned values are based on a snapshot corresponding to the given `epoch` /// @@ -166,8 +202,11 @@ impl Keyspace { self.store.iter(range, epoch).await } - /// Gets the underlying state store. - pub fn state_store(&self) -> S { - self.store.clone() + pub fn start_write_batch(&self) -> WriteBatch { + self.store.start_write_batch(self.table_id) + } + + pub fn wait_epoch(&self, epoch: u64) -> S::WaitEpochFuture<'_> { + self.store.wait_epoch(epoch, self.table_id) } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index af358d23ae186..a7acce7e17664 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -21,13 +21,15 @@ use std::sync::Arc; use bytes::Bytes; use lazy_static::lazy_static; +use risingwave_hummock_sdk::HummockEpoch; use tokio::sync::Mutex; use crate::storage_value::StorageValue; use crate::store::*; use crate::{define_state_store_associated_type, StateStore, StateStoreIter}; -type KeyWithEpoch = (Bytes, Reverse); +type TableInMemStorage = BTreeMap>; +type KeyWithEpoch = (Bytes, Reverse); /// An in-memory state store /// @@ -37,8 +39,8 @@ type KeyWithEpoch = (Bytes, Reverse); /// store should never be used in production. #[derive(Clone)] pub struct MemoryStateStore { - /// Stores (key, epoch) -> user value. We currently don't consider value meta here. - inner: Arc>>>, + /// Stores table_id -> (key, epoch) -> user value. We currently don't consider value meta here. + inner: Arc>>, } impl Default for MemoryStateStore { @@ -53,13 +55,19 @@ where B: AsRef<[u8]>, { let start = match range.start_bound() { - Included(k) => Included((Bytes::copy_from_slice(k.as_ref()), Reverse(u64::MAX))), + Included(k) => Included(( + Bytes::copy_from_slice(k.as_ref()), + Reverse(HummockEpoch::MAX), + )), Excluded(k) => Excluded((Bytes::copy_from_slice(k.as_ref()), Reverse(0))), Unbounded => Unbounded, }; let end = match range.end_bound() { Included(k) => Included((Bytes::copy_from_slice(k.as_ref()), Reverse(0))), - Excluded(k) => Excluded((Bytes::copy_from_slice(k.as_ref()), Reverse(u64::MAX))), + Excluded(k) => Excluded(( + Bytes::copy_from_slice(k.as_ref()), + Reverse(HummockEpoch::MAX), + )), Unbounded => Unbounded, }; (start, end) @@ -85,7 +93,7 @@ impl StateStore for MemoryStateStore { define_state_store_associated_type!(); - fn get<'a>(&'a self, key: &'a [u8], epoch: u64) -> Self::GetFuture<'_> { + fn get<'a>(&'a self, key: &'a [u8], epoch: HummockEpoch) -> Self::GetFuture<'_> { async move { let range_bounds = key.to_vec()..=key.to_vec(); let res = self.scan(range_bounds, Some(1), epoch).await?; @@ -102,7 +110,7 @@ impl StateStore for MemoryStateStore { &self, key_range: R, limit: Option, - epoch: u64, + epoch: HummockEpoch, ) -> Self::ScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -114,22 +122,26 @@ impl StateStore for MemoryStateStore { return Ok(vec![]); } let inner = self.inner.lock().await; + let byte_range = to_bytes_range(key_range); - let mut last_key = None; - for ((key, Reverse(key_epoch)), value) in inner.range(to_bytes_range(key_range)) { - if *key_epoch > epoch { - continue; - } - if Some(key) != last_key.as_ref() { - if let Some(value) = value { - data.push((key.clone(), value.clone())); + inner.values().for_each(|tree| { + let mut last_key = None; + for ((key, Reverse(key_epoch)), value) in tree.range(byte_range.clone()) { + if *key_epoch > epoch { + continue; + } + if Some(key) != last_key.as_ref() { + if let Some(value) = value { + data.push((key.clone(), value.clone())); + } + last_key = Some(key.clone()); + } + if let Some(limit) = limit && data.len() >= limit { + break; } - last_key = Some(key.clone()); - } - if let Some(limit) = limit && data.len() >= limit { - break; } - } + }); + Ok(data) } } @@ -138,7 +150,7 @@ impl StateStore for MemoryStateStore { &self, _key_range: R, _limit: Option, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReverseScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -150,14 +162,21 @@ impl StateStore for MemoryStateStore { fn ingest_batch( &self, kv_pairs: Vec<(Bytes, StorageValue)>, - epoch: u64, + epoch: HummockEpoch, + table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { - let mut inner = self.inner.lock().await; let mut size: u64 = 0; - for (key, value) in kv_pairs { - size += (key.len() + value.size()) as u64; - inner.insert((key, Reverse(epoch)), value.user_value); + if !kv_pairs.is_empty() { + let mut inner = self.inner.lock().await; + #[allow(clippy::mutable_key_type)] + // TODO: may want to use `Vec` as the key instead of `Bytes` to avoid warning on + // `mutable_key_type`. + let table_store = inner.entry(table_id).or_insert_with(BTreeMap::new); + for (key, value) in kv_pairs { + size += (key.len() + value.size()) as u64; + table_store.insert((key, Reverse(epoch)), value.user_value); + } } Ok(size) } @@ -166,12 +185,12 @@ impl StateStore for MemoryStateStore { fn replicate_batch( &self, _kv_pairs: Vec<(Bytes, StorageValue)>, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReplicateBatchFuture<'_> { async move { unimplemented!() } } - fn iter(&self, key_range: R, epoch: u64) -> Self::IterFuture<'_, R, B> + fn iter(&self, key_range: R, epoch: HummockEpoch) -> Self::IterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -183,7 +202,11 @@ impl StateStore for MemoryStateStore { } } - fn reverse_iter(&self, _key_range: R, _epoch: u64) -> Self::ReverseIterFuture<'_, R, B> + fn reverse_iter( + &self, + _key_range: R, + _epoch: HummockEpoch, + ) -> Self::ReverseIterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -191,14 +214,22 @@ impl StateStore for MemoryStateStore { async move { unimplemented!() } } - fn wait_epoch(&self, _epoch: u64) -> Self::WaitEpochFuture<'_> { + fn wait_epoch( + &self, + _epoch: HummockEpoch, + _table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { async move { // memory backend doesn't support wait for epoch, so this is a no-op. Ok(()) } } - fn sync(&self, _epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + _epoch: Option, + _table_id: Option>, + ) -> Self::SyncFuture<'_> { async move { // memory backend doesn't support push to S3, so this is a no-op Ok(()) @@ -246,6 +277,7 @@ mod tests { ), ], 0, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); @@ -259,6 +291,7 @@ mod tests { (b"b".to_vec().into(), StorageValue::new_default_delete()), ], 1, + GLOBAL_STORAGE_TABLE_ID, ) .await .unwrap(); diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 125aa7f273fc2..dd7c1c3c703be 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use futures::Future; +use risingwave_hummock_sdk::HummockEpoch; use super::StateStoreMetrics; use crate::error::StorageResult; @@ -137,6 +138,7 @@ where &self, kv_pairs: Vec<(Bytes, StorageValue)>, epoch: u64, + table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { if kv_pairs.is_empty() { @@ -147,7 +149,7 @@ where .write_batch_tuple_counts .inc_by(kv_pairs.len() as _); let timer = self.stats.write_batch_duration.start_timer(); - let batch_size = self.inner.ingest_batch(kv_pairs, epoch).await?; + let batch_size = self.inner.ingest_batch(kv_pairs, epoch, table_id).await?; timer.observe_duration(); self.stats.write_batch_size.observe(batch_size as _); @@ -174,14 +176,22 @@ where } } - fn wait_epoch(&self, epoch: u64) -> Self::WaitEpochFuture<'_> { - async move { self.inner.wait_epoch(epoch).await } + fn wait_epoch( + &self, + epoch: HummockEpoch, + table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { + async move { self.inner.wait_epoch(epoch, table_id).await } } - fn sync(&self, epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + epoch: Option, + table_id: Option>, + ) -> Self::SyncFuture<'_> { async move { let timer = self.stats.shared_buffer_to_l0_duration.start_timer(); - self.inner.sync(epoch).await?; + self.inner.sync(epoch, table_id).await?; timer.observe_duration(); Ok(()) } diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 46a0e9b437bd0..4decc789de12b 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -16,6 +16,7 @@ use std::future::Future; use std::ops::RangeBounds; use bytes::Bytes; +use risingwave_hummock_sdk::HummockEpoch; use crate::storage_value::StorageValue; use crate::store::*; @@ -71,6 +72,7 @@ impl StateStore for PanicStateStore { &self, _kv_pairs: Vec<(Bytes, StorageValue)>, _epoch: u64, + _table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { panic!("should not write the state store!"); @@ -107,13 +109,21 @@ impl StateStore for PanicStateStore { } } - fn wait_epoch(&self, _epoch: u64) -> Self::WaitEpochFuture<'_> { + fn wait_epoch( + &self, + _epoch: HummockEpoch, + _table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { async move { panic!("should not wait epoch from the panic state store!"); } } - fn sync(&self, _epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + _epoch: Option, + _table_id: Option>, + ) -> Self::SyncFuture<'_> { async move { panic!("should not sync from the panic state store!"); } diff --git a/src/storage/src/rocksdb_local_mock.rs b/src/storage/src/rocksdb_local_mock.rs index 82b29863c6d63..a64b1622dd5a1 100644 --- a/src/storage/src/rocksdb_local_mock.rs +++ b/src/storage/src/rocksdb_local_mock.rs @@ -19,6 +19,7 @@ use std::ops::RangeBounds; use bytes::Bytes; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_hummock_sdk::HummockEpoch; use super::{StateStore, StateStoreIter}; use crate::define_state_store_associated_type; @@ -39,7 +40,7 @@ impl StateStore for RocksDBStateStore { define_state_store_associated_type!(); - fn get<'a>(&'a self, _key: &'a [u8], _epoch: u64) -> Self::GetFuture<'_> { + fn get<'a>(&'a self, _key: &'a [u8], _epoch: HummockEpoch) -> Self::GetFuture<'_> { async move { unimplemented!() } } @@ -47,7 +48,7 @@ impl StateStore for RocksDBStateStore { &self, _key_range: R, _limit: Option, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -60,7 +61,7 @@ impl StateStore for RocksDBStateStore { &self, _key_range: R, _limit: Option, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReverseScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -72,7 +73,8 @@ impl StateStore for RocksDBStateStore { fn ingest_batch( &self, _kv_pairs: Vec<(Bytes, StorageValue)>, - _epoch: u64, + _epoch: HummockEpoch, + _table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { unimplemented!() } } @@ -80,12 +82,12 @@ impl StateStore for RocksDBStateStore { fn replicate_batch( &self, _kv_pairs: Vec<(Bytes, StorageValue)>, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReplicateBatchFuture<'_> { async move { unimplemented!() } } - fn iter(&self, _key_range: R, _epoch: u64) -> Self::IterFuture<'_, R, B> + fn iter(&self, _key_range: R, _epoch: HummockEpoch) -> Self::IterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -93,7 +95,11 @@ impl StateStore for RocksDBStateStore { async move { unimplemented!() } } - fn reverse_iter(&self, _key_range: R, _epoch: u64) -> Self::ReverseIterFuture<'_, R, B> + fn reverse_iter( + &self, + _key_range: R, + _epoch: HummockEpoch, + ) -> Self::ReverseIterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -101,11 +107,19 @@ impl StateStore for RocksDBStateStore { async move { unimplemented!() } } - fn wait_epoch(&self, _epoch: u64) -> Self::WaitEpochFuture<'_> { + fn wait_epoch( + &self, + _epoch: HummockEpoch, + _table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { async move { unimplemented!() } } - fn sync(&self, _epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + _epoch: Option, + _table_id: Option>, + ) -> Self::SyncFuture<'_> { async move { unimplemented!() } } } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index d0521dca720dc..5f80344469f99 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -16,6 +16,7 @@ use std::ops::RangeBounds; use std::sync::Arc; use bytes::Bytes; +use risingwave_hummock_sdk::HummockEpoch; use crate::error::StorageResult; use crate::monitor::{MonitoredStateStore, StateStoreMetrics}; @@ -27,6 +28,19 @@ pub trait ScanFutureTrait<'a, R, B> = Future = Future> + Send; pub trait IngestBatchFutureTrait<'a> = Future> + Send; +/// Table id of storage table, which is a logical concept representing the kvs written by a specific +/// relational table. A storage table usually stores the kvs written from a specific +/// stateful operator. Each storage table has its own `max committed epoch` +/// +/// A storage table id is usually the operator id, or also the table id of the relational table. +/// +/// Such concept is introduced for finer-grained tracking about where the kvs are written from, so +/// that we are able to support partial checkpoint, vertical grouping and shared arrangement in the +/// future. +pub type StorageTableId = u64; +// TODO: should only use this in test after partial checkpoint is fully implemented +pub const GLOBAL_STORAGE_TABLE_ID: StorageTableId = 0x2333abcd; + #[macro_export] macro_rules! define_state_store_associated_type { () => { @@ -79,7 +93,7 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// Point gets a value from the state store. /// The result is based on a snapshot corresponding to the given `epoch`. - fn get<'a>(&'a self, key: &'a [u8], epoch: u64) -> Self::GetFuture<'_>; + fn get<'a>(&'a self, key: &'a [u8], epoch: HummockEpoch) -> Self::GetFuture<'_>; /// Scans `limit` number of keys from a key range. If `limit` is `None`, scans all elements. /// The result is based on a snapshot corresponding to the given `epoch`. @@ -90,17 +104,18 @@ pub trait StateStore: Send + Sync + 'static + Clone { &self, key_range: R, limit: Option, - epoch: u64, + epoch: HummockEpoch, ) -> Self::ScanFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send; + /// Similar to `scan` but scan from a reverse direction. fn reverse_scan( &self, key_range: R, limit: Option, - epoch: u64, + epoch: HummockEpoch, ) -> Self::ReverseScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -115,23 +130,26 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// - A version of a kv pair. kv pair associated with larger `Epoch` is guaranteed to be newer /// then kv pair with smaller `Epoch`. Currently this version is only used to derive the /// per-key modification history (e.g. in compaction), not across different keys. + /// + /// `table_id` is used to specify the storage table to write to. fn ingest_batch( &self, kv_pairs: Vec<(Bytes, StorageValue)>, - epoch: u64, + epoch: HummockEpoch, + table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_>; /// Functions the same as `ingest_batch`, except that data won't be persisted. fn replicate_batch( &self, kv_pairs: Vec<(Bytes, StorageValue)>, - epoch: u64, + epoch: HummockEpoch, ) -> Self::ReplicateBatchFuture<'_>; /// Opens and returns an iterator for given `key_range`. /// The returned iterator will iterate data based on a snapshot corresponding to the given /// `epoch`. - fn iter(&self, key_range: R, epoch: u64) -> Self::IterFuture<'_, R, B> + fn iter(&self, key_range: R, epoch: HummockEpoch) -> Self::IterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send; @@ -139,23 +157,43 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// Opens and returns a reversed iterator for given `key_range`. /// The returned iterator will iterate data based on a snapshot corresponding to the given /// `epoch` - fn reverse_iter(&self, key_range: R, epoch: u64) -> Self::ReverseIterFuture<'_, R, B> + fn reverse_iter( + &self, + key_range: R, + epoch: HummockEpoch, + ) -> Self::ReverseIterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send; /// Creates a `WriteBatch` associated with this state store. - fn start_write_batch(&self) -> WriteBatch { - WriteBatch::new(self.clone()) + /// + /// `table_id` is used to specify the storage group to write to. + fn start_write_batch(&self, table_id: StorageTableId) -> WriteBatch { + WriteBatch::new(self.clone(), table_id) } /// Waits until the epoch is committed and its data is ready to read. - fn wait_epoch(&self, epoch: u64) -> Self::WaitEpochFuture<'_>; + /// + /// `table_id` is used to specify the storage table to wait for when specified. If `None`, it + /// will wait for all tables. + fn wait_epoch( + &self, + epoch: HummockEpoch, + table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_>; /// Syncs buffered data to S3. /// If the epoch is None, all buffered data will be synced. /// Otherwise, only data of the provided epoch will be synced. - fn sync(&self, epoch: Option) -> Self::SyncFuture<'_>; + /// + /// `table_id` is used to specify the storage table to sync to when specified. If `None`, it + /// will sync all tables. + fn sync( + &self, + epoch: Option, + table_id: Option>, + ) -> Self::SyncFuture<'_>; /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`. fn monitored(self, stats: Arc) -> MonitoredStateStore { diff --git a/src/storage/src/table/cell_based_table.rs b/src/storage/src/table/cell_based_table.rs index 54e828c99bc12..cbb71e2b881ef 100644 --- a/src/storage/src/table/cell_based_table.rs +++ b/src/storage/src/table/cell_based_table.rs @@ -22,7 +22,6 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::ordered::*; use risingwave_common::util::sort_util::OrderType; -use risingwave_hummock_sdk::key::next_key; use super::TableIter; use crate::cell_based_row_deserializer::CellBasedRowDeserializer; @@ -159,16 +158,10 @@ impl CellBasedTable { pub async fn get_row_by_scan(&self, pk: &Row, epoch: u64) -> StorageResult> { // get row by state_store scan let pk_serializer = self.pk_serializer.as_ref().expect("pk_serializer is None"); - let start_key = self - .keyspace - .prefixed_key(&serialize_pk(pk, pk_serializer).map_err(err)?); - let end_key = next_key(&start_key); - - let state_store_range_scan_res = self - .keyspace - .state_store() - .scan(start_key..end_key, None, epoch) - .await?; + let sub_prefix = serialize_pk(pk, pk_serializer).map_err(err)?; + + let state_store_range_scan_res = + self.keyspace.scan_subspace(sub_prefix, None, epoch).await?; let mut cell_based_row_deserializer = CellBasedRowDeserializer::new(self.column_descs.clone()); for (key, value) in state_store_range_scan_res { @@ -188,7 +181,7 @@ impl CellBasedTable { rows: Vec<(Row, Option)>, epoch: u64, ) -> StorageResult<()> { - let mut batch = self.keyspace.state_store().start_write_batch(); + let mut batch = self.keyspace.start_write_batch(); let mut local = batch.prefixify(&self.keyspace); for (pk, cell_values) in rows { let arrange_key_buf = @@ -254,7 +247,7 @@ impl CellBasedTableRowIter { epoch: u64, _stats: Arc, ) -> StorageResult { - keyspace.state_store().wait_epoch(epoch).await?; + keyspace.wait_epoch(epoch).await?; let cell_based_row_deserializer = CellBasedRowDeserializer::new(table_descs); diff --git a/src/storage/src/tikv_mock.rs b/src/storage/src/tikv_mock.rs index 38cb20435a043..85d18076750e9 100644 --- a/src/storage/src/tikv_mock.rs +++ b/src/storage/src/tikv_mock.rs @@ -16,6 +16,7 @@ use std::future::Future; use std::ops::RangeBounds; use bytes::Bytes; +use risingwave_hummock_sdk::HummockEpoch; use super::StateStore; use crate::storage_value::StorageValue; @@ -36,19 +37,20 @@ impl StateStore for TikvStateStore { define_state_store_associated_type!(); - fn get<'a>(&'a self, _key: &'a [u8], _epoch: u64) -> Self::GetFuture<'_> { + fn get<'a>(&'a self, _key: &'a [u8], _epoch: HummockEpoch) -> Self::GetFuture<'_> { async move { unimplemented!() } } fn ingest_batch( &self, _kv_pairs: Vec<(Bytes, StorageValue)>, - _epoch: u64, + _epoch: HummockEpoch, + _table_id: StorageTableId, ) -> Self::IngestBatchFuture<'_> { async move { unimplemented!() } } - fn iter(&self, _key_range: R, _epoch: u64) -> Self::IterFuture<'_, R, B> + fn iter(&self, _key_range: R, _epoch: HummockEpoch) -> Self::IterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -59,12 +61,16 @@ impl StateStore for TikvStateStore { fn replicate_batch( &self, _kv_pairs: Vec<(Bytes, StorageValue)>, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReplicateBatchFuture<'_> { async move { unimplemented!() } } - fn reverse_iter(&self, _key_range: R, _epoch: u64) -> Self::ReverseIterFuture<'_, R, B> + fn reverse_iter( + &self, + _key_range: R, + _epoch: HummockEpoch, + ) -> Self::ReverseIterFuture<'_, R, B> where R: RangeBounds + Send, B: AsRef<[u8]> + Send, @@ -72,11 +78,19 @@ impl StateStore for TikvStateStore { async move { unimplemented!() } } - fn wait_epoch(&self, _epoch: u64) -> Self::WaitEpochFuture<'_> { + fn wait_epoch( + &self, + _epoch: HummockEpoch, + _table_id: StorageTableId, + ) -> Self::WaitEpochFuture<'_> { async move { unimplemented!() } } - fn sync(&self, _epoch: Option) -> Self::SyncFuture<'_> { + fn sync( + &self, + _epoch: Option, + _table_id: Option>, + ) -> Self::SyncFuture<'_> { async move { unimplemented!() } } @@ -84,7 +98,7 @@ impl StateStore for TikvStateStore { &self, _key_range: R, _limit: Option, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ScanFuture<'_, R, B> where R: RangeBounds + Send, @@ -97,7 +111,7 @@ impl StateStore for TikvStateStore { &self, _key_range: R, _limit: Option, - _epoch: u64, + _epoch: HummockEpoch, ) -> Self::ReverseScanFuture<'_, R, B> where R: RangeBounds + Send, diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 44c336cb57eb7..582c3e3d189bd 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -17,6 +17,7 @@ use bytes::Bytes; use crate::error::StorageResult; use crate::hummock::HummockError; use crate::storage_value::StorageValue; +use crate::store::StorageTableId; use crate::{Keyspace, StateStore}; /// [`WriteBatch`] wraps a list of key-value pairs and an associated [`StateStore`]. @@ -24,6 +25,8 @@ pub struct WriteBatch { store: S, batch: Vec<(Bytes, StorageValue)>, + + table_id: StorageTableId, } impl WriteBatch @@ -31,18 +34,20 @@ where S: StateStore, { /// Constructs a new, empty [`WriteBatch`] with the given `store`. - pub fn new(store: S) -> Self { + pub fn new(store: S, table_id: StorageTableId) -> Self { Self { store, batch: Vec::new(), + table_id, } } /// Constructs a new, empty [`WriteBatch`] with the given `store` and specified capacity. - pub fn with_capacity(store: S, capacity: usize) -> Self { + pub fn with_capacity(store: S, capacity: usize, table_id: StorageTableId) -> Self { Self { store, batch: Vec::with_capacity(capacity), + table_id, } } @@ -82,7 +87,9 @@ where /// Ingests this batch into the associated state store. pub async fn ingest(mut self, epoch: u64) -> StorageResult<()> { self.preprocess()?; - self.store.ingest_batch(self.batch, epoch).await?; + self.store + .ingest_batch(self.batch, epoch, self.table_id) + .await?; Ok(()) } @@ -154,12 +161,13 @@ mod tests { use super::WriteBatch; use crate::memory::MemoryStateStore; use crate::storage_value::StorageValue; + use crate::store::GLOBAL_STORAGE_TABLE_ID; use crate::Keyspace; #[tokio::test] async fn test_invalid_write_batch() { let state_store = MemoryStateStore::new(); - let mut write_batch = WriteBatch::new(state_store.clone()); + let mut write_batch = WriteBatch::new(state_store.clone(), GLOBAL_STORAGE_TABLE_ID); let key_space = Keyspace::executor_root(state_store, 0x118); assert!(write_batch.is_empty()); diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index bd1ee5f8d477a..134057bf0ff74 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -393,7 +393,7 @@ impl HashJoinExecutor { async fn flush_data(&mut self) -> Result<()> { let epoch = self.executor_state().epoch(); for side in [&mut self.side_l, &mut self.side_r] { - let mut write_batch = side.keyspace.state_store().start_write_batch(); + let mut write_batch = side.keyspace.start_write_batch(); for state in side.ht.values_mut() { state.flush(&mut write_batch)?; } diff --git a/src/stream/src/executor/managed_state/aggregation/extreme.rs b/src/stream/src/executor/managed_state/aggregation/extreme.rs index e203d5c358105..dd3f4b1f94819 100644 --- a/src/stream/src/executor/managed_state/aggregation/extreme.rs +++ b/src/stream/src/executor/managed_state/aggregation/extreme.rs @@ -470,6 +470,7 @@ mod tests { use risingwave_common::array::{I64Array, Op}; use risingwave_common::types::ScalarImpl; use risingwave_storage::memory::MemoryStateStore; + use risingwave_storage::store::GLOBAL_STORAGE_TABLE_ID; use smallvec::smallvec; use super::*; @@ -505,7 +506,7 @@ mod tests { assert!(managed_state.is_dirty()); // flush to write batch and write to state store - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -533,7 +534,7 @@ mod tests { // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -560,7 +561,7 @@ mod tests { // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -583,7 +584,7 @@ mod tests { // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -606,7 +607,7 @@ mod tests { // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -719,7 +720,7 @@ mod tests { .unwrap(); // flush - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -734,7 +735,7 @@ mod tests { // flush epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -820,7 +821,7 @@ mod tests { .unwrap(); // flush - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -835,7 +836,7 @@ mod tests { // flush epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -884,7 +885,7 @@ mod tests { // only ingest after insert in some cases // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); } @@ -896,7 +897,7 @@ mod tests { // flush to write batch and write to state store epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -986,7 +987,7 @@ mod tests { .unwrap(); // flush to write batch and write to state store - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); @@ -1009,7 +1010,7 @@ mod tests { keyspace: &Keyspace, epoch: u64, ) { - let mut write_batch = keyspace.state_store().start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); } diff --git a/src/stream/src/executor/managed_state/aggregation/string_agg.rs b/src/stream/src/executor/managed_state/aggregation/string_agg.rs index 4715281f233ab..40f79cb6a7117 100644 --- a/src/stream/src/executor/managed_state/aggregation/string_agg.rs +++ b/src/stream/src/executor/managed_state/aggregation/string_agg.rs @@ -310,7 +310,6 @@ mod tests { #[tokio::test] async fn test_managed_string_agg_state() { let keyspace = create_in_memory_keyspace(); - let store = keyspace.state_store(); let mut managed_state = create_managed_state(&keyspace, 0).await; assert!(!managed_state.is_dirty()); let mut epoch: u64 = 0; @@ -340,7 +339,7 @@ mod tests { Some(ScalarImpl::Utf8("ghi||def||abc".to_string())) ); - let mut write_batch = store.start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); assert!(!managed_state.is_dirty()); @@ -371,7 +370,7 @@ mod tests { ); epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); assert!(!managed_state.is_dirty()); @@ -403,7 +402,7 @@ mod tests { ); epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); assert!(!managed_state.is_dirty()); @@ -488,7 +487,7 @@ mod tests { .unwrap(); epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); assert!(!managed_state.is_dirty()); @@ -517,7 +516,7 @@ mod tests { ); epoch += 1; - let mut write_batch = store.start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); assert!(!managed_state.is_dirty()); diff --git a/src/stream/src/executor/managed_state/aggregation/value.rs b/src/stream/src/executor/managed_state/aggregation/value.rs index d9a38a246f82c..dbe5f799f7e12 100644 --- a/src/stream/src/executor/managed_state/aggregation/value.rs +++ b/src/stream/src/executor/managed_state/aggregation/value.rs @@ -157,7 +157,7 @@ mod tests { // flush to write batch and write to state store let epoch: u64 = 0; - let mut write_batch = keyspace.state_store().start_write_batch(); + let mut write_batch = keyspace.start_write_batch(); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); diff --git a/src/stream/src/executor/managed_state/join/join_entry_state.rs b/src/stream/src/executor/managed_state/join/join_entry_state.rs index 00edb932b4261..94da9df17a552 100644 --- a/src/stream/src/executor/managed_state/join/join_entry_state.rs +++ b/src/stream/src/executor/managed_state/join/join_entry_state.rs @@ -216,6 +216,7 @@ mod tests { use risingwave_common::column_nonnull; use risingwave_common::types::ScalarImpl; use risingwave_storage::memory::MemoryStateStore; + use risingwave_storage::store::GLOBAL_STORAGE_TABLE_ID; use super::*; @@ -261,7 +262,7 @@ mod tests { } // flush to write batch and write to state store - let mut write_batch = store.start_write_batch(); + let mut write_batch = store.start_write_batch(GLOBAL_STORAGE_TABLE_ID); managed_state.flush(&mut write_batch).unwrap(); write_batch.ingest(epoch).await.unwrap(); diff --git a/src/stream/src/executor/managed_state/top_n/top_n_bottom_n_state.rs b/src/stream/src/executor/managed_state/top_n/top_n_bottom_n_state.rs index b56e250ae8194..df272e73bb59d 100644 --- a/src/stream/src/executor/managed_state/top_n/top_n_bottom_n_state.rs +++ b/src/stream/src/executor/managed_state/top_n/top_n_bottom_n_state.rs @@ -300,7 +300,7 @@ impl ManagedTopNBottomNState { return Ok(()); } - let mut write_batch = self.keyspace.state_store().start_write_batch(); + let mut write_batch = self.keyspace.start_write_batch(); let mut local = write_batch.prefixify(&self.keyspace); for (pk, cells) in std::mem::take(&mut self.flush_buffer) { diff --git a/src/stream/src/executor/managed_state/top_n/top_n_state.rs b/src/stream/src/executor/managed_state/top_n/top_n_state.rs index 685d9f5acaddb..ba0bb105c58d5 100644 --- a/src/stream/src/executor/managed_state/top_n/top_n_state.rs +++ b/src/stream/src/executor/managed_state/top_n/top_n_state.rs @@ -335,7 +335,7 @@ impl ManagedTopNState { iterator: impl Iterator)>, epoch: u64, ) -> Result<()> { - let mut write_batch = self.keyspace.state_store().start_write_batch(); + let mut write_batch = self.keyspace.start_write_batch(); let mut local = write_batch.prefixify(&self.keyspace); for (pk, cells) in iterator { let row = cells.into_option(); diff --git a/src/stream/src/executor_v2/global_simple_agg.rs b/src/stream/src/executor_v2/global_simple_agg.rs index e6605885e9883..6f8f7d6d77122 100644 --- a/src/stream/src/executor_v2/global_simple_agg.rs +++ b/src/stream/src/executor_v2/global_simple_agg.rs @@ -189,7 +189,7 @@ impl AggExecutor for AggSimpleAggExecutor { _ => return Ok(None), // Nothing to flush. }; - let mut write_batch = self.keyspace.state_store().start_write_batch(); + let mut write_batch = self.keyspace.start_write_batch(); for state in &mut states.managed_states { state .flush(&mut write_batch) diff --git a/src/stream/src/executor_v2/hash_agg.rs b/src/stream/src/executor_v2/hash_agg.rs index 72c5cc6135f8f..3f29c38c03eb8 100644 --- a/src/stream/src/executor_v2/hash_agg.rs +++ b/src/stream/src/executor_v2/hash_agg.rs @@ -283,7 +283,7 @@ impl AggExecutor for AggHashAggExecutor { // Some state will have the correct output only after their internal states have been fully // flushed. let (write_batch, dirty_cnt) = { - let mut write_batch = self.keyspace.state_store().start_write_batch(); + let mut write_batch = self.keyspace.start_write_batch(); let mut dirty_cnt = 0; for states in self.state_map.values_mut() { diff --git a/src/stream/src/executor_v2/mview/state.rs b/src/stream/src/executor_v2/mview/state.rs index bc1d4a26f6f17..12e3a8e9904cd 100644 --- a/src/stream/src/executor_v2/mview/state.rs +++ b/src/stream/src/executor_v2/mview/state.rs @@ -77,7 +77,7 @@ impl ManagedMViewState { } pub async fn flush(&mut self, epoch: u64) -> Result<()> { - let mut batch = self.keyspace.state_store().start_write_batch(); + let mut batch = self.keyspace.start_write_batch(); batch.reserve(self.cache.len() * self.column_ids.len()); let mut local = batch.prefixify(&self.keyspace); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 4dd712c601203..fc1d7ad8e5d3d 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -30,6 +30,7 @@ use risingwave_expr::expr::AggKind; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan::stream_node::Node; use risingwave_pb::{expr, stream_plan, stream_service}; +use risingwave_storage::store::GLOBAL_STORAGE_TABLE_ID; use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -178,7 +179,14 @@ impl LocalStreamManager { // Sync states from shared buffer to S3 before telling meta service we've done. dispatch_state_store!(self.state_store(), store, { - match store.sync(Some(barrier.epoch.prev)).await { + // TODO(partial checkpoint): use the table id obtained locally. + match store + .sync( + Some(barrier.epoch.prev), + Some(vec![GLOBAL_STORAGE_TABLE_ID]), + ) + .await + { Ok(_) => {} // TODO: Handle sync failure by propagating it // back to global barrier manager diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 4ef5e2fb38d85..eb9891afec98e 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -17,8 +17,6 @@ anyhow = { version = "1", features = ["backtrace", "std"] } axum = { version = "0.5", features = ["form", "http1", "json", "matched-path", "original-uri", "query", "serde_json", "serde_urlencoded", "tower-log"] } bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] } bytes = { version = "1", features = ["serde", "std"] } -crossbeam-deque = { version = "0.8", features = ["crossbeam-epoch", "crossbeam-utils", "std"] } -crossbeam-utils = { version = "0.8", features = ["lazy_static", "std"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } fixedbitset = { version = "0.4", features = ["std"] } @@ -65,8 +63,6 @@ anyhow = { version = "1", features = ["backtrace", "std"] } axum = { version = "0.5", features = ["form", "http1", "json", "matched-path", "original-uri", "query", "serde_json", "serde_urlencoded", "tower-log"] } bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] } bytes = { version = "1", features = ["serde", "std"] } -crossbeam-deque = { version = "0.8", features = ["crossbeam-epoch", "crossbeam-utils", "std"] } -crossbeam-utils = { version = "0.8", features = ["lazy_static", "std"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } fixedbitset = { version = "0.4", features = ["std"] }