From 1ff58bc53f445d3f01a1ddaab5f4a92f5b337ea9 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Tue, 26 Sep 2023 16:52:36 +0800 Subject: [PATCH] refactor(backup): refine error message (#12388) --- proto/backup_service.proto | 6 +-- src/ctl/src/cmd_impl/meta/backup_meta.rs | 22 +++++++-- src/meta/src/backup_restore/backup_manager.rs | 48 +++++++++---------- src/meta/src/rpc/service/backup_service.rs | 5 +- src/rpc_client/src/meta_client.rs | 4 +- 5 files changed, 47 insertions(+), 38 deletions(-) diff --git a/proto/backup_service.proto b/proto/backup_service.proto index 425d3abb24e2f..feca5f17b7dc3 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -13,11 +13,8 @@ enum BackupJobStatus { UNSPECIFIED = 0; RUNNING = 1; SUCCEEDED = 2; - // NOT_FOUND indicates one of these cases: - // - Invalid job id. - // - Job has failed. - // - Job has succeeded, but its resulted backup has been deleted later. NOT_FOUND = 3; + FAILED = 4; } message BackupMetaRequest {} message BackupMetaResponse { @@ -29,6 +26,7 @@ message GetBackupJobStatusRequest { message GetBackupJobStatusResponse { uint64 job_id = 1; BackupJobStatus job_status = 2; + string message = 3; } message DeleteMetaSnapshotRequest { repeated uint64 snapshot_ids = 1; diff --git a/src/ctl/src/cmd_impl/meta/backup_meta.rs b/src/ctl/src/cmd_impl/meta/backup_meta.rs index 77c7f0edb7ca2..3238e22b35050 100644 --- a/src/ctl/src/cmd_impl/meta/backup_meta.rs +++ b/src/ctl/src/cmd_impl/meta/backup_meta.rs @@ -22,21 +22,33 @@ pub async fn backup_meta(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let job_id = meta_client.backup_meta().await?; loop { - let job_status = meta_client.get_backup_job_status(job_id).await?; + let (job_status, message) = meta_client.get_backup_job_status(job_id).await?; match job_status { BackupJobStatus::Running => { - tracing::info!("backup job is still running: job {}", job_id); + tracing::info!("backup job is still running: job {}, {}", job_id, message); tokio::time::sleep(Duration::from_secs(1)).await; } BackupJobStatus::Succeeded => { + tracing::info!("backup job succeeded: job {}, {}", job_id, message); break; } - _ => { - return Err(anyhow::anyhow!("backup job failed: job {}", job_id)); + BackupJobStatus::NotFound => { + return Err(anyhow::anyhow!( + "backup job status not found: job {}, {}", + job_id, + message + )); } + BackupJobStatus::Failed => { + return Err(anyhow::anyhow!( + "backup job failed: job {}, {}", + job_id, + message + )); + } + _ => unreachable!("unknown backup job status"), } } - tracing::info!("backup job succeeded: job {}", job_id); Ok(()) } diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index c280572c796d4..819ea02e36346 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -68,9 +68,11 @@ pub struct BackupManager { hummock_manager: HummockManagerRef, backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>, /// Tracks the running backup job. Concurrent jobs is not supported. - running_backup_job: tokio::sync::Mutex>, + running_job_handle: tokio::sync::Mutex>, metrics: BackupManagerMetrics, meta_metrics: Arc, + /// (job id, status, message) + latest_job_info: ArcSwap<(MetaBackupJobId, BackupJobStatus, String)>, } impl BackupManager { @@ -147,9 +149,10 @@ impl BackupManager { env, hummock_manager, backup_store: ArcSwap::from_pointee(backup_store), - running_backup_job: tokio::sync::Mutex::new(None), + running_job_handle: tokio::sync::Mutex::new(None), metrics: BackupManagerMetrics::default(), meta_metrics, + latest_job_info: ArcSwap::from_pointee((0, BackupJobStatus::NotFound, "".into())), } } @@ -181,7 +184,7 @@ impl BackupManager { /// Starts a backup job in background. It's non-blocking. /// Returns job id. pub async fn start_backup_job(self: &Arc) -> MetaResult { - let mut guard = self.running_backup_job.lock().await; + let mut guard = self.running_job_handle.lock().await; if let Some(job) = (*guard).as_ref() { bail!(format!( "concurrent backup job is not supported: existent job {}", @@ -213,6 +216,8 @@ impl BackupManager { .id_gen_manager() .generate::<{ IdCategory::Backup }>() .await?; + self.latest_job_info + .store(Arc::new((job_id, BackupJobStatus::Running, "".into()))); let hummock_version_safe_point = self.hummock_manager.register_safe_point().await; // Ideally `BackupWorker` and its r/w IO can be made external to meta node. // The justification of keeping `BackupWorker` in meta node are: @@ -227,27 +232,12 @@ impl BackupManager { Ok(job_id) } - pub async fn get_backup_job_status( - &self, - job_id: MetaBackupJobId, - ) -> MetaResult { - if let Some(running_job) = self.running_backup_job.lock().await.as_ref() { - if running_job.job_id == job_id { - return Ok(BackupJobStatus::Running); - } - } - if self - .backup_store - .load() - .0 - .manifest() - .snapshot_metadata - .iter() - .any(|m| m.id == job_id) - { - return Ok(BackupJobStatus::Succeeded); + pub fn get_backup_job_status(&self, job_id: MetaBackupJobId) -> (BackupJobStatus, String) { + let last = self.latest_job_info.load(); + if last.0 == job_id { + return (last.1, last.2.clone()); } - Ok(BackupJobStatus::NotFound) + (BackupJobStatus::NotFound, "".into()) } async fn finish_backup_job(&self, job_id: MetaBackupJobId, job_result: BackupJobResult) { @@ -269,16 +259,24 @@ impl BackupManager { id: self.backup_store.load().0.manifest().manifest_id, }), ); + self.latest_job_info.store(Arc::new(( + job_id, + BackupJobStatus::Succeeded, + "".into(), + ))); } BackupJobResult::Failed(e) => { self.metrics.job_latency_failure.observe(job_latency); - tracing::warn!("failed backup job {}: {}", job_id, e); + let message = format!("failed backup job {}: {}", job_id, e); + tracing::warn!(message); + self.latest_job_info + .store(Arc::new((job_id, BackupJobStatus::Failed, message))); } } } async fn take_job_handle_by_job_id(&self, job_id: u64) -> Option { - let mut guard = self.running_backup_job.lock().await; + let mut guard = self.running_job_handle.lock().await; match (*guard).as_ref() { None => { return None; diff --git a/src/meta/src/rpc/service/backup_service.rs b/src/meta/src/rpc/service/backup_service.rs index 22897d8bb770e..d83b4d0d1e8e4 100644 --- a/src/meta/src/rpc/service/backup_service.rs +++ b/src/meta/src/rpc/service/backup_service.rs @@ -49,10 +49,11 @@ impl BackupService for BackupServiceImpl { request: Request, ) -> Result, Status> { let job_id = request.into_inner().job_id; - let job_status = self.backup_manager.get_backup_job_status(job_id).await? as _; + let (job_status, message) = self.backup_manager.get_backup_job_status(job_id); Ok(Response::new(GetBackupJobStatusResponse { job_id, - job_status, + job_status: job_status as _, + message, })) } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 357cd4cf37f1d..35f69aaa74fbd 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -936,10 +936,10 @@ impl MetaClient { Ok(resp.job_id) } - pub async fn get_backup_job_status(&self, job_id: u64) -> Result { + pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> { let req = GetBackupJobStatusRequest { job_id }; let resp = self.inner.get_backup_job_status(req).await?; - Ok(resp.job_status()) + Ok((resp.job_status(), resp.message)) } pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {