Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): enable preload for log store and dynamic filter #13558

Merged
merged 31 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7d0a80e
enable preload for log store
Little-Wallace Nov 21, 2023
c5f66e7
speed up tombstone reclaim
Little-Wallace Nov 21, 2023
4abeaa3
only prefetch small data for streaming executor
Little-Wallace Nov 23, 2023
6e42e38
Revert "speed up tombstone reclaim"
Little-Wallace Nov 23, 2023
c6a1c82
fix check
Little-Wallace Nov 23, 2023
cd8a949
only prefetch small data for streaming read
Little-Wallace Nov 23, 2023
3b532f5
Merge branch 'main' into wallace/more-preload
Little-Wallace Nov 23, 2023
5ef5e7b
fill more block
Little-Wallace Nov 23, 2023
8b1d10f
fix block index error
Little-Wallace Nov 23, 2023
e4dc5b9
support close prefetch
Little-Wallace Nov 23, 2023
cd5f2cb
limit select memory
Little-Wallace Nov 24, 2023
cb6bd7d
rename variable
Little-Wallace Nov 24, 2023
372c9e9
rename new method
Little-Wallace Nov 24, 2023
d45a926
Merge branch 'main' into wallace/more-preload
Little-Wallace Nov 24, 2023
9c6ab96
refactor streaming
Little-Wallace Nov 24, 2023
ba49e30
fix format
Little-Wallace Nov 25, 2023
bebc963
Squashed commit of the following:
Little-Wallace Nov 27, 2023
09499ff
Merge branch 'main' into wallace/more-preload
Little-Wallace Nov 27, 2023
01685da
fix format
Little-Wallace Nov 27, 2023
7970480
use one trait for both prefetch strategy
Little-Wallace Dec 1, 2023
05970f6
Merge branch 'main' into wallace/more-preload
Little-Wallace Dec 1, 2023
035a879
spawn for data stream
Little-Wallace Dec 7, 2023
29e516d
Merge branch 'main' into wallace/more-preload
Little-Wallace Dec 7, 2023
bdd467d
fix conflict
Little-Wallace Dec 7, 2023
e12ac31
use prefetch small scan
Little-Wallace Dec 7, 2023
29bad52
revert streaming read
Little-Wallace Dec 8, 2023
1597cdd
Merge branch 'main' into wallace/more-preload
Little-Wallace Dec 8, 2023
ed431d2
address comment
Little-Wallace Dec 11, 2023
6b782a3
address comment
Little-Wallace Dec 11, 2023
cd29165
fix comment
Little-Wallace Dec 11, 2023
400e7db
Merge branch 'main' into wallace/more-preload
Little-Wallace Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
},
),
ordered,
PrefetchOptions::new_for_large_range_scan(),
PrefetchOptions::prefetch_for_large_range_scan(),
)
.await?;

Expand Down
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let stream = state_table
.iter_with_prefix(row::empty(), sub_range, PrefetchOptions::default())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::prefetch_for_large_range_scan(),
)
.await?;
pin_mut!(stream);
iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
19 changes: 10 additions & 9 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

use core::ops::Bound::Unbounded;

use futures::StreamExt;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreReadExt};
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreRead};

