diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 21704d83072ff..eaaeb41a344bc 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -20,9 +20,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; use std::mem::{replace, swap, take}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::task::{ready, Context, Poll}; use futures::FutureExt; @@ -227,7 +225,7 @@ impl UploadingTask { .collect() } - fn new(input: UploadTaskInput, context: &UploaderContext) -> Self { + fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self { assert!(!input.is_empty()); let mut epochs = input .iter() @@ -261,9 +259,8 @@ impl UploadingTask { } let join_handle = (context.spawn_upload_task)(payload, task_info.clone()); context.stats.uploader_uploading_task_count.inc(); - static NEXT_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); Self { - task_id: UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)), + task_id, input, join_handle, task_info, @@ -840,7 +837,7 @@ impl UploaderData { let _epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch); let mut all_table_watermarks = HashMap::new(); - let mut spilling_tasks = HashSet::new(); + let mut uploading_tasks = HashSet::new(); let mut spilled_tasks = BTreeSet::new(); let mut flush_payload = HashMap::new(); @@ -867,22 +864,25 @@ impl UploaderData { if self.spilled_data.contains_key(&task_id) { spilled_tasks.insert(task_id); } else { - spilling_tasks.insert(task_id); + uploading_tasks.insert(task_id); } } } - static NEXT_SYNC_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); - let sync_id = SyncId(NEXT_SYNC_ID.fetch_add(1, Relaxed)); + let sync_id = { + let sync_id = self.next_sync_id; + self.next_sync_id += 1; + SyncId(sync_id) + }; if let Some(extra_flush_task_id) = self.task_manager.sync( context, sync_id, flush_payload, - spilling_tasks.iter().cloned(), + uploading_tasks.iter().cloned(), &table_ids, ) { - spilling_tasks.insert(extra_flush_task_id); + uploading_tasks.insert(extra_flush_task_id); } // iter from large task_id to small one so that newer data at the front @@ -905,7 +905,7 @@ impl UploaderData { SyncingData { sync_epoch: epoch, table_ids, - remaining_uploading_tasks: spilling_tasks, + remaining_uploading_tasks: uploading_tasks, uploaded, table_watermarks: all_table_watermarks, sync_result_sender, @@ -976,13 +976,11 @@ struct SyncId(usize); struct UploaderData { unsync_data: UnsyncData, - /// Data that has started syncing but not synced yet. `epoch` satisfies - /// `max_synced_epoch < epoch <= max_syncing_epoch`. - /// Newer epoch at the front syncing_data: BTreeMap, task_manager: TaskManager, spilled_data: HashMap, HashSet)>, + next_sync_id: usize, } impl UploaderData { @@ -1226,8 +1224,6 @@ impl HummockUploader { } impl UploaderData { - /// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if - /// there is no syncing data. fn may_notify_sync_task(&mut self, context: &UploaderContext) { while let Some((_, syncing_data)) = self.syncing_data.first_key_value() && syncing_data.remaining_uploading_tasks.is_empty() @@ -1360,6 +1356,13 @@ impl HummockUploader { }), ), UploaderState::Working(data) => data); + let _ = syncing_data + .sync_result_sender + .send(Err(HummockError::other(format!( + "failed to sync: {:?}", + e.as_report() + )))); + data.abort(|| { HummockError::other(format!( "previous epoch {} failed to sync", @@ -1383,8 +1386,8 @@ pub(crate) mod tests { use std::ops::Deref; use std::pin::pin; use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering::SeqCst; - use std::sync::Arc; + use std::sync::atomic::Ordering::{Relaxed, SeqCst}; + use std::sync::{Arc, LazyLock}; use std::task::Poll; use bytes::Bytes; @@ -1411,7 +1414,7 @@ pub(crate) mod tests { use crate::hummock::event_handler::uploader::{ get_payload_imm_ids, HummockUploader, SyncedData, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, UploaderContext, UploaderData, UploaderState, - UploadingTask, + UploadingTask, UploadingTaskId, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -1590,7 +1593,12 @@ pub(crate) mod tests { TEST_LOCAL_INSTANCE_ID, imms.into_iter().map(UploaderImm::for_test).collect_vec(), )]); - Self::new(input, context) + static NEXT_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); + Self::new( + UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)), + input, + context, + ) } } diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs index fd3ec142fbddf..2347be1ed57eb 100644 --- a/src/storage/src/hummock/event_handler/uploader/task_manager.rs +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -31,6 +31,7 @@ pub(super) struct TaskManager { tasks: HashMap, // newer task at the front task_order: VecDeque, + next_task_id: usize, } impl TaskManager { @@ -41,7 +42,10 @@ impl TaskManager { ) -> &UploadingTaskStatus { let task_id = task.task_id; self.task_order.push_front(task.task_id); - self.tasks.insert(task.task_id, TaskEntry { task, status }); + assert!(self + .tasks + .insert(task.task_id, TaskEntry { task, status }) + .is_none()); &self.tasks.get(&task_id).expect("should exist").status } @@ -64,6 +68,12 @@ impl TaskManager { Poll::Ready(result) } + fn get_next_task_id(&mut self) -> UploadingTaskId { + let task_id = self.next_task_id; + self.next_task_id += 1; + UploadingTaskId(task_id) + } + #[expect(clippy::type_complexity)] pub(super) fn poll_task_result( &mut self, @@ -100,7 +110,7 @@ impl TaskManager { imms: HashMap>, ) -> (UploadingTaskId, usize, &HashSet) { assert!(!imms.is_empty()); - let task = UploadingTask::new(imms, context); + let task = UploadingTask::new(self.get_next_task_id(), imms, context); context.stats.spill_task_counts_from_unsealed.inc(); context .stats @@ -146,7 +156,11 @@ impl TaskManager { let task = if unflushed_payload.is_empty() { None } else { - Some(UploadingTask::new(unflushed_payload, context)) + Some(UploadingTask::new( + self.get_next_task_id(), + unflushed_payload, + context, + )) }; for task_id in spill_task_ids { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 11aa643c3659a..7b28588c5054a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -565,16 +565,14 @@ impl StateStore for HummockStorage { fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { let (tx, rx) = oneshot::channel(); - self.hummock_event_sender - .send(HummockEvent::SyncEpoch { - new_sync_epoch: epoch, - sync_result_sender: tx, - table_ids, - }) - .expect("should send success"); + let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch { + new_sync_epoch: epoch, + sync_result_sender: tx, + table_ids, + }); rx.map(|recv_result| { Ok(recv_result - .expect("should wait success")? + .map_err(|_| HummockError::other("failed to receive sync result"))?? .into_sync_result()) }) }