From a6a7c3488ae73191c37f87e21ca357d14837ff62 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Tue, 14 Jan 2025 13:34:22 +0800 Subject: [PATCH] chore: use run_job wrap to simplify running job Signed-off-by: Wei Zhang --- .../src/service/background_job/db.rs | 5 +- .../src/service/background_job/git.rs | 7 +- .../index_garbage_collection.rs | 10 +- .../service/background_job/license_check.rs | 9 +- .../src/service/background_job/mod.rs | 138 ++++++++++-------- .../background_job/third_party_integration.rs | 12 +- 6 files changed, 81 insertions(+), 100 deletions(-) diff --git a/ee/tabby-webserver/src/service/background_job/db.rs b/ee/tabby-webserver/src/service/background_job/db.rs index dcb5e238dfcb..e420c531ec1d 100644 --- a/ee/tabby-webserver/src/service/background_job/db.rs +++ b/ee/tabby-webserver/src/service/background_job/db.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use tabby_db::DbConn; use tabby_schema::{context::ContextService, CoreError}; -use super::helper::{Job, JobLogger}; +use super::helper::Job; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct DbMaintainanceJob; @@ -19,9 +19,7 @@ impl DbMaintainanceJob { now: DateTime, context: Arc, db: DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); let mut exit_code = 0; if let Err(e) = db.delete_expired_token().await { @@ -64,7 +62,6 @@ impl DbMaintainanceJob { logkit::warn!(exit_code = exit_code; "Failed to run data retention job: {}", e); } - logger.finalize().await; if exit_code == 0 { Ok(()) } else { diff --git a/ee/tabby-webserver/src/service/background_job/git.rs b/ee/tabby-webserver/src/service/background_job/git.rs index 959176a53a92..dca3ebcc4a0a 100644 --- a/ee/tabby-webserver/src/service/background_job/git.rs +++ b/ee/tabby-webserver/src/service/background_job/git.rs @@ -8,7 +8,7 @@ use tabby_index::public::CodeIndexer; use tabby_inference::Embedding; use tabby_schema::{job::JobService, repository::GitRepositoryService}; -use super::{helper::Job, BackgroundJobEvent, JobLogger}; +use super::{helper::Job, BackgroundJobEvent}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct SchedulerGitJob { @@ -41,15 +41,11 @@ impl SchedulerGitJob { _now: DateTime, git_repository: Arc, job: Arc, - db: tabby_db::DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); let repositories = match git_repository.repository_list().await { Ok(repos) => repos, Err(err) => { logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err); - logger.finalize().await; return Err(err); } }; @@ -65,7 +61,6 @@ impl SchedulerGitJob { .await; } - logger.finalize().await; Ok(()) } } diff --git a/ee/tabby-webserver/src/service/background_job/index_garbage_collection.rs b/ee/tabby-webserver/src/service/background_job/index_garbage_collection.rs index 5c60cfdff04b..edacdcb48321 100644 --- a/ee/tabby-webserver/src/service/background_job/index_garbage_collection.rs +++ b/ee/tabby-webserver/src/service/background_job/index_garbage_collection.rs @@ -4,7 +4,7 @@ use serde::Serialize; use tabby_index::public::{run_index_garbage_collection, CodeIndexer}; use tabby_schema::{context::ContextService, repository::RepositoryService}; -use super::helper::{Job, JobLogger}; +use super::helper::Job; #[derive(Serialize)] pub struct IndexGarbageCollection; @@ -18,17 +18,12 @@ impl IndexGarbageCollection { self, repository: Arc, context: Arc, - db: tabby_db::DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); - // Run garbage collection on the index let sources = match context.read(None).await { Ok(sources) => sources, Err(err) => { logkit::warn!(exit_code = -1; "Failed to list sources: {}", err); - logger.finalize().await; return Err(err); } }; @@ -40,7 +35,6 @@ impl IndexGarbageCollection { if let Err(e) = run_index_garbage_collection(sources) { logkit::warn!(exit_code = -1; "Failed to run index garbage collection: {}", e); - logger.finalize().await; return Err(e.into()); } @@ -49,14 +43,12 @@ impl IndexGarbageCollection { Ok(repos) => repos, Err(err) => { logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err); - logger.finalize().await; return Err(err); } }; let mut code = CodeIndexer::default(); code.garbage_collection(&repositories).await; - logger.finalize().await; Ok(()) } } diff --git a/ee/tabby-webserver/src/service/background_job/license_check.rs b/ee/tabby-webserver/src/service/background_job/license_check.rs index 1f6e7557fdfe..ef88bbbff8c2 100644 --- a/ee/tabby-webserver/src/service/background_job/license_check.rs +++ b/ee/tabby-webserver/src/service/background_job/license_check.rs @@ -7,7 +7,7 @@ use tabby_schema::{ notification::{NotificationRecipient, NotificationService}, }; -use super::helper::{Job, JobLogger}; +use super::helper::Job; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct LicenseCheckJob; @@ -21,16 +21,11 @@ impl LicenseCheckJob { _now: DateTime, license_service: Arc, notification_service: Arc, - db: tabby_db::DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); - let license = match license_service.read().await { Ok(license) => license, Err(err) => { logkit::warn!(exit_code = -1; "Failed to read license: {}", err); - logger.finalize().await; return Err(err); } }; @@ -47,13 +42,11 @@ impl LicenseCheckJob { .await { logkit::warn!(exit_code = -1; "Failed to create notification: {}", e); - logger.finalize().await; return Err(e); } } } - logger.finalize().await; Ok(()) } } diff --git a/ee/tabby-webserver/src/service/background_job/mod.rs b/ee/tabby-webserver/src/service/background_job/mod.rs index cddc7d37b8bd..18598bf7e8d3 100644 --- a/ee/tabby-webserver/src/service/background_job/mod.rs +++ b/ee/tabby-webserver/src/service/background_job/mod.rs @@ -98,7 +98,10 @@ Please check the log at [Jobs Detail](/jobs/detail?id={}) to identify the underl ), ) .await - .unwrap(); + .map_err(|err| { + warn!("Failed to send notification: {:?}", err); + }) + .ok(); }}; } @@ -162,7 +165,7 @@ pub async fn start( } BackgroundJobEvent::IndexGarbageCollection => { let job = IndexGarbageCollection; - job.run(repository_service.clone(), context_service.clone(), db.clone(), job_id).await + job.run(repository_service.clone(), context_service.clone()).await } }; debug!("Background job {} completed", job.id); @@ -180,72 +183,55 @@ pub async fn start( } }, Some(now) = hourly.next() => { - let job_id = match db.create_job_run(DbMaintainanceJob.name().to_string(), DbMaintainanceJob.to_command()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, DbMaintainanceJob.name(), job_id); - } + run_job( + &db, + DbMaintainanceJob.name(), + &DbMaintainanceJob.to_command(), + || DbMaintainanceJob::cron(now, context_service.clone(), db.clone()), + notification_service.clone(), + ).await; - let job_id = match db.create_job_run(SchedulerGitJob::NAME.to_string(), "cron".to_string()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, SchedulerGitJob::NAME, job_id); - } - let job_id = match db.create_job_run(SyncIntegrationJob::NAME.to_string(), "cron".to_string()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, SyncIntegrationJob::NAME, job_id); - } + run_job( + &db, + SchedulerGitJob::NAME, + "cron", + || SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone()), + notification_service.clone(), + ).await; - let job_id = match db.create_job_run(SchedulerGithubGitlabJob::NAME.to_string(), "cron".to_string()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, SchedulerGithubGitlabJob::NAME, job_id); - } + run_job( + &db, + SyncIntegrationJob::NAME, + "cron", + || SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone()), + notification_service.clone(), + ).await; - let job_id = match db.create_job_run(IndexGarbageCollection.name().to_string(), IndexGarbageCollection.to_command()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = IndexGarbageCollection.run(repository_service.clone(), context_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, IndexGarbageCollection.name(), job_id); - } + run_job( + &db, + SchedulerGithubGitlabJob::NAME, + "cron", + || SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone()), + notification_service.clone(), + ).await; + + run_job( + &db, + IndexGarbageCollection.name(), + &IndexGarbageCollection.to_command(), + || IndexGarbageCollection.run(repository_service.clone(), context_service.clone()), + notification_service.clone(), + ).await; }, Some(now) = daily.next() => { - let job_id = match db.create_job_run(LicenseCheckJob::NAME.to_string(), "cron".to_string()).await { - Ok(job_id) => job_id, - Err(_) => { - warn!("Failed to create job run"); - continue; - } - }; - if let Err(err) = LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone(), db.clone(), job_id).await { - notify_job_error!(notification_service, err, LicenseCheckJob::NAME, job_id); - } + run_job( + &db, + LicenseCheckJob::NAME, + "cron", + || LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone()), + notification_service.clone(), + ).await; } else => { warn!("Background job channel closed"); @@ -255,3 +241,31 @@ pub async fn start( } }); } + +async fn run_job( + db: &DbConn, + job_name: &str, + command: &str, + job_fn: F, + notification_service: Arc, +) where + F: FnOnce() -> Fut, + Fut: std::future::Future>, +{ + let job_id = match db + .create_job_run(job_name.to_string(), command.to_string()) + .await + { + Ok(job_id) => job_id, + Err(err) => { + warn!("Failed to create job run for {}: {}", job_name, err); + return; + } + }; + + let logger = JobLogger::new(db.clone(), job_id); + if let Err(err) = job_fn().await { + notify_job_error!(notification_service, err, job_name, job_id); + } + logger.finalize().await; +} diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 8d8078de5cac..28cd0fa56c7b 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -19,7 +19,7 @@ use tabby_schema::{ }; use tracing::debug; -use super::{helper::Job, BackgroundJobEvent, JobLogger}; +use super::{helper::Job, BackgroundJobEvent}; mod error; mod issues; @@ -53,10 +53,7 @@ impl SyncIntegrationJob { _now: DateTime, integration: Arc, job: Arc, - db: tabby_db::DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); debug!("Syncing all github and gitlab repositories"); let integrations = match integration .list_integrations(None, None, None, None, None, None) @@ -65,7 +62,6 @@ impl SyncIntegrationJob { Ok(integrations) => integrations, Err(err) => { logkit::warn!(exit_code = -1; "Failed to list integrations: {}", err); - logger.finalize().await; return Err(err); } }; @@ -78,7 +74,6 @@ impl SyncIntegrationJob { .await; } - logger.finalize().await; Ok(()) } } @@ -241,10 +236,7 @@ impl SchedulerGithubGitlabJob { _now: DateTime, repository: Arc, job: Arc, - db: tabby_db::DbConn, - job_id: i64, ) -> tabby_schema::Result<()> { - let logger = JobLogger::new(db.clone(), job_id); let repositories = match repository .list_repositories_with_filter(None, None, Some(true), None, None, None, None) .await @@ -252,7 +244,6 @@ impl SchedulerGithubGitlabJob { Ok(repos) => repos, Err(err) => { logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err); - logger.finalize().await; return Err(err); } }; @@ -265,7 +256,6 @@ impl SchedulerGithubGitlabJob { .await; } - logger.finalize().await; Ok(()) } }