Skip to content

Commit

Permalink
fix test and address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 2, 2024
1 parent 37cc33d commit 30a808e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 33 deletions.
52 changes: 30 additions & 22 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::fmt::{Debug, Display, Formatter};
use std::future::{poll_fn, Future};
use std::mem::{replace, swap, take};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use futures::FutureExt;
Expand Down Expand Up @@ -227,7 +225,7 @@ impl UploadingTask {
.collect()
}

fn new(input: UploadTaskInput, context: &UploaderContext) -> Self {
fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
assert!(!input.is_empty());
let mut epochs = input
.iter()
Expand Down Expand Up @@ -261,9 +259,8 @@ impl UploadingTask {
}
let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
context.stats.uploader_uploading_task_count.inc();
static NEXT_TASK_ID: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
Self {
task_id: UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)),
task_id,
input,
join_handle,
task_info,
Expand Down Expand Up @@ -840,7 +837,7 @@ impl UploaderData {
let _epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch);

let mut all_table_watermarks = HashMap::new();
let mut spilling_tasks = HashSet::new();
let mut uploading_tasks = HashSet::new();
let mut spilled_tasks = BTreeSet::new();

let mut flush_payload = HashMap::new();
Expand All @@ -867,22 +864,25 @@ impl UploaderData {
if self.spilled_data.contains_key(&task_id) {
spilled_tasks.insert(task_id);
} else {
spilling_tasks.insert(task_id);
uploading_tasks.insert(task_id);
}
}
}

static NEXT_SYNC_ID: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
let sync_id = SyncId(NEXT_SYNC_ID.fetch_add(1, Relaxed));
let sync_id = {
let sync_id = self.next_sync_id;
self.next_sync_id += 1;
SyncId(sync_id)
};

if let Some(extra_flush_task_id) = self.task_manager.sync(
context,
sync_id,
flush_payload,
spilling_tasks.iter().cloned(),
uploading_tasks.iter().cloned(),
&table_ids,
) {
spilling_tasks.insert(extra_flush_task_id);
uploading_tasks.insert(extra_flush_task_id);
}

// iter from large task_id to small one so that newer data at the front
Expand All @@ -905,7 +905,7 @@ impl UploaderData {
SyncingData {
sync_epoch: epoch,
table_ids,
remaining_uploading_tasks: spilling_tasks,
remaining_uploading_tasks: uploading_tasks,
uploaded,
table_watermarks: all_table_watermarks,
sync_result_sender,
Expand Down Expand Up @@ -976,13 +976,11 @@ struct SyncId(usize);
struct UploaderData {
unsync_data: UnsyncData,

/// Data that has started syncing but not synced yet. `epoch` satisfies
/// `max_synced_epoch < epoch <= max_syncing_epoch`.
/// Newer epoch at the front
syncing_data: BTreeMap<SyncId, SyncingData>,

task_manager: TaskManager,
spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
next_sync_id: usize,
}

impl UploaderData {
Expand Down Expand Up @@ -1226,8 +1224,6 @@ impl HummockUploader {
}

impl UploaderData {
/// Poll the syncing task of the syncing data of the oldest epoch. Return `Poll::Ready(None)` if
/// there is no syncing data.
fn may_notify_sync_task(&mut self, context: &UploaderContext) {
while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
&& syncing_data.remaining_uploading_tasks.is_empty()
Expand Down Expand Up @@ -1360,6 +1356,13 @@ impl HummockUploader {
}),
), UploaderState::Working(data) => data);

let _ = syncing_data
.sync_result_sender
.send(Err(HummockError::other(format!(
"failed to sync: {:?}",
e.as_report()
))));

data.abort(|| {
HummockError::other(format!(
"previous epoch {} failed to sync",
Expand All @@ -1383,8 +1386,8 @@ pub(crate) mod tests {
use std::ops::Deref;
use std::pin::pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, LazyLock};
use std::task::Poll;

use bytes::Bytes;
Expand All @@ -1411,7 +1414,7 @@ pub(crate) mod tests {
use crate::hummock::event_handler::uploader::{
get_payload_imm_ids, HummockUploader, SyncedData, TableUnsyncData, UploadTaskInfo,
UploadTaskOutput, UploadTaskPayload, UploaderContext, UploaderData, UploaderState,
UploadingTask,
UploadingTask, UploadingTaskId,
};
use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
use crate::hummock::local_version::pinned_version::PinnedVersion;
Expand Down Expand Up @@ -1590,7 +1593,12 @@ pub(crate) mod tests {
TEST_LOCAL_INSTANCE_ID,
imms.into_iter().map(UploaderImm::for_test).collect_vec(),
)]);
Self::new(input, context)
static NEXT_TASK_ID: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
Self::new(
UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)),
input,
context,
)
}
}

Expand Down
20 changes: 17 additions & 3 deletions src/storage/src/hummock/event_handler/uploader/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(super) struct TaskManager {
tasks: HashMap<UploadingTaskId, TaskEntry>,
// newer task at the front
task_order: VecDeque<UploadingTaskId>,
next_task_id: usize,
}

impl TaskManager {
Expand All @@ -41,7 +42,10 @@ impl TaskManager {
) -> &UploadingTaskStatus {
let task_id = task.task_id;
self.task_order.push_front(task.task_id);
self.tasks.insert(task.task_id, TaskEntry { task, status });
assert!(self
.tasks
.insert(task.task_id, TaskEntry { task, status })
.is_none());
&self.tasks.get(&task_id).expect("should exist").status
}

Expand All @@ -64,6 +68,12 @@ impl TaskManager {
Poll::Ready(result)
}

fn get_next_task_id(&mut self) -> UploadingTaskId {
let task_id = self.next_task_id;
self.next_task_id += 1;
UploadingTaskId(task_id)
}

#[expect(clippy::type_complexity)]
pub(super) fn poll_task_result(
&mut self,
Expand Down Expand Up @@ -100,7 +110,7 @@ impl TaskManager {
imms: HashMap<LocalInstanceId, Vec<UploaderImm>>,
) -> (UploadingTaskId, usize, &HashSet<TableId>) {
assert!(!imms.is_empty());
let task = UploadingTask::new(imms, context);
let task = UploadingTask::new(self.get_next_task_id(), imms, context);
context.stats.spill_task_counts_from_unsealed.inc();
context
.stats
Expand Down Expand Up @@ -146,7 +156,11 @@ impl TaskManager {
let task = if unflushed_payload.is_empty() {
None
} else {
Some(UploadingTask::new(unflushed_payload, context))
Some(UploadingTask::new(
self.get_next_task_id(),
unflushed_payload,
context,
))
};

for task_id in spill_task_ids {
Expand Down
14 changes: 6 additions & 8 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,16 +565,14 @@ impl StateStore for HummockStorage {

fn sync(&self, epoch: u64, table_ids: HashSet<TableId>) -> impl SyncFuture {
let (tx, rx) = oneshot::channel();
self.hummock_event_sender
.send(HummockEvent::SyncEpoch {
new_sync_epoch: epoch,
sync_result_sender: tx,
table_ids,
})
.expect("should send success");
let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
new_sync_epoch: epoch,
sync_result_sender: tx,
table_ids,
});
rx.map(|recv_result| {
Ok(recv_result
.expect("should wait success")?
.map_err(|_| HummockError::other("failed to receive sync result"))??
.into_sync_result())
})
}
Expand Down

0 comments on commit 30a808e

Please sign in to comment.