Skip to content

Commit

Permalink
feat(compaction): dynamically determine the type of sstable_object_id…
Browse files Browse the repository at this point in the history
…_manager for shared compaction (#11867)
  • Loading branch information
wcy-fdu authored Aug 31, 2023
1 parent e0480ea commit fdbba6e
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 107 deletions.
7 changes: 5 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,16 @@ pub async fn compute_node_serve(
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
filter_key_extractor_manager: storage.filter_key_extractor_manager().clone(),
memory_limiter,
sstable_object_id_manager: storage.sstable_object_id_manager().clone(),

task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
});

let (handle, shutdown_sender) = start_compactor(compactor_context);
let (handle, shutdown_sender) = start_compactor(
compactor_context,
storage.sstable_object_id_manager().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
let flush_limiter = storage.get_memory_limiter();
Expand Down
9 changes: 6 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn compactor_serve(
)),
filter_key_extractor_manager: filter_key_extractor_manager.clone(),
memory_limiter,
sstable_object_id_manager: sstable_object_id_manager.clone(),

task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
running_task_count: Arc::new(AtomicU32::new(0)),
Expand All @@ -212,9 +212,12 @@ pub async fn compactor_serve(
MetaClient::start_heartbeat_loop(
meta_client.clone(),
Duration::from_millis(config.server.heartbeat_interval_ms as u64),
vec![sstable_object_id_manager],
vec![sstable_object_id_manager.clone()],
),
risingwave_storage::hummock::compactor::start_compactor(
compactor_context.clone(),
sstable_object_id_manager.clone(),
),
risingwave_storage::hummock::compactor::start_compactor(compactor_context.clone()),
];

let telemetry_manager = TelemetryManager::new(
Expand Down
106 changes: 85 additions & 21 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,14 @@ pub(crate) mod tests {
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
CompactorContext {
storage_opts: options.clone(),
storage_opts: options,
sstable_store,
hummock_meta_client: hummock_meta_client.clone(),
compactor_metrics: Arc::new(CompactorMetrics::unused()),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
memory_limiter: MemoryLimiter::unlimit(),
filter_key_extractor_manager,
sstable_object_id_manager: Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
options.sstable_id_remote_fetch_number,
)),
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
Expand Down Expand Up @@ -232,6 +228,13 @@ pub(crate) mod tests {
&hummock_meta_client,
storage.filter_key_extractor_manager().clone(),
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
let worker_node2 = hummock_manager_ref
.cluster_manager
.add_worker_node(
Expand Down Expand Up @@ -277,8 +280,13 @@ pub(crate) mod tests {
compact_task.current_epoch_time = 0;

let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx.clone()), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx.clone()),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -376,7 +384,13 @@ pub(crate) mod tests {
&hummock_meta_client,
storage.filter_key_extractor_manager().clone(),
);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
// 1. add sstables with 1MB value
let mut key = BytesMut::default();
key.put_u16(0);
Expand Down Expand Up @@ -421,8 +435,13 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -653,7 +672,13 @@ pub(crate) mod tests {
&hummock_meta_client,
filter_key_extractor_manager.clone(),
);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
global_storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
// 1. add sstables
let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value

Expand Down Expand Up @@ -731,8 +756,13 @@ pub(crate) mod tests {

// 4. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -821,6 +851,13 @@ pub(crate) mod tests {
&hummock_meta_client,
filter_key_extractor_manager.clone(),
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
filter_key_extractor_manager.update(
2,
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)),
Expand Down Expand Up @@ -900,8 +937,13 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -1003,7 +1045,13 @@ pub(crate) mod tests {
&hummock_meta_client,
filter_key_extractor_manager.clone(),
);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
// 1. add sstables
let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value
let kv_count = 11;
Expand Down Expand Up @@ -1069,8 +1117,13 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -1168,7 +1221,13 @@ pub(crate) mod tests {
.await;
let compact_ctx =
prepare_compactor_and_filter(&storage, &hummock_meta_client, existing_table_id);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));
prepare_data(hummock_meta_client.clone(), &storage, existing_table_id, 2).await;
let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
Expand Down Expand Up @@ -1217,8 +1276,13 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) =
compact(Arc::new(compact_ctx), compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down
55 changes: 45 additions & 10 deletions src/storage/hummock_test/src/sync_point_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::compactor::compactor_runner::compact;
use risingwave_storage::hummock::compactor::CompactorContext;
use risingwave_storage::hummock::{CachePolicy, SstableObjectIdManager};
use risingwave_storage::hummock::{CachePolicy, GetObjectId, SstableObjectIdManager};
use risingwave_storage::store::{LocalStateStore, NewLocalOptions, ReadOptions};
use risingwave_storage::StateStore;
use serial_test::serial;
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn test_syncpoints_sstable_object_id_manager() {
});

