diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 37ad84529857c..470cf651e31d6 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -55,7 +55,7 @@ use crate::hummock::store::version::{ }; use crate::hummock::utils::validate_table_key_range; use crate::hummock::{ - HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId, + HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId, }; use crate::mem_table::ImmutableMemtable; use crate::monitor::HummockStateStoreMetrics; @@ -194,7 +194,6 @@ pub struct HummockEventHandler { hummock_event_tx: HummockEventSender, hummock_event_rx: HummockEventReceiver, version_update_rx: UnboundedReceiver, - pending_sync_requests: BTreeMap>>, read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler local_read_version_mapping: HashMap, @@ -361,7 +360,6 @@ impl HummockEventHandler { hummock_event_tx, hummock_event_rx, version_update_rx, - pending_sync_requests: Default::default(), version_update_notifier_tx, pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), write_conflict_detector, @@ -398,63 +396,38 @@ impl HummockEventHandler { // Handler for different events impl HummockEventHandler { - fn handle_epoch_synced( - &mut self, - epoch: HummockEpoch, - newly_uploaded_sstables: Vec, - ) { + fn handle_epoch_synced(&mut self, epoch: HummockEpoch, data: SyncedData) { debug!("epoch has been synced: {}.", epoch); - let newly_uploaded_sstables = newly_uploaded_sstables - .into_iter() - .map(Arc::new) - .collect_vec(); - if !newly_uploaded_sstables.is_empty() { - let related_instance_ids: HashSet<_> = newly_uploaded_sstables - .iter() - .flat_map(|sst| sst.imm_ids().keys().cloned()) - .collect(); - self.for_each_read_version(related_instance_ids, |instance_id, read_version| { - newly_uploaded_sstables - .iter() - // Take rev because newer data come first in `newly_uploaded_sstables` but we apply - // older data first - .rev() - .for_each(|staging_sstable_info| { - if staging_sstable_info.imm_ids().contains_key(&instance_id) { - read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info.clone(), - ))); - } - }); - }); - } - let result = self - .uploader - .get_synced_data(epoch) - .expect("data just synced. must exist"); - // clear the pending sync epoch that is older than newly synced epoch - while let Some((smallest_pending_sync_epoch, _)) = - self.pending_sync_requests.first_key_value() + let SyncedData { + newly_upload_ssts: newly_uploaded_sstables, + .. + } = data; { - if *smallest_pending_sync_epoch > epoch { - // The smallest pending sync epoch has not synced yet. Wait later - break; - } - let (pending_sync_epoch, result_sender) = - self.pending_sync_requests.pop_first().expect("must exist"); - if pending_sync_epoch == epoch { - send_sync_result(result_sender, to_sync_result(result)); - break; - } else { - send_sync_result( - result_sender, - Err(HummockError::other(format!( - "epoch {} is not a checkpoint epoch", - pending_sync_epoch - ))), - ); + let newly_uploaded_sstables = newly_uploaded_sstables + .into_iter() + .map(Arc::new) + .collect_vec(); + if !newly_uploaded_sstables.is_empty() { + let related_instance_ids: HashSet<_> = newly_uploaded_sstables + .iter() + .flat_map(|sst| sst.imm_ids().keys().cloned()) + .collect(); + self.for_each_read_version(related_instance_ids, |instance_id, read_version| { + newly_uploaded_sstables + .iter() + // Take rev because newer data come first in `newly_uploaded_sstables` but we apply + // older data first + .rev() + .for_each(|staging_sstable_info| { + if staging_sstable_info.imm_ids().contains_key(&instance_id) { + read_version.update(VersionUpdate::Staging(StagingData::Sst( + staging_sstable_info.clone(), + ))); + } + }); + }); } - } + }; } /// This function will be performed under the protection of the `read_version_mapping` read @@ -529,68 +502,18 @@ impl HummockEventHandler { ) } - fn handle_await_sync_epoch( + fn handle_sync_epoch( &mut self, new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, ) { - debug!("receive await sync epoch: {}", new_sync_epoch); - // The epoch to sync has been committed already. - if new_sync_epoch <= self.uploader.max_committed_epoch() { - send_sync_result( - sync_result_sender, - Err(HummockError::other(format!( - "epoch {} has been committed. {}", - new_sync_epoch, - self.uploader.max_committed_epoch() - ))), - ); - return; - } - // The epoch has been synced - if new_sync_epoch <= self.uploader.max_synced_epoch() { - debug!( - "epoch {} has been synced. Current max_sync_epoch {}", - new_sync_epoch, - self.uploader.max_synced_epoch() - ); - if let Some(result) = self.uploader.get_synced_data(new_sync_epoch) { - let result = to_sync_result(result); - send_sync_result(sync_result_sender, result); - } else { - send_sync_result( - sync_result_sender, - Err(HummockError::other( - "the requested sync epoch is not a checkpoint epoch", - )), - ); - } - return; - } - debug!( "awaiting for epoch to be synced: {}, max_synced_epoch: {}", new_sync_epoch, self.uploader.max_synced_epoch() ); - - // If the epoch is not synced, we add to the `pending_sync_requests` anyway. If the epoch is - // not a checkpoint epoch, it will be clear with the max synced epoch bumps up. - if let Some(old_sync_result_sender) = self - .pending_sync_requests - .insert(new_sync_epoch, sync_result_sender) - { - let _ = old_sync_result_sender - .send(Err(HummockError::other( - "the sync rx is overwritten by an new rx", - ))) - .inspect_err(|e| { - error!( - "unable to send sync result: {}. Err: {:?}", - new_sync_epoch, e - ); - }); - } + self.uploader + .start_sync_epoch(new_sync_epoch, sync_result_sender); } async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { @@ -661,16 +584,6 @@ impl HummockEventHandler { ); } - for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) { - send_sync_result( - result_sender, - Err(HummockError::other(format!( - "the sync epoch {} has been cleared", - epoch - ))), - ); - } - assert!( self.local_read_version_mapping.is_empty(), "read version mapping not empty when clear. remaining tables: {:?}", @@ -839,12 +752,12 @@ impl HummockEventHandler { fn handle_uploader_event(&mut self, event: UploaderEvent) { match event { - UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables) => { + UploaderEvent::SyncFinish(epoch, data) => { let _timer = self .metrics .event_handler_on_sync_finish_latency .start_timer(); - self.handle_epoch_synced(epoch, newly_uploaded_sstables); + self.handle_epoch_synced(epoch, data); } UploaderEvent::DataSpilled(staging_sstable_info) => { @@ -860,11 +773,11 @@ impl HummockEventHandler { HummockEvent::BufferMayFlush => { self.uploader.may_flush(); } - HummockEvent::AwaitSyncEpoch { + HummockEvent::SyncEpoch { new_sync_epoch, sync_result_sender, } => { - self.handle_await_sync_epoch(new_sync_epoch, sync_result_sender); + self.handle_sync_epoch(new_sync_epoch, sync_result_sender); } HummockEvent::Clear(_, _) => { unreachable!("clear is handled in separated async context") @@ -885,13 +798,9 @@ impl HummockEventHandler { HummockEvent::SealEpoch { epoch, - is_checkpoint, + is_checkpoint: _, } => { self.uploader.seal_epoch(epoch); - - if is_checkpoint { - self.uploader.start_sync_epoch(epoch); - } } HummockEvent::LocalSealEpoch { @@ -1025,47 +934,40 @@ impl HummockEventHandler { } } -fn send_sync_result( +pub(super) fn send_sync_result( sender: oneshot::Sender>, - result: HummockResult, + result: HummockResult<&SyncedData>, ) { + let result = result.map( + |SyncedData { + newly_upload_ssts, + uploaded_ssts, + table_watermarks, + }| { + let mut sync_size = 0; + let mut uncommitted_ssts = Vec::new(); + let mut old_value_ssts = Vec::new(); + // The newly uploaded `sstable_infos` contains newer data. Therefore, + // `newly_upload_ssts` at the front + for sst in newly_upload_ssts.iter().chain(uploaded_ssts.iter()) { + sync_size += sst.imm_size(); + uncommitted_ssts.extend(sst.sstable_infos().iter().cloned()); + old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned()); + } + SyncResult { + sync_size, + uncommitted_ssts, + table_watermarks: table_watermarks.clone(), + old_value_ssts, + } + }, + ); + let _ = sender.send(result).inspect_err(|e| { error!("unable to send sync result. Err: {:?}", e); }); } -fn to_sync_result(result: &HummockResult) -> HummockResult { - match result { - Ok(sync_data) => { - let sync_size = sync_data - .staging_ssts - .iter() - .map(StagingSstableInfo::imm_size) - .sum(); - Ok(SyncResult { - sync_size, - uncommitted_ssts: sync_data - .staging_ssts - .iter() - .flat_map(|staging_sstable_info| staging_sstable_info.sstable_infos().clone()) - .collect(), - table_watermarks: sync_data.table_watermarks.clone(), - old_value_ssts: sync_data - .staging_ssts - .iter() - .flat_map(|staging_sstable_info| { - staging_sstable_info.old_value_sstable_infos().clone() - }) - .collect(), - }) - } - Err(e) => Err(HummockError::other(format!( - "sync task failed: {}", - e.as_report() - ))), - } -} - #[cfg(test)] mod tests { use std::future::{poll_fn, Future}; @@ -1073,16 +975,25 @@ mod tests { use std::task::Poll; use futures::FutureExt; + use parking_lot::Mutex; + use risingwave_common::buffer::BitmapBuilder; + use risingwave_common::hash::VirtualNode; + use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_pb::hummock::PbHummockVersion; use tokio::spawn; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; + use crate::hummock::event_handler::refiller::CacheRefiller; + use crate::hummock::event_handler::uploader::tests::{gen_imm, TEST_TABLE_ID}; + use crate::hummock::event_handler::uploader::UploadTaskOutput; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; + use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; + use crate::hummock::HummockError; use crate::monitor::HummockStateStoreMetrics; #[tokio::test] @@ -1218,4 +1129,118 @@ mod tests { assert_eq!(latest_version.load().version(), &version5); } } + + #[tokio::test] + async fn test_old_epoch_sync_fail() { + let epoch0 = test_epoch(233); + + let initial_version = PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&PbHummockVersion { + id: 1, + max_committed_epoch: epoch0, + ..Default::default() + }), + unbounded_channel().0, + ); + + let (_version_update_tx, version_update_rx) = unbounded_channel(); + + let epoch1 = epoch0.next_epoch(); + let epoch2 = epoch1.next_epoch(); + let (tx, rx) = oneshot::channel(); + let rx = Arc::new(Mutex::new(Some(rx))); + + let event_handler = HummockEventHandler::new_inner( + version_update_rx, + initial_version.clone(), + None, + mock_sstable_store().await, + Arc::new(HummockStateStoreMetrics::unused()), + &default_opts_for_test(), + Arc::new(move |_, info| { + assert_eq!(info.epochs.len(), 1); + let epoch = info.epochs[0]; + match epoch { + epoch if epoch == epoch1 => { + let rx = rx.lock().take().unwrap(); + spawn(async move { + rx.await.unwrap(); + Err(HummockError::other("fail")) + }) + } + epoch if epoch == epoch2 => spawn(async move { + Ok(UploadTaskOutput { + new_value_ssts: vec![], + old_value_ssts: vec![], + wait_poll_timer: None, + }) + }), + _ => unreachable!(), + } + }), + CacheRefiller::default_spawn_refill_task(), + ); + + let event_tx = event_handler.event_sender(); + + let send_event = |event| event_tx.send(event).unwrap(); + + let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); + + let (read_version, guard) = { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::RegisterReadVersion { + table_id: TEST_TABLE_ID, + new_read_version_sender: tx, + is_replicated: false, + vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::COUNT).finish()), + }); + rx.await.unwrap() + }; + + let imm1 = gen_imm(epoch1).await; + read_version + .write() + .update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone()))); + + send_event(HummockEvent::ImmToUploader { + instance_id: guard.instance_id, + imm: imm1, + }); + + let imm2 = gen_imm(epoch2).await; + read_version + .write() + .update(VersionUpdate::Staging(StagingData::ImmMem(imm2.clone()))); + + send_event(HummockEvent::ImmToUploader { + instance_id: guard.instance_id, + imm: imm2, + }); + + send_event(HummockEvent::SealEpoch { + epoch: epoch1, + is_checkpoint: true, + }); + let (tx1, mut rx1) = oneshot::channel(); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch: epoch1, + sync_result_sender: tx1, + }); + assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await); + send_event(HummockEvent::SealEpoch { + epoch: epoch2, + is_checkpoint: true, + }); + let (tx2, mut rx2) = oneshot::channel(); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch: epoch2, + sync_result_sender: tx2, + }); + assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await); + + tx.send(()).unwrap(); + rx1.await.unwrap().unwrap_err(); + rx2.await.unwrap().unwrap_err(); + } } diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index f15cc39f82ef6..c28dd6d25c3a4 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -57,7 +57,7 @@ pub enum HummockEvent { /// An epoch is going to be synced. Once the event is processed, there will be no more flush /// task on this epoch. Previous concurrent flush task join handle will be returned by the join /// handle sender. - AwaitSyncEpoch { + SyncEpoch { new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, }, @@ -107,7 +107,7 @@ impl HummockEvent { match self { HummockEvent::BufferMayFlush => "BufferMayFlush".to_string(), - HummockEvent::AwaitSyncEpoch { + HummockEvent::SyncEpoch { new_sync_epoch, sync_result_sender: _, } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 23c0ea4ab49d1..9f40e9ac28962 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; -use std::mem::take; +use std::mem::{replace, take}; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -30,15 +30,17 @@ use prometheus::{HistogramTimer, IntGauge}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; +use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo, SyncResult}; use thiserror_ext::AsReport; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use tracing::{debug, error, info}; -use crate::hummock::event_handler::hummock_event_handler::BufferTracker; +use crate::hummock::event_handler::hummock_event_handler::{send_sync_result, BufferTracker}; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -350,11 +352,10 @@ impl SpilledData { } } - fn clear(&mut self) { - for task in self.uploading_tasks.drain(..) { + fn abort(self) { + for task in self.uploading_tasks { task.join_handle.abort(); } - self.uploaded_data.clear(); } } @@ -448,11 +449,6 @@ struct SealedData { } impl SealedData { - fn clear(&mut self) { - self.spilled_data.clear(); - *self = Self::default(); - } - /// Add the data of a newly sealed epoch. /// /// Note: it may happen that, for example, currently we hold `imms` and `spilled_data` of epoch @@ -576,6 +572,7 @@ struct SyncingData { // newer data at the front uploaded: VecDeque, table_watermarks: HashMap, + sync_result_sender: oneshot::Sender>, } impl SyncingData { @@ -584,14 +581,12 @@ impl SyncingData { } } -pub struct SyncedData { - pub staging_ssts: Vec, +pub(super) struct SyncedData { + pub newly_upload_ssts: Vec, + pub uploaded_ssts: VecDeque, pub table_watermarks: HashMap, } -// newer staging sstable info at the front -type SyncedDataState = HummockResult; - struct UploaderContext { pinned_version: PinnedVersion, /// When called, it will spawn a task to flush the imm into sst and return the join handle. @@ -618,6 +613,42 @@ impl UploaderContext { } } +#[derive(Default)] +struct UploaderData { + /// Data that are not sealed yet. `epoch` satisfies `epoch > max_sealed_epoch`. + unsealed_data: BTreeMap, + + /// Data that are sealed but not synced yet. `epoch` satisfies + /// `max_syncing_epoch < epoch <= max_sealed_epoch`. + sealed_data: SealedData, + + /// Data that has started syncing but not synced yet. `epoch` satisfies + /// `max_synced_epoch < epoch <= max_syncing_epoch`. + /// Newer epoch at the front + syncing_data: VecDeque, +} + +impl UploaderData { + fn abort(self, err: impl Fn() -> HummockError) { + self.sealed_data.spilled_data.abort(); + for (_, unsealed_data) in self.unsealed_data { + unsealed_data.spilled_data.abort(); + } + // TODO: call `abort` on the uploading task join handle of syncing_data + for syncing_data in self.syncing_data { + send_sync_result(syncing_data.sync_result_sender, Err(err())); + } + } +} + +enum UploaderState { + Working(UploaderData), + Err { + failed_epoch: HummockEpoch, + reason: String, + }, +} + /// An uploader for hummock data. /// /// Data have 4 sequential stages: unsealed, sealed, syncing, synced. @@ -639,27 +670,13 @@ pub struct HummockUploader { /// The maximum epoch that has been synced max_synced_epoch: HummockEpoch, - /// Data that are not sealed yet. `epoch` satisfies `epoch > max_sealed_epoch`. - unsealed_data: BTreeMap, - - /// Data that are sealed but not synced yet. `epoch` satisfies - /// `max_syncing_epoch < epoch <= max_sealed_epoch`. - sealed_data: SealedData, - - /// Data that has started syncing but not synced yet. `epoch` satisfies - /// `max_synced_epoch < epoch <= max_syncing_epoch`. - /// Newer epoch at the front - syncing_data: VecDeque, - - /// Data that has been synced already. `epoch` satisfies - /// `epoch <= max_synced_epoch`. - synced_data: BTreeMap, + state: UploaderState, context: UploaderContext, } impl HummockUploader { - pub(crate) fn new( + pub(super) fn new( state_store_metrics: Arc, pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, @@ -671,10 +688,11 @@ impl HummockUploader { max_sealed_epoch: initial_epoch, max_syncing_epoch: initial_epoch, max_synced_epoch: initial_epoch, - unsealed_data: Default::default(), - sealed_data: Default::default(), - syncing_data: Default::default(), - synced_data: Default::default(), + state: UploaderState::Working(UploaderData { + unsealed_data: Default::default(), + sealed_data: Default::default(), + syncing_data: Default::default(), + }), context: UploaderContext::new( pinned_version, spawn_upload_task, @@ -685,32 +703,30 @@ impl HummockUploader { } } - pub(crate) fn buffer_tracker(&self) -> &BufferTracker { + pub(super) fn buffer_tracker(&self) -> &BufferTracker { &self.context.buffer_tracker } - pub(crate) fn max_sealed_epoch(&self) -> HummockEpoch { + pub(super) fn max_sealed_epoch(&self) -> HummockEpoch { self.max_sealed_epoch } - pub(crate) fn max_synced_epoch(&self) -> HummockEpoch { + pub(super) fn max_synced_epoch(&self) -> HummockEpoch { self.max_synced_epoch } - pub(crate) fn max_committed_epoch(&self) -> HummockEpoch { + pub(super) fn max_committed_epoch(&self) -> HummockEpoch { self.context.pinned_version.max_committed_epoch() } - pub(crate) fn hummock_version(&self) -> &PinnedVersion { + pub(super) fn hummock_version(&self) -> &PinnedVersion { &self.context.pinned_version } - pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> { - assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch); - self.synced_data.get(&epoch) - } - - pub(crate) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) { + pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) { + let UploaderState::Working(data) = &mut self.state else { + return; + }; let epoch = imm.min_epoch(); assert!( epoch > self.max_sealed_epoch, @@ -718,7 +734,7 @@ impl HummockUploader { epoch, self.max_sealed_epoch ); - let unsealed_data = self.unsealed_data.entry(epoch).or_default(); + let unsealed_data = data.unsealed_data.entry(epoch).or_default(); unsealed_data .imms .entry(instance_id) @@ -726,26 +742,32 @@ impl HummockUploader { .push_front(UploaderImm::new(imm, &self.context)); } - pub(crate) fn add_table_watermarks( + pub(super) fn add_table_watermarks( &mut self, epoch: u64, table_id: TableId, table_watermarks: Vec, direction: WatermarkDirection, ) { + let UploaderState::Working(data) = &mut self.state else { + return; + }; assert!( epoch > self.max_sealed_epoch, "imm epoch {} older than max sealed epoch {}", epoch, self.max_sealed_epoch ); - self.unsealed_data + data.unsealed_data .entry(epoch) .or_default() .add_table_watermarks(table_id, table_watermarks, direction); } - pub(crate) fn seal_epoch(&mut self, epoch: HummockEpoch) { + pub(super) fn seal_epoch(&mut self, epoch: HummockEpoch) { + let UploaderState::Working(data) = &mut self.state else { + return; + }; debug!("epoch {} is sealed", epoch); assert!( epoch > self.max_sealed_epoch, @@ -755,7 +777,7 @@ impl HummockUploader { ); self.max_sealed_epoch = epoch; let unsealed_data = - if let Some((&smallest_unsealed_epoch, _)) = self.unsealed_data.first_key_value() { + if let Some((&smallest_unsealed_epoch, _)) = data.unsealed_data.first_key_value() { assert!( smallest_unsealed_epoch >= epoch, "some epoch {} older than epoch to seal {}", @@ -763,7 +785,7 @@ impl HummockUploader { epoch ); if smallest_unsealed_epoch == epoch { - let (_, unsealed_data) = self + let (_, unsealed_data) = data .unsealed_data .pop_first() .expect("we have checked non-empty"); @@ -776,10 +798,28 @@ impl HummockUploader { debug!("epoch {} to seal has no data", epoch); UnsealedEpochData::default() }; - self.sealed_data.seal_new_epoch(epoch, unsealed_data); + data.sealed_data.seal_new_epoch(epoch, unsealed_data); } - pub(crate) fn start_sync_epoch(&mut self, epoch: HummockEpoch) { + pub(super) fn start_sync_epoch( + &mut self, + epoch: HummockEpoch, + sync_result_sender: oneshot::Sender>, + ) { + let data = match &mut self.state { + UploaderState::Working(data) => data, + UploaderState::Err { + failed_epoch, + reason, + } => { + let result = Err(HummockError::other(format!( + "previous epoch {} failed due to [{}]", + failed_epoch, reason + ))); + send_sync_result(sync_result_sender, result); + return; + } + }; debug!("start sync epoch: {}", epoch); assert!( epoch > self.max_syncing_epoch, @@ -796,7 +836,7 @@ impl HummockUploader { // flush imms to SST file, the output SSTs will be uploaded to object store // return unfinished merging task - self.sealed_data.flush(&self.context, false); + data.sealed_data.flush(&self.context, false); let SealedData { epochs, @@ -808,7 +848,7 @@ impl HummockUploader { }, table_watermarks, .. - } = self.sealed_data.drain(); + } = data.sealed_data.drain(); assert!( imms_by_table_shard.is_empty(), @@ -823,20 +863,21 @@ impl HummockUploader { Some(try_join_all(uploading_tasks)) }; - self.syncing_data.push_front(SyncingData { + data.syncing_data.push_front(SyncingData { epochs: epochs.into_iter().collect(), uploading_tasks: try_join_all_upload_task, uploaded: uploaded_data, table_watermarks, + sync_result_sender, }); self.context .stats .uploader_syncing_epoch_count - .set(self.syncing_data.len() as _); + .set(data.syncing_data.len() as _); } - fn add_synced_data(&mut self, epoch: HummockEpoch, synced_state: SyncedDataState) { + fn add_synced_data(&mut self, epoch: HummockEpoch) { assert!( epoch <= self.max_syncing_epoch, "epoch {} that has been synced has not started syncing yet. previous max syncing epoch {}", @@ -850,7 +891,6 @@ impl HummockUploader { self.max_synced_epoch ); self.max_synced_epoch = epoch; - assert!(self.synced_data.insert(epoch, synced_state).is_none()); } pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) { @@ -860,42 +900,49 @@ impl HummockUploader { ); let max_committed_epoch = pinned_version.max_committed_epoch(); self.context.pinned_version = pinned_version; - self.synced_data - .retain(|epoch, _| *epoch > max_committed_epoch); if self.max_synced_epoch < max_committed_epoch { self.max_synced_epoch = max_committed_epoch; - if let Some(syncing_data) = self.syncing_data.back() { - // there must not be any syncing data below MCE - assert_gt!( - *syncing_data - .epochs - .last() - .expect("epoch should not be empty"), - max_committed_epoch - ); - } + if let UploaderState::Working(data) = &mut self.state { + if let Some(syncing_data) = data.syncing_data.back() { + // there must not be any syncing data below MCE + assert_gt!( + *syncing_data + .epochs + .last() + .expect("epoch should not be empty"), + max_committed_epoch + ); + } + }; } if self.max_syncing_epoch < max_committed_epoch { self.max_syncing_epoch = max_committed_epoch; - // there must not be any sealed data below MCE - if let Some(&epoch) = self.sealed_data.epochs.back() { - assert_gt!(epoch, max_committed_epoch); + if let UploaderState::Working(data) = &mut self.state { + // there must not be any sealed data below MCE + if let Some(&epoch) = data.sealed_data.epochs.back() { + assert_gt!(epoch, max_committed_epoch); + } } } if self.max_sealed_epoch < max_committed_epoch { self.max_sealed_epoch = max_committed_epoch; - // there must not be any unsealed data below MCE - if let Some((&epoch, _)) = self.unsealed_data.first_key_value() { - assert_gt!(epoch, max_committed_epoch); + if let UploaderState::Working(data) = &mut self.state { + // there must not be any unsealed data below MCE + if let Some((&epoch, _)) = data.unsealed_data.first_key_value() { + assert_gt!(epoch, max_committed_epoch); + } } } } pub(crate) fn may_flush(&mut self) -> bool { + let UploaderState::Working(data) = &mut self.state else { + return false; + }; if self.context.buffer_tracker.need_flush() { let mut curr_batch_flush_size = 0; if self.context.buffer_tracker.need_flush() { - curr_batch_flush_size += self.sealed_data.flush(&self.context, true); + curr_batch_flush_size += data.sealed_data.flush(&self.context, true); } if self @@ -904,7 +951,7 @@ impl HummockUploader { .need_more_flush(curr_batch_flush_size) { // iterate from older epoch to newer epoch - for unsealed_data in self.unsealed_data.values_mut() { + for unsealed_data in data.unsealed_data.values_mut() { curr_batch_flush_size += unsealed_data.flush(&self.context); if !self .context @@ -926,24 +973,34 @@ impl HummockUploader { self.max_synced_epoch = max_committed_epoch; self.max_syncing_epoch = max_committed_epoch; self.max_sealed_epoch = max_committed_epoch; - self.synced_data.clear(); - self.syncing_data.clear(); - self.sealed_data.clear(); - self.unsealed_data.clear(); + if let UploaderState::Working(data) = replace( + &mut self.state, + UploaderState::Working(UploaderData::default()), + ) { + data.abort(|| { + HummockError::other(format!("uploader is reset to {}", max_committed_epoch)) + }); + } self.context.stats.uploader_syncing_epoch_count.set(0); - - // TODO: call `abort` on the uploading task join handle } } -impl HummockUploader { +impl UploaderData { /// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if /// there is no syncing data. + #[expect(clippy::type_complexity)] fn poll_syncing_task( &mut self, cx: &mut Context<'_>, - ) -> Poll)>> { + context: &UploaderContext, + ) -> Poll< + Option<( + HummockEpoch, + HummockResult, + oneshot::Sender>, + )>, + > { // Only poll the oldest epoch if there is any so that the syncing epoch are finished in // order if let Some(syncing_data) = self.syncing_data.back_mut() { @@ -954,28 +1011,19 @@ impl HummockUploader { Ok(Vec::new()) }; let syncing_data = self.syncing_data.pop_back().expect("must exist"); - self.context + context .stats .uploader_syncing_epoch_count .set(self.syncing_data.len() as _); let epoch = syncing_data.sync_epoch(); - let newly_uploaded_sstable_infos = match &result { - Ok(sstable_infos) => sstable_infos.clone(), - Err(_) => vec![], - }; - - let result = result.map(|mut sstable_infos| { - // The newly uploaded `sstable_infos` contains newer data. Therefore, - // `sstable_infos` at the front - sstable_infos.extend(syncing_data.uploaded); - SyncedData { - staging_ssts: sstable_infos, - table_watermarks: syncing_data.table_watermarks, - } + let result = result.map(|newly_uploaded_sstable_infos| SyncedData { + newly_upload_ssts: newly_uploaded_sstable_infos, + uploaded_ssts: syncing_data.uploaded, + table_watermarks: syncing_data.table_watermarks, }); - self.add_synced_data(epoch, result); - Poll::Ready(Some((epoch, newly_uploaded_sstable_infos))) + + Poll::Ready(Some((epoch, result, syncing_data.sync_result_sender))) } else { Poll::Ready(None) } @@ -1005,24 +1053,57 @@ impl HummockUploader { } } -pub(crate) enum UploaderEvent { +pub(super) enum UploaderEvent { // staging sstable info of newer data comes first - SyncFinish(HummockEpoch, Vec), + SyncFinish(HummockEpoch, SyncedData), DataSpilled(StagingSstableInfo), } impl HummockUploader { - pub(crate) fn next_event(&mut self) -> impl Future + '_ { + pub(super) fn next_event(&mut self) -> impl Future + '_ { poll_fn(|cx| { - if let Some((epoch, newly_uploaded_sstables)) = ready!(self.poll_syncing_task(cx)) { - return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); + let UploaderState::Working(data) = &mut self.state else { + return Poll::Pending; + }; + if let Some((epoch, result, result_sender)) = + ready!(data.poll_syncing_task(cx, &self.context)) + { + match result { + Ok(data) => { + self.add_synced_data(epoch); + send_sync_result(result_sender, Ok(&data)); + return Poll::Ready(UploaderEvent::SyncFinish(epoch, data)); + } + Err(e) => { + send_sync_result( + result_sender, + Err(HummockError::other(format!( + "failed sync task: {:?}", + e.as_report() + ))), + ); + let data = must_match!(replace( + &mut self.state, + UploaderState::Err { + failed_epoch: epoch, + reason: format!("{:?}", e.as_report()), + }, + ), UploaderState::Working(data) => data); + + data.abort(|| { + HummockError::other(format!("previous epoch {} failed to sync", epoch)) + }); + + return Poll::Pending; + } + } } - if let Some(sstable_info) = ready!(self.poll_sealed_spill_task(cx)) { + if let Some(sstable_info) = ready!(data.poll_sealed_spill_task(cx)) { return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); } - if let Some(sstable_info) = ready!(self.poll_unsealed_spill_task(cx)) { + if let Some(sstable_info) = ready!(data.poll_unsealed_spill_task(cx)) { return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); } @@ -1032,7 +1113,7 @@ impl HummockUploader { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::collections::{HashMap, VecDeque}; use std::future::{poll_fn, Future}; use std::ops::Deref; @@ -1047,6 +1128,7 @@ mod tests { use itertools::Itertools; use prometheus::core::GenericGauge; use risingwave_common::catalog::TableId; + use risingwave_common::must_match; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::version::HummockVersion; @@ -1061,8 +1143,9 @@ mod tests { use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::uploader::{ - get_payload_imm_ids, HummockUploader, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, - UploaderContext, UploaderEvent, UploadingTask, + get_payload_imm_ids, HummockUploader, SyncedData, UploadTaskInfo, UploadTaskOutput, + UploadTaskPayload, UploaderContext, UploaderData, UploaderEvent, UploaderState, + UploadingTask, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -1075,13 +1158,23 @@ mod tests { use crate::opts::StorageOpts; const INITIAL_EPOCH: HummockEpoch = test_epoch(5); - const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; pub trait UploadOutputFuture = Future> + Send + 'static; pub trait UploadFn = Fn(UploadTaskPayload, UploadTaskInfo) -> Fut + Send + Sync + 'static; + impl HummockUploader { + fn data(&self) -> &UploaderData { + must_match!(&self.state, UploaderState::Working(data) => data) + } + + fn start_sync_epoch_for_test(&mut self, epoch: HummockEpoch) { + self.start_sync_epoch(epoch, oneshot::channel().0) + } + } + fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { HummockVersion { id: epoch, @@ -1122,7 +1215,7 @@ mod tests { ) } - async fn gen_imm(epoch: HummockEpoch) -> ImmutableMemtable { + pub(crate) async fn gen_imm(epoch: HummockEpoch) -> ImmutableMemtable { gen_imm_with_limiter(epoch, None).await } @@ -1299,14 +1392,15 @@ mod tests { let imm = gen_imm(epoch1).await; uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); - assert_eq!(1, uploader.unsealed_data.len()); + assert_eq!(1, uploader.data().unsealed_data.len()); assert_eq!( epoch1 as HummockEpoch, - *uploader.unsealed_data.first_key_value().unwrap().0 + *uploader.data().unsealed_data.first_key_value().unwrap().0 ); assert_eq!( 1, uploader + .data() .unsealed_data .first_key_value() .unwrap() @@ -1316,51 +1410,50 @@ mod tests { ); uploader.seal_epoch(epoch1); assert_eq!(epoch1, uploader.max_sealed_epoch); - assert!(uploader.unsealed_data.is_empty()); - assert_eq!(1, uploader.sealed_data.imm_count()); + assert!(uploader.data().unsealed_data.is_empty()); + assert_eq!(1, uploader.data().sealed_data.imm_count()); - uploader.start_sync_epoch(epoch1); + uploader.start_sync_epoch_for_test(epoch1); assert_eq!(epoch1 as HummockEpoch, uploader.max_syncing_epoch); - assert_eq!(0, uploader.sealed_data.imm_count()); - assert!(uploader.sealed_data.spilled_data.is_empty()); - assert_eq!(1, uploader.syncing_data.len()); - let syncing_data = uploader.syncing_data.front().unwrap(); + assert_eq!(0, uploader.data().sealed_data.imm_count()); + assert!(uploader.data().sealed_data.spilled_data.is_empty()); + assert_eq!(1, uploader.data().syncing_data.len()); + let syncing_data = uploader.data().syncing_data.front().unwrap(); assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch()); assert!(syncing_data.uploaded.is_empty()); assert!(syncing_data.uploading_tasks.is_some()); match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, ssts) => { + UploaderEvent::SyncFinish(finished_epoch, data) => { assert_eq!(epoch1, finished_epoch); - assert_eq!(1, ssts.len()); - let staging_sst = ssts.first().unwrap(); + let SyncedData { + newly_upload_ssts, + uploaded_ssts, + table_watermarks, + } = data; + assert_eq!(1, newly_upload_ssts.len()); + let staging_sst = newly_upload_ssts.first().unwrap(); assert_eq!(&vec![epoch1], staging_sst.epochs()); - assert_eq!(&get_imm_ids([&imm]), staging_sst.imm_ids()); + assert_eq!( + &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]), + staging_sst.imm_ids() + ); assert_eq!( &dummy_success_upload_output().new_value_ssts, staging_sst.sstable_infos() ); + assert!(uploaded_ssts.is_empty()); + assert!(table_watermarks.is_empty()); } _ => unreachable!(), }; assert_eq!(epoch1, uploader.max_synced_epoch()); - let synced_data = uploader.get_synced_data(epoch1).unwrap(); - let ssts = &synced_data.as_ref().unwrap().staging_ssts; - assert_eq!(1, ssts.len()); - let staging_sst = ssts.first().unwrap(); - assert_eq!(&vec![epoch1], staging_sst.epochs()); - assert_eq!(&get_imm_ids([&imm]), staging_sst.imm_ids()); - assert_eq!( - &dummy_success_upload_output().new_value_ssts, - staging_sst.sstable_infos() - ); let new_pinned_version = uploader .context .pinned_version .new_pin_version(test_hummock_version(epoch1)); uploader.update_pinned_version(new_pinned_version); - assert!(uploader.synced_data.is_empty()); assert_eq!(epoch1, uploader.max_committed_epoch()); } @@ -1375,13 +1468,14 @@ mod tests { uploader.seal_epoch(epoch1); assert_eq!(epoch1, uploader.max_sealed_epoch); - uploader.start_sync_epoch(epoch1); + uploader.start_sync_epoch_for_test(epoch1); assert_eq!(epoch1, uploader.max_syncing_epoch); match uploader.next_event().await { - UploaderEvent::SyncFinish(finished_epoch, ssts) => { + UploaderEvent::SyncFinish(finished_epoch, data) => { assert_eq!(epoch1, finished_epoch); - assert!(ssts.is_empty()); + assert!(data.uploaded_ssts.is_empty()); + assert!(data.newly_upload_ssts.is_empty()); } _ => unreachable!(), }; @@ -1391,18 +1485,21 @@ mod tests { .pinned_version .new_pin_version(test_hummock_version(epoch1)); uploader.update_pinned_version(new_pinned_version); - assert!(uploader.synced_data.is_empty()); + assert!(uploader.data().syncing_data.is_empty()); assert_eq!(epoch1, uploader.max_committed_epoch()); } #[tokio::test] async fn test_uploader_poll_empty() { let mut uploader = test_uploader(dummy_success_upload_future); - assert!(poll_fn(|cx| uploader.poll_syncing_task(cx)).await.is_none()); - assert!(poll_fn(|cx| uploader.poll_sealed_spill_task(cx)) + let data = must_match!(&mut uploader.state, UploaderState::Working(data) => data); + assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) + .await + .is_none()); + assert!(poll_fn(|cx| data.poll_sealed_spill_task(cx)) .await .is_none()); - assert!(poll_fn(|cx| uploader.poll_unsealed_spill_task(cx)) + assert!(poll_fn(|cx| data.poll_unsealed_spill_task(cx)) .await .is_none()); } @@ -1440,7 +1537,7 @@ mod tests { assert_eq!(epoch3, uploader.max_syncing_epoch); assert_eq!(epoch6, uploader.max_sealed_epoch); - uploader.start_sync_epoch(epoch6); + uploader.start_sync_epoch_for_test(epoch6); assert_eq!(epoch6, uploader.max_syncing_epoch); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.max_synced_epoch); @@ -1534,7 +1631,6 @@ mod tests { .is_pending() ) } - #[tokio::test] async fn test_uploader_finish_in_order() { let config = StorageOpts { @@ -1606,7 +1702,7 @@ mod tests { let (await_start1_4, finish_tx1_4) = new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); uploader.seal_epoch(epoch1); - uploader.start_sync_epoch(epoch1); + uploader.start_sync_epoch_for_test(epoch1); await_start1_4.await; uploader.seal_epoch(epoch2); @@ -1656,40 +1752,25 @@ mod tests { assert_uploader_pending(&mut uploader).await; finish_tx1_3.send(()).unwrap(); - if let UploaderEvent::SyncFinish(epoch, newly_upload_sst) = uploader.next_event().await { + if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { assert_eq!(epoch1, epoch); - assert_eq!(2, newly_upload_sst.len()); + assert_eq!(2, data.newly_upload_ssts.len()); + assert_eq!(1, data.uploaded_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch1_sync_payload), - newly_upload_sst[0].imm_ids() + data.newly_upload_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch1_spill_payload3), - newly_upload_sst[1].imm_ids() + data.newly_upload_ssts[1].imm_ids() + ); + assert_eq!( + &get_payload_imm_ids(&epoch1_spill_payload12), + data.uploaded_ssts[0].imm_ids() ); } else { unreachable!("should be sync finish"); } - assert_eq!(epoch1, uploader.max_synced_epoch); - let synced_data1 = &uploader - .get_synced_data(epoch1) - .unwrap() - .as_ref() - .unwrap() - .staging_ssts; - assert_eq!(3, synced_data1.len()); - assert_eq!( - &get_payload_imm_ids(&epoch1_sync_payload), - synced_data1[0].imm_ids() - ); - assert_eq!( - &get_payload_imm_ids(&epoch1_spill_payload3), - synced_data1[1].imm_ids() - ); - assert_eq!( - &get_payload_imm_ids(&epoch1_spill_payload12), - synced_data1[2].imm_ids() - ); // current uploader state: // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] @@ -1698,25 +1779,19 @@ mod tests { // syncing: empty // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) - uploader.start_sync_epoch(epoch2); - if let UploaderEvent::SyncFinish(epoch, newly_upload_sst) = uploader.next_event().await { + uploader.start_sync_epoch_for_test(epoch2); + if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { assert_eq!(epoch2, epoch); - assert!(newly_upload_sst.is_empty()); + assert!(data.newly_upload_ssts.is_empty()); + assert_eq!(data.uploaded_ssts.len(), 1); + assert_eq!( + &get_payload_imm_ids(&epoch2_spill_payload), + data.uploaded_ssts[0].imm_ids() + ); } else { unreachable!("should be sync finish"); } assert_eq!(epoch2, uploader.max_synced_epoch); - let synced_data2 = &uploader - .get_synced_data(epoch2) - .unwrap() - .as_ref() - .unwrap() - .staging_ssts; - assert_eq!(1, synced_data2.len()); - assert_eq!( - &get_payload_imm_ids(&epoch2_spill_payload), - synced_data2[0].imm_ids() - ); // current uploader state: // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] @@ -1744,7 +1819,7 @@ mod tests { let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]); let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); - uploader.start_sync_epoch(epoch4); + uploader.start_sync_epoch_for_test(epoch4); await_start4_with_3_3.await; // current uploader state: @@ -1759,41 +1834,26 @@ mod tests { assert_uploader_pending(&mut uploader).await; finish_tx4_with_3_3.send(()).unwrap(); - if let UploaderEvent::SyncFinish(epoch, newly_upload_sst) = uploader.next_event().await { + if let UploaderEvent::SyncFinish(epoch, data) = uploader.next_event().await { assert_eq!(epoch4, epoch); - assert_eq!(2, newly_upload_sst.len()); + assert_eq!(2, data.newly_upload_ssts.len()); assert_eq!( &get_payload_imm_ids(&epoch4_sync_payload), - newly_upload_sst[0].imm_ids() + data.newly_upload_ssts[0].imm_ids() ); assert_eq!( &get_payload_imm_ids(&epoch3_spill_payload2), - newly_upload_sst[1].imm_ids() + data.newly_upload_ssts[1].imm_ids() ); + assert_eq!(1, data.uploaded_ssts.len()); + assert_eq!( + &get_payload_imm_ids(&epoch3_spill_payload1), + data.uploaded_ssts[0].imm_ids(), + ) } else { unreachable!("should be sync finish"); } assert_eq!(epoch4, uploader.max_synced_epoch); - let synced_data4 = &uploader - .get_synced_data(epoch4) - .unwrap() - .as_ref() - .unwrap() - .staging_ssts; - assert_eq!(3, synced_data4.len()); - assert_eq!(&vec![epoch4, epoch3], synced_data4[0].epochs()); - assert_eq!( - &get_payload_imm_ids(&epoch4_sync_payload), - synced_data4[0].imm_ids() - ); - assert_eq!( - &get_payload_imm_ids(&epoch3_spill_payload2), - synced_data4[1].imm_ids() - ); - assert_eq!( - &get_payload_imm_ids(&epoch3_spill_payload1), - synced_data4[2].imm_ids() - ); // current uploader state: // unsealed: empty diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 3108c45b7fb93..bfe24e5559376 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; +use futures::FutureExt; use itertools::Itertools; use more_asserts::assert_gt; use risingwave_common::catalog::TableId; @@ -556,15 +557,15 @@ impl StateStore for HummockStorage { wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await } - async fn sync(&self, epoch: u64) -> StorageResult { + fn sync(&self, epoch: u64) -> impl SyncFuture { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::AwaitSyncEpoch { + .send(HummockEvent::SyncEpoch { new_sync_epoch: epoch, sync_result_sender: tx, }) .expect("should send success"); - Ok(rx.await.expect("should wait success")?) + rx.map(|recv_result| Ok(recv_result.expect("should wait success")?)) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 2bbed36b4efce..a266fbc129cfd 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -745,10 +745,13 @@ impl StateStore for RangeKvStateStore { } #[allow(clippy::unused_async)] - async fn sync(&self, _epoch: u64) -> StorageResult { - self.inner.flush()?; + fn sync(&self, _epoch: u64) -> impl SyncFuture { + let result = self.inner.flush(); // memory backend doesn't need to push to S3, so this is a no-op - Ok(SyncResult::default()) + async move { + result?; + Ok(SyncResult::default()) + } } fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 5c010106aa958..a571d47b799a2 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -21,7 +21,7 @@ use futures::{Future, TryFutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::{error, Instrument}; @@ -319,23 +319,20 @@ impl StateStore for MonitoredStateStore { .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } - async fn sync(&self, epoch: u64) -> StorageResult { - // TODO: this metrics may not be accurate if we start syncing after `seal_epoch`. We may - // move this metrics to inside uploader + fn sync(&self, epoch: u64) -> impl SyncFuture { + let future = self.inner.sync(epoch).instrument_await("store_sync"); let timer = self.storage_metrics.sync_duration.start_timer(); - let sync_result = self - .inner - .sync(epoch) - .instrument_await("store_sync") - .await - .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?; - timer.observe_duration(); - if sync_result.sync_size != 0 { - self.storage_metrics - .sync_size - .observe(sync_result.sync_size as _); + let sync_size = self.storage_metrics.sync_size.clone(); + async move { + let sync_result = future + .await + .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?; + timer.observe_duration(); + if sync_result.sync_size != 0 { + sync_size.observe(sync_result.sync_size as _); + } + Ok(sync_result) } - Ok(sync_result) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 34d7149ff62b3..f22c854808114 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -11,13 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use std::sync::Arc; use bytes::Bytes; -use futures::Future; +use futures::{Future, FutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ init_collector, should_use_trace, ConcurrentId, MayTraceSpan, OperationResult, StorageType, TraceResult, TraceSpan, TracedBytes, TracedSealCurrentEpochOptions, LOCAL_ID, @@ -252,15 +253,17 @@ impl StateStore for TracedStateStore { res } - async fn sync(&self, epoch: u64) -> StorageResult { + fn sync(&self, epoch: u64) -> impl SyncFuture { let span: MayTraceSpan = TraceSpan::new_sync_span(epoch, self.storage_type); - let sync_result = self.inner.sync(epoch).await; + let future = self.inner.sync(epoch); - span.may_send_result(OperationResult::Sync( - sync_result.as_ref().map(|res| res.sync_size).into(), - )); - sync_result + future.map(move |sync_result| { + span.may_send_result(OperationResult::Sync( + sync_result.as_ref().map(|res| res.sync_size).into(), + )); + sync_result + }) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index d2e032fb03a47..9f3a428442040 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use bytes::Bytes; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::HummockReadEpoch; use crate::error::StorageResult; use crate::storage_value::StorageValue; @@ -180,8 +180,8 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - async fn sync(&self, _epoch: u64) -> StorageResult { - panic!("should not await sync epoch from the panic state store!"); + fn sync(&self, _epoch: u64) -> impl SyncFuture { + async { panic!("should not await sync epoch from the panic state store!") } } fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 18be3a91c0398..92015136a1f3f 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -318,6 +318,8 @@ pub trait StateStoreWrite: StaticSendSync { ) -> StorageResult; } +pub trait SyncFuture = Future> + Send + 'static; + pub trait StateStore: StateStoreRead + StaticSendSync + Clone { type Local: LocalStateStore; @@ -328,7 +330,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { epoch: HummockReadEpoch, ) -> impl Future> + Send + '_; - fn sync(&self, epoch: u64) -> impl Future> + Send + '_; + fn sync(&self, epoch: u64) -> impl SyncFuture; /// update max current epoch in storage. fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c8b91438f0006..f0aace0e12474 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -215,7 +215,7 @@ pub mod verify { use bytes::Bytes; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; - use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; + use risingwave_hummock_sdk::HummockReadEpoch; use tracing::log::warn; use crate::error::StorageResult; @@ -558,11 +558,15 @@ pub mod verify { self.actual.try_wait_epoch(epoch) } - async fn sync(&self, epoch: u64) -> StorageResult { - if let Some(expected) = &self.expected { - let _ = expected.sync(epoch).await; + fn sync(&self, epoch: u64) -> impl SyncFuture { + let expected_future = self.expected.as_ref().map(|expected| expected.sync(epoch)); + let actual_future = self.actual.sync(epoch); + async move { + if let Some(expected_future) = expected_future { + expected_future.await?; + } + actual_future.await } - self.actual.sync(epoch).await } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { @@ -820,6 +824,8 @@ pub mod boxed_state_store { use bytes::Bytes; use dyn_clone::{clone_trait_object, DynClone}; + use futures::future::BoxFuture; + use futures::FutureExt; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; @@ -1146,7 +1152,7 @@ pub mod boxed_state_store { pub trait DynamicDispatchedStateStoreExt: StaticSendSync { async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; - async fn sync(&self, epoch: u64) -> StorageResult; + fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult>; fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); @@ -1163,8 +1169,8 @@ pub mod boxed_state_store { self.try_wait_epoch(epoch).await } - async fn sync(&self, epoch: u64) -> StorageResult { - self.sync(epoch).await + fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult> { + self.sync(epoch).boxed() } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { @@ -1257,7 +1263,10 @@ pub mod boxed_state_store { self.deref().try_wait_epoch(epoch) } - fn sync(&self, epoch: u64) -> impl Future> + Send + '_ { + fn sync( + &self, + epoch: u64, + ) -> impl Future> + Send + 'static { self.deref().sync(epoch) } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 62bfe279f34af..00ff5606bde2e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use anyhow::anyhow; use await_tree::InstrumentAwait; use futures::stream::FuturesOrdered; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use prometheus::HistogramTimer; use risingwave_common::must_match; use risingwave_hummock_sdk::SyncResult; @@ -96,43 +96,48 @@ fn sync_epoch( kind: BarrierKind, ) -> impl Future>> + 'static { let barrier_sync_latency = streaming_metrics.barrier_sync_latency.clone(); - let state_store = state_store.clone(); - async move { - let sync_result = match kind { - BarrierKind::Unspecified => unreachable!(), - BarrierKind::Initial => { - if let Some(hummock) = state_store.as_hummock() { - let mce = hummock.get_pinned_version().max_committed_epoch(); - assert_eq!( - mce, prev_epoch, - "first epoch should match with the current version", - ); - } - tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); - None - } - BarrierKind::Barrier => None, - BarrierKind::Checkpoint => { - let timer = barrier_sync_latency.start_timer(); - let sync_result = dispatch_state_store!(state_store, store, { - store - .sync(prev_epoch) - .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) - .await - .inspect_err(|e| { - tracing::error!( - prev_epoch, - error = %e.as_report(), - "Failed to sync state store", - ); - }) - })?; - timer.observe_duration(); - Some(sync_result) + let sync_result_future = match kind { + BarrierKind::Unspecified => unreachable!(), + BarrierKind::Initial => { + if let Some(hummock) = state_store.as_hummock() { + let mce = hummock.get_pinned_version().max_committed_epoch(); + assert_eq!( + mce, prev_epoch, + "first epoch should match with the current version", + ); } - }; - Ok(sync_result) + tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); + None + } + BarrierKind::Barrier => None, + BarrierKind::Checkpoint => { + let timer = barrier_sync_latency.start_timer(); + let sync_result_future = dispatch_state_store!(state_store, store, { + store + .sync(prev_epoch) + .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) + .inspect_ok(move |_| { + timer.observe_duration(); + }) + .inspect_err(move |e| { + tracing::error!( + prev_epoch, + error = %e.as_report(), + "Failed to sync state store", + ); + }) + .boxed() + }); + Some(sync_result_future) + } + }; + async move { + if let Some(sync_result_future) = sync_result_future { + Ok(Some(sync_result_future.await?)) + } else { + Ok(None) + } } }