Skip to content

Commit

Permalink
fix(storage): lock table may cause deadlock and block iter (#13979)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Dec 14, 2023
1 parent 8b2b3f7 commit d8dd8f4
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 68 deletions.
6 changes: 5 additions & 1 deletion src/storage/src/hummock/sstable/forward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::cmp::Ordering::{Equal, Less};
use std::ops::Bound::*;
use std::sync::Arc;

use await_tree::InstrumentAwait;
use risingwave_hummock_sdk::key::FullKey;

use super::super::{HummockResult, HummockValue};
Expand Down Expand Up @@ -133,7 +134,9 @@ impl SstableIterator {
.prefetch_blocks(self.sst.value(), idx, self.preload_end_block_idx,
self.options.cache_policy,
&mut self.stats,
).await {
)
.verbose_instrument_await("prefetch_blocks")
.await {
Ok(preload_stream) => self.preload_stream = Some(preload_stream),
Err(e) => tracing::warn!("failed to create stream for prefetch data because of {:?}, fall back to block get.", e),
}
Expand Down Expand Up @@ -190,6 +193,7 @@ impl SstableIterator {
self.options.cache_policy,
&mut self.stats,
)
.verbose_instrument_await("prefetch_blocks")
.await
{
Ok(stream) => {
Expand Down
12 changes: 0 additions & 12 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use crate::hummock::block_stream::{
};
use crate::hummock::file_cache::preclude::*;
use crate::hummock::multi_builder::UploadJoinHandle;
use crate::hummock::utils::LockTable;
use crate::hummock::{
BlockHolder, CacheableEntry, HummockError, HummockResult, LruCache, MemoryLimiter,
};
Expand Down Expand Up @@ -171,7 +170,6 @@ pub struct SstableStore {
///
/// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
prefetch_lock_table: LockTable,
prefetch_buffer_usage: Arc<AtomicUsize>,
prefetch_buffer_capacity: usize,
}
Expand Down Expand Up @@ -218,7 +216,6 @@ impl SstableStore {
meta_file_cache,

recent_filter,
prefetch_lock_table: LockTable::default(),
prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
prefetch_buffer_capacity,
}
Expand All @@ -240,7 +237,6 @@ impl SstableStore {
meta_cache,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
prefetch_lock_table: LockTable::default(),
prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
prefetch_buffer_capacity: block_cache_capacity,
recent_filter: None,
Expand Down Expand Up @@ -328,14 +324,6 @@ impl SstableStore {
None,
)));
}
let _guard = self.prefetch_lock_table.lock_for(object_id).await;
if let Some(block) = self.block_cache.get(object_id, block_index as u64) {
return Ok(Box::new(PrefetchBlockStream::new(
VecDeque::from([block]),
block_index,
None,
)));
}
let end_index = std::cmp::min(end_index, block_index + MAX_PREFETCH_BLOCK);
let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
let start_offset = sst.meta.block_metas[block_index].offset as usize;
Expand Down
55 changes: 0 additions & 55 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
Expand All @@ -22,7 +21,6 @@ use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use parking_lot::Mutex;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::key::{
Expand Down Expand Up @@ -597,59 +595,6 @@ pub(crate) async fn wait_for_epoch(
}
}

#[derive(Clone)]
pub struct LockTable {
locks: Arc<Mutex<HashMap<u64, VecDeque<Arc<Notify>>>>>,
}

pub struct LockGuard {
key: u64,
locks: Arc<Mutex<HashMap<u64, VecDeque<Arc<Notify>>>>>,
}

impl Default for LockTable {
fn default() -> Self {
Self {
locks: Arc::new(Mutex::new(HashMap::default())),
}
}
}

impl LockTable {
pub async fn lock_for(&self, key: u64) -> LockGuard {
let notify = {
let mut locks = self.locks.lock();
if let Some(que) = locks.get_mut(&key) {
let notify = Arc::new(Notify::new());
que.push_back(notify.clone());
Some(notify)
} else {
locks.insert(key, VecDeque::default());
None
}
};
if let Some(notify) = notify {
notify.notified().await;
}
LockGuard {
key,
locks: self.locks.clone(),
}
}
}

impl Drop for LockGuard {
fn drop(&mut self) {
let mut locks = self.locks.lock();
let que = locks.get_mut(&self.key).unwrap();
if let Some(notify) = que.pop_front() {
notify.notify_one();
} else {
locks.remove(&self.key);
}
}
}

#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
Expand Down

0 comments on commit d8dd8f4

Please sign in to comment.