Skip to content

Commit

Permalink
refactor(meta): list and delete SST objects in meta node (#19329)
Browse files Browse the repository at this point in the history
(cherry picked from commit b689615)
  • Loading branch information
zwang28 committed Nov 11, 2024
1 parent dcd12f5 commit 28ea19e
Show file tree
Hide file tree
Showing 23 changed files with 442 additions and 1,335 deletions.
33 changes: 3 additions & 30 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ message SubscribeCompactionEventResponse {

oneof event {
CompactTask compact_task = 1;
VacuumTask vacuum_task = 2;
FullScanTask full_scan_task = 3;
ValidationTask validation_task = 4;
VacuumTask vacuum_task = 2 [deprecated = true];
FullScanTask full_scan_task = 3 [deprecated = true];
ValidationTask validation_task = 4 [deprecated = true];
CancelCompactTask cancel_compact_task = 5;

PullTaskAck pull_task_ack = 6;
Expand Down Expand Up @@ -554,14 +554,6 @@ message CancelCompactTask {
uint64 task_id = 2;
}

message ReportVacuumTaskRequest {
VacuumTask vacuum_task = 1;
}

message ReportVacuumTaskResponse {
common.Status status = 1;
}

message TriggerManualCompactionRequest {
uint64 compaction_group_id = 1;
KeyRange key_range = 2;
Expand All @@ -574,23 +566,6 @@ message TriggerManualCompactionResponse {
common.Status status = 1;
}

message ReportFullScanTaskRequest {
// Object ids that satisfying conditions specified by FullScanTask.
repeated uint64 object_ids = 1;
// Total count of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
// Start key of the next page.
optional string next_start_after = 4;
// Start key of this page.
optional string start_after = 5;
}

message ReportFullScanTaskResponse {
common.Status status = 1;
}

message TriggerFullGCRequest {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
Expand Down Expand Up @@ -819,9 +794,7 @@ service HummockManagerService {
rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse);
rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse);
rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse);
rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse);
rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse);
Expand Down
19 changes: 6 additions & 13 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,22 +524,13 @@ pub async fn start_service_as_election_leader(
.may_fill_backward_state_table_info()
.await
.unwrap();

let vacuum_manager = Arc::new(hummock::VacuumManager::new(
env.clone(),
hummock_manager.clone(),
backup_manager.clone(),
compactor_manager.clone(),
));

let mut aws_cli = None;
if let Some(my_vpc_id) = &env.opts.vpc_id
&& let Some(security_group_id) = &env.opts.security_group_id
{
let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await;
aws_cli = Some(cli);
}

let ddl_srv = DdlServiceImpl::new(
env.clone(),
aws_cli.clone(),
Expand Down Expand Up @@ -573,12 +564,12 @@ pub async fn start_service_as_election_leader(
let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
let hummock_srv = HummockServiceImpl::new(
hummock_manager.clone(),
vacuum_manager.clone(),
metadata_manager.clone(),
backup_manager.clone(),
);

let health_srv = HealthServiceImpl::new();
let backup_srv = BackupServiceImpl::new(backup_manager);
let backup_srv = BackupServiceImpl::new(backup_manager.clone());
let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
let system_params_srv = SystemParamsServiceImpl::new(
env.system_params_manager_impl_ref(),
Expand All @@ -598,7 +589,6 @@ pub async fn start_service_as_election_leader(
// sub_tasks executed concurrently. Can be shutdown via shutdown_all
sub_tasks.extend(hummock::start_hummock_workers(
hummock_manager.clone(),
vacuum_manager,
// compaction_scheduler,
&env.opts,
));
Expand All @@ -616,7 +606,10 @@ pub async fn start_service_as_election_leader(
sub_tasks.push(SystemParamsController::start_params_notifier(
env.system_params_manager_impl_ref(),
));
sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone()));
sub_tasks.push(HummockManager::hummock_timer_task(
hummock_manager.clone(),
Some(backup_manager),
));
sub_tasks.extend(HummockManager::compaction_event_loop(
hummock_manager,
compactor_streams_change_rx,
Expand Down
72 changes: 18 additions & 54 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,34 @@ use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_meta::backup_restore::BackupManagerRef;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::*;
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status, Streaming};

use crate::hummock::compaction::selector::ManualCompactionOption;
use crate::hummock::{HummockManagerRef, VacuumManagerRef};
use crate::hummock::HummockManagerRef;
use crate::RwReceiverStream;

pub struct HummockServiceImpl {
hummock_manager: HummockManagerRef,
vacuum_manager: VacuumManagerRef,
metadata_manager: MetadataManager,
backup_manager: BackupManagerRef,
}

impl HummockServiceImpl {
pub fn new(
hummock_manager: HummockManagerRef,
vacuum_trigger: VacuumManagerRef,
metadata_manager: MetadataManager,
backup_manager: BackupManagerRef,
) -> Self {
HummockServiceImpl {
hummock_manager,
vacuum_manager: vacuum_trigger,
metadata_manager,
backup_manager,
}
}
}
Expand Down Expand Up @@ -174,17 +174,6 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn report_vacuum_task(
&self,
request: Request<ReportVacuumTaskRequest>,
) -> Result<Response<ReportVacuumTaskResponse>, Status> {
if let Some(vacuum_task) = request.into_inner().vacuum_task {
self.vacuum_manager.report_vacuum_task(vacuum_task).await?;
}
sync_point::sync_point!("AFTER_REPORT_VACUUM");
Ok(Response::new(ReportVacuumTaskResponse { status: None }))
}

async fn trigger_manual_compaction(
&self,
request: Request<TriggerManualCompactionRequest>,
Expand Down Expand Up @@ -245,49 +234,24 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn report_full_scan_task(
&self,
request: Request<ReportFullScanTaskRequest>,
) -> Result<Response<ReportFullScanTaskResponse>, Status> {
let req = request.into_inner();
let hummock_manager = self.hummock_manager.clone();
hummock_manager.update_paged_metrics(
req.start_after,
req.next_start_after.clone(),
req.total_object_count,
req.total_object_size,
);
let pinned_by_metadata_backup = self.vacuum_manager.backup_manager.list_pinned_ssts();
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager
.complete_full_gc(
req.object_ids,
req.next_start_after,
pinned_by_metadata_backup,
)
.await
{
Ok(number) => {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Full GC SST failed");
}
}
});
Ok(Response::new(ReportFullScanTaskResponse { status: None }))
}

async fn trigger_full_gc(
&self,
request: Request<TriggerFullGcRequest>,
) -> Result<Response<TriggerFullGcResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)
.await?;
let backup_manager_2 = self.backup_manager.clone();
let hummock_manager_2 = self.hummock_manager.clone();
tokio::task::spawn(async move {
use thiserror_ext::AsReport;
let _ = hummock_manager_2
.start_full_gc(
Duration::from_secs(req.sst_retention_time_sec),
req.prefix,
Some(backup_manager_2),
)
.await
.inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
});
Ok(Response::new(TriggerFullGcResponse { status: None }))
}

Expand Down
1 change: 0 additions & 1 deletion src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ impl HummockManager {
// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// Sanity check to ensure SSTs to commit have not been full GCed yet.
// TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed.
let now = self.now().await?;
check_sst_retention(
now,
Expand Down
Loading

0 comments on commit 28ea19e

Please sign in to comment.