use crate::common::HummockServiceOpts;
use crate::CtlContext;
Expand All @@ -34,23 +35,23 @@ pub async fn list_kv(
if epoch == MAX_EPOCH {
tracing::info!("using MAX_EPOCH as epoch");
}
let scan_result = {
let range = (Unbounded, Unbounded);
let range = (Unbounded, Unbounded);
let mut scan_result = Box::pin(
hummock
.scan(
.iter(
range,
epoch,
None,
ReadOptions {
table_id: TableId { table_id },
prefetch_options: PrefetchOptions::default(),
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
)
.await?
};
for (k, v) in scan_result {
.await?,
);
while let Some(item) = scan_result.next().await {
let (k, v) = item?;
let print_string = format!("[t{}]", k.user_key.table_id.table_id());
println!("{} {:?} => {:?}", print_string, k, v)
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorag
.batch_iter(
HummockReadEpoch::Committed(read_epoch),
true,
PrefetchOptions::default(),
PrefetchOptions::prefetch_for_large_range_scan(),
)
.await?;
pin_mut!(stream);
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) {
let level2 = vec![info1, info2];
let read_options = Arc::new(SstableIteratorReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
prefetch_for_large_query: false,
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
});
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ macro_rules! assert_count_range_scan {
map_table_key_range(bounds),
$epoch,
ReadOptions {
prefetch_options: PrefetchOptions::new_for_large_range_scan(),
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CachePriority::High),
..Default::default()
},
Expand Down
6 changes: 5 additions & 1 deletion src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::TracedBytes;
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedPrefetchOptions {
pub exhaust_iter: bool,
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
pub for_large_query: bool,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
Expand Down Expand Up @@ -95,7 +96,10 @@ impl TracedReadOptions {
Self {
prefix_hint: Some(TracedBytes::from(vec![0])),
ignore_range_tombstone: true,
prefetch_options: TracedPrefetchOptions { exhaust_iter: true },
prefetch_options: TracedPrefetchOptions {
exhaust_iter: true,
for_large_query: true,
},
cache_policy: TracedCachePolicy::Disable,
retention_seconds: None,
table_id: TracedTableId { table_id },
Expand Down
42 changes: 27 additions & 15 deletions src/storage/src/hummock/sstable/forward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,10 @@ impl SstableIterator {
return Ok(());
}
// Maybe the previous preload stream breaks on some cached block, so here we can try to preload some data again
if self.preload_stream.is_none()
&& idx + 1 < self.preload_end_block_idx
&& let Ok(preload_stream) = self
.sstable_store
.preload_blocks(self.sst.value(), idx, self.preload_end_block_idx)
if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx
&& self.options.prefetch_for_large_query
&& let Ok(preload_stream) = self.sstable_store
.get_stream(self.sst.value(), idx, self.preload_end_block_idx)
.await
{
self.preload_stream = preload_stream;
Expand Down Expand Up @@ -176,7 +175,7 @@ impl SstableIterator {
tracing::warn!("recreate stream because the connection to remote storage has closed, reason: {:?}", e);
match self
.sstable_store
.preload_blocks(self.sst.value(), idx, self.preload_end_block_idx)
.get_stream(self.sst.value(), idx, self.preload_end_block_idx)
.await
{
Ok(stream) => {
Expand All @@ -191,15 +190,27 @@ impl SstableIterator {
}
}
if !hit_cache {
let block = self
.sstable_store
.get(
self.sst.value(),
idx,
self.options.cache_policy,
&mut self.stats,
)
.await?;
let block =
if idx + 1 < self.preload_end_block_idx && self.sstable_store.support_prefetch() {
self.sstable_store
.get_with_prefetch(
self.sst.value(),
idx,
self.preload_end_block_idx - idx,
self.options.cache_policy,
&mut self.stats,
)
.await?
} else {
self.sstable_store
.get(
self.sst.value(),
idx,
self.options.cache_policy,
&mut self.stats,
)
.await?
};
self.block_iter = Some(BlockIterator::new(block));
};
let block_iter = self.block_iter.as_mut().unwrap();
Expand Down Expand Up @@ -461,6 +472,7 @@ mod tests {
cache_policy: CachePolicy::Fill(CachePriority::High),
must_iterated_end_user_key: Some(Bound::Included(uk)),
max_preload_retry_times: 0,
prefetch_for_large_query: true,
}),
);
let mut cnt = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/sstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ pub struct SstableIteratorReadOptions {
pub cache_policy: CachePolicy,
pub must_iterated_end_user_key: Option<Bound<UserKey<KeyPayloadType>>>,
pub max_preload_retry_times: usize,
pub prefetch_for_large_query: bool,
}

impl SstableIteratorReadOptions {
Expand All @@ -507,6 +508,7 @@ impl SstableIteratorReadOptions {
cache_policy: read_options.cache_policy,
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
prefetch_for_large_query: read_options.prefetch_options.for_large_query,
}
}
}
Expand Down
67 changes: 66 additions & 1 deletion src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl SstableStore {
.map_err(HummockError::object_io_error)
}

pub async fn preload_blocks(
pub async fn get_stream(
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
&self,
sst: &Sstable,
start_index: usize,
Expand Down Expand Up @@ -310,6 +310,71 @@ impl SstableStore {
)))
}

pub fn support_prefetch(&self) -> bool {
self.large_query_memory_usage > 0
}

pub async fn get_with_prefetch(
&self,
sst: &Sstable,
block_index: usize,
max_prefetch_block_count: usize,
policy: CachePolicy,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the provided cache policy is used is confusing. The behavior is:

  1. If no prefetch is triggered, the requested block is inserted into block cache with the provided policy
  2. If prefetch is triggered, the requested block as well as the prefetched blocks are inserted into block cache with CachePolicy::Fill(CachePriority::Low)

How about making it more consistent?

Copy link
Collaborator

@hzxa21 hzxa21 Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More thoughts:
we have two mechanism to constrain prefetch memory usage now:

  • For prefetch with streaming read, pending_streaming_loading is used to track the usage and it is limited by large_query_memory_usage
  • For prefetch with normal read, block cache is used to track the usage and it is limited by block_cache_capacity (and the low priority ratio).

The motivation of introducing low priority block cache is to control the behavior of cache refill, not block prefetch, and we are mixing it up here. IMO, logically it is better to have a clear sepration:

  • Low priority block cache: enable block reuse across queries without polluting the whole block cache. Blocks in here can be evicted.
  • Prefetch buffer: buffer prefetched blocks within a query. Blocks in here cannot be eviected.

Ideas:

  1. rename pending_streaming_loading to prefetch_buffer_usage and large_query_memory_usage to prefetch_buffer_capacity
  2. Prefetched blocks will be put into low priority block cache and the block holder will be dropped immediately. This is completely independent of the prefetch policy.
  3. Prefetched blocks will be put into a buf (holding Bytes, not the BlockHolder) and account for prefetch_buffer_usage. For normal read prefetch, the buf is a Vec<Block>. For streaming read prefetch, the buf is the implicit recv buf.
  4. When there is no quota in prefetch_buffer_capacity, prefetch will be disable and fall back to per block normal read. This is completely independent to the block cache policy.

What do you think?

stats: &mut StoreLocalStatistic,
) -> HummockResult<BlockHolder> {
const MAX_PREFETCH_BLOCK: usize = 16;
let object_id = sst.id;
if let Some(block) = self.block_cache.get(object_id, block_index as u64) {
return Ok(block);
}
let max_prefetch_block_count = std::cmp::min(max_prefetch_block_count, MAX_PREFETCH_BLOCK);
let mut end_index = std::cmp::min(
block_index + max_prefetch_block_count,
sst.meta.block_metas.len(),
);
let start_offset = sst.meta.block_metas[block_index].offset as usize;
let mut end_offset = start_offset;
for idx in block_index..end_index {
if self.block_cache.exists_block(object_id, idx as u64) {
end_index = idx;
break;
}
end_offset += sst.meta.block_metas[idx].len as usize;
}
if end_index == block_index {
let resp = self
.get_block_response(sst, block_index, policy, stats)
.await?;
return resp.wait().await;
}
let data_path = self.get_sst_data_path(object_id);
let buf = self
.store
.read(&data_path, start_offset..end_offset)
.await?;
let mut offset = 0;
let mut first_holder = None;
for idx in block_index..end_index {
let end = offset + sst.meta.block_metas[idx].len as usize;
if end > buf.len() {
return Err(ObjectError::internal("read unexpected EOF").into());
}
// copy again to avoid holding a large data in memory.
let block = Block::decode(
Bytes::copy_from_slice(&buf[offset..end]),
sst.meta.block_metas[idx].uncompressed_size as usize,
)?;
let holder =
self.block_cache
.insert(object_id, idx as u64, Box::new(block), CachePriority::Low);
if block_index == idx {
first_holder = Some(holder);
}
offset = end;
}
Ok(first_holder.unwrap())
}

pub async fn get_block_response(
&self,
sst: &Sstable,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl HummockVersionReader {
.as_ref()
.map(|hint| Sstable::hash_for_bloom_filter(hint, read_options.table_id.table_id()));
let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
if read_options.prefetch_options.preload {
if read_options.prefetch_options.prefetch {
sst_read_options.must_iterated_end_user_key =
Some(user_key_range.1.map(|key| key.cloned()));
sst_read_options.max_preload_retry_times = self.preload_retry_times;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef)
cache_policy: CachePolicy::NotFill,
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
prefetch_for_large_query: false,
}),
);
let mut previous_key: Option<FullKey<Vec<u8>>> = None;
Expand Down
28 changes: 21 additions & 7 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<S: StateStoreRead> StateStoreReadExt for S {
mut read_options: ReadOptions,
) -> StorageResult<Vec<StateStoreIterItem>> {
if limit.is_some() {
read_options.prefetch_options.preload = false;
read_options.prefetch_options.prefetch = false;
}
let limit = limit.unwrap_or(usize::MAX);
self.iter(key_range, epoch, read_options)
Expand Down Expand Up @@ -273,33 +273,47 @@ pub trait LocalStateStore: StaticSendSync {
pub struct PrefetchOptions {
/// `exhaust_iter` is set `true` only if the return value of `iter()` will definitely be
/// exhausted, i.e., will iterate until end.
pub preload: bool,
pub prefetch: bool,
pub for_large_query: bool,
Comment on lines +280 to +281
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some documentation on what the behavior is after setting these flags

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streaming read is removed from this PR. Can we either remove this option until streaming read is added back or at least modify the doc to mention that this option is not used.

}

impl PrefetchOptions {
pub fn new_for_large_range_scan() -> Self {
Self::new_with_exhaust_iter(true)
pub fn prefetch_for_large_range_scan() -> Self {
Self {
prefetch: true,
for_large_query: true,
}
}

pub fn prefetch_for_small_range_scan() -> Self {
Self {
prefetch: true,
for_large_query: false,
}
}

pub fn new_with_exhaust_iter(exhaust_iter: bool) -> Self {
Self {
preload: exhaust_iter,
prefetch: exhaust_iter,
for_large_query: false,
}
}
}

impl From<TracedPrefetchOptions> for PrefetchOptions {
fn from(value: TracedPrefetchOptions) -> Self {
Self {
preload: value.exhaust_iter,
prefetch: value.exhaust_iter,
for_large_query: value.for_large_query,
}
}
}

impl From<PrefetchOptions> for TracedPrefetchOptions {
fn from(value: PrefetchOptions) -> Self {
Self {
exhaust_iter: value.preload,
exhaust_iter: value.prefetch,
for_large_query: value.for_large_query,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
(Included(range_start), Excluded(range_end)),
MAX_EPOCH,
ReadOptions {
prefetch_options: PrefetchOptions::default(),
// TODO: if this stream lives too long ,the connection of prefetch object may break. We may need optimize it because if a connection break too many times, we will disable prefetch for current object.
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CachePriority::Low),
table_id,
..Default::default()
Expand Down Expand Up @@ -234,7 +235,8 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
(Included(range_start), Included(range_end)),
MAX_EPOCH,
ReadOptions {
prefetch_options: PrefetchOptions::default(),
prefetch_options:
PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CachePriority::Low),
table_id,
..Default::default()
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ impl MaterializedInputState {
group_key.map(GroupKey::table_pk),
sub_range,
PrefetchOptions {
preload: cache_filler.capacity().is_none(),
prefetch: cache_filler.capacity().is_none(),
for_large_query: false,
},
)
.await?;
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ where
row::empty(),
range_bounds,
ordered,
PrefetchOptions::new_for_large_range_scan(),
// Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
PrefetchOptions::prefetch_for_small_range_scan(),
)
.await?;

Expand Down
Loading
Loading