Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a config to disbale storage iter prefetch #12642

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ pub struct StorageConfig {
pub enable_fast_compaction: bool,
#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
#[serde(default = "default::storage::disable_iter_prefetch")]
pub disable_iter_prefetch: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
Expand Down Expand Up @@ -1062,6 +1064,10 @@ pub mod default {
pub fn enable_fast_compaction() -> bool {
true
}

pub fn disable_iter_prefetch() -> bool {
false
}
}

pub mod streaming {
Expand Down
8 changes: 6 additions & 2 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ impl HummockJavaBindingIterator {
FileCache::none(),
FileCache::none(),
));
let reader =
HummockVersionReader::new(sstable_store, Arc::new(HummockStateStoreMetrics::unused()));
let disable_iter_prefetch = false;
let reader = HummockVersionReader::new(
sstable_store,
Arc::new(HummockStateStoreMetrics::unused()),
disable_iter_prefetch,
);

let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl HummockStorage {
hummock_version_reader: HummockVersionReader::new(
sstable_store,
state_store_metrics.clone(),
options.disable_iter_prefetch,
),
_shutdown_guard: Arc::new(HummockStorageShutdownGuard {
shutdown_sender: event_tx,
Expand Down
10 changes: 9 additions & 1 deletion src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::mem_table::{ImmId, ImmutableMemtable};
use crate::monitor::{
GetLocalMetricsGuard, HummockStateStoreMetrics, MayExistLocalMetricsGuard, StoreLocalStatistic,
};
use crate::opts::StorageOpts;
use crate::store::{gen_min_epoch, ReadOptions, StateStoreIterExt, StreamTypeOfIter};

// TODO: use a custom data structure to allow in-place update instead of proto
Expand Down Expand Up @@ -514,6 +515,8 @@ pub struct HummockVersionReader {

/// Statistics
state_store_metrics: Arc<HummockStateStoreMetrics>,

disable_iter_prefetch: bool,
}

/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
Expand All @@ -522,10 +525,12 @@ impl HummockVersionReader {
pub fn new(
sstable_store: SstableStoreRef,
state_store_metrics: Arc<HummockStateStoreMetrics>,
disable_iter_prefetch: bool,
) -> Self {
Self {
sstable_store,
state_store_metrics,
disable_iter_prefetch,
}
}

Expand Down Expand Up @@ -684,9 +689,12 @@ impl HummockVersionReader {
&self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
mut read_options: ReadOptions,
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
) -> StorageResult<StreamTypeOfIter<HummockStorageIterator>> {
if self.disable_iter_prefetch {
read_options.disable_prefetch();
}
let table_id_string = read_options.table_id.to_string();
let table_id_label = table_id_string.as_str();
let (imms, uncommitted_ssts, committed) = read_version_tuple;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ pub struct StorageOpts {
pub compactor_max_sst_size: u64,
/// enable FastCompactorRunner.
pub enable_fast_compaction: bool,
/// false: enable block-level prefetch when `PrefetchOptions` is set
/// true: disable block-level prefetch
pub disable_iter_prefetch: bool,
}

impl Default for StorageOpts {
Expand Down Expand Up @@ -216,6 +219,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier,
compactor_max_sst_size: c.storage.compactor_max_sst_size,
enable_fast_compaction: c.storage.enable_fast_compaction,
disable_iter_prefetch: c.storage.disable_iter_prefetch,
}
}
}
6 changes: 6 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ impl From<ReadOptions> for TracedReadOptions {
}
}

impl ReadOptions {
pub fn disable_prefetch(&mut self) {
self.prefetch_options.exhaust_iter = false;
}
}

pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
let base_epoch = Epoch(base_epoch);
match retention_seconds {
Expand Down