Skip to content

Commit

Permalink
Merge branch 'yiming/remove-sync-finish-event' into yiming/dag-uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 24, 2024
2 parents 8fe655f + 2874313 commit 7bd9c79
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 268 deletions.
114 changes: 34 additions & 80 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::pin::pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
Expand Down Expand Up @@ -43,7 +43,7 @@ use crate::hummock::compactor::{await_tree_key, compact, CompactorContext};
use crate::hummock::conflict_detector::ConflictDetector;
use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
use crate::hummock::event_handler::uploader::{
HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, UploaderEvent,
HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput,
};
use crate::hummock::event_handler::{
HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
Expand Down Expand Up @@ -184,8 +184,7 @@ impl HummockEventReceiver {
}

struct HummockEventHandlerMetrics {
event_handler_on_sync_finish_latency: Histogram,
event_handler_on_spilled_latency: Histogram,
event_handler_on_upload_finish_latency: Histogram,
event_handler_on_apply_version_update: Histogram,
event_handler_on_recv_version_update: Histogram,
}
Expand Down Expand Up @@ -329,12 +328,9 @@ impl HummockEventHandler {
let write_conflict_detector = ConflictDetector::new_from_config(storage_opts);

let metrics = HummockEventHandlerMetrics {
event_handler_on_sync_finish_latency: state_store_metrics
event_handler_on_upload_finish_latency: state_store_metrics
.event_handler_latency
.with_label_values(&["on_sync_finish"]),
event_handler_on_spilled_latency: state_store_metrics
.event_handler_latency
.with_label_values(&["on_spilled"]),
.with_label_values(&["on_upload_finish"]),
event_handler_on_apply_version_update: state_store_metrics
.event_handler_latency
.with_label_values(&["apply_version"]),
Expand Down Expand Up @@ -396,40 +392,6 @@ impl HummockEventHandler {

// Handler for different events
impl HummockEventHandler {
fn handle_epoch_synced(&mut self, epoch: HummockEpoch, data: SyncedData) {
debug!("epoch has been synced: {}.", epoch);
let SyncedData {
newly_upload_ssts: newly_uploaded_sstables,
..
} = data;
{
let newly_uploaded_sstables = newly_uploaded_sstables
.into_iter()
.map(Arc::new)
.collect_vec();
if !newly_uploaded_sstables.is_empty() {
let related_instance_ids: HashSet<_> = newly_uploaded_sstables
.iter()
.flat_map(|sst| sst.imm_ids().keys().cloned())
.collect();
self.for_each_read_version(related_instance_ids, |instance_id, read_version| {
newly_uploaded_sstables
.iter()
// Take rev because newer data come first in `newly_uploaded_sstables` but we apply
// older data first
.rev()
.for_each(|staging_sstable_info| {
if staging_sstable_info.imm_ids().contains_key(&instance_id) {
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
)));
}
});
});
}
};
}

/// This function will be performed under the protection of the `read_version_mapping` read
/// lock, and add write lock on each `read_version` operation
fn for_each_read_version(
Expand All @@ -441,7 +403,7 @@ impl HummockEventHandler {
#[cfg(debug_assertions)]
{
// check duplication on debug_mode
let mut id_set = HashSet::new();
let mut id_set = std::collections::HashSet::new();
for instance in instances {
assert!(id_set.insert(instance));
}
Expand Down Expand Up @@ -488,9 +450,8 @@ impl HummockEventHandler {
}
}

fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) {
let staging_sstable_info = Arc::new(staging_sstable_info);
trace!("data_spilled. SST size {}", staging_sstable_info.imm_size());
fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc<StagingSstableInfo>) {
trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
self.for_each_read_version(
staging_sstable_info.imm_ids().keys().cloned(),
|_, read_version| {
Expand All @@ -504,7 +465,7 @@ impl HummockEventHandler {
fn handle_sync_epoch(
&mut self,
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncResult>>,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
) {
debug!(
"awaiting for epoch to be synced: {}, max_synced_epoch: {}",
Expand Down Expand Up @@ -715,8 +676,8 @@ impl HummockEventHandler {
pub async fn start_hummock_event_handler_worker(mut self) {
loop {
tokio::select! {
event = self.uploader.next_event() => {
self.handle_uploader_event(event);
sst = self.uploader.next_uploaded_sst() => {
self.handle_uploaded_sst(sst);
}
event = self.refiller.next_event() => {
let CacheRefillerEvent {pinned_version, new_pinned_version } = event;
Expand Down Expand Up @@ -748,21 +709,12 @@ impl HummockEventHandler {
}
}

fn handle_uploader_event(&mut self, event: UploaderEvent) {
match event {
UploaderEvent::SyncFinish(epoch, data) => {
let _timer = self
.metrics
.event_handler_on_sync_finish_latency
.start_timer();
self.handle_epoch_synced(epoch, data);
}

UploaderEvent::DataSpilled(staging_sstable_info) => {
let _timer = self.metrics.event_handler_on_spilled_latency.start_timer();
self.handle_data_spilled(staging_sstable_info);
}
}
fn handle_uploaded_sst(&mut self, sst: Arc<StagingSstableInfo>) {
let _timer = self
.metrics
.event_handler_on_upload_finish_latency
.start_timer();
self.handle_uploaded_sst_inner(sst);
}

/// Gracefully shutdown if returns `true`.
Expand Down Expand Up @@ -925,21 +877,27 @@ impl HummockEventHandler {
}

pub(super) fn send_sync_result(
sender: oneshot::Sender<HummockResult<SyncResult>>,
result: HummockResult<&SyncedData>,
sender: oneshot::Sender<HummockResult<SyncedData>>,
result: HummockResult<SyncedData>,
) {
let result = result.map(
|SyncedData {
newly_upload_ssts,
uploaded_ssts,
table_watermarks,
}| {
let _ = sender.send(result).inspect_err(|e| {
error!("unable to send sync result. Err: {:?}", e);
});
}

impl SyncedData {
pub fn into_sync_result(self) -> SyncResult {
{
let SyncedData {
uploaded_ssts,
table_watermarks,
} = self;
let mut sync_size = 0;
let mut uncommitted_ssts = Vec::new();
let mut old_value_ssts = Vec::new();
// The newly uploaded `sstable_infos` contains newer data. Therefore,
// `newly_upload_ssts` at the front
for sst in newly_upload_ssts.iter().chain(uploaded_ssts.iter()) {
for sst in uploaded_ssts {
sync_size += sst.imm_size();
uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
Expand All @@ -950,12 +908,8 @@ pub(super) fn send_sync_result(
table_watermarks: table_watermarks.clone(),
old_value_ssts,
}
},
);

let _ = sender.send(result).inspect_err(|e| {
error!("unable to send sync result. Err: {:?}", e);
});
}
}
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
use risingwave_hummock_sdk::HummockEpoch;
use thiserror_ext::AsReport;
use tokio::sync::oneshot;

Expand All @@ -36,6 +36,7 @@ use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};

use super::store::version::HummockReadVersion;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::uploader::SyncedData;

#[derive(Debug)]
pub struct BufferWriteRequest {
Expand All @@ -59,7 +60,7 @@ pub enum HummockEvent {
/// handle sender.
SyncEpoch {
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncResult>>,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
},

/// Clear shared buffer and reset all states
Expand Down
Loading

0 comments on commit 7bd9c79

Please sign in to comment.