// Start the task that fetches new ids.
let sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let mut sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let leader_task = tokio::spawn(async move {
sstable_object_id_manager_clone
.get_new_sst_object_id()
Expand All @@ -90,7 +90,7 @@ async fn test_syncpoints_sstable_object_id_manager() {
// Start tasks that waits to be notified.
let mut follower_tasks = vec![];
for _ in 0..3 {
let sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let mut sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let follower_task = tokio::spawn(async move {
sstable_object_id_manager_clone
.get_new_sst_object_id()
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn test_syncpoints_test_failpoints_fetch_ids() {
});

// Start the task that fetches new ids.
let sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let mut sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let leader_task = tokio::spawn(async move {
fail::cfg("get_new_sst_ids_err", "return").unwrap();
sstable_object_id_manager_clone
Expand All @@ -153,7 +153,7 @@ async fn test_syncpoints_test_failpoints_fetch_ids() {
// Start tasks that waits to be notified.
let mut follower_tasks = vec![];
for _ in 0..3 {
let sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let mut sstable_object_id_manager_clone = sstable_object_id_manager.clone();
let follower_task = tokio::spawn(async move {
sstable_object_id_manager_clone
.get_new_sst_object_id()
Expand Down Expand Up @@ -230,6 +230,7 @@ async fn test_syncpoints_test_local_notification_receiver() {
pub async fn compact_once(
hummock_manager_ref: HummockManagerRef<MemStore>,
compact_ctx: Arc<CompactorContext>,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) {
// 2. get compact task
let manual_compcation_option = ManualCompactionOption {
Expand All @@ -251,7 +252,13 @@ pub async fn compact_once(
compact_task.compaction_filter_mask = compaction_filter_flag.bits();
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(compact_ctx, compact_task.clone(), rx).await;
let (mut result_task, task_stats) = compact(
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
Expand Down Expand Up @@ -288,6 +295,14 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
existing_table_id,
));

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
.storage_opts()
.clone()
.sstable_id_remote_fetch_number,
));

let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
.await;
Expand Down Expand Up @@ -320,7 +335,12 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
local.flush(Vec::new()).await.unwrap();
local.seal_current_epoch(101);
flush_and_commit(&hummock_meta_client, &storage, 100).await;
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
compact_once(
hummock_manager_ref.clone(),
compact_ctx.clone(),
sstable_object_id_manager.clone(),
)
.await;

local
.insert(Bytes::from(b"\0\0aaa".as_slice()), val1.clone(), None)
Expand All @@ -337,7 +357,12 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
.unwrap();
local.seal_current_epoch(102);
flush_and_commit(&hummock_meta_client, &storage, 101).await;
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
compact_once(
hummock_manager_ref.clone(),
compact_ctx.clone(),
sstable_object_id_manager.clone(),
)
.await;

local
.insert(Bytes::from(b"\0\0hhh".as_slice()), val1.clone(), None)
Expand All @@ -355,7 +380,12 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
local.seal_current_epoch(103);
flush_and_commit(&hummock_meta_client, &storage, 102).await;
// move this two file to the same level.
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
compact_once(
hummock_manager_ref.clone(),
compact_ctx.clone(),
sstable_object_id_manager.clone(),
)
.await;

local
.insert(Bytes::from(b"\0\0lll".as_slice()), val1.clone(), None)
Expand All @@ -367,7 +397,12 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
local.seal_current_epoch(u64::MAX);
flush_and_commit(&hummock_meta_client, &storage, 103).await;
// move this two file to the same level.
compact_once(hummock_manager_ref.clone(), compact_ctx.clone()).await;
compact_once(
hummock_manager_ref.clone(),
compact_ctx.clone(),
sstable_object_id_manager.clone(),
)
.await;

// 4. get the latest version and check
let version = hummock_manager_ref.get_current_version().await;
Expand Down
Loading

0 comments on commit fdbba6e

Please sign in to comment.