Skip to content

Commit

Permalink
feat(storage): support clear the shared buffer of a subset of tables
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 7, 2024
1 parent 9d83354 commit b3c017f
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 111 deletions.
436 changes: 391 additions & 45 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ pub enum HummockEvent {
},

/// Clear shared buffer and reset all states
Clear(oneshot::Sender<()>, HummockVersionId),
Clear(
oneshot::Sender<()>,
HummockVersionId,
Option<HashSet<TableId>>,
),

Shutdown,

Expand Down Expand Up @@ -122,7 +126,9 @@ impl HummockEvent {
table_ids,
} => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids),

HummockEvent::Clear(_, version_id) => format!("Clear {}", version_id),
HummockEvent::Clear(_, version_id, table_ids) => {
format!("Clear {} {:?}", version_id, table_ids)
}

HummockEvent::Shutdown => "Shutdown".to_string(),

Expand Down
122 changes: 100 additions & 22 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use crate::hummock::store::version::StagingSstableInfo;
use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
use crate::mem_table::ImmId;
use crate::monitor::HummockStateStoreMetrics;
use crate::opts::StorageOpts;
use crate::store::SealCurrentEpochOptions;

/// Take epoch data inclusively before `epoch` out from `data`
Expand Down Expand Up @@ -784,6 +783,7 @@ struct UnsyncData {
// An index as a mapping from instance id to its table id
instance_table_id: HashMap<LocalInstanceId, TableId>,
unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
}

impl UnsyncData {
Expand Down Expand Up @@ -903,6 +903,43 @@ impl UnsyncData {
None
}
}

fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
for table_id in table_ids {
if let Some(table_unsync_data) = self.table_data.remove(table_id) {
for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
if let Some(task_status) = task_manager.abort_task(task_id) {
must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
assert!(spill_table_ids.is_subset(table_ids));
});
}
if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
assert!(spill_table_ids.is_subset(table_ids));
}
}
assert!(
table_unsync_data.instance_data.is_empty(),
"should be clear when dropping the read version instance"
);
}
}
debug_assert!(self
.spilled_data
.values()
.all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids)));
self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
if !unsync_epoch_table_ids.is_disjoint(table_ids) {
assert!(unsync_epoch_table_ids.is_subset(table_ids));
false
} else {
true
}
});
assert!(self
.instance_table_id
.values()
.all(|table_id| !table_ids.contains(table_id)));
}
}

