From b9ceb8d208c2d906d45af2a5926456d41629e67f Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 9 Jul 2024 17:20:06 +0800 Subject: [PATCH] feat(storage): decouple upload task finish order from epoch (#17460) --- .../{uploader.rs => uploader/mod.rs} | 633 +++++++++--------- .../event_handler/uploader/task_manager.rs | 187 ++++++ .../src/hummock/store/hummock_storage.rs | 14 +- 3 files changed, 494 insertions(+), 340 deletions(-) rename src/storage/src/hummock/event_handler/{uploader.rs => uploader/mod.rs} (83%) create mode 100644 src/storage/src/hummock/event_handler/uploader/task_manager.rs diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs similarity index 83% rename from src/storage/src/hummock/event_handler/uploader.rs rename to src/storage/src/hummock/event_handler/uploader/mod.rs index 55761f1216183..06f0c3aff77a4 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod task_manager; + use std::cmp::Ordering; use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; use std::mem::{replace, swap, take}; -use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -35,6 +36,7 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use task_manager::{TaskManager, UploadingTaskStatus}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -161,9 +163,13 @@ mod uploader_imm { } } +#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)] +struct UploadingTaskId(usize); + /// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are /// stored so that when the task fails, it can be re-tried. struct UploadingTask { + task_id: UploadingTaskId, // newer data at the front input: UploadTaskInput, join_handle: JoinHandle>, @@ -219,7 +225,7 @@ impl UploadingTask { .collect() } - fn new(input: UploadTaskInput, context: &UploaderContext) -> Self { + fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self { assert!(!input.is_empty()); let mut epochs = input .iter() @@ -254,6 +260,7 @@ impl UploadingTask { let join_handle = (context.spawn_upload_task)(payload, task_info.clone()); context.stats.uploader_uploading_task_count.inc(); Self { + task_id, input, join_handle, task_info, @@ -324,81 +331,6 @@ impl UploadingTask { } } -impl Future for UploadingTask { - type Output = HummockResult>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll_result(cx) - } -} - -#[derive(Default, Debug)] -/// Manage the spilled data. Task and uploaded data at the front is newer data. Task data are -/// always newer than uploaded data. Task holding oldest data is always collected first. -struct SpilledData { - // ordered spilling tasks. Task at the back is spilling older data. - uploading_tasks: VecDeque, - // ordered spilled data. Data at the back is older. - uploaded_data: VecDeque>, -} - -impl SpilledData { - fn add_task(&mut self, task: UploadingTask) { - self.uploading_tasks.push_front(task); - } - - /// Poll the successful spill of the oldest uploading task. Return `Poll::Ready(None)` is there - /// is no uploading task - fn poll_success_spill( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - // only poll the oldest uploading task if there is any - if let Some(task) = self.uploading_tasks.back_mut() { - let staging_sstable_info = ready!(task.poll_ok_with_retry(cx)); - self.uploaded_data.push_front(staging_sstable_info.clone()); - self.uploading_tasks.pop_back(); - Poll::Ready(Some(staging_sstable_info)) - } else { - Poll::Ready(None) - } - } - - fn abort(self) { - for task in self.uploading_tasks { - task.join_handle.abort(); - } - } -} - -#[derive(Default, Debug)] -struct EpochData { - spilled_data: SpilledData, -} - -impl EpochData { - fn flush( - &mut self, - context: &UploaderContext, - imms: HashMap>, - ) -> usize { - if !imms.is_empty() { - let task = UploadingTask::new(imms, context); - context.stats.spill_task_counts_from_unsealed.inc(); - context - .stats - .spill_task_size_from_unsealed - .inc_by(task.task_info.task_size as u64); - info!("Spill unsealed data. Task: {}", task.get_task_info()); - let size = task.task_info.task_size; - self.spilled_data.add_task(task); - size - } else { - 0 - } - } -} - impl TableUnsyncData { fn add_table_watermarks( &mut self, @@ -453,72 +385,9 @@ impl TableUnsyncData { } } -#[derive(Default)] -struct SyncDataBuilder { - // newer epochs come first - epochs: VecDeque, - - spilled_data: SpilledData, - - table_watermarks: HashMap, - - table_ids_to_ack: HashSet, -} - -impl SyncDataBuilder { - /// Add the data of a new epoch. - /// - /// Note: it may happen that, for example, currently we hold `imms` and `spilled_data` of epoch - /// 3, and after we add the spilled data of epoch 4, both `imms` and `spilled_data` hold data - /// of both epoch 3 and 4, which seems breaking the rules that data in `imms` are - /// always newer than data in `spilled_data`, because epoch 3 data of `imms` - /// seems older than epoch 4 data of `spilled_data`. However, if this happens, the epoch 3 - /// data of `imms` must not overlap with the epoch 4 data of `spilled_data`. The explanation is - /// as followed: - /// - /// First, unsync data has 3 stages, from earlier to later, imms, uploading task, and - /// uploaded. When we try to spill unsync data, we first pick the imms of older epoch until - /// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsync - /// data, we first poll the task of older epoch, until there is no uploading task in older - /// epoch. Therefore, we can reach that, if two data are in the same stage, but - /// different epochs, data in the older epoch will always enter the next stage earlier than data - /// in the newer epoch. - /// - /// Second, we have an assumption that, if a key has been written in a newer epoch, e.g. epoch4, - /// it will no longer be written in an older epoch, e.g. epoch3, and then, if two data of the - /// same key are at the imm stage, the data of older epoch must appear earlier than the data - /// of newer epoch. - /// - /// Based on the two points above, we can reach that, if two data of a same key appear in - /// different epochs, the data of older epoch will not appear at a later stage than the data - /// of newer epoch. Therefore, we can safely merge the data of each stage when we seal an epoch. - fn add_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) { - if let Some(prev_max_epoch) = self.epochs.front() { - assert!( - epoch > *prev_max_epoch, - "epoch {} to seal not greater than prev max epoch {}", - epoch, - prev_max_epoch - ); - } - - self.epochs.push_front(epoch); - // for each local instance, earlier data must be spilled at earlier epoch. Therefore, since we add spill data from old epoch - // to new epoch, - unseal_epoch_data - .spilled_data - .uploading_tasks - .append(&mut self.spilled_data.uploading_tasks); - unseal_epoch_data - .spilled_data - .uploaded_data - .append(&mut self.spilled_data.uploaded_data); - self.spilled_data.uploading_tasks = unseal_epoch_data.spilled_data.uploading_tasks; - self.spilled_data.uploaded_data = unseal_epoch_data.spilled_data.uploaded_data; - } - +impl UploaderData { fn add_table_watermarks( - &mut self, + all_table_watermarks: &mut HashMap, table_id: TableId, direction: WatermarkDirection, watermarks: impl Iterator)>, @@ -540,19 +409,11 @@ impl SyncDataBuilder { } } if let Some(table_watermarks) = table_watermarks { - assert!(self - .table_watermarks + assert!(all_table_watermarks .insert(table_id, table_watermarks) .is_none()); } } - - fn flush(&mut self, context: &UploaderContext, payload: UploadTaskInput) { - if !payload.is_empty() { - let task = UploadingTask::new(payload, context); - self.spilled_data.add_task(task); - } - } } struct LocalInstanceEpochData { @@ -753,6 +614,7 @@ struct TableUnsyncData { WatermarkDirection, BTreeMap, BitmapBuilder)>, )>, + spill_tasks: BTreeMap>, // newer epoch at the front syncing_epochs: VecDeque, max_synced_epoch: Option, @@ -764,6 +626,7 @@ impl TableUnsyncData { table_id, instance_data: Default::default(), table_watermarks: None, + spill_tasks: Default::default(), syncing_epochs: Default::default(), max_synced_epoch: committed_epoch, } @@ -778,6 +641,7 @@ impl TableUnsyncData { WatermarkDirection, impl Iterator)>, )>, + impl Iterator, ) { if let Some(prev_epoch) = self.max_sync_epoch() { assert_gt!(epoch, prev_epoch) @@ -798,6 +662,9 @@ impl TableUnsyncData { .map(|(epoch, (watermarks, _))| (epoch, watermarks)), ) }), + take_before_epoch(&mut self.spill_tasks, epoch) + .into_values() + .flat_map(|tasks| tasks.into_iter()), ) } @@ -858,7 +725,9 @@ struct UnsyncData { table_data: HashMap, // An index as a mapping from instance id to its table id instance_table_id: HashMap, - epoch_data: BTreeMap, + // TODO: this is only used in spill to get existing epochs and can be removed + // when we support spill not based on epoch + epochs: BTreeMap, } impl UnsyncData { @@ -899,7 +768,7 @@ impl UnsyncData { .instance_table_id .insert(instance_id, table_id) .is_none()); - self.epoch_data.entry(init_epoch).or_default(); + self.epochs.insert(init_epoch, ()); } fn instance_data( @@ -938,59 +807,123 @@ impl UnsyncData { .get_mut(&instance_id) .expect("should exist"); let epoch = instance_data.local_seal_epoch(next_epoch); - self.epoch_data.entry(next_epoch).or_default(); + self.epochs.insert(next_epoch, ()); if let Some((direction, table_watermarks)) = opts.table_watermarks { table_data.add_table_watermarks(epoch, table_watermarks, direction); } } - fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { + fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) -> Option { if let Some(table_id) = self.instance_table_id.remove(&instance_id) { debug!(instance_id, "destroy instance"); let table_data = self.table_data.get_mut(&table_id).expect("should exist"); assert!(table_data.instance_data.remove(&instance_id).is_some()); if table_data.is_empty() { - self.table_data.remove(&table_id); + Some(self.table_data.remove(&table_id).expect("should exist")) + } else { + None } + } else { + None } } +} +impl UploaderData { fn sync( &mut self, epoch: HummockEpoch, context: &UploaderContext, - table_ids: &HashSet, - ) -> SyncDataBuilder { - let sync_epoch_data = take_before_epoch(&mut self.epoch_data, epoch); + table_ids: HashSet, + sync_result_sender: oneshot::Sender>, + ) { + // clean old epochs + let _epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch); - let mut sync_data = SyncDataBuilder::default(); - for (epoch, epoch_data) in sync_epoch_data { - sync_data.add_new_epoch(epoch, epoch_data); - } + let mut all_table_watermarks = HashMap::new(); + let mut uploading_tasks = HashSet::new(); + let mut spilled_tasks = BTreeSet::new(); let mut flush_payload = HashMap::new(); let mut table_ids_to_ack = HashSet::new(); - for (table_id, table_data) in &mut self.table_data { + for (table_id, table_data) in &mut self.unsync_data.table_data { if !table_ids.contains(table_id) { table_data.assert_after_epoch(epoch); continue; } table_ids_to_ack.insert(*table_id); - let (unflushed_payload, table_watermarks) = table_data.sync(epoch); + let (unflushed_payload, table_watermarks, task_ids) = table_data.sync(epoch); for (instance_id, payload) in unflushed_payload { if !payload.is_empty() { flush_payload.insert(instance_id, payload); } } if let Some((direction, watermarks)) = table_watermarks { - sync_data.add_table_watermarks(*table_id, direction, watermarks); + Self::add_table_watermarks( + &mut all_table_watermarks, + *table_id, + direction, + watermarks, + ); } + for task_id in task_ids { + if self.spilled_data.contains_key(&task_id) { + spilled_tasks.insert(task_id); + } else { + uploading_tasks.insert(task_id); + } + } + } + + let sync_id = { + let sync_id = self.next_sync_id; + self.next_sync_id += 1; + SyncId(sync_id) + }; + + if let Some(extra_flush_task_id) = self.task_manager.sync( + context, + sync_id, + flush_payload, + uploading_tasks.iter().cloned(), + &table_ids, + ) { + uploading_tasks.insert(extra_flush_task_id); } - sync_data.flush(context, flush_payload); - sync_data.table_ids_to_ack = table_ids_to_ack; - sync_data + + // iter from large task_id to small one so that newer data at the front + let uploaded = spilled_tasks + .iter() + .rev() + .map(|task_id| { + let (sst, spill_table_ids) = + self.spilled_data.remove(task_id).expect("should exist"); + assert!( + spill_table_ids.is_subset(&table_ids), + "spill_table_ids: {spill_table_ids:?}, table_ids: {table_ids:?}" + ); + sst + }) + .collect(); + + self.syncing_data.insert( + sync_id, + SyncingData { + sync_epoch: epoch, + table_ids, + table_ids_to_ack, + remaining_uploading_tasks: uploading_tasks, + uploaded, + table_watermarks: all_table_watermarks, + sync_result_sender, + }, + ); + + self.check_upload_task_consistency(); } +} +impl UnsyncData { fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) { for (instance_id, imm_ids) in sstable_info.imm_ids() { if let Some(instance_data) = self.instance_data(*instance_id) { @@ -1006,8 +939,7 @@ struct SyncingData { table_ids: HashSet, /// Subset of `table_ids` that has existing instance table_ids_to_ack: HashSet, - // task of newer data at the front - uploading_tasks: VecDeque, + remaining_uploading_tasks: HashSet, // newer data at the front uploaded: VecDeque>, table_watermarks: HashMap, @@ -1046,25 +978,24 @@ impl UploaderContext { } } +#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)] +struct SyncId(usize); + #[derive(Default)] struct UploaderData { unsync_data: UnsyncData, - /// Data that has started syncing but not synced yet. `epoch` satisfies - /// `max_synced_epoch < epoch <= max_syncing_epoch`. - /// Newer epoch at the front - syncing_data: VecDeque, + syncing_data: BTreeMap, + + task_manager: TaskManager, + spilled_data: HashMap, HashSet)>, + next_sync_id: usize, } impl UploaderData { fn abort(self, err: impl Fn() -> HummockError) { - for (_, epoch_data) in self.unsync_data.epoch_data { - epoch_data.spilled_data.abort(); - } - for syncing_data in self.syncing_data { - for task in syncing_data.uploading_tasks { - task.join_handle.abort(); - } + self.task_manager.abort(); + for syncing_data in self.syncing_data.into_values() { send_sync_result(syncing_data.sync_result_sender, Err(err())); } } @@ -1186,28 +1117,9 @@ impl HummockUploader { }; debug!(epoch, ?table_ids, "start sync epoch"); - let sync_data = data.unsync_data.sync(epoch, &self.context, &table_ids); + data.sync(epoch, &self.context, table_ids, sync_result_sender); - let SyncDataBuilder { - spilled_data: - SpilledData { - uploading_tasks, - uploaded_data, - }, - table_watermarks, - table_ids_to_ack, - epochs: _, - } = sync_data; - - data.syncing_data.push_front(SyncingData { - sync_epoch: epoch, - table_ids, - table_ids_to_ack, - uploading_tasks, - uploaded: uploaded_data, - table_watermarks, - sync_result_sender, - }); + data.may_notify_sync_task(&self.context); self.context .stats @@ -1238,7 +1150,7 @@ impl HummockUploader { if self.context.buffer_tracker.need_flush() { let mut curr_batch_flush_size = 0; // iterate from older epoch to newer epoch - for (epoch, epoch_data) in &mut data.unsync_data.epoch_data { + for epoch in &mut data.unsync_data.epochs.keys() { if !self .context .buffer_tracker @@ -1246,20 +1158,35 @@ impl HummockUploader { { break; } + let mut spilled_table_ids = HashSet::new(); let mut payload = HashMap::new(); - for (instance_id, instance_data) in data - .unsync_data - .table_data - .values_mut() - .flat_map(|data| data.instance_data.iter_mut()) - { - let instance_payload = instance_data.spill(*epoch); - if !instance_payload.is_empty() { - payload.insert(*instance_id, instance_payload); + for (table_id, table_data) in &mut data.unsync_data.table_data { + for (instance_id, instance_data) in &mut table_data.instance_data { + let instance_payload = instance_data.spill(*epoch); + if !instance_payload.is_empty() { + payload.insert(*instance_id, instance_payload); + spilled_table_ids.insert(*table_id); + } } } - curr_batch_flush_size += epoch_data.flush(&self.context, payload); + if !payload.is_empty() { + let (task_id, task_size, spilled_table_ids) = + data.task_manager + .spill(&self.context, spilled_table_ids, payload); + for table_id in spilled_table_ids { + data.unsync_data + .table_data + .get_mut(table_id) + .expect("should exist") + .spill_tasks + .entry(*epoch) + .or_default() + .push_front(task_id); + } + curr_batch_flush_size += task_size; + } } + data.check_upload_task_consistency(); curr_batch_flush_size > 0 } else { false @@ -1281,108 +1208,123 @@ impl HummockUploader { let UploaderState::Working(data) = &mut self.state else { return; }; - data.unsync_data.may_destroy_instance(instance_id); + if let Some(removed_table_data) = data.unsync_data.may_destroy_instance(instance_id) { + data.task_manager.remove_table_spill_tasks( + removed_table_data.table_id, + removed_table_data + .spill_tasks + .into_values() + .flat_map(|task_ids| task_ids.into_iter()) + .filter(|task_id| { + if let Some((_, table_ids)) = data.spilled_data.get_mut(task_id) { + assert!(table_ids.remove(&removed_table_data.table_id)); + if table_ids.is_empty() { + data.spilled_data.remove(task_id); + } + false + } else { + true + } + }), + ) + } + data.check_upload_task_consistency(); } } impl UploaderData { - /// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if - /// there is no syncing data. - fn poll_syncing_task( - &mut self, - cx: &mut Context<'_>, - context: &UploaderContext, - ) -> Poll, ErrState>>> { - while let Some(syncing_data) = self.syncing_data.back_mut() { - let sstable_info = if let Some(task) = syncing_data.uploading_tasks.back_mut() { - let result = ready!(task.poll_result(cx)); - let _task = syncing_data.uploading_tasks.pop_back().expect("non-empty"); - let sstable_info = match result { - Ok(sstable_info) => sstable_info, - Err(e) => { - let SyncingData { - sync_epoch, - uploading_tasks, - sync_result_sender, - .. - } = self.syncing_data.pop_back().expect("non-empty"); - for task in uploading_tasks { - task.join_handle.abort(); - } - send_sync_result( - sync_result_sender, - Err(HummockError::other(format!( - "failed sync task: {:?}", - e.as_report() - ))), - ); - - return Poll::Ready(Some(Err(ErrState { - failed_epoch: sync_epoch, - reason: format!("{:?}", e.as_report()), - }))); + fn may_notify_sync_task(&mut self, context: &UploaderContext) { + while let Some((_, syncing_data)) = self.syncing_data.first_key_value() + && syncing_data.remaining_uploading_tasks.is_empty() + { + let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty"); + let SyncingData { + sync_epoch, + table_ids: _table_ids, + table_ids_to_ack, + remaining_uploading_tasks: _, + uploaded, + table_watermarks, + sync_result_sender, + } = syncing_data; + context + .stats + .uploader_syncing_epoch_count + .set(self.syncing_data.len() as _); + + for table_id in table_ids_to_ack { + if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) { + table_data.ack_synced(sync_epoch); + if table_data.is_empty() { + self.unsync_data.table_data.remove(&table_id); } - }; - syncing_data.uploaded.push_front(sstable_info.clone()); - self.unsync_data.ack_flushed(&sstable_info); - Some(sstable_info) - } else { - None - }; + } + } - if syncing_data.uploading_tasks.is_empty() { - let syncing_data = self.syncing_data.pop_back().expect("non-empty"); - let SyncingData { - sync_epoch, - table_ids: _table_ids, - table_ids_to_ack, - uploading_tasks, - uploaded, + send_sync_result( + sync_result_sender, + Ok(SyncedData { + uploaded_ssts: uploaded, table_watermarks, - sync_result_sender, - } = syncing_data; - assert!(uploading_tasks.is_empty()); - context - .stats - .uploader_syncing_epoch_count - .set(self.syncing_data.len() as _); - for table_id in table_ids_to_ack { - if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) { - table_data.ack_synced(sync_epoch); - if table_data.is_empty() { - self.unsync_data.table_data.remove(&table_id); - } - } + }), + ) + } + } + + fn check_upload_task_consistency(&self) { + #[cfg(debug_assertions)] + { + let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new(); + for table_data in self.unsync_data.table_data.values() { + for task_id in table_data + .spill_tasks + .iter() + .flat_map(|(_, tasks)| tasks.iter()) + { + assert!(spill_task_table_id_from_data + .entry(*task_id) + .or_default() + .insert(table_data.table_id)); } - send_sync_result( - sync_result_sender, - Ok(SyncedData { - uploaded_ssts: uploaded, - table_watermarks, - }), - ) } + let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self + .syncing_data + .iter() + .filter_map(|(sync_id, data)| { + if data.remaining_uploading_tasks.is_empty() { + None + } else { + Some((*sync_id, data.remaining_uploading_tasks.clone())) + } + }) + .collect(); - if let Some(sstable_info) = sstable_info { - return Poll::Ready(Some(Ok(sstable_info))); + let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); + for (task_id, (_, table_ids)) in &self.spilled_data { + spill_task_table_id_from_manager.insert(*task_id, table_ids.clone()); } - } - Poll::Ready(None) - } - - /// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if - /// there is no spilling task. - fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll>> { - // iterator from older epoch to new epoch so that the spill task are finished in epoch order - for epoch_data in self.unsync_data.epoch_data.values_mut() { - // if None, there is no spilling task. Search for the unsync data of the next epoch in - // the next iteration. - if let Some(sstable_info) = ready!(epoch_data.spilled_data.poll_success_spill(cx)) { - self.unsync_data.ack_flushed(&sstable_info); - return Poll::Ready(Some(sstable_info)); + let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); + for (task_id, status) in self.task_manager.tasks() { + match status { + UploadingTaskStatus::Spilling(table_ids) => { + assert!(spill_task_table_id_from_manager + .insert(task_id, table_ids.clone()) + .is_none()); + } + UploadingTaskStatus::Sync(sync_id) => { + assert!(syncing_task_from_manager + .entry(*sync_id) + .or_default() + .insert(task_id)); + } + } } + assert_eq!( + spill_task_table_id_from_data, + spill_task_table_id_from_manager + ); + assert_eq!(syncing_task_id_from_data, syncing_task_from_manager); } - Poll::Ready(None) } } @@ -1395,34 +1337,57 @@ impl HummockUploader { return Poll::Pending; }; - if let Some(result) = ready!(data.poll_syncing_task(cx, &self.context)) { + if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx)) + { match result { - Ok(data) => { - return Poll::Ready(data); + Ok(sst) => { + data.unsync_data.ack_flushed(&sst); + match status { + UploadingTaskStatus::Sync(sync_id) => { + let syncing_data = + data.syncing_data.get_mut(&sync_id).expect("should exist"); + syncing_data.uploaded.push_front(sst.clone()); + assert!(syncing_data.remaining_uploading_tasks.remove(&task_id)); + data.may_notify_sync_task(&self.context); + } + UploadingTaskStatus::Spilling(table_ids) => { + data.spilled_data.insert(task_id, (sst.clone(), table_ids)); + } + } + data.check_upload_task_consistency(); + Poll::Ready(sst) } - Err(e) => { - let failed_epoch = e.failed_epoch; + Err((sync_id, e)) => { + let syncing_data = + data.syncing_data.remove(&sync_id).expect("should exist"); + let failed_epoch = syncing_data.sync_epoch; let data = must_match!(replace( &mut self.state, - UploaderState::Err(e), + UploaderState::Err(ErrState { + failed_epoch, + reason: e.as_report().to_string(), + }), ), UploaderState::Working(data) => data); + let _ = syncing_data + .sync_result_sender + .send(Err(HummockError::other(format!( + "failed to sync: {:?}", + e.as_report() + )))); + data.abort(|| { HummockError::other(format!( "previous epoch {} failed to sync", failed_epoch )) }); - return Poll::Pending; + Poll::Pending } } + } else { + Poll::Pending } - - if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) { - return Poll::Ready(sstable_info); - } - - Poll::Pending }) } } @@ -1432,9 +1397,10 @@ pub(crate) mod tests { use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{poll_fn, Future}; use std::ops::Deref; + use std::pin::pin; use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering::SeqCst; - use std::sync::Arc; + use std::sync::atomic::Ordering::{Relaxed, SeqCst}; + use std::sync::{Arc, LazyLock}; use std::task::Poll; use bytes::Bytes; @@ -1461,7 +1427,7 @@ pub(crate) mod tests { use crate::hummock::event_handler::uploader::{ get_payload_imm_ids, HummockUploader, SyncedData, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, UploaderContext, UploaderData, UploaderState, - UploadingTask, + UploadingTask, UploadingTaskId, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -1640,7 +1606,12 @@ pub(crate) mod tests { TEST_LOCAL_INSTANCE_ID, imms.into_iter().map(UploaderImm::for_test).collect_vec(), )]); - Self::new(input, context) + static NEXT_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); + Self::new( + UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)), + input, + context, + ) } } @@ -1670,11 +1641,11 @@ pub(crate) mod tests { let imm = gen_imm(INITIAL_EPOCH).await; let imm_size = imm.size(); let imm_ids = get_imm_ids(vec![&imm]); - let task = UploadingTask::from_vec(vec![imm], &uploader_context); + let mut task = UploadingTask::from_vec(vec![imm], &uploader_context); assert_eq!(imm_size, task.task_info.task_size); assert_eq!(imm_ids, task.task_info.imm_ids); assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs); - let output = task.await.unwrap(); + let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap(); assert_eq!( output.sstable_infos(), &dummy_success_upload_output().new_value_ssts @@ -1685,8 +1656,8 @@ pub(crate) mod tests { let uploader_context = test_uploader_context(dummy_fail_upload_future); let imm = gen_imm(INITIAL_EPOCH).await; - let task = UploadingTask::from_vec(vec![imm], &uploader_context); - let _ = task.await.unwrap_err(); + let mut task = UploadingTask::from_vec(vec![imm], &uploader_context); + let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err(); } #[tokio::test] @@ -1747,10 +1718,10 @@ pub(crate) mod tests { uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch()); assert_eq!(1, uploader.data().syncing_data.len()); - let syncing_data = uploader.data().syncing_data.front().unwrap(); + let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap(); assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch); assert!(syncing_data.uploaded.is_empty()); - assert!(!syncing_data.uploading_tasks.is_empty()); + assert!(!syncing_data.remaining_uploading_tasks.is_empty()); let staging_sst = uploader.next_uploaded_sst().await; assert_eq!(&vec![epoch1], staging_sst.epochs()); @@ -1859,11 +1830,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_uploader_poll_empty() { let mut uploader = test_uploader(dummy_success_upload_future); - let data = must_match!(&mut uploader.state, UploaderState::Working(data) => data); - assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) - .await - .is_none()); - assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none()); + let fut = uploader.next_uploaded_sst(); + let mut fut = pin!(fut); + assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await); } #[tokio::test] diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs new file mode 100644 index 0000000000000..2347be1ed57eb --- /dev/null +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -0,0 +1,187 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +#[derive(Debug)] +pub(super) enum UploadingTaskStatus { + Spilling(HashSet), + Sync(SyncId), +} + +#[derive(Debug)] +struct TaskEntry { + task: UploadingTask, + status: UploadingTaskStatus, +} + +#[derive(Default, Debug)] +pub(super) struct TaskManager { + tasks: HashMap, + // newer task at the front + task_order: VecDeque, + next_task_id: usize, +} + +impl TaskManager { + fn add_task( + &mut self, + task: UploadingTask, + status: UploadingTaskStatus, + ) -> &UploadingTaskStatus { + let task_id = task.task_id; + self.task_order.push_front(task.task_id); + assert!(self + .tasks + .insert(task.task_id, TaskEntry { task, status }) + .is_none()); + &self.tasks.get(&task_id).expect("should exist").status + } + + fn poll_task( + &mut self, + cx: &mut Context<'_>, + task_id: UploadingTaskId, + ) -> Poll, (SyncId, HummockError)>> { + let entry = self.tasks.get_mut(&task_id).expect("should exist"); + let result = match &entry.status { + UploadingTaskStatus::Spilling(_) => { + let sst = ready!(entry.task.poll_ok_with_retry(cx)); + Ok(sst) + } + UploadingTaskStatus::Sync(sync_id) => { + let result = ready!(entry.task.poll_result(cx)); + result.map_err(|e| (*sync_id, e)) + } + }; + Poll::Ready(result) + } + + fn get_next_task_id(&mut self) -> UploadingTaskId { + let task_id = self.next_task_id; + self.next_task_id += 1; + UploadingTaskId(task_id) + } + + #[expect(clippy::type_complexity)] + pub(super) fn poll_task_result( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + Option<( + UploadingTaskId, + UploadingTaskStatus, + Result, (SyncId, HummockError)>, + )>, + > { + if let Some(task_id) = self.task_order.back() { + let task_id = *task_id; + let result = ready!(self.poll_task(cx, task_id)); + self.task_order.pop_back(); + let entry = self.tasks.remove(&task_id).expect("should exist"); + + Poll::Ready(Some((task_id, entry.status, result))) + } else { + Poll::Ready(None) + } + } + + pub(super) fn abort(self) { + for task in self.tasks.into_values() { + task.task.join_handle.abort(); + } + } + + pub(super) fn spill( + &mut self, + context: &UploaderContext, + table_ids: HashSet, + imms: HashMap>, + ) -> (UploadingTaskId, usize, &HashSet) { + assert!(!imms.is_empty()); + let task = UploadingTask::new(self.get_next_task_id(), imms, context); + context.stats.spill_task_counts_from_unsealed.inc(); + context + .stats + .spill_task_size_from_unsealed + .inc_by(task.task_info.task_size as u64); + info!("Spill data. Task: {}", task.get_task_info()); + let size = task.task_info.task_size; + let id = task.task_id; + let status = self.add_task(task, UploadingTaskStatus::Spilling(table_ids)); + ( + id, + size, + must_match!(status, UploadingTaskStatus::Spilling(table_ids) => table_ids), + ) + } + + pub(super) fn remove_table_spill_tasks( + &mut self, + table_id: TableId, + task_ids: impl Iterator, + ) { + for task_id in task_ids { + let entry = self.tasks.get_mut(&task_id).expect("should exist"); + let empty = must_match!(&mut entry.status, UploadingTaskStatus::Spilling(table_ids) => { + assert!(table_ids.remove(&table_id)); + table_ids.is_empty() + }); + if empty { + let task = self.tasks.remove(&task_id).expect("should exist").task; + task.join_handle.abort(); + } + } + } + + pub(super) fn sync( + &mut self, + context: &UploaderContext, + sync_id: SyncId, + unflushed_payload: UploadTaskInput, + spill_task_ids: impl Iterator, + sync_table_ids: &HashSet, + ) -> Option { + let task = if unflushed_payload.is_empty() { + None + } else { + Some(UploadingTask::new( + self.get_next_task_id(), + unflushed_payload, + context, + )) + }; + + for task_id in spill_task_ids { + let entry = self.tasks.get_mut(&task_id).expect("should exist"); + must_match!(&entry.status, UploadingTaskStatus::Spilling(table_ids) => { + assert!(table_ids.is_subset(sync_table_ids), "spill table_ids: {table_ids:?}, sync_table_ids: {sync_table_ids:?}"); + }); + entry.status = UploadingTaskStatus::Sync(sync_id); + } + + task.map(|task| { + let id = task.task_id; + self.add_task(task, UploadingTaskStatus::Sync(sync_id)); + id + }) + } + + #[cfg(debug_assertions)] + pub(super) fn tasks(&self) -> impl Iterator { + self.tasks + .iter() + .map(|(task_id, entry)| (*task_id, &entry.status)) + } +} diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 11aa643c3659a..7b28588c5054a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -565,16 +565,14 @@ impl StateStore for HummockStorage { fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { let (tx, rx) = oneshot::channel(); - self.hummock_event_sender - .send(HummockEvent::SyncEpoch { - new_sync_epoch: epoch, - sync_result_sender: tx, - table_ids, - }) - .expect("should send success"); + let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch { + new_sync_epoch: epoch, + sync_result_sender: tx, + table_ids, + }); rx.map(|recv_result| { Ok(recv_result - .expect("should wait success")? + .map_err(|_| HummockError::other("failed to receive sync result"))?? .into_sync_result()) }) }