Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): optimize data alignment for default compaction group #13075

Merged
merged 19 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
62a9cdf
feat(storage): optimize data alignment for default compaction group
Li0k Oct 26, 2023
e3e51d0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Oct 26, 2023
9893724
chore(storage): remove split by table
Li0k Oct 31, 2023
0e21215
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Oct 31, 2023
98779dd
fix(storage): fix test
Li0k Oct 31, 2023
3ef2635
fix(storage): fix comments
Li0k Nov 1, 2023
e2d79f8
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 13, 2023
1003115
fix(storage): update branch
Li0k Nov 13, 2023
9e2c49e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 13, 2023
3e0959c
chore(storage): address comments
Li0k Nov 20, 2023
c0e1121
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 20, 2023
e58d6f1
fix(storage): fix unit-test
Li0k Nov 20, 2023
b8a664e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 21, 2023
6b56485
refactor(storage): refactor code and fix comments
Li0k Nov 27, 2023
ec2210b
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 27, 2023
6f2b69e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 28, 2023
3833fe7
chore(storage): add docs
Li0k Nov 30, 2023
10f960d
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Nov 30, 2023
840dca3
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ message CompactTask {
bool split_by_state_table = 21;
// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
uint32 split_weight_by_vnode = 22;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they deprecated now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No in this PR, the new table_vnode_partition is not persistent

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not persistent? Are split_byt_state_table and split_weight_by_vnode persistent? Why does persistence affect the deprecation of these fields?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we should document what fields in CompactTask are persistent and what fields are not.

map<uint32, uint32> table_vnode_partition = 23;
}

