Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): list and delete SST objects in meta node #19329

Merged
merged 7 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ message SubscribeCompactionEventResponse {

oneof event {
CompactTask compact_task = 1;
VacuumTask vacuum_task = 2;
FullScanTask full_scan_task = 3;
ValidationTask validation_task = 4;
VacuumTask vacuum_task = 2 [deprecated = true];
FullScanTask full_scan_task = 3 [deprecated = true];
ValidationTask validation_task = 4 [deprecated = true];
CancelCompactTask cancel_compact_task = 5;

PullTaskAck pull_task_ack = 6;
Expand Down Expand Up @@ -554,14 +554,6 @@ message CancelCompactTask {
uint64 task_id = 2;
}

message ReportVacuumTaskRequest {
VacuumTask vacuum_task = 1;
}

message ReportVacuumTaskResponse {
common.Status status = 1;
}

message TriggerManualCompactionRequest {
uint64 compaction_group_id = 1;
KeyRange key_range = 2;
Expand All @@ -574,23 +566,6 @@ message TriggerManualCompactionResponse {
common.Status status = 1;
}

message ReportFullScanTaskRequest {
// Object ids that satisfying conditions specified by FullScanTask.
repeated uint64 object_ids = 1;
// Total count of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
// Start key of the next page.
optional string next_start_after = 4;
// Start key of this page.
optional string start_after = 5;
}

message ReportFullScanTaskResponse {
common.Status status = 1;
}

message TriggerFullGCRequest {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
Expand Down Expand Up @@ -819,9 +794,7 @@ service HummockManagerService {
rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse);
rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse);
rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse);
rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse);
rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse);
Expand Down
17 changes: 6 additions & 11 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,6 @@ pub async fn start_service_as_election_leader(
.await
.unwrap();

let vacuum_manager = Arc::new(hummock::VacuumManager::new(
env.clone(),
hummock_manager.clone(),
backup_manager.clone(),
compactor_manager.clone(),
));

let ddl_srv = DdlServiceImpl::new(
env.clone(),
metadata_manager.clone(),
Expand Down Expand Up @@ -560,12 +553,12 @@ pub async fn start_service_as_election_leader(
let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
let hummock_srv = HummockServiceImpl::new(
hummock_manager.clone(),
vacuum_manager.clone(),
metadata_manager.clone(),
backup_manager.clone(),
);

let health_srv = HealthServiceImpl::new();
let backup_srv = BackupServiceImpl::new(backup_manager);
let backup_srv = BackupServiceImpl::new(backup_manager.clone());
let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
let system_params_srv = SystemParamsServiceImpl::new(
env.system_params_manager_impl_ref(),
Expand All @@ -585,7 +578,6 @@ pub async fn start_service_as_election_leader(
// sub_tasks executed concurrently. Can be shutdown via shutdown_all
sub_tasks.extend(hummock::start_hummock_workers(
hummock_manager.clone(),
vacuum_manager,
// compaction_scheduler,
&env.opts,
));
Expand All @@ -603,7 +595,10 @@ pub async fn start_service_as_election_leader(
sub_tasks.push(SystemParamsController::start_params_notifier(
env.system_params_manager_impl_ref(),
));
sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone()));
sub_tasks.push(HummockManager::hummock_timer_task(
hummock_manager.clone(),
Some(backup_manager),
));
sub_tasks.extend(HummockManager::compaction_event_loop(
hummock_manager,
compactor_streams_change_rx,
Expand Down
72 changes: 18 additions & 54 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,34 @@ use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_meta::backup_restore::BackupManagerRef;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::*;
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status, Streaming};

use crate::hummock::compaction::selector::ManualCompactionOption;
use crate::hummock::{HummockManagerRef, VacuumManagerRef};
use crate::hummock::HummockManagerRef;
use crate::RwReceiverStream;

pub struct HummockServiceImpl {
hummock_manager: HummockManagerRef,
vacuum_manager: VacuumManagerRef,
metadata_manager: MetadataManager,
backup_manager: BackupManagerRef,
}

impl HummockServiceImpl {
pub fn new(
hummock_manager: HummockManagerRef,
vacuum_trigger: VacuumManagerRef,
metadata_manager: MetadataManager,
backup_manager: BackupManagerRef,
) -> Self {
HummockServiceImpl {
hummock_manager,
vacuum_manager: vacuum_trigger,
metadata_manager,
backup_manager,
}
}
}
Expand Down Expand Up @@ -174,17 +174,6 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn report_vacuum_task(
&self,
request: Request<ReportVacuumTaskRequest>,
) -> Result<Response<ReportVacuumTaskResponse>, Status> {
if let Some(vacuum_task) = request.into_inner().vacuum_task {
self.vacuum_manager.report_vacuum_task(vacuum_task).await?;
}
sync_point::sync_point!("AFTER_REPORT_VACUUM");
Ok(Response::new(ReportVacuumTaskResponse { status: None }))
}

async fn trigger_manual_compaction(
&self,
request: Request<TriggerManualCompactionRequest>,
Expand Down Expand Up @@ -245,49 +234,24 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn report_full_scan_task(
&self,
request: Request<ReportFullScanTaskRequest>,
) -> Result<Response<ReportFullScanTaskResponse>, Status> {
let req = request.into_inner();
let hummock_manager = self.hummock_manager.clone();
hummock_manager.update_paged_metrics(
req.start_after,
req.next_start_after.clone(),
req.total_object_count,
req.total_object_size,
);
let pinned_by_metadata_backup = self.vacuum_manager.backup_manager.list_pinned_ssts();
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager
.complete_full_gc(
req.object_ids,
req.next_start_after,
pinned_by_metadata_backup,
)
.await
{
Ok(number) => {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Full GC SST failed");
}
}
});
Ok(Response::new(ReportFullScanTaskResponse { status: None }))
}

async fn trigger_full_gc(
&self,
request: Request<TriggerFullGcRequest>,
) -> Result<Response<TriggerFullGcResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)
.await?;
let backup_manager_2 = self.backup_manager.clone();
let hummock_manager_2 = self.hummock_manager.clone();
tokio::task::spawn(async move {
use thiserror_ext::AsReport;
let _ = hummock_manager_2
.start_full_gc(
Duration::from_secs(req.sst_retention_time_sec),
req.prefix,
Some(backup_manager_2),
)
.await
.inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
});
Ok(Response::new(TriggerFullGcResponse { status: None }))
}

Expand Down
1 change: 0 additions & 1 deletion src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ impl HummockManager {
// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// Sanity check to ensure SSTs to commit have not been full GCed yet.
// TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed.
let now = self.now().await?;
check_sst_retention(
now,
Expand Down
Loading
Loading