Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_disable_merge_imm
  • Loading branch information
Li0k committed Jan 11, 2024
2 parents 87aaef9 + 44019bc commit dfb7dec
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 40 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ pub struct StorageConfig {
#[serde(default)]
pub prefetch_buffer_capacity_mb: Option<usize>,

/// max prefetch block number
#[serde(default = "default::storage::max_prefetch_block_number")]
pub max_prefetch_block_number: usize,

#[serde(default = "default::storage::disable_remote_compactor")]
pub disable_remote_compactor: bool,

Expand Down Expand Up @@ -1206,6 +1210,10 @@ pub mod default {
pub fn compactor_fast_max_compact_task_size() -> u64 {
2 * 1024 * 1024 * 1024 // 2g
}

pub fn max_prefetch_block_number() -> usize {
16
}
}

pub mod streaming {
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ share_buffer_compaction_worker_threads_number = 4
shared_buffer_flush_ratio = 0.800000011920929
imm_merge_threshold = 0
write_conflict_detection_enabled = true
max_prefetch_block_number = 16
disable_remote_compactor = false
share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 2.5
Expand Down
1 change: 1 addition & 0 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl HummockServiceOpts {
opts.meta_cache_capacity_mb * (1 << 20),
0,
opts.block_cache_capacity_mb * (1 << 20),
opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl HummockJavaBindingIterator {
1 << 10,
0,
1 << 10,
16,
FileCache::none(),
FileCache::none(),
None,
Expand Down
5 changes: 2 additions & 3 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ impl DdlServiceImpl {
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
let source = source.as_mut().unwrap();
source.id = source_id;
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(
source,
&mut source,
&mut table,
TableJobType::General,
&mut fragment_graph,
Expand Down
73 changes: 39 additions & 34 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,11 @@ impl DdlController {
stream_job.set_id(id);

match &mut stream_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
if let Some(src) = src {
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
}
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(_) => {
Expand Down Expand Up @@ -1979,19 +1981,12 @@ impl DdlController {

/// Fill in necessary information for table stream graph.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));

for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
Expand All @@ -2001,26 +1996,40 @@ pub fn fill_table_stream_graph_info(
}

// If we're creating a table with connector, we should additionally fill its ID first.
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

Expand All @@ -2034,8 +2043,4 @@ pub fn fill_table_stream_graph_info(
}
});
}
assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);
}
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl DdlController {
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn mock_sstable_store() -> SstableStoreRef {
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ fn bench_builder(
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
storage_opts.meta_cache_capacity_mb * (1 << 20),
storage_opts.high_priority_ratio,
storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
storage_opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto
64 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
Expand Down
7 changes: 5 additions & 2 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub struct SstableStore {
recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
prefetch_buffer_usage: Arc<AtomicUsize>,
prefetch_buffer_capacity: usize,
max_prefetch_block_number: usize,
}

impl SstableStore {
Expand All @@ -182,6 +183,7 @@ impl SstableStore {
meta_cache_capacity: usize,
high_priority_ratio: usize,
prefetch_buffer_capacity: usize,
max_prefetch_block_number: usize,
data_file_cache: FileCache<SstableBlockIndex, CachedBlock>,
meta_file_cache: FileCache<HummockSstableObjectId, CachedSstable>,
recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
Expand Down Expand Up @@ -218,6 +220,7 @@ impl SstableStore {
recent_filter,
prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
prefetch_buffer_capacity,
max_prefetch_block_number,
}
}

Expand All @@ -239,6 +242,7 @@ impl SstableStore {
meta_file_cache: FileCache::none(),
prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
prefetch_buffer_capacity: block_cache_capacity,
max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
recent_filter: None,
}
}
Expand Down Expand Up @@ -306,7 +310,6 @@ impl SstableStore {
policy: CachePolicy,
stats: &mut StoreLocalStatistic,
) -> HummockResult<Box<dyn BlockStream>> {
const MAX_PREFETCH_BLOCK: usize = 16;
let object_id = sst.id;
if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
let block = self.get(sst, block_index, policy, stats).await?;
Expand All @@ -324,7 +327,7 @@ impl SstableStore {
None,
)));
}
let end_index = std::cmp::min(end_index, block_index + MAX_PREFETCH_BLOCK);
let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
let start_offset = sst.meta.block_metas[block_index].offset as usize;
let mut min_hit_index = end_index;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct StorageOpts {
pub high_priority_ratio: usize,
/// max memory usage for large query.
pub prefetch_buffer_capacity_mb: usize,

pub max_prefetch_block_number: usize,

pub disable_remote_compactor: bool,
/// Number of tasks shared buffer can upload in parallel.
pub share_buffer_upload_concurrency: usize,
Expand Down Expand Up @@ -165,6 +168,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
high_priority_ratio: s.high_priority_ratio_in_percent,
block_cache_capacity_mb: s.block_cache_capacity_mb,
prefetch_buffer_capacity_mb: s.prefetch_buffer_capacity_mb,
max_prefetch_block_number: c.storage.max_prefetch_block_number,
meta_cache_capacity_mb: s.meta_cache_capacity_mb,
disable_remote_compactor: c.storage.disable_remote_compactor,
share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ impl StateStoreImpl {
opts.meta_cache_capacity_mb * (1 << 20),
opts.high_priority_ratio,
opts.prefetch_buffer_capacity_mb * (1 << 20),
opts.max_prefetch_block_number,
data_file_cache,
meta_file_cache,
recent_filter,
Expand Down
1 change: 1 addition & 0 deletions src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ async fn compaction_test(
storage_memory_config.meta_cache_capacity_mb * (1 << 20),
0,
storage_memory_config.prefetch_buffer_capacity_mb * (1 << 20),
storage_opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
Expand Down

0 comments on commit dfb7dec

Please sign in to comment.