Skip to content

Commit

Permalink
refactor(storage): simplify associated type in state store trait (#10756
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wenym1 authored Jul 6, 2023
1 parent 33a97fb commit 840114a
Show file tree
Hide file tree
Showing 10 changed files with 639 additions and 754 deletions.
137 changes: 65 additions & 72 deletions src/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use crate::hummock::store::version::read_filter_for_batch;
use crate::hummock::{HummockEpoch, HummockError};
use crate::monitor::StoreLocalStatistic;
use crate::store::*;
use crate::{
define_state_store_associated_type, define_state_store_read_associated_type, StateStore,
};
use crate::StateStore;

impl HummockStorage {
/// Gets the value of a specified `key` in the table specified in `read_options`.
Expand Down Expand Up @@ -156,9 +154,12 @@ impl HummockStorage {
impl StateStoreRead for HummockStorage {
type IterStream = StreamTypeOfIter<HummockStorageIterator>;

define_state_store_read_associated_type!();

fn get(&self, key: Bytes, epoch: u64, read_options: ReadOptions) -> Self::GetFuture<'_> {
fn get(
&self,
key: Bytes,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Option<Bytes>>> + '_ {
self.get(key, epoch, read_options)
}

Expand All @@ -167,87 +168,79 @@ impl StateStoreRead for HummockStorage {
key_range: IterKeyRange,
epoch: u64,
read_options: ReadOptions,
) -> Self::IterFuture<'_> {
) -> impl Future<Output = StorageResult<Self::IterStream>> + '_ {
self.iter_inner(map_table_key_range(key_range), epoch, read_options)
}
}

impl StateStore for HummockStorage {
type Local = LocalHummockStorage;

type NewLocalFuture<'a> = impl Future<Output = Self::Local> + Send + 'a;

define_state_store_associated_type!();

/// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
/// we will only check whether it is le `sealed_epoch` and won't wait.
fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> Self::WaitEpochFuture<'_> {
async move {
self.validate_read_epoch(wait_epoch)?;
let wait_epoch = match wait_epoch {
HummockReadEpoch::Committed(epoch) => {
assert_ne!(epoch, HummockEpoch::MAX, "epoch should not be u64::MAX");
epoch
}
_ => return Ok(()),
};
let mut receiver = self.version_update_notifier_tx.subscribe();
// avoid unnecessary check in the loop if the value does not change
let max_committed_epoch = *receiver.borrow_and_update();
if max_committed_epoch >= wait_epoch {
return Ok(());
async fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> StorageResult<()> {
self.validate_read_epoch(wait_epoch)?;
let wait_epoch = match wait_epoch {
HummockReadEpoch::Committed(epoch) => {
assert_ne!(epoch, HummockEpoch::MAX, "epoch should not be u64::MAX");
epoch
}
loop {
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
Err(elapsed) => {
// The reason that we need to retry here is batch scan in
// chain/rearrange_chain is waiting for an
// uncommitted epoch carried by the CreateMV barrier, which
// can take unbounded time to become committed and propagate
// to the CN. We should consider removing the retry as well as wait_epoch
// for chain/rearrange_chain if we enforce
// chain/rearrange_chain to be scheduled on the same
// CN with the same distribution as the upstream MV.
// See #3845 for more details.
tracing::warn!(
"wait_epoch {:?} timeout when waiting for version update elapsed {:?}s",
wait_epoch,
elapsed
);
continue;
}
Ok(Err(_)) => {
return Err(HummockError::wait_epoch("tx dropped").into());
}
Ok(Ok(_)) => {
let max_committed_epoch = *receiver.borrow();
if max_committed_epoch >= wait_epoch {
return Ok(());
}
_ => return Ok(()),
};
let mut receiver = self.version_update_notifier_tx.subscribe();
// avoid unnecessary check in the loop if the value does not change
let max_committed_epoch = *receiver.borrow_and_update();
if max_committed_epoch >= wait_epoch {
return Ok(());
}
loop {
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
Err(elapsed) => {
// The reason that we need to retry here is batch scan in
// chain/rearrange_chain is waiting for an
// uncommitted epoch carried by the CreateMV barrier, which
// can take unbounded time to become committed and propagate
// to the CN. We should consider removing the retry as well as wait_epoch
// for chain/rearrange_chain if we enforce
// chain/rearrange_chain to be scheduled on the same
// CN with the same distribution as the upstream MV.
// See #3845 for more details.
tracing::warn!(
"wait_epoch {:?} timeout when waiting for version update elapsed {:?}s",
wait_epoch,
elapsed
);
continue;
}
Ok(Err(_)) => {
return Err(HummockError::wait_epoch("tx dropped").into());
}
Ok(Ok(_)) => {
let max_committed_epoch = *receiver.borrow();
if max_committed_epoch >= wait_epoch {
return Ok(());
}
}
}
}
}

fn sync(&self, epoch: u64) -> Self::SyncFuture<'_> {
async move {
if epoch == INVALID_EPOCH {
warn!("syncing invalid epoch");
return Ok(SyncResult {
sync_size: 0,
uncommitted_ssts: vec![],
});
}
let (tx, rx) = oneshot::channel();
self.hummock_event_sender
.send(HummockEvent::AwaitSyncEpoch {
new_sync_epoch: epoch,
sync_result_sender: tx,
})
.expect("should send success");
Ok(rx.await.expect("should wait success")?)
async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
if epoch == INVALID_EPOCH {
warn!("syncing invalid epoch");
return Ok(SyncResult {
sync_size: 0,
uncommitted_ssts: vec![],
});
}
let (tx, rx) = oneshot::channel();
self.hummock_event_sender
.send(HummockEvent::AwaitSyncEpoch {
new_sync_epoch: epoch,
sync_result_sender: tx,
})
.expect("should send success");
Ok(rx.await.expect("should wait success")?)
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
Expand Down Expand Up @@ -276,7 +269,7 @@ impl StateStore for HummockStorage {
StoreLocalStatistic::flush_all();
}

fn clear_shared_buffer(&self) -> Self::ClearSharedBufferFuture<'_> {
fn clear_shared_buffer(&self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.min_current_epoch
.store(HummockEpoch::MAX, MemOrdering::SeqCst);
async move {
Expand All @@ -289,7 +282,7 @@ impl StateStore for HummockStorage {
}
}

fn new_local(&self, option: NewLocalOptions) -> Self::NewLocalFuture<'_> {
fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
self.new_local_inner(option)
}

Expand Down
Loading

0 comments on commit 840114a

Please sign in to comment.