Skip to content

Commit

Permalink
chore: use run_job wrap to simplify running job
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper committed Jan 14, 2025
1 parent c2a655d commit a6a7c34
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 100 deletions.
5 changes: 1 addition & 4 deletions ee/tabby-webserver/src/service/background_job/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use tabby_db::DbConn;
use tabby_schema::{context::ContextService, CoreError};

use super::helper::{Job, JobLogger};
use super::helper::Job;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DbMaintainanceJob;
Expand All @@ -19,9 +19,7 @@ impl DbMaintainanceJob {
now: DateTime<Utc>,
context: Arc<dyn ContextService>,
db: DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);
let mut exit_code = 0;

if let Err(e) = db.delete_expired_token().await {
Expand Down Expand Up @@ -64,7 +62,6 @@ impl DbMaintainanceJob {
logkit::warn!(exit_code = exit_code; "Failed to run data retention job: {}", e);
}

logger.finalize().await;
if exit_code == 0 {
Ok(())
} else {
Expand Down
7 changes: 1 addition & 6 deletions ee/tabby-webserver/src/service/background_job/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tabby_index::public::CodeIndexer;
use tabby_inference::Embedding;
use tabby_schema::{job::JobService, repository::GitRepositoryService};

use super::{helper::Job, BackgroundJobEvent, JobLogger};
use super::{helper::Job, BackgroundJobEvent};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SchedulerGitJob {
Expand Down Expand Up @@ -41,15 +41,11 @@ impl SchedulerGitJob {
_now: DateTime<Utc>,
git_repository: Arc<dyn GitRepositoryService>,
job: Arc<dyn JobService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);
let repositories = match git_repository.repository_list().await {
Ok(repos) => repos,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err);
logger.finalize().await;
return Err(err);
}
};
Expand All @@ -65,7 +61,6 @@ impl SchedulerGitJob {
.await;
}

logger.finalize().await;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::Serialize;
use tabby_index::public::{run_index_garbage_collection, CodeIndexer};
use tabby_schema::{context::ContextService, repository::RepositoryService};

use super::helper::{Job, JobLogger};
use super::helper::Job;

#[derive(Serialize)]
pub struct IndexGarbageCollection;
Expand All @@ -18,17 +18,12 @@ impl IndexGarbageCollection {
self,
repository: Arc<dyn RepositoryService>,
context: Arc<dyn ContextService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);

// Run garbage collection on the index
let sources = match context.read(None).await {
Ok(sources) => sources,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list sources: {}", err);
logger.finalize().await;
return Err(err);
}
};
Expand All @@ -40,7 +35,6 @@ impl IndexGarbageCollection {

if let Err(e) = run_index_garbage_collection(sources) {
logkit::warn!(exit_code = -1; "Failed to run index garbage collection: {}", e);
logger.finalize().await;
return Err(e.into());
}

Expand All @@ -49,14 +43,12 @@ impl IndexGarbageCollection {
Ok(repos) => repos,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err);
logger.finalize().await;
return Err(err);
}
};
let mut code = CodeIndexer::default();
code.garbage_collection(&repositories).await;

logger.finalize().await;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tabby_schema::{
notification::{NotificationRecipient, NotificationService},
};

use super::helper::{Job, JobLogger};
use super::helper::Job;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LicenseCheckJob;
Expand All @@ -21,16 +21,11 @@ impl LicenseCheckJob {
_now: DateTime<Utc>,
license_service: Arc<dyn LicenseService>,
notification_service: Arc<dyn NotificationService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);

let license = match license_service.read().await {
Ok(license) => license,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to read license: {}", err);
logger.finalize().await;
return Err(err);
}
};
Expand All @@ -47,13 +42,11 @@ impl LicenseCheckJob {
.await
{
logkit::warn!(exit_code = -1; "Failed to create notification: {}", e);
logger.finalize().await;
return Err(e);
}
}
}

logger.finalize().await;
Ok(())
}
}
Expand Down
138 changes: 76 additions & 62 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ Please check the log at [Jobs Detail](/jobs/detail?id={}) to identify the underl
),
)
.await
.unwrap();
.map_err(|err| {
warn!("Failed to send notification: {:?}", err);
})
.ok();
}};
}

Expand Down Expand Up @@ -162,7 +165,7 @@ pub async fn start(
}
BackgroundJobEvent::IndexGarbageCollection => {
let job = IndexGarbageCollection;
job.run(repository_service.clone(), context_service.clone(), db.clone(), job_id).await
job.run(repository_service.clone(), context_service.clone()).await
}
};
debug!("Background job {} completed", job.id);
Expand All @@ -180,72 +183,55 @@ pub async fn start(
}
},
Some(now) = hourly.next() => {
let job_id = match db.create_job_run(DbMaintainanceJob.name().to_string(), DbMaintainanceJob.to_command()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, DbMaintainanceJob.name(), job_id);
}
run_job(
&db,
DbMaintainanceJob.name(),
&DbMaintainanceJob.to_command(),
|| DbMaintainanceJob::cron(now, context_service.clone(), db.clone()),
notification_service.clone(),
).await;