impl UploaderData {
Expand Down Expand Up @@ -958,7 +995,7 @@ impl UploaderData {
);
}
for task_id in task_ids {
if self.spilled_data.contains_key(&task_id) {
if self.unsync_data.spilled_data.contains_key(&task_id) {
spilled_tasks.insert(task_id);
} else {
uploading_tasks.insert(task_id);
Expand Down Expand Up @@ -988,8 +1025,11 @@ impl UploaderData {
.iter()
.rev()
.map(|task_id| {
let (sst, spill_table_ids) =
self.spilled_data.remove(task_id).expect("should exist");
let (sst, spill_table_ids) = self
.unsync_data
.spilled_data
.remove(task_id)
.expect("should exist");
assert!(
spill_table_ids.is_subset(&table_ids),
"spilled tabled ids {:?} not a subset of sync table id {:?}",
Expand Down Expand Up @@ -1057,7 +1097,6 @@ impl UploaderContext {
pinned_version: PinnedVersion,
spawn_upload_task: SpawnUploadTask,
buffer_tracker: BufferTracker,
_config: &StorageOpts,
stats: Arc<HummockStateStoreMetrics>,
) -> Self {
UploaderContext {
Expand All @@ -1079,20 +1118,52 @@ struct UploaderData {
syncing_data: BTreeMap<SyncId, SyncingData>,

task_manager: TaskManager,
spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
next_sync_id: usize,
}

impl UploaderData {
fn abort(self, err: impl Fn() -> HummockError) {
self.task_manager.abort();
self.task_manager.abort_all_tasks();
for syncing_data in self.syncing_data.into_values() {
send_sync_result(syncing_data.sync_result_sender, Err(err()));
}
}

fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
if table_ids.is_empty() {
return;
}
self.unsync_data
.clear_tables(&table_ids, &mut self.task_manager);
self.syncing_data.retain(|sync_id, syncing_data| {
if !syncing_data.table_ids.is_disjoint(&table_ids) {
assert!(syncing_data.table_ids.is_subset(&table_ids));
for task_id in &syncing_data.remaining_uploading_tasks {
match self
.task_manager
.abort_task(*task_id)
.expect("should exist")
{
UploadingTaskStatus::Spilling(spill_table_ids) => {
assert!(spill_table_ids.is_subset(&table_ids));
}
UploadingTaskStatus::Sync(task_sync_id) => {
assert_eq!(sync_id, &task_sync_id);
}
}
}
false
} else {
true
}
});

self.check_upload_task_consistency();
}

fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
self.spilled_data
self.unsync_data
.spilled_data
.values()
.map(|(s, _)| s)
.chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
Expand Down Expand Up @@ -1141,15 +1212,13 @@ impl HummockUploader {
pinned_version: PinnedVersion,
spawn_upload_task: SpawnUploadTask,
buffer_tracker: BufferTracker,
config: &StorageOpts,
) -> Self {
Self {
state: UploaderState::Working(UploaderData::default()),
context: UploaderContext::new(
pinned_version,
spawn_upload_task,
buffer_tracker,
config,
state_store_metrics,
),
}
Expand Down Expand Up @@ -1308,15 +1377,21 @@ impl HummockUploader {
}
}

pub(crate) fn clear(&mut self) {
if let UploaderState::Working(data) = replace(
&mut self.state,
UploaderState::Working(UploaderData::default()),
) {
data.abort(|| HummockError::other("uploader is reset"));
}
pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
if let Some(table_ids) = table_ids {
if let UploaderState::Working(data) = &mut self.state {
data.clear_tables(table_ids);
}
} else {
if let UploaderState::Working(data) = replace(
&mut self.state,
UploaderState::Working(UploaderData::default()),
) {
data.abort(|| HummockError::other("uploader is reset"));
}

self.context.stats.uploader_syncing_epoch_count.set(0);
self.context.stats.uploader_syncing_epoch_count.set(0);
}
}

pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
Expand All @@ -1331,10 +1406,11 @@ impl HummockUploader {
.into_values()
.flat_map(|task_ids| task_ids.into_iter())
.filter(|task_id| {
if let Some((_, table_ids)) = data.spilled_data.get_mut(task_id) {
if let Some((_, table_ids)) = data.unsync_data.spilled_data.get_mut(task_id)
{
assert!(table_ids.remove(&removed_table_data.table_id));
if table_ids.is_empty() {
data.spilled_data.remove(task_id);
data.unsync_data.spilled_data.remove(task_id);
}
false
} else {
Expand Down Expand Up @@ -1422,7 +1498,7 @@ impl UploaderData {
.collect();

let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
for (task_id, (_, table_ids)) in &self.spilled_data {
for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
}
let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
Expand Down Expand Up @@ -1473,7 +1549,9 @@ impl HummockUploader {
data.may_notify_sync_task(&self.context);
}
UploadingTaskStatus::Spilling(table_ids) => {
data.spilled_data.insert(task_id, (sst.clone(), table_ids));
data.unsync_data
.spilled_data
.insert(task_id, (sst.clone(), table_ids));
}
}
data.check_upload_task_consistency();
Expand Down
16 changes: 15 additions & 1 deletion src/storage/src/hummock/event_handler/uploader/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -75,7 +76,20 @@ impl<'a> Spiller<'a> {
if let Some(unsync_epoch_id) = self
.epoch_info
.iter()
.max_by_key(|(_, info)| info.payload_size)
.max_by(
|(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| {
info1.payload_size.cmp(&info2.payload_size).then_with(|| {
if !cfg!(test) {
Ordering::Equal
} else {
assert_ne!(table1, table2);
// enforce deterministic spill order in test
// smaller table id will be spilled first.
table2.cmp(table1)
}
})
},
)
.map(|(unsync_epoch_id, _)| *unsync_epoch_id)
{
let spill_epoch = unsync_epoch_id.epoch();
Expand Down
11 changes: 10 additions & 1 deletion src/storage/src/hummock/event_handler/uploader/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ impl TaskManager {
}
}

pub(super) fn abort(self) {
pub(super) fn abort_all_tasks(self) {
for task in self.tasks.into_values() {
task.task.join_handle.abort();
}
}

pub(super) fn abort_task(&mut self, task_id: UploadingTaskId) -> Option<UploadingTaskStatus> {
self.tasks.remove(&task_id).map(|entry| {
entry.task.join_handle.abort();
self.task_order
.retain(|inflight_task_id| *inflight_task_id != task_id);
entry.status
})
}

pub(super) fn spill(
&mut self,
context: &UploaderContext,
Expand Down
Loading

0 comments on commit b3c017f

Please sign in to comment.