Skip to content

Commit

Permalink
fix(storage): fix race between deregister read version and clear shar…
Browse files Browse the repository at this point in the history
…ed buffer (#15738)
  • Loading branch information
wenym1 authored Mar 18, 2024
1 parent 63b273b commit 0e8d800
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
17 changes: 14 additions & 3 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use prometheus::{Histogram, IntGauge};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -894,15 +895,17 @@ impl HummockEventHandler {
LocalInstanceGuard {
table_id,
instance_id,
event_sender: self.hummock_event_tx.clone(),
event_sender: Some(self.hummock_event_tx.clone()),
},
)) {
Ok(_) => {}
Err(_) => {
Err((_, mut guard)) => {
warn!(
"RegisterReadVersion send fail table_id {:?} instance_is {:?}",
table_id, instance_id
)
);
guard.event_sender.take().expect("sender is just set");
self.destroy_read_version(table_id, instance_id);
}
}
}
Expand All @@ -911,6 +914,14 @@ impl HummockEventHandler {
table_id,
instance_id,
} => {
self.destroy_read_version(table_id, instance_id);
}
}
}

fn destroy_read_version(&mut self, table_id: TableId, instance_id: LocalInstanceId) {
{
{
debug!(
"read version deregister: table_id: {}, instance_id: {}",
table_id, instance_id
Expand Down
35 changes: 19 additions & 16 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,28 @@ impl<T> ReadOnlyRwLockRef<T> {
pub struct LocalInstanceGuard {
pub table_id: TableId,
pub instance_id: LocalInstanceId,
event_sender: HummockEventSender,
// Only send destroy event when event_sender when is_some
event_sender: Option<HummockEventSender>,
}

impl Drop for LocalInstanceGuard {
fn drop(&mut self) {
// If sending fails, it means that event_handler and event_channel have been destroyed, no
// need to handle failure
self.event_sender
.send(HummockEvent::DestroyReadVersion {
table_id: self.table_id,
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
"LocalInstanceGuard Drop SendError",
)
})
if let Some(sender) = self.event_sender.take() {
// If sending fails, it means that event_handler and event_channel have been destroyed, no
// need to handle failure
sender
.send(HummockEvent::DestroyReadVersion {
table_id: self.table_id,
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
"LocalInstanceGuard Drop SendError",
)
})
}
}
}

0 comments on commit 0e8d800

Please sign in to comment.