diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 277984d3545dd..53d7167e0167b 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -43,7 +43,7 @@ use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ - HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, UploaderEvent, + HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, }; use crate::hummock::event_handler::{ HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping, @@ -184,8 +184,7 @@ impl HummockEventReceiver { } struct HummockEventHandlerMetrics { - event_handler_on_sync_finish_latency: Histogram, - event_handler_on_spilled_latency: Histogram, + event_handler_on_upload_finish_latency: Histogram, event_handler_on_apply_version_update: Histogram, event_handler_on_recv_version_update: Histogram, } @@ -329,12 +328,9 @@ impl HummockEventHandler { let write_conflict_detector = ConflictDetector::new_from_config(storage_opts); let metrics = HummockEventHandlerMetrics { - event_handler_on_sync_finish_latency: state_store_metrics + event_handler_on_upload_finish_latency: state_store_metrics .event_handler_latency - .with_label_values(&["on_sync_finish"]), - event_handler_on_spilled_latency: state_store_metrics - .event_handler_latency - .with_label_values(&["on_spilled"]), + .with_label_values(&["on_upload_finish"]), event_handler_on_apply_version_update: state_store_metrics .event_handler_latency .with_label_values(&["apply_version"]), @@ -396,40 +392,6 @@ impl HummockEventHandler { // Handler for different events impl HummockEventHandler { - fn handle_epoch_synced(&mut self, epoch: HummockEpoch, data: SyncedData) { - debug!("epoch has been synced: {}.", epoch); - let SyncedData { - newly_upload_ssts: newly_uploaded_sstables, - .. - } = data; - { - let newly_uploaded_sstables = newly_uploaded_sstables - .into_iter() - .map(Arc::new) - .collect_vec(); - if !newly_uploaded_sstables.is_empty() { - let related_instance_ids: HashSet<_> = newly_uploaded_sstables - .iter() - .flat_map(|sst| sst.imm_ids().keys().cloned()) - .collect(); - self.for_each_read_version(related_instance_ids, |instance_id, read_version| { - newly_uploaded_sstables - .iter() - // Take rev because newer data come first in `newly_uploaded_sstables` but we apply - // older data first - .rev() - .for_each(|staging_sstable_info| { - if staging_sstable_info.imm_ids().contains_key(&instance_id) { - read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info.clone(), - ))); - } - }); - }); - } - }; - } - /// This function will be performed under the protection of the `read_version_mapping` read /// lock, and add write lock on each `read_version` operation fn for_each_read_version( @@ -441,7 +403,7 @@ impl HummockEventHandler { #[cfg(debug_assertions)] { // check duplication on debug_mode - let mut id_set = HashSet::new(); + let mut id_set = std::collections::HashSet::new(); for instance in instances { assert!(id_set.insert(instance)); } @@ -488,9 +450,8 @@ impl HummockEventHandler { } } - fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) { - let staging_sstable_info = Arc::new(staging_sstable_info); - trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); + fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc) { + trace!("data_flushed. SST size {}", staging_sstable_info.imm_size()); self.for_each_read_version( staging_sstable_info.imm_ids().keys().cloned(), |_, read_version| { @@ -504,7 +465,7 @@ impl HummockEventHandler { fn handle_sync_epoch( &mut self, new_sync_epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, ) { debug!( "awaiting for epoch to be synced: {}, max_synced_epoch: {}", @@ -715,8 +676,8 @@ impl HummockEventHandler { pub async fn start_hummock_event_handler_worker(mut self) { loop { tokio::select! { - event = self.uploader.next_event() => { - self.handle_uploader_event(event); + sst = self.uploader.next_uploaded_sst() => { + self.handle_uploaded_sst(sst); } event = self.refiller.next_event() => { let CacheRefillerEvent {pinned_version, new_pinned_version } = event; @@ -748,21 +709,12 @@ impl HummockEventHandler { } } - fn handle_uploader_event(&mut self, event: UploaderEvent) { - match event { - UploaderEvent::SyncFinish(epoch, data) => { - let _timer = self - .metrics - .event_handler_on_sync_finish_latency - .start_timer(); - self.handle_epoch_synced(epoch, data); - } - - UploaderEvent::DataSpilled(staging_sstable_info) => { - let _timer = self.metrics.event_handler_on_spilled_latency.start_timer(); - self.handle_data_spilled(staging_sstable_info); - } - } + fn handle_uploaded_sst(&mut self, sst: Arc) { + let _timer = self + .metrics + .event_handler_on_upload_finish_latency + .start_timer(); + self.handle_uploaded_sst_inner(sst); } /// Gracefully shutdown if returns `true`. @@ -925,21 +877,27 @@ impl HummockEventHandler { } pub(super) fn send_sync_result( - sender: oneshot::Sender>, - result: HummockResult<&SyncedData>, + sender: oneshot::Sender>, + result: HummockResult, ) { - let result = result.map( - |SyncedData { - newly_upload_ssts, - uploaded_ssts, - table_watermarks, - }| { + let _ = sender.send(result).inspect_err(|e| { + error!("unable to send sync result. Err: {:?}", e); + }); +} + +impl SyncedData { + pub fn into_sync_result(self) -> SyncResult { + { + let SyncedData { + uploaded_ssts, + table_watermarks, + } = self; let mut sync_size = 0; let mut uncommitted_ssts = Vec::new(); let mut old_value_ssts = Vec::new(); // The newly uploaded `sstable_infos` contains newer data. Therefore, // `newly_upload_ssts` at the front - for sst in newly_upload_ssts.iter().chain(uploaded_ssts.iter()) { + for sst in uploaded_ssts { sync_size += sst.imm_size(); uncommitted_ssts.extend(sst.sstable_infos().iter().cloned()); old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned()); @@ -950,12 +908,8 @@ pub(super) fn send_sync_result( table_watermarks: table_watermarks.clone(), old_value_ssts, } - }, - ); - - let _ = sender.send(result).inspect_err(|e| { - error!("unable to send sync result. Err: {:?}", e); - }); + } + } } #[cfg(test)] diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index bbf69ae194f72..c88611cd0888a 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockEpoch; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -36,6 +36,7 @@ use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use super::store::version::HummockReadVersion; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; +use crate::hummock::event_handler::uploader::SyncedData; #[derive(Debug)] pub struct BufferWriteRequest { @@ -59,7 +60,7 @@ pub enum HummockEvent { /// handle sender. SyncEpoch { new_sync_epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, }, /// Clear shared buffer and reset all states diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index f768aa23dcd89..ee0e975e5a3ea 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -22,7 +22,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use futures::future::{try_join_all, TryJoinAll}; use futures::FutureExt; use itertools::Itertools; use more_asserts::{assert_ge, assert_gt}; @@ -35,7 +34,7 @@ use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -265,7 +264,10 @@ impl UploadingTask { } /// Poll the result of the uploading task - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_result( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result .inspect(|_| { @@ -277,13 +279,13 @@ impl UploadingTask { }) .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed")) .map(|output| { - StagingSstableInfo::new( + Arc::new(StagingSstableInfo::new( output.new_value_ssts, output.old_value_ssts, self.task_info.epochs.clone(), self.task_info.imm_ids.clone(), self.task_info.task_size, - ) + )) }), Err(err) => Err(HummockError::other(format!( @@ -294,7 +296,7 @@ impl UploadingTask { } /// Poll the uploading task until it succeeds. If it fails, we will retry it. - fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let result = ready!(self.poll_result(cx)); match result { @@ -323,7 +325,7 @@ impl UploadingTask { } impl Future for UploadingTask { - type Output = HummockResult; + type Output = HummockResult>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_result(cx) @@ -337,7 +339,7 @@ struct SpilledData { // ordered spilling tasks. Task at the back is spilling older data. uploading_tasks: VecDeque, // ordered spilled data. Data at the back is older. - uploaded_data: VecDeque, + uploaded_data: VecDeque>, } impl SpilledData { @@ -347,7 +349,10 @@ impl SpilledData { /// Poll the successful spill of the oldest uploading task. Return `Poll::Ready(None)` is there /// is no uploading task - fn poll_success_spill(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_success_spill( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { // only poll the oldest uploading task if there is any if let Some(task) = self.uploading_tasks.back_mut() { let staging_sstable_info = ready!(task.poll_ok_with_retry(cx)); @@ -905,19 +910,17 @@ impl UnsyncData { struct SyncingData { sync_epoch: HummockEpoch, - // TODO: may replace `TryJoinAll` with a future that will abort other join handles once - // one join handle failed. - // None means there is no pending uploading tasks - uploading_tasks: Option>, + // task of newer data at the front + uploading_tasks: VecDeque, // newer data at the front - uploaded: VecDeque, + uploaded: VecDeque>, table_watermarks: HashMap, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, } +#[derive(Debug)] pub struct SyncedData { - pub newly_upload_ssts: Vec, - pub uploaded_ssts: VecDeque, + pub uploaded_ssts: VecDeque>, pub table_watermarks: HashMap, } @@ -962,19 +965,23 @@ impl UploaderData { for (_, epoch_data) in self.unsync_data.epoch_data { epoch_data.spilled_data.abort(); } - // TODO: call `abort` on the uploading task join handle of syncing_data for syncing_data in self.syncing_data { + for task in syncing_data.uploading_tasks { + task.join_handle.abort(); + } send_sync_result(syncing_data.sync_result_sender, Err(err())); } } } +struct ErrState { + failed_epoch: HummockEpoch, + reason: String, +} + enum UploaderState { Working(UploaderData), - Err { - failed_epoch: HummockEpoch, - reason: String, - }, + Err(ErrState), } /// An uploader for hummock data. @@ -1081,14 +1088,14 @@ impl HummockUploader { pub(super) fn start_sync_epoch( &mut self, epoch: HummockEpoch, - sync_result_sender: oneshot::Sender>, + sync_result_sender: oneshot::Sender>, ) { let data = match &mut self.state { UploaderState::Working(data) => data, - UploaderState::Err { + UploaderState::Err(ErrState { failed_epoch, reason, - } => { + }) => { let result = Err(HummockError::other(format!( "previous epoch {} failed due to [{}]", failed_epoch, reason @@ -1119,15 +1126,9 @@ impl HummockUploader { .. } = sync_data; - let try_join_all_upload_task = if uploading_tasks.is_empty() { - None - } else { - Some(try_join_all(uploading_tasks)) - }; - data.syncing_data.push_front(SyncingData { sync_epoch: epoch, - uploading_tasks: try_join_all_upload_task, + uploading_tasks, uploaded: uploaded_data, table_watermarks, sync_result_sender, @@ -1139,20 +1140,24 @@ impl HummockUploader { .set(data.syncing_data.len() as _); } - fn add_synced_data(&mut self, epoch: HummockEpoch) { + fn set_max_synced_epoch( + max_synced_epoch: &mut HummockEpoch, + max_syncing_epoch: HummockEpoch, + epoch: HummockEpoch, + ) { assert!( - epoch <= self.max_syncing_epoch, + epoch <= max_syncing_epoch, "epoch {} that has been synced has not started syncing yet. previous max syncing epoch {}", epoch, - self.max_syncing_epoch + max_syncing_epoch ); assert!( - epoch > self.max_synced_epoch, + epoch > *max_synced_epoch, "epoch {} has been synced. previous max synced epoch: {}", epoch, - self.max_synced_epoch + max_synced_epoch ); - self.max_synced_epoch = epoch; + *max_synced_epoch = epoch; } pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) { @@ -1246,55 +1251,83 @@ impl HummockUploader { impl UploaderData { /// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if /// there is no syncing data. - #[expect(clippy::type_complexity)] fn poll_syncing_task( &mut self, cx: &mut Context<'_>, context: &UploaderContext, - ) -> Poll< - Option<( - HummockEpoch, - HummockResult, - oneshot::Sender>, - )>, - > { - // Only poll the oldest epoch if there is any so that the syncing epoch are finished in - // order - if let Some(syncing_data) = self.syncing_data.back_mut() { - // The syncing task has finished - let result = if let Some(all_tasks) = &mut syncing_data.uploading_tasks { - ready!(all_tasks.poll_unpin(cx)) + mut set_max_synced_epoch: impl FnMut(u64), + ) -> Poll, ErrState>>> { + while let Some(syncing_data) = self.syncing_data.back_mut() { + let sstable_info = if let Some(task) = syncing_data.uploading_tasks.back_mut() { + let result = ready!(task.poll_result(cx)); + let _task = syncing_data.uploading_tasks.pop_back().expect("non-empty"); + let sstable_info = match result { + Ok(sstable_info) => sstable_info, + Err(e) => { + let SyncingData { + sync_epoch, + uploading_tasks, + sync_result_sender, + .. + } = self.syncing_data.pop_back().expect("non-empty"); + for task in uploading_tasks { + task.join_handle.abort(); + } + send_sync_result( + sync_result_sender, + Err(HummockError::other(format!( + "failed sync task: {:?}", + e.as_report() + ))), + ); + + return Poll::Ready(Some(Err(ErrState { + failed_epoch: sync_epoch, + reason: format!("{:?}", e.as_report()), + }))); + } + }; + syncing_data.uploaded.push_front(sstable_info.clone()); + self.unsync_data.ack_flushed(&sstable_info); + Some(sstable_info) } else { - Ok(Vec::new()) + None }; - let syncing_data = self.syncing_data.pop_back().expect("must exist"); - context - .stats - .uploader_syncing_epoch_count - .set(self.syncing_data.len() as _); - let epoch = syncing_data.sync_epoch; - - let result = result.map(|newly_uploaded_sstable_infos| { - // take `rev` so that old data is acked first - for sstable_info in newly_uploaded_sstable_infos.iter().rev() { - self.unsync_data.ack_flushed(sstable_info); - } - SyncedData { - newly_upload_ssts: newly_uploaded_sstable_infos, - uploaded_ssts: syncing_data.uploaded, - table_watermarks: syncing_data.table_watermarks, - } - }); - Poll::Ready(Some((epoch, result, syncing_data.sync_result_sender))) - } else { - Poll::Ready(None) + if syncing_data.uploading_tasks.is_empty() { + let syncing_data = self.syncing_data.pop_back().expect("non-empty"); + let SyncingData { + sync_epoch, + uploading_tasks, + uploaded, + table_watermarks, + sync_result_sender, + } = syncing_data; + assert!(uploading_tasks.is_empty()); + context + .stats + .uploader_syncing_epoch_count + .set(self.syncing_data.len() as _); + set_max_synced_epoch(sync_epoch); + send_sync_result( + sync_result_sender, + Ok(SyncedData { + uploaded_ssts: uploaded, + table_watermarks, + }), + ) + } + + if let Some(sstable_info) = sstable_info { + return Poll::Ready(Some(Ok(sstable_info))); + } } + Poll::Ready(None) } /// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if /// there is no spilling task. - fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll>> { // iterator from older epoch to new epoch so that the spill task are finished in epoch order for epoch_data in self.unsync_data.epoch_data.values_mut() { // if None, there is no spilling task. Search for the unsync data of the next epoch in @@ -1308,53 +1341,50 @@ impl UploaderData { } } -pub(super) enum UploaderEvent { - // staging sstable info of newer data comes first - SyncFinish(HummockEpoch, SyncedData), - DataSpilled(StagingSstableInfo), -} - impl HummockUploader { - pub(super) fn next_event(&mut self) -> impl Future + '_ { + pub(super) fn next_uploaded_sst( + &mut self, + ) -> impl Future> + '_ { poll_fn(|cx| { let UploaderState::Working(data) = &mut self.state else { return Poll::Pending; }; - if let Some((epoch, result, result_sender)) = - ready!(data.poll_syncing_task(cx, &self.context)) + + if let Some(result) = + ready!( + data.poll_syncing_task(cx, &self.context, |new_synced_epoch| { + Self::set_max_synced_epoch( + &mut self.max_synced_epoch, + self.max_syncing_epoch, + new_synced_epoch, + ) + }) + ) { match result { Ok(data) => { - self.add_synced_data(epoch); - send_sync_result(result_sender, Ok(&data)); - return Poll::Ready(UploaderEvent::SyncFinish(epoch, data)); + return Poll::Ready(data); } Err(e) => { - send_sync_result( - result_sender, - Err(HummockError::other(format!( - "failed sync task: {:?}", - e.as_report() - ))), - ); + let failed_epoch = e.failed_epoch; let data = must_match!(replace( &mut self.state, - UploaderState::Err { - failed_epoch: epoch, - reason: format!("{:?}", e.as_report()), - }, + UploaderState::Err(e), ), UploaderState::Working(data) => data); data.abort(|| { - HummockError::other(format!("previous epoch {} failed to sync", epoch)) + HummockError::other(format!( + "previous epoch {} failed to sync", + failed_epoch + )) }); return Poll::Pending; } - }; + } } if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + return Poll::Ready(sstable_info); } Poll::Pending @@ -1394,8 +1424,7 @@ pub(crate) mod tests { use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::uploader::{ get_payload_imm_ids, HummockUploader, SyncedData, UploadTaskInfo, UploadTaskOutput, - UploadTaskPayload, UploaderContext, UploaderData, UploaderEvent, UploaderState, - UploadingTask, + UploadTaskPayload, UploaderContext, UploaderData, UploaderState, UploadingTask, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -1420,10 +1449,6 @@ pub(crate) mod tests { fn data(&self) -> &UploaderData { must_match!(&self.state, UploaderState::Working(data) => data) } - - fn start_sync_epoch_for_test(&mut self, epoch: HummockEpoch) { - self.start_sync_epoch(epoch, oneshot::channel().0) - } } fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { @@ -1654,24 +1679,34 @@ pub(crate) mod tests { uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1 as HummockEpoch, uploader.max_syncing_epoch); assert_eq!(1, uploader.data().syncing_data.len()); let syncing_data = uploader.data().syncing_data.front().unwrap(); assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch); assert!(syncing_data.uploaded.is_empty()); - assert!(syncing_data.uploading_tasks.is_some()); + assert!(!syncing_data.uploading_tasks.is_empty()); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + let staging_sst = uploader.next_uploaded_sst().await; + assert_eq!(&vec![epoch1], staging_sst.epochs()); + assert_eq!( + &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), + staging_sst.imm_ids() + ); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); + + match sync_rx.await { + Ok(Ok(data)) => { let SyncedData { - newly_upload_ssts, uploaded_ssts, table_watermarks, } = data; - assert_eq!(1, newly_upload_ssts.len()); - let staging_sst = newly_upload_ssts.first().unwrap(); + assert_eq!(1, uploaded_ssts.len()); + let staging_sst = &uploaded_ssts[0]; assert_eq!(&vec![epoch1], staging_sst.epochs()); assert_eq!( &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), @@ -1681,7 +1716,6 @@ pub(crate) mod tests { &dummy_success_upload_output().new_value_ssts, staging_sst.sstable_infos() ); - assert!(uploaded_ssts.is_empty()); assert!(table_watermarks.is_empty()); } _ => unreachable!(), @@ -1701,14 +1735,15 @@ pub(crate) mod tests { let mut uploader = test_uploader(dummy_success_upload_future); let epoch1 = INITIAL_EPOCH.next_epoch(); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + assert_uploader_pending(&mut uploader).await; + + match sync_rx.await { + Ok(Ok(data)) => { assert!(data.uploaded_ssts.is_empty()); - assert!(data.newly_upload_ssts.is_empty()); } _ => unreachable!(), }; @@ -1733,14 +1768,15 @@ pub(crate) mod tests { uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx); assert_eq!(epoch1, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, data) => { - assert_eq!(epoch1, finished_epoch); + assert_uploader_pending(&mut uploader).await; + + match sync_rx.await { + Ok(Ok(data)) => { assert!(data.uploaded_ssts.is_empty()); - assert!(data.newly_upload_ssts.is_empty()); } _ => unreachable!(), }; @@ -1758,9 +1794,11 @@ pub(crate) mod tests { async fn test_uploader_poll_empty() { let mut uploader = test_uploader(dummy_success_upload_future); let data = must_match!(&mut uploader.state, UploaderState::Working(data) => data); - assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) - .await - .is_none()); + assert!( + poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context, |_| unreachable!())) + .await + .is_none() + ); assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none()); } @@ -1784,7 +1822,8 @@ pub(crate) mod tests { assert_eq!(epoch1, uploader.max_syncing_epoch); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6); - uploader.add_imm(TEST_LOCAL_INSTANCE_ID, gen_imm(epoch6).await); + let imm = gen_imm(epoch6).await; + uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); uploader.update_pinned_version(version2); assert_eq!(epoch2, uploader.max_synced_epoch); assert_eq!(epoch2, uploader.max_syncing_epoch); @@ -1794,18 +1833,25 @@ pub(crate) mod tests { assert_eq!(epoch3, uploader.max_synced_epoch); assert_eq!(epoch3, uploader.max_syncing_epoch); - uploader.start_sync_epoch_for_test(epoch6); + let (sync_tx, sync_rx) = oneshot::channel(); + uploader.start_sync_epoch(epoch6, sync_tx); assert_eq!(epoch6, uploader.max_syncing_epoch); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); - match uploader.next_event().await { - UploaderEvent::SyncFinish(epoch, _) => { - assert_eq!(epoch6, epoch); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_imm_ids([&imm]), sst.imm_ids()); + + match sync_rx.await { + Ok(Ok(data)) => { + assert!(data.table_watermarks.is_empty()); + assert_eq!(1, data.uploaded_ssts.len()); + assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids()); } - UploaderEvent::DataSpilled(_) => unreachable!(), + _ => unreachable!(), } + uploader.update_pinned_version(version5); assert_eq!(epoch6, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); @@ -1881,11 +1927,12 @@ pub(crate) mod tests { yield_now().await; } assert!( - poll_fn(|cx| Poll::Ready(uploader.next_event().poll_unpin(cx))) + poll_fn(|cx| Poll::Ready(uploader.next_uploaded_sst().poll_unpin(cx))) .await .is_pending() ) } + #[tokio::test] async fn test_uploader_finish_in_order() { let config = StorageOpts { @@ -1933,19 +1980,13 @@ pub(crate) mod tests { assert_uploader_pending(&mut uploader).await; finish_tx1.send(()).unwrap(); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids()); - assert_eq!(&vec![epoch1], sst.epochs()); - } else { - unreachable!("") - } + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids()); + assert_eq!(&vec![epoch1], sst.epochs()); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids()); - assert_eq!(&vec![epoch2], sst.epochs()); - } else { - unreachable!("") - } + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids()); + assert_eq!(&vec![epoch2], sst.epochs()); let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await; uploader.add_imm(instance_id1, imm1_3.clone()); @@ -1960,7 +2001,8 @@ pub(crate) mod tests { let (await_start1_4, finish_tx1_4) = new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); uploader.local_seal_epoch_for_test(instance_id1, epoch1); - uploader.start_sync_epoch_for_test(epoch1); + let (sync_tx1, mut sync_rx1) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx1); await_start1_4.await; let epoch3 = epoch2.next_epoch(); @@ -2012,24 +2054,30 @@ pub(crate) mod tests { assert_uploader_pending(&mut uploader).await; finish_tx1_3.send(()).unwrap(); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch1, epoch); - assert_eq!(2, data.newly_upload_ssts.len()); - assert_eq!(1, data.uploaded_ssts.len()); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids()); + + assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await); + + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids()); + + if let Ok(Ok(data)) = sync_rx1.await { + assert_eq!(3, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch1_sync_payload), - data.newly_upload_ssts[0].imm_ids() + data.uploaded_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch1_spill_payload3), - data.newly_upload_ssts[1].imm_ids() + data.uploaded_ssts[1].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch1_spill_payload12), - data.uploaded_ssts[0].imm_ids() + data.uploaded_ssts[2].imm_ids() ); } else { - unreachable!("should be sync finish"); + unreachable!() } // current uploader state: @@ -2039,10 +2087,13 @@ pub(crate) mod tests { // syncing: empty // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) - uploader.start_sync_epoch_for_test(epoch2); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch2, epoch); - assert!(data.newly_upload_ssts.is_empty()); + let (sync_tx2, sync_rx2) = oneshot::channel(); + uploader.start_sync_epoch(epoch2, sync_tx2); + uploader.local_seal_epoch_for_test(instance_id2, epoch3); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); + + if let Ok(Ok(data)) = sync_rx2.await { assert_eq!(data.uploaded_ssts.len(), 1); assert_eq!( &get_payload_imm_ids(&epoch2_spill_payload), @@ -2053,21 +2104,6 @@ pub(crate) mod tests { } assert_eq!(epoch2, uploader.max_synced_epoch); - // current uploader state: - // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] - // epoch4: imm: imm4 - // sealed: empty - // syncing: empty - // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) - // epoch2: sst([imm2]) - - uploader.local_seal_epoch_for_test(instance_id2, epoch3); - if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); - } else { - unreachable!("should be data spilled"); - } - // current uploader state: // unsealed: epoch4: imm: imm4 // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1]) @@ -2080,7 +2116,8 @@ pub(crate) mod tests { let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]); let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); - uploader.start_sync_epoch_for_test(epoch4); + let (sync_tx4, mut sync_rx4) = oneshot::channel(); + uploader.start_sync_epoch(epoch4, sync_tx4); await_start4_with_3_3.await; // current uploader state: @@ -2091,25 +2128,30 @@ pub(crate) mod tests { // epoch2: sst([imm2]) assert_uploader_pending(&mut uploader).await; + finish_tx3_2.send(()).unwrap(); - assert_uploader_pending(&mut uploader).await; + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids()); + finish_tx4_with_3_3.send(()).unwrap(); + assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await); + + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids()); - if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { - assert_eq!(epoch4, epoch); - assert_eq!(2, data.newly_upload_ssts.len()); + if let Ok(Ok(data)) = sync_rx4.await { + assert_eq!(3, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch4_sync_payload), - data.newly_upload_ssts[0].imm_ids() + data.uploaded_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch3_spill_payload2), - data.newly_upload_ssts[1].imm_ids() + data.uploaded_ssts[1].imm_ids() ); - assert_eq!(1, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch3_spill_payload1), - data.uploaded_ssts[0].imm_ids(), + data.uploaded_ssts[2].imm_ids(), ) } else { unreachable!("should be sync finish"); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7f9f956b62ddf..b4a3c55bbf829 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -570,7 +570,11 @@ impl StateStore for HummockStorage { sync_result_sender: tx, }) .expect("should send success"); - rx.map(|recv_result| Ok(recv_result.expect("should wait success")?)) + rx.map(|recv_result| { + Ok(recv_result + .expect("should wait success")? + .into_sync_result()) + }) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {