diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 1da93dae10f1f..bbfb932162552 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -428,10 +428,7 @@ impl EpochData { } #[derive(Default)] -/// Data at the sealed stage. We will ensure that data in `imms` are newer than the data in the -/// `spilled_data`, and that data in the `uploading_tasks` in `spilled_data` are newer than data in -/// the `uploaded_data` in `spilled_data`. -struct SealedData { +struct SyncDataBuilder { // newer epochs come first epochs: VecDeque, @@ -440,8 +437,8 @@ struct SealedData { table_watermarks: HashMap, } -impl SealedData { - /// Add the data of a newly sealed epoch. +impl SyncDataBuilder { + /// Add the data of a new epoch. /// /// Note: it may happen that, for example, currently we hold `imms` and `spilled_data` of epoch /// 3, and after we add the spilled data of epoch 4, both `imms` and `spilled_data` hold data @@ -451,9 +448,9 @@ impl SealedData { /// data of `imms` must not overlap with the epoch 4 data of `spilled_data`. The explanation is /// as followed: /// - /// First, unsealed data has 3 stages, from earlier to later, imms, uploading task, and - /// uploaded. When we try to spill unsealed data, we first pick the imms of older epoch until - /// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsealed + /// First, unsync data has 3 stages, from earlier to later, imms, uploading task, and + /// uploaded. When we try to spill unsync data, we first pick the imms of older epoch until + /// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsync /// data, we first poll the task of older epoch, until there is no uploading task in older /// epoch. Therefore, we can reach that, if two data are in the same stage, but /// different epochs, data in the older epoch will always enter the next stage earlier than data @@ -467,17 +464,19 @@ impl SealedData { /// Based on the two points above, we can reach that, if two data of a same key appear in /// different epochs, the data of older epoch will not appear at a later stage than the data /// of newer epoch. Therefore, we can safely merge the data of each stage when we seal an epoch. - fn seal_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) { - if let Some(prev_max_sealed_epoch) = self.epochs.front() { + fn add_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) { + if let Some(prev_max_epoch) = self.epochs.front() { assert!( - epoch > *prev_max_sealed_epoch, - "epoch {} to seal not greater than prev max sealed epoch {}", + epoch > *prev_max_epoch, + "epoch {} to seal not greater than prev max epoch {}", epoch, - prev_max_sealed_epoch + prev_max_epoch ); } self.epochs.push_front(epoch); + // for each local instance, earlier data must be spilled at earlier epoch. Therefore, since we add spill data from old epoch + // to new epoch, unseal_epoch_data .spilled_data .uploading_tasks @@ -692,6 +691,10 @@ impl LocalInstanceUnsyncData { } #[derive(Default)] +/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information. +/// +/// `instance_data` holds the imm of each individual local instance, and data are first added here. +/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`. struct UnsyncData { instance_data: HashMap, // An index of `instance_data` that maintains the set of existing instance id of a table id @@ -766,18 +769,15 @@ impl UnsyncData { } } - fn sync( - &mut self, - epoch: HummockEpoch, - ) -> (HashMap>, SealedData) { + fn sync(&mut self, epoch: HummockEpoch, context: &UploaderContext) -> SyncDataBuilder { let mut sync_epoch_data = self.epoch_data.split_off(&(epoch + 1)); swap(&mut sync_epoch_data, &mut self.epoch_data); // by now, epoch data inclusively before epoch is in sync_epoch_data - let mut sealed_data = SealedData::default(); + let mut sync_data = SyncDataBuilder::default(); for (epoch, epoch_data) in sync_epoch_data { - sealed_data.seal_new_epoch(epoch, epoch_data); + sync_data.add_new_epoch(epoch, epoch_data); } let mut flush_payload = HashMap::new(); @@ -787,7 +787,8 @@ impl UnsyncData { flush_payload.insert(*instance_id, payload); } } - (flush_payload, sealed_data) + sync_data.flush(context, flush_payload); + sync_data } fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) { @@ -876,14 +877,13 @@ enum UploaderState { /// An uploader for hummock data. /// -/// Data have 4 sequential stages: unsealed, sealed, syncing, synced. +/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced. /// -/// The 4 stages are divided by 3 marginal epochs: `max_sealed_epoch`, `max_syncing_epoch`, +/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`, /// `max_synced_epoch`. Epochs satisfy the following inequality. /// /// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <= -/// `max_syncing_epoch` < (epochs of `sealed_data`) <= `max_sealed_epoch` < (epochs of -/// `unsealed_data`) +/// `max_syncing_epoch` < (epochs of `unsync_data`) /// /// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data /// order. Data at the front represents ***newer*** data. @@ -1005,11 +1005,9 @@ impl HummockUploader { self.max_syncing_epoch = epoch; - let (payload, mut sealed_data) = data.unsync_data.sync(epoch); + let sync_data = data.unsync_data.sync(epoch, &self.context); - sealed_data.flush(&self.context, payload); - - let SealedData { + let SyncDataBuilder { spilled_data: SpilledData { uploading_tasks, @@ -1017,7 +1015,7 @@ impl HummockUploader { }, table_watermarks, .. - } = sealed_data; + } = sync_data; let try_join_all_upload_task = if uploading_tasks.is_empty() { None @@ -1067,11 +1065,15 @@ impl HummockUploader { } if self.max_syncing_epoch < max_committed_epoch { self.max_syncing_epoch = max_committed_epoch; - // // there must not be any sealed data below MCE - // if let Some(&epoch) = self.sealed_data.epochs.back() { - // assert_gt!(epoch, max_committed_epoch); - // } - // TODO: may check each instance + if let UploaderState::Working(data) = &self.state { + for instance_data in data.unsync_data.instance_data.values() { + if let Some(oldest_epoch) = instance_data.sealed_data.back() { + assert_gt!(oldest_epoch.epoch, max_committed_epoch); + } else if let Some(current_epoch) = &instance_data.current_epoch_data { + assert_gt!(current_epoch.epoch, max_committed_epoch); + } + } + } } } @@ -1178,17 +1180,14 @@ impl UploaderData { } } - /// Poll the success of the oldest spilled task of unsealed data. Return `Poll::Ready(None)` if + /// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if /// there is no spilling task. - fn poll_unsealed_spill_task( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll> { // iterator from older epoch to new epoch so that the spill task are finished in epoch order - for unsealed_data in self.unsync_data.epoch_data.values_mut() { - // if None, there is no spilling task. Search for the unsealed data of the next epoch in + for epoch_data in self.unsync_data.epoch_data.values_mut() { + // if None, there is no spilling task. Search for the unsync data of the next epoch in // the next iteration. - if let Some(sstable_info) = ready!(unsealed_data.spilled_data.poll_success_spill(cx)) { + if let Some(sstable_info) = ready!(epoch_data.spilled_data.poll_success_spill(cx)) { self.unsync_data.ack_flushed(&sstable_info); return Poll::Ready(Some(sstable_info)); } @@ -1242,7 +1241,7 @@ impl HummockUploader { }; } - if let Some(sstable_info) = ready!(data.poll_unsealed_spill_task(cx)) { + if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) { return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); } @@ -1651,9 +1650,7 @@ pub(crate) mod tests { assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) .await .is_none()); - assert!(poll_fn(|cx| data.poll_unsealed_spill_task(cx)) - .await - .is_none()); + assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none()); } #[tokio::test]