diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 6653db94e0543..8b264b6fb186c 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -138,10 +138,17 @@ impl ReplayRead for GlobalReplayImpl { #[async_trait::async_trait] impl ReplayStateStore for GlobalReplayImpl { - async fn sync(&self, id: u64, table_ids: Vec) -> Result { + async fn sync(&self, sync_table_epochs: Vec<(u64, Vec)>) -> Result { let result: SyncResult = self .store - .sync(id, table_ids.into_iter().map(TableId::new).collect()) + .sync( + sync_table_epochs + .into_iter() + .map(|(epoch, table_ids)| { + (epoch, table_ids.into_iter().map(TableId::new).collect()) + }) + .collect(), + ) .await .map_err(|e| TraceError::SyncFailed(format!("{e}")))?; Ok(result.sync_size) diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 068cbdcee45ed..a3962c1bf8834 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -25,7 +25,7 @@ use bincode::{Decode, Encode}; use bytes::Bytes; use parking_lot::Mutex; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use risingwave_pb::meta::SubscribeResponse; use tokio::runtime::Runtime; use tokio::sync::mpsc::{ @@ -281,14 +281,20 @@ impl TraceSpan { } pub fn new_sync_span( - epoch: u64, - table_ids: &HashSet, + sync_table_epochs: &Vec<(HummockEpoch, HashSet)>, storage_type: StorageType, ) -> MayTraceSpan { Self::new_global_op( Operation::Sync( - epoch, - table_ids.iter().map(|table_id| table_id.table_id).collect(), + sync_table_epochs + .iter() + .map(|(epoch, table_ids)| { + ( + *epoch, + table_ids.iter().map(|table_id| table_id.table_id).collect(), + ) + }) + .collect(), ), storage_type, ) diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index a9ae562f02b4c..e740ce3158baa 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -146,7 +146,7 @@ pub enum Operation { IterNext(RecordId), /// Sync operation of Hummock. - Sync(u64, Vec), + Sync(Vec<(u64, Vec)>), /// `MetaMessage` operation of Hummock. MetaMessage(Box), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 347ef30704570..d5262a69f7629 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -115,7 +115,7 @@ pub trait ReplayWrite { #[cfg_attr(test, automock)] #[async_trait::async_trait] pub trait ReplayStateStore { - async fn sync(&self, id: u64, table_ids: Vec) -> Result; + async fn sync(&self, sync_table_epochs: Vec<(u64, Vec)>) -> Result; async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch( @@ -147,7 +147,7 @@ mock! { } #[async_trait::async_trait] impl ReplayStateStore for GlobalReplayInterface{ - async fn sync(&self, id: u64, table_ids: Vec) -> Result; + async fn sync(&self, sync_table_epochs: Vec<(u64, Vec)>) -> Result; async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64, ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; diff --git a/src/storage/hummock_trace/src/replay/runner.rs b/src/storage/hummock_trace/src/replay/runner.rs index 3794671ace2a5..911e1f5ddfd84 100644 --- a/src/storage/hummock_trace/src/replay/runner.rs +++ b/src/storage/hummock_trace/src/replay/runner.rs @@ -196,7 +196,7 @@ mod tests { let mut non_local: Vec> = vec![ (12, Operation::Finish), - (13, Operation::Sync(sync_id, vec![1, 2, 3])), + (13, Operation::Sync(vec![(sync_id, vec![1, 2, 3])])), ( 13, Operation::Result(OperationResult::Sync(TraceResult::Ok(0))), @@ -244,9 +244,9 @@ mod tests { mock_replay .expect_sync() - .with(predicate::eq(sync_id), predicate::eq(vec![1, 2, 3])) + .with(predicate::eq(vec![(sync_id, vec![1, 2, 3])])) .times(1) - .returning(|_, _| Ok(0)); + .returning(|_| Ok(0)); let mut replay = HummockReplay::new(mock_reader, mock_replay); diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 08d877cadf3ab..65a9faf4d8122 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -257,9 +257,9 @@ impl ReplayWorker { panic!("expect iter result, but got {:?}", res); } } - Operation::Sync(epoch_id, table_ids) => { + Operation::Sync(sync_table_epochs) => { assert_eq!(storage_type, StorageType::Global); - let sync_result = replay.sync(epoch_id, table_ids).await.unwrap(); + let sync_result = replay.sync(sync_table_epochs).await.unwrap(); let res = res_rx.recv().await.expect("recv result failed"); if let OperationResult::Sync(expected) = res { assert_eq!(TraceResult::Ok(sync_result), expected, "sync failed"); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 0b8cadaf4c97e..a2da11bee7159 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -463,17 +463,12 @@ impl HummockEventHandler { fn handle_sync_epoch( &mut self, - new_sync_epoch: HummockEpoch, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, sync_result_sender: oneshot::Sender>, - table_ids: HashSet, ) { - debug!( - new_sync_epoch, - ?table_ids, - "awaiting for epoch to be synced", - ); + debug!(?sync_table_epochs, "awaiting for epoch to be synced",); self.uploader - .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); + .start_sync_epoch(sync_result_sender, sync_table_epochs); } fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option>) { @@ -641,11 +636,10 @@ impl HummockEventHandler { self.uploader.may_flush(); } HummockEvent::SyncEpoch { - new_sync_epoch, sync_result_sender, - table_ids, + sync_table_epochs, } => { - self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); + self.handle_sync_epoch(sync_table_epochs, sync_result_sender); } HummockEvent::Clear(notifier, table_ids) => { self.handle_clear(notifier, table_ids); @@ -1013,16 +1007,14 @@ mod tests { let (tx1, mut rx1) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { - new_sync_epoch: epoch1, sync_result_sender: tx1, - table_ids: HashSet::from_iter([TEST_TABLE_ID]), + sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))], }); assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await); let (tx2, mut rx2) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { - new_sync_epoch: epoch2, sync_result_sender: tx2, - table_ids: HashSet::from_iter([TEST_TABLE_ID]), + sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))], }); assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await); @@ -1144,9 +1136,8 @@ mod tests { let sync_epoch = |table_id, new_sync_epoch| { let (tx, rx) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { - new_sync_epoch, sync_result_sender: tx, - table_ids: HashSet::from_iter([table_id]), + sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))], }); rx }; @@ -1281,9 +1272,8 @@ mod tests { vec![imm1_2_2.batch_id()], )])); send_event(HummockEvent::SyncEpoch { - new_sync_epoch: epoch2, sync_result_sender: tx2, - table_ids: HashSet::from_iter([table_id1]), + sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))], }); wait_task_start.await; assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 46b44c051fdff..a5253c7ac6566 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -59,9 +59,8 @@ pub enum HummockEvent { /// task on this epoch. Previous concurrent flush task join handle will be returned by the join /// handle sender. SyncEpoch { - new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, - table_ids: HashSet, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, }, /// Clear shared buffer and reset all states @@ -117,10 +116,9 @@ impl HummockEvent { HummockEvent::BufferMayFlush => "BufferMayFlush".to_string(), HummockEvent::SyncEpoch { - new_sync_epoch, sync_result_sender: _, - table_ids, - } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), + sync_table_epochs, + } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs), HummockEvent::Clear(_, table_ids) => { format!("Clear {:?}", table_ids) diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 96b565c00ef49..3160c4526e00f 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -945,60 +945,72 @@ impl UnsyncData { impl UploaderData { fn sync( &mut self, - epoch: HummockEpoch, context: &UploaderContext, - table_ids: HashSet, sync_result_sender: oneshot::Sender>, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, ) { let mut all_table_watermarks = HashMap::new(); let mut uploading_tasks = HashSet::new(); let mut spilled_tasks = BTreeSet::new(); + let mut all_table_ids = HashSet::new(); let mut flush_payload = HashMap::new(); - if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, &table_ids) { - let min_table_id_data = self - .unsync_data - .table_data - .get_mut(&min_table_id) - .expect("should exist"); - let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch); - for epoch in epochs.keys() { - assert_eq!( - self.unsync_data - .unsync_epochs - .remove(&UnsyncEpochId(*epoch, min_table_id)) - .expect("should exist"), - table_ids + for (epoch, table_ids) in &sync_table_epochs { + let epoch = *epoch; + for table_id in table_ids { + assert!( + all_table_ids.insert(*table_id), + "duplicate sync table epoch: {:?} {:?}", + all_table_ids, + sync_table_epochs ); } - for table_id in &table_ids { - let table_data = self + if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) { + let min_table_id_data = self .unsync_data .table_data - .get_mut(table_id) + .get_mut(&min_table_id) .expect("should exist"); - let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) = - table_data.sync(epoch); - assert_eq!(table_unsync_epochs, epochs); - for (instance_id, payload) in unflushed_payload { - if !payload.is_empty() { - flush_payload.insert(instance_id, payload); - } - } - if let Some((direction, watermarks)) = table_watermarks { - Self::add_table_watermarks( - &mut all_table_watermarks, - *table_id, - direction, - watermarks, + let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch); + for epoch in epochs.keys() { + assert_eq!( + &self + .unsync_data + .unsync_epochs + .remove(&UnsyncEpochId(*epoch, min_table_id)) + .expect("should exist"), + table_ids ); } - for task_id in task_ids { - if self.unsync_data.spilled_data.contains_key(&task_id) { - spilled_tasks.insert(task_id); - } else { - uploading_tasks.insert(task_id); + for table_id in table_ids { + let table_data = self + .unsync_data + .table_data + .get_mut(table_id) + .expect("should exist"); + let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) = + table_data.sync(epoch); + assert_eq!(table_unsync_epochs, epochs); + for (instance_id, payload) in unflushed_payload { + if !payload.is_empty() { + flush_payload.insert(instance_id, payload); + } + } + if let Some((direction, watermarks)) = table_watermarks { + Self::add_table_watermarks( + &mut all_table_watermarks, + *table_id, + direction, + watermarks, + ); + } + for task_id in task_ids { + if self.unsync_data.spilled_data.contains_key(&task_id) { + spilled_tasks.insert(task_id); + } else { + uploading_tasks.insert(task_id); + } } } } @@ -1015,7 +1027,7 @@ impl UploaderData { sync_id, flush_payload, uploading_tasks.iter().cloned(), - &table_ids, + &all_table_ids, ) { uploading_tasks.insert(extra_flush_task_id); } @@ -1031,10 +1043,10 @@ impl UploaderData { .remove(task_id) .expect("should exist"); assert!( - spill_table_ids.is_subset(&table_ids), + spill_table_ids.is_subset(&all_table_ids), "spilled tabled ids {:?} not a subset of sync table id {:?}", spill_table_ids, - table_ids + all_table_ids ); sst }) @@ -1043,8 +1055,7 @@ impl UploaderData { self.syncing_data.insert( sync_id, SyncingData { - sync_epoch: epoch, - table_ids, + sync_table_epochs, remaining_uploading_tasks: uploading_tasks, uploaded, table_watermarks: all_table_watermarks, @@ -1068,8 +1079,7 @@ impl UnsyncData { } struct SyncingData { - sync_epoch: HummockEpoch, - table_ids: HashSet, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, remaining_uploading_tasks: HashSet, // newer data at the front uploaded: VecDeque>, @@ -1136,8 +1146,15 @@ impl UploaderData { self.unsync_data .clear_tables(&table_ids, &mut self.task_manager); self.syncing_data.retain(|sync_id, syncing_data| { - if !syncing_data.table_ids.is_disjoint(&table_ids) { - assert!(syncing_data.table_ids.is_subset(&table_ids)); + if syncing_data + .sync_table_epochs + .iter() + .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids)) + { + assert!(syncing_data + .sync_table_epochs + .iter() + .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))); for task_id in &syncing_data.remaining_uploading_tasks { match self .task_manager @@ -1179,7 +1196,7 @@ impl UploaderData { } struct ErrState { - failed_epoch: HummockEpoch, + failed_sync_table_epochs: Vec<(HummockEpoch, HashSet)>, reason: String, } @@ -1295,27 +1312,26 @@ impl HummockUploader { pub(super) fn start_sync_epoch( &mut self, - epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, - table_ids: HashSet, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, ) { let data = match &mut self.state { UploaderState::Working(data) => data, UploaderState::Err(ErrState { - failed_epoch, + failed_sync_table_epochs, reason, }) => { let result = Err(HummockError::other(format!( - "previous epoch {} failed due to [{}]", - failed_epoch, reason + "previous sync epoch {:?} failed due to [{}]", + failed_sync_table_epochs, reason ))); send_sync_result(sync_result_sender, result); return; } }; - debug!(epoch, ?table_ids, "start sync epoch"); + debug!(?sync_table_epochs, "start sync epoch"); - data.sync(epoch, &self.context, table_ids, sync_result_sender); + data.sync(&self.context, sync_result_sender, sync_table_epochs); data.may_notify_sync_task(&self.context); @@ -1438,8 +1454,7 @@ impl UploaderData { { let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty"); let SyncingData { - sync_epoch, - table_ids, + sync_table_epochs, remaining_uploading_tasks: _, uploaded, table_watermarks, @@ -1450,11 +1465,13 @@ impl UploaderData { .uploader_syncing_epoch_count .set(self.syncing_data.len() as _); - for table_id in table_ids { - if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) { - table_data.ack_synced(sync_epoch); - if table_data.is_empty() { - self.unsync_data.table_data.remove(&table_id); + for (sync_epoch, table_ids) in sync_table_epochs { + for table_id in table_ids { + if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) { + table_data.ack_synced(sync_epoch); + if table_data.is_empty() { + self.unsync_data.table_data.remove(&table_id); + } } } } @@ -1560,11 +1577,11 @@ impl HummockUploader { Err((sync_id, e)) => { let syncing_data = data.syncing_data.remove(&sync_id).expect("should exist"); - let failed_epoch = syncing_data.sync_epoch; + let failed_epochs = syncing_data.sync_table_epochs.clone(); let data = must_match!(replace( &mut self.state, UploaderState::Err(ErrState { - failed_epoch, + failed_sync_table_epochs: syncing_data.sync_table_epochs, reason: e.as_report().to_string(), }), ), UploaderState::Working(data) => data); @@ -1578,8 +1595,8 @@ impl HummockUploader { data.abort(|| { HummockError::other(format!( - "previous epoch {} failed to sync", - failed_epoch + "previous epoch {:?} failed to sync", + failed_epochs )) }); Poll::Pending @@ -1604,16 +1621,30 @@ pub(crate) mod tests { use std::task::Poll; use futures::FutureExt; + use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::EpochExt; use risingwave_hummock_sdk::HummockEpoch; use tokio::sync::oneshot; use super::test_utils::*; - use crate::hummock::event_handler::uploader::{get_payload_imm_ids, SyncedData, UploadingTask}; + use crate::hummock::event_handler::uploader::{ + get_payload_imm_ids, HummockUploader, SyncedData, UploadingTask, + }; use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID; - use crate::hummock::HummockError; + use crate::hummock::{HummockError, HummockResult}; use crate::opts::StorageOpts; + impl HummockUploader { + pub(super) fn start_single_epoch_sync( + &mut self, + epoch: HummockEpoch, + sync_result_sender: oneshot::Sender>, + table_ids: HashSet, + ) { + self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]); + } + } + #[tokio::test] pub async fn test_uploading_task_future() { let uploader_context = test_uploader_context(dummy_success_upload_future); @@ -1696,11 +1727,11 @@ pub(crate) mod tests { uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch()); assert_eq!(1, uploader.data().syncing_data.len()); let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap(); - assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch); + assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0); assert!(syncing_data.uploaded.is_empty()); assert!(!syncing_data.remaining_uploading_tasks.is_empty()); @@ -1763,7 +1794,7 @@ pub(crate) mod tests { uploader.start_epochs_for_test([epoch1]); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); - uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1, uploader.test_max_syncing_epoch()); assert_uploader_pending(&mut uploader).await; @@ -1805,7 +1836,7 @@ pub(crate) mod tests { uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1, uploader.test_max_syncing_epoch()); assert_uploader_pending(&mut uploader).await; @@ -1887,7 +1918,7 @@ pub(crate) mod tests { assert_eq!(epoch3, uploader.test_max_syncing_epoch()); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch6, uploader.test_max_syncing_epoch()); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.test_max_synced_epoch()); @@ -1982,7 +2013,7 @@ pub(crate) mod tests { new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); uploader.local_seal_epoch_for_test(instance_id1, epoch1); let (sync_tx1, mut sync_rx1) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID])); await_start1_4.await; uploader.local_seal_epoch_for_test(instance_id1, epoch2); @@ -2066,7 +2097,7 @@ pub(crate) mod tests { // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) let (sync_tx2, sync_rx2) = oneshot::channel(); - uploader.start_sync_epoch(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID])); uploader.local_seal_epoch_for_test(instance_id2, epoch3); let sst = uploader.next_uploaded_sst().await; assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); @@ -2095,7 +2126,7 @@ pub(crate) mod tests { let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); let (sync_tx4, mut sync_rx4) = oneshot::channel(); - uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID])); + uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID])); await_start4_with_3_3.await; // current uploader state: diff --git a/src/storage/src/hummock/event_handler/uploader/spiller.rs b/src/storage/src/hummock/event_handler/uploader/spiller.rs index 4e560c36eacf0..6c39e9d17eeab 100644 --- a/src/storage/src/hummock/event_handler/uploader/spiller.rs +++ b/src/storage/src/hummock/event_handler/uploader/spiller.rs @@ -336,17 +336,17 @@ mod tests { // epoch4 spill(imm1_1_4, imm1_2_4, size 2) spill(imm2_4_1, size 1), imm2_4_2 | let (sync_tx1_1, sync_rx1_1) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx1_1, HashSet::from_iter([table_id1])); + uploader.start_single_epoch_sync(epoch1, sync_tx1_1, HashSet::from_iter([table_id1])); let (sync_tx2_1, sync_rx2_1) = oneshot::channel(); - uploader.start_sync_epoch(epoch2, sync_tx2_1, HashSet::from_iter([table_id1])); + uploader.start_single_epoch_sync(epoch2, sync_tx2_1, HashSet::from_iter([table_id1])); let (sync_tx3_1, sync_rx3_1) = oneshot::channel(); - uploader.start_sync_epoch(epoch3, sync_tx3_1, HashSet::from_iter([table_id1])); + uploader.start_single_epoch_sync(epoch3, sync_tx3_1, HashSet::from_iter([table_id1])); let (sync_tx1_2, sync_rx1_2) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx1_2, HashSet::from_iter([table_id2])); + uploader.start_single_epoch_sync(epoch1, sync_tx1_2, HashSet::from_iter([table_id2])); let (sync_tx2_2, sync_rx2_2) = oneshot::channel(); - uploader.start_sync_epoch(epoch2, sync_tx2_2, HashSet::from_iter([table_id2])); + uploader.start_single_epoch_sync(epoch2, sync_tx2_2, HashSet::from_iter([table_id2])); let (sync_tx3_2, sync_rx3_2) = oneshot::channel(); - uploader.start_sync_epoch(epoch3, sync_tx3_2, HashSet::from_iter([table_id2])); + uploader.start_single_epoch_sync(epoch3, sync_tx3_2, HashSet::from_iter([table_id2])); let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([( instance_id2, @@ -412,7 +412,11 @@ mod tests { // trigger the sync after the spill task is finished and acked to cover the case let (sync_tx4, mut sync_rx4) = oneshot::channel(); - uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([table_id1, table_id2])); + uploader.start_single_epoch_sync( + epoch4, + sync_tx4, + HashSet::from_iter([table_id1, table_id2]), + ); await_start2_4_2.await; let sst = uploader.next_uploaded_sst().await; diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index efc4989b8e6ec..f2691c73ddd1f 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; -use futures::FutureExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; @@ -30,7 +29,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId}; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, SyncResult}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -572,6 +571,21 @@ impl HummockStorage { .expect("should send success"); rx.await.expect("should await success") } + + pub async fn sync( + &self, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, + ) -> StorageResult { + let (tx, rx) = oneshot::channel(); + let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch { + sync_result_sender: tx, + sync_table_epochs, + }); + let synced_data = rx + .await + .map_err(|_| HummockError::other("failed to receive sync result"))??; + Ok(synced_data.into_sync_result()) + } } impl StateStoreRead for HummockStorage { @@ -704,20 +718,6 @@ impl StateStore for HummockStorage { Ok(()) } - fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { - let (tx, rx) = oneshot::channel(); - let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch { - new_sync_epoch: epoch, - sync_result_sender: tx, - table_ids, - }); - rx.map(|recv_result| { - Ok(recv_result - .map_err(|_| HummockError::other("failed to receive sync result"))?? - .into_sync_result()) - }) - } - fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { self.new_local_inner(option) } @@ -730,7 +730,7 @@ impl HummockStorage { epoch: u64, table_ids: HashSet, ) -> StorageResult { - self.sync(epoch, table_ids).await + self.sync(vec![(epoch, table_ids)]).await } /// Used in the compaction test tool diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 6f6de5a47dd0f..d78e956123ce3 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, LazyLock}; @@ -22,7 +22,7 @@ use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; -use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use crate::error::StorageResult; use crate::mem_table::MemtableLocalStateStore; @@ -747,16 +747,6 @@ impl StateStore for RangeKvStateStore { Ok(()) } - #[allow(clippy::unused_async)] - fn sync(&self, _epoch: u64, _table_ids: HashSet) -> impl SyncFuture { - let result = self.inner.flush(); - // memory backend doesn't need to push to S3, so this is a no-op - async move { - result?; - Ok(SyncResult::default()) - } - } - async fn new_local(&self, option: NewLocalOptions) -> Self::Local { MemtableLocalStateStore::new(self.clone(), option) } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index e5e3ceaaca016..fc62c010ddc46 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -23,7 +23,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult}; use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::{error, Instrument}; @@ -37,6 +37,7 @@ use crate::hummock::{HummockStorage, SstableObjectIdManagerRef}; use crate::monitor::monitored_storage_metrics::StateStoreIterStats; use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait}; use crate::store::*; + /// A state store wrapper for monitoring metrics. #[derive(Clone)] pub struct MonitoredStateStore { @@ -334,25 +335,6 @@ impl StateStore for MonitoredStateStore { .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } - fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { - let future = self - .inner - .sync(epoch, table_ids) - .instrument_await("store_sync"); - let timer = self.storage_metrics.sync_duration.start_timer(); - let sync_size = self.storage_metrics.sync_size.clone(); - async move { - let sync_result = future - .await - .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?; - timer.observe_duration(); - if sync_result.sync_size != 0 { - sync_size.observe(sync_result.sync_size as _); - } - Ok(sync_result) - } - } - fn monitored( self, _storage_metrics: Arc, @@ -379,6 +361,26 @@ impl MonitoredStateStore { pub fn sstable_object_id_manager(&self) -> SstableObjectIdManagerRef { self.inner.sstable_object_id_manager().clone() } + + pub async fn sync( + &self, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, + ) -> StorageResult { + let future = self + .inner + .sync(sync_table_epochs) + .instrument_await("store_sync"); + let timer = self.storage_metrics.sync_duration.start_timer(); + let sync_size = self.storage_metrics.sync_size.clone(); + let sync_result = future + .await + .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?; + timer.observe_duration(); + if sync_result.sync_size != 0 { + sync_size.observe(sync_result.sync_size as _); + } + Ok(sync_result) + } } /// A state store iterator wrapper for monitoring metrics. diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index f06c5634a5220..bd308081da11a 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -21,7 +21,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ init_collector, should_use_trace, ConcurrentId, MayTraceSpan, OperationResult, StorageType, TraceResult, TraceSpan, TracedBytes, TracedSealCurrentEpochOptions, LOCAL_ID, @@ -279,19 +279,6 @@ impl StateStore for TracedStateStore { res } - fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { - let span: MayTraceSpan = TraceSpan::new_sync_span(epoch, &table_ids, self.storage_type); - - let future = self.inner.sync(epoch, table_ids); - - future.map(move |sync_result| { - span.may_send_result(OperationResult::Sync( - sync_result.as_ref().map(|res| res.sync_size).into(), - )); - sync_result - }) - } - async fn new_local(&self, options: NewLocalOptions) -> Self::Local { TracedStateStore::new_local(self.inner.new_local(options.clone()).await, options) } @@ -368,6 +355,24 @@ impl TracedStateStore { pub fn sstable_object_id_manager(&self) -> &SstableObjectIdManagerRef { self.inner.sstable_object_id_manager() } + + pub async fn sync( + &self, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, + ) -> StorageResult { + let span: MayTraceSpan = TraceSpan::new_sync_span(&sync_table_epochs, self.storage_type); + + let future = self.inner.sync(sync_table_epochs); + + future + .map(move |sync_result| { + span.may_send_result(OperationResult::Sync( + sync_result.as_ref().map(|res| res.sync_size).into(), + )); + sync_result + }) + .await + } } impl TracedStateStore { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index ee1f8ebfbbf43..804c7c97334b4 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::marker::PhantomData; use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; use risingwave_common::bitmap::Bitmap; -use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -181,11 +179,6 @@ impl StateStore for PanicStateStore { panic!("should not wait epoch from the panic state store!"); } - #[allow(clippy::unused_async)] - fn sync(&self, _epoch: u64, _table_ids: HashSet) -> impl SyncFuture { - async { panic!("should not await sync epoch from the panic state store!") } - } - #[allow(clippy::unused_async)] async fn new_local(&self, _option: NewLocalOptions) -> Self::Local { panic!("should not call new local from the panic state store"); diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index bf93639b3d70e..2a5405b9f9a17 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp::min; -use std::collections::HashSet; use std::default::Default; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -32,7 +31,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; -use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, @@ -357,8 +356,6 @@ pub trait StateStoreWrite: StaticSendSync { ) -> StorageResult; } -pub trait SyncFuture = Future> + Send + 'static; - #[derive(Clone)] pub struct TryWaitEpochOptions { pub table_id: TableId, @@ -398,8 +395,6 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { options: TryWaitEpochOptions, ) -> impl Future> + Send + '_; - fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture; - /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`. fn monitored(self, storage_metrics: Arc) -> MonitoredStateStore { MonitoredStateStore::new(self, storage_metrics) diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index f59395d26db7f..8ce2906ea3e75 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -222,7 +222,6 @@ macro_rules! dispatch_state_store { #[cfg(any(debug_assertions, test, feature = "test"))] pub mod verify { - use std::collections::HashSet; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; @@ -231,7 +230,6 @@ pub mod verify { use bytes::Bytes; use risingwave_common::bitmap::Bitmap; - use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -575,20 +573,6 @@ pub mod verify { self.actual.try_wait_epoch(epoch, options) } - fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { - let expected_future = self - .expected - .as_ref() - .map(|expected| expected.sync(epoch, table_ids.clone())); - let actual_future = self.actual.sync(epoch, table_ids); - async move { - if let Some(expected_future) = expected_future { - expected_future.await?; - } - actual_future.await - } - } - async fn new_local(&self, option: NewLocalOptions) -> Self::Local { let expected = if let Some(expected) = &self.expected { Some(expected.new_local(option.clone()).await) @@ -826,20 +810,16 @@ impl AsHummock for SledStateStore { #[cfg(debug_assertions)] pub mod boxed_state_store { - use std::collections::HashSet; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use bytes::Bytes; use dyn_clone::{clone_trait_object, DynClone}; - use futures::future::BoxFuture; - use futures::FutureExt; use risingwave_common::bitmap::Bitmap; - use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; - use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; + use risingwave_hummock_sdk::HummockReadEpoch; use crate::error::StorageResult; use crate::hummock::HummockStorage; @@ -1161,12 +1141,6 @@ pub mod boxed_state_store { options: TryWaitEpochOptions, ) -> StorageResult<()>; - fn sync( - &self, - epoch: u64, - table_ids: HashSet, - ) -> BoxFuture<'static, StorageResult>; - async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; } @@ -1180,14 +1154,6 @@ pub mod boxed_state_store { self.try_wait_epoch(epoch, options).await } - fn sync( - &self, - epoch: u64, - table_ids: HashSet, - ) -> BoxFuture<'static, StorageResult> { - self.sync(epoch, table_ids).boxed() - } - async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { Box::new(self.new_local(option).await) } @@ -1267,14 +1233,6 @@ pub mod boxed_state_store { self.deref().try_wait_epoch(epoch, options) } - fn sync( - &self, - epoch: u64, - table_ids: HashSet, - ) -> impl Future> + Send + 'static { - self.deref().sync(epoch, table_ids) - } - fn new_local( &self, option: NewLocalOptions, diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index cd6bb924f4784..e555c0ff037ff 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -32,7 +32,7 @@ use risingwave_common::must_match; use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; +use risingwave_storage::StateStoreImpl; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -145,14 +145,21 @@ use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; use risingwave_pb::stream_service::streaming_control_stream_request::InitialPartialGraph; use risingwave_pb::stream_service::InjectBarrierRequest; -fn sync_epoch( - state_store: &S, +fn sync_epoch( + state_store: &StateStoreImpl, streaming_metrics: &StreamingMetrics, prev_epoch: u64, table_ids: HashSet, ) -> BoxFuture<'static, StreamResult> { let timer = streaming_metrics.barrier_sync_latency.start_timer(); - let future = state_store.sync(prev_epoch, table_ids); + let hummock = state_store.as_hummock().cloned(); + let future = async move { + if let Some(hummock) = hummock { + hummock.sync(vec![(prev_epoch, table_ids)]).await + } else { + Ok(SyncResult::default()) + } + }; future .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) .inspect_ok(move |_| { @@ -771,16 +778,12 @@ impl PartialGraphManagedBarrierState { None } BarrierKind::Barrier => None, - BarrierKind::Checkpoint => { - dispatch_state_store!(&self.state_store, state_store, { - Some(sync_epoch( - state_store, - &self.streaming_metrics, - prev_epoch, - table_ids.expect("should be Some on BarrierKind::Checkpoint"), - )) - }) - } + BarrierKind::Checkpoint => Some(sync_epoch( + &self.state_store, + &self.streaming_metrics, + prev_epoch, + table_ids.expect("should be Some on BarrierKind::Checkpoint"), + )), }; let barrier = barrier_state.barrier.clone();