From eaac3f901de813938f04415b01ee84dec47a804b Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 26 Jun 2024 14:36:21 +0800 Subject: [PATCH] move to new file --- .../{uploader.rs => uploader/mod.rs} | 154 +---------------- .../event_handler/uploader/task_manager.rs | 160 ++++++++++++++++++ 2 files changed, 163 insertions(+), 151 deletions(-) rename src/storage/src/hummock/event_handler/{uploader.rs => uploader/mod.rs} (93%) create mode 100644 src/storage/src/hummock/event_handler/uploader/task_manager.rs diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs similarity index 93% rename from src/storage/src/hummock/event_handler/uploader.rs rename to src/storage/src/hummock/event_handler/uploader/mod.rs index 738454ad02efc..e4672db162f7e 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod task_manager; + use std::cmp::Ordering; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; @@ -36,6 +38,7 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use task_manager::{TaskManager, UploadingTaskStatus}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -331,157 +334,6 @@ impl UploadingTask { } } -mod task_manager { - use super::*; - - #[derive(Debug)] - pub(super) enum UploadingTaskStatus { - Spilling, - Sync(HummockEpoch), - } - - #[derive(Debug)] - enum TaskStatus { - Uploading(UploadingTaskStatus), - Spilled(Arc), - } - - #[derive(Default, Debug)] - pub(super) struct TaskManager { - // newer task at the front - uploading_tasks: VecDeque, - task_status: HashMap, - } - - impl TaskManager { - fn add_task(&mut self, task: UploadingTask, status: UploadingTaskStatus) { - self.task_status - .insert(task.task_id, TaskStatus::Uploading(status)); - self.uploading_tasks.push_front(task); - } - - #[expect(clippy::type_complexity)] - pub(super) fn poll_task_result( - &mut self, - cx: &mut Context<'_>, - _context: &UploaderContext, - ) -> Poll< - Option< - Result< - ( - UploadingTaskId, - UploadingTaskStatus, - Arc, - ), - ErrState, - >, - >, - > { - if let Some(task) = self.uploading_tasks.back_mut() { - let result = match self.task_status.get(&task.task_id).expect("should exist") { - TaskStatus::Uploading(UploadingTaskStatus::Spilling) => { - let sst = ready!(task.poll_ok_with_retry(cx)); - self.task_status - .insert(task.task_id, TaskStatus::Spilled(sst.clone())); - Ok((task.task_id, UploadingTaskStatus::Spilling, sst)) - } - TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)) => { - let epoch = *epoch; - let result = ready!(task.poll_result(cx)); - let _status = self.task_status.remove(&task.task_id); - result - .map(|sst| (task.task_id, UploadingTaskStatus::Sync(epoch), sst)) - .map_err(|e| ErrState { - failed_epoch: epoch, - reason: e.as_report().to_string(), - }) - } - TaskStatus::Spilled(_) => { - unreachable!("should be uploading task") - } - }; - - let _task = self.uploading_tasks.pop_back().expect("non-empty"); - Poll::Ready(Some(result)) - } else { - Poll::Ready(None) - } - } - - pub(super) fn abort(self) { - for task in self.uploading_tasks { - task.join_handle.abort(); - } - } - - pub(super) fn spill( - &mut self, - context: &UploaderContext, - imms: HashMap>, - ) -> (UploadingTaskId, usize) { - assert!(!imms.is_empty()); - let task = UploadingTask::new(imms, context); - context.stats.spill_task_counts_from_unsealed.inc(); - context - .stats - .spill_task_size_from_unsealed - .inc_by(task.task_info.task_size as u64); - info!("Spill data. Task: {}", task.get_task_info()); - let size = task.task_info.task_size; - let id = task.task_id; - self.add_task(task, UploadingTaskStatus::Spilling); - (id, size) - } - - pub(super) fn sync( - &mut self, - context: &UploaderContext, - epoch: HummockEpoch, - spilled_task: BTreeSet, - unflushed_payload: UploadTaskInput, - ) -> (HashSet, VecDeque>) { - let mut remaining_tasks = HashSet::new(); - let total_task_count = if unflushed_payload.is_empty() { - spilled_task.len() - } else { - let task = UploadingTask::new(unflushed_payload, context); - remaining_tasks.insert(task.task_id); - self.task_status.insert( - task.task_id, - TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)), - ); - self.add_task(task, UploadingTaskStatus::Sync(epoch)); - spilled_task.len() + 1 - }; - let mut uploaded = VecDeque::with_capacity(total_task_count); - - // iterate from small task id to large, which means from old data to new data - for task_id in spilled_task { - let status = self.task_status.remove(&task_id).expect("should exist"); - match status { - TaskStatus::Uploading(UploadingTaskStatus::Spilling) => { - self.task_status.insert( - task_id, - TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)), - ); - remaining_tasks.insert(task_id); - } - TaskStatus::Uploading(UploadingTaskStatus::Sync(_)) => { - unreachable!("cannot be synced again") - } - TaskStatus::Spilled(sst) => { - self.task_status.remove(&task_id); - uploaded.push_front(sst); - } - } - } - (remaining_tasks, uploaded) - } - } -} - -use task_manager::{TaskManager, UploadingTaskStatus}; - impl TableUnsyncData { fn add_table_watermarks( &mut self, diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs new file mode 100644 index 0000000000000..2f19ffed53175 --- /dev/null +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -0,0 +1,160 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +#[derive(Debug)] +pub(super) enum UploadingTaskStatus { + Spilling, + Sync(HummockEpoch), +} + +#[derive(Debug)] +enum TaskStatus { + Uploading(UploadingTaskStatus), + Spilled(Arc), +} + +#[derive(Default, Debug)] +pub(super) struct TaskManager { + // newer task at the front + uploading_tasks: VecDeque, + task_status: HashMap, +} + +impl TaskManager { + fn add_task(&mut self, task: UploadingTask, status: UploadingTaskStatus) { + self.task_status + .insert(task.task_id, TaskStatus::Uploading(status)); + self.uploading_tasks.push_front(task); + } + + #[expect(clippy::type_complexity)] + pub(super) fn poll_task_result( + &mut self, + cx: &mut Context<'_>, + _context: &UploaderContext, + ) -> Poll< + Option< + Result< + ( + UploadingTaskId, + UploadingTaskStatus, + Arc, + ), + ErrState, + >, + >, + > { + if let Some(task) = self.uploading_tasks.back_mut() { + let result = match self.task_status.get(&task.task_id).expect("should exist") { + TaskStatus::Uploading(UploadingTaskStatus::Spilling) => { + let sst = ready!(task.poll_ok_with_retry(cx)); + self.task_status + .insert(task.task_id, TaskStatus::Spilled(sst.clone())); + Ok((task.task_id, UploadingTaskStatus::Spilling, sst)) + } + TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)) => { + let epoch = *epoch; + let result = ready!(task.poll_result(cx)); + let _status = self.task_status.remove(&task.task_id); + result + .map(|sst| (task.task_id, UploadingTaskStatus::Sync(epoch), sst)) + .map_err(|e| ErrState { + failed_epoch: epoch, + reason: e.as_report().to_string(), + }) + } + TaskStatus::Spilled(_) => { + unreachable!("should be uploading task") + } + }; + + let _task = self.uploading_tasks.pop_back().expect("non-empty"); + Poll::Ready(Some(result)) + } else { + Poll::Ready(None) + } + } + + pub(super) fn abort(self) { + for task in self.uploading_tasks { + task.join_handle.abort(); + } + } + + pub(super) fn spill( + &mut self, + context: &UploaderContext, + imms: HashMap>, + ) -> (UploadingTaskId, usize) { + assert!(!imms.is_empty()); + let task = UploadingTask::new(imms, context); + context.stats.spill_task_counts_from_unsealed.inc(); + context + .stats + .spill_task_size_from_unsealed + .inc_by(task.task_info.task_size as u64); + info!("Spill data. Task: {}", task.get_task_info()); + let size = task.task_info.task_size; + let id = task.task_id; + self.add_task(task, UploadingTaskStatus::Spilling); + (id, size) + } + + pub(super) fn sync( + &mut self, + context: &UploaderContext, + epoch: HummockEpoch, + spilled_task: BTreeSet, + unflushed_payload: UploadTaskInput, + ) -> (HashSet, VecDeque>) { + let mut remaining_tasks = HashSet::new(); + let total_task_count = if unflushed_payload.is_empty() { + spilled_task.len() + } else { + let task = UploadingTask::new(unflushed_payload, context); + remaining_tasks.insert(task.task_id); + self.task_status.insert( + task.task_id, + TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)), + ); + self.add_task(task, UploadingTaskStatus::Sync(epoch)); + spilled_task.len() + 1 + }; + let mut uploaded = VecDeque::with_capacity(total_task_count); + + // iterate from small task id to large, which means from old data to new data + for task_id in spilled_task { + let status = self.task_status.remove(&task_id).expect("should exist"); + match status { + TaskStatus::Uploading(UploadingTaskStatus::Spilling) => { + self.task_status.insert( + task_id, + TaskStatus::Uploading(UploadingTaskStatus::Sync(epoch)), + ); + remaining_tasks.insert(task_id); + } + TaskStatus::Uploading(UploadingTaskStatus::Sync(_)) => { + unreachable!("cannot be synced again") + } + TaskStatus::Spilled(sst) => { + self.task_status.remove(&task_id); + uploaded.push_front(sst); + } + } + } + (remaining_tasks, uploaded) + } +}