Skip to content

Commit

Permalink
refactor(backup): refine error message (#12388)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 26, 2023
1 parent f38554e commit 1ff58bc
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 38 deletions.
6 changes: 2 additions & 4 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ enum BackupJobStatus {
UNSPECIFIED = 0;
RUNNING = 1;
SUCCEEDED = 2;
// NOT_FOUND indicates one of these cases:
// - Invalid job id.
// - Job has failed.
// - Job has succeeded, but its resulted backup has been deleted later.
NOT_FOUND = 3;
FAILED = 4;
}
message BackupMetaRequest {}
message BackupMetaResponse {
Expand All @@ -29,6 +26,7 @@ message GetBackupJobStatusRequest {
message GetBackupJobStatusResponse {
uint64 job_id = 1;
BackupJobStatus job_status = 2;
string message = 3;
}
message DeleteMetaSnapshotRequest {
repeated uint64 snapshot_ids = 1;
Expand Down
22 changes: 17 additions & 5 deletions src/ctl/src/cmd_impl/meta/backup_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,33 @@ pub async fn backup_meta(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let job_id = meta_client.backup_meta().await?;
loop {
let job_status = meta_client.get_backup_job_status(job_id).await?;
let (job_status, message) = meta_client.get_backup_job_status(job_id).await?;
match job_status {
BackupJobStatus::Running => {
tracing::info!("backup job is still running: job {}", job_id);
tracing::info!("backup job is still running: job {}, {}", job_id, message);
tokio::time::sleep(Duration::from_secs(1)).await;
}
BackupJobStatus::Succeeded => {
tracing::info!("backup job succeeded: job {}, {}", job_id, message);
break;
}
_ => {
return Err(anyhow::anyhow!("backup job failed: job {}", job_id));
BackupJobStatus::NotFound => {
return Err(anyhow::anyhow!(
"backup job status not found: job {}, {}",
job_id,
message
));
}
BackupJobStatus::Failed => {
return Err(anyhow::anyhow!(
"backup job failed: job {}, {}",
job_id,
message
));
}
_ => unreachable!("unknown backup job status"),
}
}
tracing::info!("backup job succeeded: job {}", job_id);
Ok(())
}

Expand Down
48 changes: 23 additions & 25 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ pub struct BackupManager {
hummock_manager: HummockManagerRef,
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
/// Tracks the running backup job. Concurrent jobs is not supported.
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
running_job_handle: tokio::sync::Mutex<Option<BackupJobHandle>>,
metrics: BackupManagerMetrics,
meta_metrics: Arc<MetaMetrics>,
/// (job id, status, message)
latest_job_info: ArcSwap<(MetaBackupJobId, BackupJobStatus, String)>,
}

impl BackupManager {
Expand Down Expand Up @@ -147,9 +149,10 @@ impl BackupManager {
env,
hummock_manager,
backup_store: ArcSwap::from_pointee(backup_store),
running_backup_job: tokio::sync::Mutex::new(None),
running_job_handle: tokio::sync::Mutex::new(None),
metrics: BackupManagerMetrics::default(),
meta_metrics,
latest_job_info: ArcSwap::from_pointee((0, BackupJobStatus::NotFound, "".into())),
}
}

Expand Down Expand Up @@ -181,7 +184,7 @@ impl BackupManager {
/// Starts a backup job in background. It's non-blocking.
/// Returns job id.
pub async fn start_backup_job(self: &Arc<Self>) -> MetaResult<MetaBackupJobId> {
let mut guard = self.running_backup_job.lock().await;
let mut guard = self.running_job_handle.lock().await;
if let Some(job) = (*guard).as_ref() {
bail!(format!(
"concurrent backup job is not supported: existent job {}",
Expand Down Expand Up @@ -213,6 +216,8 @@ impl BackupManager {
.id_gen_manager()
.generate::<{ IdCategory::Backup }>()
.await?;
self.latest_job_info
.store(Arc::new((job_id, BackupJobStatus::Running, "".into())));
let hummock_version_safe_point = self.hummock_manager.register_safe_point().await;
// Ideally `BackupWorker` and its r/w IO can be made external to meta node.
// The justification of keeping `BackupWorker` in meta node are:
Expand All @@ -227,27 +232,12 @@ impl BackupManager {
Ok(job_id)
}

pub async fn get_backup_job_status(
&self,
job_id: MetaBackupJobId,
) -> MetaResult<BackupJobStatus> {
if let Some(running_job) = self.running_backup_job.lock().await.as_ref() {
if running_job.job_id == job_id {
return Ok(BackupJobStatus::Running);
}
}
if self
.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
.any(|m| m.id == job_id)
{
return Ok(BackupJobStatus::Succeeded);
pub fn get_backup_job_status(&self, job_id: MetaBackupJobId) -> (BackupJobStatus, String) {
let last = self.latest_job_info.load();
if last.0 == job_id {
return (last.1, last.2.clone());
}
Ok(BackupJobStatus::NotFound)
(BackupJobStatus::NotFound, "".into())
}

async fn finish_backup_job(&self, job_id: MetaBackupJobId, job_result: BackupJobResult) {
Expand All @@ -269,16 +259,24 @@ impl BackupManager {
id: self.backup_store.load().0.manifest().manifest_id,
}),
);
self.latest_job_info.store(Arc::new((
job_id,
BackupJobStatus::Succeeded,
"".into(),
)));
}
BackupJobResult::Failed(e) => {
self.metrics.job_latency_failure.observe(job_latency);
tracing::warn!("failed backup job {}: {}", job_id, e);
let message = format!("failed backup job {}: {}", job_id, e);
tracing::warn!(message);
self.latest_job_info
.store(Arc::new((job_id, BackupJobStatus::Failed, message)));
}
}
}

async fn take_job_handle_by_job_id(&self, job_id: u64) -> Option<BackupJobHandle> {
let mut guard = self.running_backup_job.lock().await;
let mut guard = self.running_job_handle.lock().await;
match (*guard).as_ref() {
None => {
return None;
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/rpc/service/backup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ impl BackupService for BackupServiceImpl {
request: Request<GetBackupJobStatusRequest>,
) -> Result<Response<GetBackupJobStatusResponse>, Status> {
let job_id = request.into_inner().job_id;
let job_status = self.backup_manager.get_backup_job_status(job_id).await? as _;
let (job_status, message) = self.backup_manager.get_backup_job_status(job_id);
Ok(Response::new(GetBackupJobStatusResponse {
job_id,
job_status,
job_status: job_status as _,
message,
}))
}

Expand Down
4 changes: 2 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,10 @@ impl MetaClient {
Ok(resp.job_id)
}

pub async fn get_backup_job_status(&self, job_id: u64) -> Result<BackupJobStatus> {
pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
let req = GetBackupJobStatusRequest { job_id };
let resp = self.inner.get_backup_job_status(req).await?;
Ok(resp.job_status())
Ok((resp.job_status(), resp.message))
}

pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
Expand Down

0 comments on commit 1ff58bc

Please sign in to comment.