let job_id = match db.create_job_run(SchedulerGitJob::NAME.to_string(), "cron".to_string()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, SchedulerGitJob::NAME, job_id);
}

let job_id = match db.create_job_run(SyncIntegrationJob::NAME.to_string(), "cron".to_string()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, SyncIntegrationJob::NAME, job_id);
}
run_job(
&db,
SchedulerGitJob::NAME,
"cron",
|| SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone()),
notification_service.clone(),
).await;

let job_id = match db.create_job_run(SchedulerGithubGitlabJob::NAME.to_string(), "cron".to_string()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, SchedulerGithubGitlabJob::NAME, job_id);
}
run_job(
&db,
SyncIntegrationJob::NAME,
"cron",
|| SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone()),
notification_service.clone(),
).await;

let job_id = match db.create_job_run(IndexGarbageCollection.name().to_string(), IndexGarbageCollection.to_command()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = IndexGarbageCollection.run(repository_service.clone(), context_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, IndexGarbageCollection.name(), job_id);
}
run_job(
&db,
SchedulerGithubGitlabJob::NAME,
"cron",
|| SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone()),
notification_service.clone(),
).await;

run_job(
&db,
IndexGarbageCollection.name(),
&IndexGarbageCollection.to_command(),
|| IndexGarbageCollection.run(repository_service.clone(), context_service.clone()),
notification_service.clone(),
).await;
},
Some(now) = daily.next() => {
let job_id = match db.create_job_run(LicenseCheckJob::NAME.to_string(), "cron".to_string()).await {
Ok(job_id) => job_id,
Err(_) => {
warn!("Failed to create job run");
continue;
}
};
if let Err(err) = LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone(), db.clone(), job_id).await {
notify_job_error!(notification_service, err, LicenseCheckJob::NAME, job_id);
}
run_job(
&db,
LicenseCheckJob::NAME,
"cron",
|| LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone()),
notification_service.clone(),
).await;
}
else => {
warn!("Background job channel closed");
Expand All @@ -255,3 +241,31 @@ pub async fn start(
}
});
}

async fn run_job<F, Fut>(
db: &DbConn,
job_name: &str,
command: &str,
job_fn: F,
notification_service: Arc<dyn NotificationService>,
) where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = tabby_schema::Result<()>>,
{
let job_id = match db
.create_job_run(job_name.to_string(), command.to_string())
.await
{
Ok(job_id) => job_id,
Err(err) => {
warn!("Failed to create job run for {}: {}", job_name, err);
return;
}
};

let logger = JobLogger::new(db.clone(), job_id);
if let Err(err) = job_fn().await {
notify_job_error!(notification_service, err, job_name, job_id);
}
logger.finalize().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tabby_schema::{
};
use tracing::debug;

use super::{helper::Job, BackgroundJobEvent, JobLogger};
use super::{helper::Job, BackgroundJobEvent};

mod error;
mod issues;
Expand Down Expand Up @@ -53,10 +53,7 @@ impl SyncIntegrationJob {
_now: DateTime<Utc>,
integration: Arc<dyn IntegrationService>,
job: Arc<dyn JobService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);
debug!("Syncing all github and gitlab repositories");
let integrations = match integration
.list_integrations(None, None, None, None, None, None)
Expand All @@ -65,7 +62,6 @@ impl SyncIntegrationJob {
Ok(integrations) => integrations,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list integrations: {}", err);
logger.finalize().await;
return Err(err);
}
};
Expand All @@ -78,7 +74,6 @@ impl SyncIntegrationJob {
.await;
}

logger.finalize().await;
Ok(())
}
}
Expand Down Expand Up @@ -241,18 +236,14 @@ impl SchedulerGithubGitlabJob {
_now: DateTime<Utc>,
repository: Arc<dyn ThirdPartyRepositoryService>,
job: Arc<dyn JobService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);
let repositories = match repository
.list_repositories_with_filter(None, None, Some(true), None, None, None, None)
.await
{
Ok(repos) => repos,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err);
logger.finalize().await;
return Err(err);
}
};
Expand All @@ -265,7 +256,6 @@ impl SchedulerGithubGitlabJob {
.await;
}

logger.finalize().await;
Ok(())
}
}
Expand Down

0 comments on commit a6a7c34

Please sign in to comment.