message LevelHandler {
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub struct MetaConfig {
#[serde(default = "default::meta::split_group_size_limit")]
pub split_group_size_limit: u64,

#[serde(default = "default::meta::cut_table_size_limit")]
pub cut_table_size_limit: u64,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,

Expand Down Expand Up @@ -937,7 +940,7 @@ pub mod default {
}

pub fn periodic_split_compact_group_interval_sec() -> u64 {
180 // 3mi
10 // 10s
}

pub fn periodic_tombstone_reclaim_compaction_interval_sec() -> u64 {
Expand Down Expand Up @@ -967,6 +970,10 @@ pub mod default {
pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
60 // 1min
}

pub fn cut_table_size_limit() -> u64 {
1024 * 1024 * 1024 // 1GB
}
}

pub mod server {
Expand Down
3 changes: 2 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ backend = "Mem"
periodic_space_reclaim_compaction_interval_sec = 3600
periodic_ttl_reclaim_compaction_interval_sec = 1800
periodic_tombstone_reclaim_compaction_interval_sec = 600
periodic_split_compact_group_interval_sec = 180
periodic_split_compact_group_interval_sec = 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change expected?

Copy link
Contributor Author

@Li0k Li0k Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think this check should be more sensitive (for backfill), what do you think?

move_table_size_limit = 10737418240
split_group_size_limit = 68719476736
cut_table_size_limit = 1073741824
do_not_config_object_storage_lifecycle = false
partition_vnode_count = 64
table_write_throughput_threshold = 16777216
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.compaction_task_max_heartbeat_interval_secs,
compaction_config: Some(config.meta.compaction_config),
cut_table_size_limit: config.meta.cut_table_size_limit,
},
config.system.into_init_system_params(),
)
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ pub struct CompactionTask {
pub compression_algorithm: String,
pub target_file_size: u64,
pub compaction_task_type: compact_task::TaskType,
pub enable_split_by_table: bool,
}

pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
Expand Down Expand Up @@ -150,6 +149,7 @@ impl CompactStatus {
task_type: ret.compaction_task_type as i32,
split_by_state_table: group.compaction_config.split_by_state_table,
split_weight_by_vnode: group.compaction_config.split_weight_by_vnode,
table_vnode_partition: BTreeMap::default(),
};
Some(compact_task)
}
Expand Down Expand Up @@ -237,7 +237,6 @@ pub fn create_compaction_task(
input,
target_file_size,
compaction_task_type,
enable_split_by_table: false,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct MinOverlappingPicker {
level: usize,
target_level: usize,
max_select_bytes: u64,
split_by_table: bool,
overlap_strategy: Arc<dyn OverlapStrategy>,
}

Expand All @@ -37,14 +36,12 @@ impl MinOverlappingPicker {
level: usize,
target_level: usize,
max_select_bytes: u64,
split_by_table: bool,
overlap_strategy: Arc<dyn OverlapStrategy>,
) -> MinOverlappingPicker {
MinOverlappingPicker {
level,
target_level,
max_select_bytes,
split_by_table,
overlap_strategy,
}
}
Expand All @@ -66,9 +63,6 @@ impl MinOverlappingPicker {
if level_handlers[self.level].is_pending_compact(&table.sst_id) {
break;
}
if self.split_by_table && table.table_ids != select_tables[left].table_ids {
break;
}
if select_file_size > self.max_select_bytes {
break;
}
Expand Down Expand Up @@ -384,13 +378,8 @@ pub mod tests {

#[test]
fn test_compact_l1() {
let mut picker = MinOverlappingPicker::new(
1,
2,
10000,
false,
Arc::new(RangeOverlapStrategy::default()),
);
let mut picker =
MinOverlappingPicker::new(1, 2, 10000, Arc::new(RangeOverlapStrategy::default()));
let levels = vec![
Level {
level_idx: 1,
Expand Down Expand Up @@ -466,13 +455,8 @@ pub mod tests {

#[test]
fn test_expand_l1_files() {
let mut picker = MinOverlappingPicker::new(
1,
2,
10000,
false,
Arc::new(RangeOverlapStrategy::default()),
);
let mut picker =
MinOverlappingPicker::new(1, 2, 10000, Arc::new(RangeOverlapStrategy::default()));
let levels = vec![
Level {
level_idx: 1,
Expand Down Expand Up @@ -812,7 +796,7 @@ pub mod tests {
];
// no limit
let picker =
MinOverlappingPicker::new(2, 3, 1000, false, Arc::new(RangeOverlapStrategy::default()));
MinOverlappingPicker::new(2, 3, 1000, Arc::new(RangeOverlapStrategy::default()));
let (select_files, target_files) = picker.pick_tables(
&levels[1].table_infos,
&levels[2].table_infos,
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl DynamicLevelSelectorCore {
picker_info.select_level,
picker_info.target_level,
self.config.max_bytes_for_level_base,
self.config.split_by_state_table,
overlap_strategy,
))
}
Expand Down
90 changes: 76 additions & 14 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ use crate::hummock::metrics_utils::{
};
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, META_NODE_ID,
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, TableId,
META_NODE_ID,
};
use crate::model::{
BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction,
Expand Down Expand Up @@ -144,6 +145,9 @@ pub struct HummockManager {
// `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
// and suggest types with a certain priority.
pub compaction_state: CompactionState,

group_to_table_vnode_partition:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also document what does it mean if:

  • a table id is not found in this mapping
  • a table id is found in this mapping with 0 as the vnode partition number
  • a table id is found in this mapping with 1 as the vnode partition numner

parking_lot::RwLock<HashMap<CompactionGroupId, BTreeMap<TableId, u32>>>,
Comment on lines +160 to +161
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... This is always mutated when we hold the compaction lock. Why not put it under Compaction?

}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -390,6 +394,7 @@ impl HummockManager {
history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
group_to_table_vnode_partition: parking_lot::RwLock::new(HashMap::default()),
};
let instance = Arc::new(instance);
instance.start_worker(rx).await;
Expand Down Expand Up @@ -813,6 +818,14 @@ impl HummockManager {
// lock in compaction_guard, take out all table_options in advance there may be a
// waste of resources here, need to add a more efficient filter in catalog_manager
let all_table_id_to_option = self.catalog_manager.get_all_table_options().await;
let mut table_to_vnode_partition = match self
.group_to_table_vnode_partition
.read()
.get(&compaction_group_id)
{
Some(table_to_vnode_partition) => table_to_vnode_partition.clone(),
None => BTreeMap::default(),
};

let mut compaction_guard = write_lock!(self, compaction).await;
let compaction = compaction_guard.deref_mut();
Expand Down Expand Up @@ -903,6 +916,7 @@ impl HummockManager {
.unwrap()
.member_table_ids
.clone();

Li0k marked this conversation as resolved.
Show resolved Hide resolved
let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task);
let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task);

Expand Down Expand Up @@ -963,6 +977,18 @@ impl HummockManager {
compact_task.current_epoch_time = Epoch::now().0;
compact_task.compaction_filter_mask =
group_config.compaction_config.compaction_filter_mask;
table_to_vnode_partition
.retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
Li0k marked this conversation as resolved.
Show resolved Hide resolved

if group_config.compaction_config.split_weight_by_vnode > 0 {
// TODO: unify to table_to_vnode_partition directly
table_to_vnode_partition.insert(
member_table_ids[0],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an implicit assumption here: there is only a single item in member_table_ids when the if condition is met. We should add an assertion to check or throw an error if the assumption doesn't hold.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of updating table_to_vnode_partition ad-hoc in get_compact_task, I think it is more appropriate to update self.group_to_table_vnode_partition correspondingly in the following cases:

  1. A new group is created.
  2. split_weight_by_vnode is modified for an existing group.
  3. New table ids are added to a group.

Maybe that is what you mean by "// TODO: unify to table_to_vnode_partition directly"?

group_config.compaction_config.split_weight_by_vnode,
);
}

compact_task.table_vnode_partition = table_to_vnode_partition;

let mut compact_task_assignment =
BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
Expand Down Expand Up @@ -2164,7 +2190,6 @@ impl HummockManager {
));
split_group_trigger_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
split_group_trigger_interval.reset();

let split_group_trigger = IntervalStream::new(split_group_trigger_interval)
.map(|_| HummockTimerEvent::GroupSplit);
Expand Down Expand Up @@ -2484,31 +2509,60 @@ impl HummockManager {
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let partition_vnode_count = self.env.opts.partition_vnode_count;
let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize);

let mut group_to_table_vnode_partition = HashMap::default();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will clear group_to_table_vnode_partition and re-calculate it. Just to double check: do we guarantee that group_infos will contain all compaction groups and all table ids?


for group in &group_infos {
let table_vnode_partition_mapping = group_to_table_vnode_partition
.entry(group.group_id)
.or_insert(BTreeMap::default());

if group.table_statistic.len() == 1 {
// no need to handle the dedication compaciton group
continue;
}

for (table_id, table_size) in &group.table_statistic {
if !created_tables.contains(table_id) {
continue;
}
let mut is_high_write_throughput = false;
let mut is_low_write_throughput = true;
let is_creating_table = !created_tables.contains(table_id);
if let Some(history) = table_write_throughput.get(table_id) {
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs
> self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = history.iter().any(|throughput| {
*throughput / checkpoint_secs
< self.env.opts.min_table_split_write_throughput
});
if !is_creating_table {
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs
> self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = history.iter().any(|throughput| {
*throughput / checkpoint_secs
< self.env.opts.min_table_split_write_throughput
});
}
} else {
let sum = history.iter().sum::<u64>();
is_low_write_throughput = sum
< self.env.opts.min_table_split_write_throughput
* history.len() as u64
* checkpoint_secs;
}
}
let state_table_size = *table_size;

{
if !is_low_write_throughput {
table_vnode_partition_mapping.insert(*table_id, 4_u32);
} else if state_table_size > self.env.opts.cut_table_size_limit {
table_vnode_partition_mapping.insert(*table_id, 1_u32);
Li0k marked this conversation as resolved.
Show resolved Hide resolved
Li0k marked this conversation as resolved.
Show resolved Hide resolved
} else {
table_vnode_partition_mapping.remove(table_id);
}
}

if !created_tables.contains(table_id) {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: this can be !is_creating_table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be is_creating_table

// do not split the creating table
continue;
}

if is_low_write_throughput {
continue;
}
Expand All @@ -2532,6 +2586,7 @@ impl HummockManager {
}
}

table_vnode_partition_mapping.insert(*table_id, partition_vnode_count);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vnode parition mapping set in L2553 can be overwritten here. Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm going to remove it, currently we'll set it up when we build compact_task

let ret = self
.move_state_table_to_compaction_group(
parent_group_id,
Expand All @@ -2557,6 +2612,13 @@ impl HummockManager {
}
}
}

tracing::info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this log be too frequently? I think we do not need to print it every times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it to debug, we can provide a separate interface to get table_vnode_partition if needed

"group_to_table_vnode_partition {:?}",
group_to_table_vnode_partition
);

*self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition;
}

#[named]
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2675,6 +2675,7 @@ impl CatalogManager {
.database
.tables
.values()
.filter(|table| table.get_stream_job_status() != Ok(StreamJobStatus::Creating))
.map(|table| table.id)
.collect()
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub struct MetaOpts {

pub compaction_task_max_heartbeat_interval_secs: u64,
pub compaction_config: Option<CompactionConfig>,

/// The size limit to split a state-table to independent sstable.
pub cut_table_size_limit: u64,
}

impl MetaOpts {
Expand Down Expand Up @@ -209,6 +212,7 @@ impl MetaOpts {
partition_vnode_count: 32,
compaction_task_max_heartbeat_interval_secs: 0,
compaction_config: None,
cut_table_size_limit: 1024 * 1024 * 1024,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -119,9 +119,9 @@ pub struct TaskConfig {
pub stats_target_table_ids: Option<HashSet<u32>>,
pub task_type: compact_task::TaskType,
pub is_target_l0_or_lbase: bool,
pub split_by_table: bool,
pub split_weight_by_vnode: u32,
pub use_block_based_filter: bool,

pub table_vnode_partition: BTreeMap<u32, u32>,
}

pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ impl CompactorRunner {
task_type: task.task_type(),
is_target_l0_or_lbase: task.target_level == 0
|| task.target_level == task.base_level,
split_by_table: task.split_by_state_table,
split_weight_by_vnode: task.split_weight_by_vnode,
use_block_based_filter,
table_vnode_partition: task.table_vnode_partition.clone(),
},
object_id_getter,
);
Expand Down
Loading
Loading