Skip to content

Commit

Permalink
refine and update doc
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 11, 2024
1 parent abcd019 commit be45d98
Showing 1 changed file with 44 additions and 47 deletions.
91 changes: 44 additions & 47 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,7 @@ impl EpochData {
}

#[derive(Default)]
/// Data at the sealed stage. We will ensure that data in `imms` are newer than the data in the
/// `spilled_data`, and that data in the `uploading_tasks` in `spilled_data` are newer than data in
/// the `uploaded_data` in `spilled_data`.
struct SealedData {
struct SyncDataBuilder {
// newer epochs come first
epochs: VecDeque<HummockEpoch>,

Expand All @@ -440,8 +437,8 @@ struct SealedData {
table_watermarks: HashMap<TableId, TableWatermarks>,
}

impl SealedData {
/// Add the data of a newly sealed epoch.
impl SyncDataBuilder {
/// Add the data of a new epoch.
///
/// Note: it may happen that, for example, currently we hold `imms` and `spilled_data` of epoch
/// 3, and after we add the spilled data of epoch 4, both `imms` and `spilled_data` hold data
Expand All @@ -451,9 +448,9 @@ impl SealedData {
/// data of `imms` must not overlap with the epoch 4 data of `spilled_data`. The explanation is
/// as followed:
///
/// First, unsealed data has 3 stages, from earlier to later, imms, uploading task, and
/// uploaded. When we try to spill unsealed data, we first pick the imms of older epoch until
/// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsealed
/// First, unsync data has 3 stages, from earlier to later, imms, uploading task, and
/// uploaded. When we try to spill unsync data, we first pick the imms of older epoch until
/// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsync
/// data, we first poll the task of older epoch, until there is no uploading task in older
/// epoch. Therefore, we can reach that, if two data are in the same stage, but
/// different epochs, data in the older epoch will always enter the next stage earlier than data
Expand All @@ -467,17 +464,19 @@ impl SealedData {
/// Based on the two points above, we can reach that, if two data of a same key appear in
/// different epochs, the data of older epoch will not appear at a later stage than the data
/// of newer epoch. Therefore, we can safely merge the data of each stage when we seal an epoch.
fn seal_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) {
if let Some(prev_max_sealed_epoch) = self.epochs.front() {
fn add_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) {
if let Some(prev_max_epoch) = self.epochs.front() {
assert!(
epoch > *prev_max_sealed_epoch,
"epoch {} to seal not greater than prev max sealed epoch {}",
epoch > *prev_max_epoch,
"epoch {} to seal not greater than prev max epoch {}",
epoch,
prev_max_sealed_epoch
prev_max_epoch
);
}

self.epochs.push_front(epoch);
// for each local instance, earlier data must be spilled at earlier epoch. Therefore, since we add spill data from old epoch
// to new epoch,
unseal_epoch_data
.spilled_data
.uploading_tasks
Expand Down Expand Up @@ -692,6 +691,10 @@ impl LocalInstanceUnsyncData {
}

#[derive(Default)]
/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
///
/// `instance_data` holds the imm of each individual local instance, and data are first added here.
/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
struct UnsyncData {
instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
// An index of `instance_data` that maintains the set of existing instance id of a table id
Expand Down Expand Up @@ -766,18 +769,15 @@ impl UnsyncData {
}
}

fn sync(
&mut self,
epoch: HummockEpoch,
) -> (HashMap<LocalInstanceId, Vec<UploaderImm>>, SealedData) {
fn sync(&mut self, epoch: HummockEpoch, context: &UploaderContext) -> SyncDataBuilder {
let mut sync_epoch_data = self.epoch_data.split_off(&(epoch + 1));
swap(&mut sync_epoch_data, &mut self.epoch_data);

// by now, epoch data inclusively before epoch is in sync_epoch_data

let mut sealed_data = SealedData::default();
let mut sync_data = SyncDataBuilder::default();
for (epoch, epoch_data) in sync_epoch_data {
sealed_data.seal_new_epoch(epoch, epoch_data);
sync_data.add_new_epoch(epoch, epoch_data);
}

let mut flush_payload = HashMap::new();
Expand All @@ -787,7 +787,8 @@ impl UnsyncData {
flush_payload.insert(*instance_id, payload);
}
}
(flush_payload, sealed_data)
sync_data.flush(context, flush_payload);
sync_data
}

fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
Expand Down Expand Up @@ -876,14 +877,13 @@ enum UploaderState {

/// An uploader for hummock data.
///
/// Data have 4 sequential stages: unsealed, sealed, syncing, synced.
/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
///
/// The 4 stages are divided by 3 marginal epochs: `max_sealed_epoch`, `max_syncing_epoch`,
/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
/// `max_synced_epoch`. Epochs satisfy the following inequality.
///
/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
/// `max_syncing_epoch` < (epochs of `sealed_data`) <= `max_sealed_epoch` < (epochs of
/// `unsealed_data`)
/// `max_syncing_epoch` < (epochs of `unsync_data`)
///
/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
/// order. Data at the front represents ***newer*** data.
Expand Down Expand Up @@ -1005,19 +1005,17 @@ impl HummockUploader {

self.max_syncing_epoch = epoch;

let (payload, mut sealed_data) = data.unsync_data.sync(epoch);
let sync_data = data.unsync_data.sync(epoch, &self.context);

sealed_data.flush(&self.context, payload);

let SealedData {
let SyncDataBuilder {
spilled_data:
SpilledData {
uploading_tasks,
uploaded_data,
},
table_watermarks,
..
} = sealed_data;
} = sync_data;

let try_join_all_upload_task = if uploading_tasks.is_empty() {
None
Expand Down Expand Up @@ -1067,11 +1065,15 @@ impl HummockUploader {
}
if self.max_syncing_epoch < max_committed_epoch {
self.max_syncing_epoch = max_committed_epoch;
// // there must not be any sealed data below MCE
// if let Some(&epoch) = self.sealed_data.epochs.back() {
// assert_gt!(epoch, max_committed_epoch);
// }
// TODO: may check each instance
if let UploaderState::Working(data) = &self.state {
for instance_data in data.unsync_data.instance_data.values() {
if let Some(oldest_epoch) = instance_data.sealed_data.back() {
assert_gt!(oldest_epoch.epoch, max_committed_epoch);
} else if let Some(current_epoch) = &instance_data.current_epoch_data {
assert_gt!(current_epoch.epoch, max_committed_epoch);
}
}
}
}
}

Expand Down Expand Up @@ -1178,17 +1180,14 @@ impl UploaderData {
}
}

/// Poll the success of the oldest spilled task of unsealed data. Return `Poll::Ready(None)` if
/// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if
/// there is no spilling task.
fn poll_unsealed_spill_task(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<StagingSstableInfo>> {
fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<StagingSstableInfo>> {
// iterator from older epoch to new epoch so that the spill task are finished in epoch order
for unsealed_data in self.unsync_data.epoch_data.values_mut() {
// if None, there is no spilling task. Search for the unsealed data of the next epoch in
for epoch_data in self.unsync_data.epoch_data.values_mut() {
// if None, there is no spilling task. Search for the unsync data of the next epoch in
// the next iteration.
if let Some(sstable_info) = ready!(unsealed_data.spilled_data.poll_success_spill(cx)) {
if let Some(sstable_info) = ready!(epoch_data.spilled_data.poll_success_spill(cx)) {
self.unsync_data.ack_flushed(&sstable_info);
return Poll::Ready(Some(sstable_info));
}
Expand Down Expand Up @@ -1242,7 +1241,7 @@ impl HummockUploader {
};
}

if let Some(sstable_info) = ready!(data.poll_unsealed_spill_task(cx)) {
if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) {
return Poll::Ready(UploaderEvent::DataSpilled(sstable_info));
}

Expand Down Expand Up @@ -1651,9 +1650,7 @@ pub(crate) mod tests {
assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context))
.await
.is_none());
assert!(poll_fn(|cx| data.poll_unsealed_spill_task(cx))
.await
.is_none());
assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none());
}

#[tokio::test]
Expand Down

0 comments on commit be45d98

Please sign in to comment.