Skip to content

Commit

Permalink
fix: fix staging sst update panic (cherry-pick #17104 #16962 #17113) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored and hzxa21 committed Jun 21, 2024
1 parent 66bccbf commit 4e0eb33
Show file tree
Hide file tree
Showing 25 changed files with 873 additions and 561 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,23 @@ def section_hummock_write(outer_panels):
"uploading task size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"uploader imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_uploading_task_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"unflushed imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('uploading_memory_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"orphan imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_old_value_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"old value size - {{%s}} @ {{%s}}"
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,11 @@ pub struct StorageConfig {
#[serde(default = "default::storage::shared_buffer_flush_ratio")]
pub shared_buffer_flush_ratio: f32,

/// The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger,
/// the total flush size across multiple epochs should be at least higher than this size.
#[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
pub shared_buffer_min_batch_flush_size_mb: usize,

/// The threshold for the number of immutable memtables to merge to a new imm.
#[serde(default = "default::storage::imm_merge_threshold")]
#[deprecated]
Expand Down Expand Up @@ -1337,6 +1342,10 @@ pub mod default {
0.8
}

pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
800
}

pub fn imm_merge_threshold() -> usize {
0 // disable
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ This page is automatically generated by `./risedev generate-example-config`
| share_buffers_sync_parallelism | parallelism while syncing share buffers into L0 SST. Should NOT be 0. | 1 |
| shared_buffer_capacity_mb | Maximum shared buffer size, writes attempting to exceed the capacity will stall until there is enough space. | |
| shared_buffer_flush_ratio | The shared buffer will start flushing data to object when the ratio of memory usage to the shared buffer capacity exceed such ratio. | 0.800000011920929 |
| shared_buffer_min_batch_flush_size_mb | The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger, the total flush size across multiple epochs should be at least higher than this size. | 800 |
| sstable_id_remote_fetch_number | Number of SST ids fetched from meta per RPC | 10 |
| write_conflict_detection_enabled | Whether to enable write conflict detection | true |

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ stream_high_join_amplification_threshold = 2048
share_buffers_sync_parallelism = 1
share_buffer_compaction_worker_threads_number = 4
shared_buffer_flush_ratio = 0.800000011920929
shared_buffer_min_batch_flush_size_mb = 800
imm_merge_threshold = 0
write_conflict_detection_enabled = true
max_prefetch_block_number = 16
Expand Down
4 changes: 2 additions & 2 deletions src/storage/benches/bench_imm_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 10000 * 100);
assert_eq!(imm.value_count(), 10000 * 100);
})
Expand All @@ -72,7 +72,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&later_batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 2000 * 100);
assert_eq!(imm.value_count(), 2000 * 100 * 5);
})
Expand Down
13 changes: 5 additions & 8 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorMana
use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
use crate::hummock::compactor::context::{await_tree_key, CompactorContext};
use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor};
use crate::hummock::event_handler::uploader::{UploadTaskOutput, UploadTaskPayload};
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::event_handler::uploader::UploadTaskOutput;
use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
Expand All @@ -60,11 +59,11 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0;
pub async fn compact(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: UploadTaskPayload,
payload: Vec<ImmutableMemtable>,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> HummockResult<UploadTaskOutput> {
let mut grouped_payload: HashMap<CompactionGroupId, UploadTaskPayload> = HashMap::new();
let mut grouped_payload: HashMap<CompactionGroupId, Vec<ImmutableMemtable>> = HashMap::new();
for imm in &payload {
let compaction_group_id = match compaction_group_index.get(&imm.table_id) {
// compaction group id is used only as a hint for grouping different data.
Expand Down Expand Up @@ -145,7 +144,7 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
filter_key_extractor_manager: FilterKeyExtractorManager,
mut payload: UploadTaskPayload,
mut payload: Vec<ImmutableMemtable>,
) -> HummockResult<Vec<LocalSstableInfo>> {
if !IS_NEW_VALUE {
assert!(payload.iter().all(|imm| imm.has_old_value()));
Expand Down Expand Up @@ -330,7 +329,6 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
/// Merge multiple batches into a larger one
pub async fn merge_imms_in_memory(
table_id: TableId,
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> ImmutableMemtable {
Expand Down Expand Up @@ -465,13 +463,12 @@ pub async fn merge_imms_in_memory(
memory_tracker,
)),
table_id,
instance_id,
}
}

/// Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
fn generate_splits(
payload: &UploadTaskPayload,
payload: &Vec<ImmutableMemtable>,
existing_table_ids: &HashSet<u32>,
storage_opts: &StorageOpts,
) -> (Vec<KeyRange>, u64, u32) {
Expand Down
Loading

0 comments on commit 4e0eb33

Please sign in to comment.