Skip to content

Commit

Permalink
refactor(webserver): extract JobController (#1993)
Browse files Browse the repository at this point in the history
* refactor(webserver): extract JobController

* update

* using randomized cronls

* randomize

* update

* simplify register/register_public implementation

* support running with TABBY_WEBSERVER_CONTROLLER_ONESHOT

* update
  • Loading branch information
wsxiaoys authored Apr 28, 2024
1 parent 8038e82 commit 602cf5e
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 213 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/tabby-scheduler/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl RepositoryExt for RepositoryConfig {
if code != 0 {
warn!(
"Failed to clone `{}`. Please check your repository configuration.",
&self.git_url
self.canonical_git_url()
);
fs::remove_dir_all(&dir).expect("Failed to remove directory");
}
Expand Down
1 change: 1 addition & 0 deletions ee/tabby-webserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tabby-search = { path = "../tabby-search" }
octocrab = "0.38.0"
fs_extra = "1.3.0"
gitlab = "0.1610.0"
rand = "0.8.5"

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
229 changes: 229 additions & 0 deletions ee/tabby-webserver/src/cron/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use std::{pin::Pin, sync::Arc, time::Duration};

use futures::Future;
use juniper::ID;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, warn};

use crate::schema::job::JobService;

pub struct JobController {
scheduler: JobScheduler,
service: Arc<dyn JobService>,
is_oneshot: bool,
}

impl JobController {
pub async fn new(service: Arc<dyn JobService>) -> Self {
service.cleanup().await.expect("failed to cleanup jobs");
let scheduler = JobScheduler::new()
.await
.expect("failed to create job scheduler");
let is_oneshot = std::env::var("TABBY_WEBSERVER_CONTROLLER_ONESHOT").is_ok();
if is_oneshot {
warn!(
"Running controller job as oneshot, this should only be used for debugging purpose..."
);
}
Self {
scheduler,
service,
is_oneshot,
}
}

pub async fn run(&self) {
self.scheduler
.start()
.await
.expect("failed to start job scheduler")
}

/// Register a new job with the scheduler, the job will be displayed in Jobs dashboard.
pub async fn register_public<T>(&mut self, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
self.register_impl(true, name, schedule, func).await;
}

/// Register a new job with the scheduler, the job will NOT be displayed in Jobs dashboard.
pub async fn register<T>(&mut self, name: &str, schedule: &str, func: T)
where
T: FnMut() -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
self.register_impl(false, name, schedule, move |_| {
let mut func = func.clone();
Box::pin(async move {
func().await?;
Ok(0)
})
})
.await;
}

async fn register_impl<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
if self.is_oneshot {
self.run_oneshot(is_public, name, func).await;
} else {
self.run_schedule(is_public, name, schedule, func).await;
};
}

async fn run_oneshot<T>(&self, is_public: bool, name: &str, mut func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let name = name.to_owned();
let context = JobContext::new(is_public, &name, self.service.clone()).await;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;

match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;
}
}
});
}

async fn run_schedule<T>(&mut self, is_public: bool, name: &str, schedule: &str, func: T)
where
T: FnMut(&JobContext) -> Pin<Box<dyn Future<Output = anyhow::Result<i32>> + Send>>
+ Send
+ Sync
+ Clone
+ 'static,
{
let job_mutex = Arc::new(tokio::sync::Mutex::new(()));
let service = self.service.clone();
let name = name.to_owned();
let func = func.clone();
let job = Job::new_async(schedule, move |uuid, mut scheduler| {
let job_mutex = job_mutex.clone();
let service = service.clone();
let name = name.clone();
let mut func = func.clone();
Box::pin(async move {
let Ok(_guard) = job_mutex.try_lock() else {
warn!("Job `{}` overlapped, skipping...", name);
return;
};

debug!("Running job `{}`", name);

let context = JobContext::new(is_public, &name, service.clone()).await;
match func(&context).await {
Ok(exit_code) => {
debug!("Job `{}` completed with exit code {}", &name, exit_code);
context.complete(exit_code).await;
}
Err(e) => {
warn!("Job `{}` failed: {}", &name, e);
context.complete(-1).await;
}
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
debug!(
"Next time for job `{}` is {:?}",
&name,
next_tick.with_timezone(&chrono::Local)
);
}
})
})
.expect("failed to create job");

self.scheduler.add(job).await.expect("failed to add job");
}
}

#[derive(Clone)]
pub struct JobContext {
id: ID,
service: Arc<dyn JobService>,
}

impl JobContext {
async fn new(public: bool, name: &str, service: Arc<dyn JobService>) -> Self {
let id = if public {
service
.start(name.to_owned())
.await
.expect("failed to create job")
} else {
ID::from("".to_owned())
};
Self { id, service }
}

fn is_private(&self) -> bool {
self.id.is_empty()
}

pub async fn stdout_writeline(&self, stdout: String) {
if self.is_private() {
return;
}

let stdout = stdout + "\n";
match self.service.update_stdout(&self.id, stdout).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to write stdout to job `{}`", self.id);
}
}
}

pub async fn stderr_writeline(&self, stderr: String) {
if self.is_private() {
return;
}

let stderr = stderr + "\n";
match self.service.update_stderr(&self.id, stderr).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to write stderr to job `{}`", self.id);
}
}
}

async fn complete(&self, exit_code: i32) {
if self.is_private() {
return;
}

match self.service.complete(&self.id, exit_code).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to complete job `{}`", self.id);
}
}
}
}
44 changes: 34 additions & 10 deletions ee/tabby-webserver/src/cron/db/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,38 @@ use anyhow::Result;
use chrono::Utc;
use juniper::ID;
use octocrab::{models::Repository, GitHubError, Octocrab};
use tracing::warn;

use crate::schema::repository::{GithubRepositoryProvider, GithubRepositoryService};
use crate::{
cron::controller::JobContext,
schema::repository::{GithubRepositoryProvider, GithubRepositoryService},
};

pub async fn refresh_all_repositories(service: Arc<dyn GithubRepositoryService>) -> Result<()> {
pub async fn refresh_all_repositories(
context: JobContext,
service: Arc<dyn GithubRepositoryService>,
) -> Result<i32> {
for provider in service
.list_providers(vec![], None, None, None, None)
.await?
{
let start = Utc::now();
refresh_repositories_for_provider(service.clone(), provider.id.clone()).await?;
context
.stdout_writeline(format!(
"Refreshing repositories for provider: {}\n",
provider.display_name
))
.await;
refresh_repositories_for_provider(context.clone(), service.clone(), provider.id.clone())
.await?;
service
.delete_outdated_repositories(provider.id, start)
.await?;
}
Ok(())
Ok(0)
}

async fn refresh_repositories_for_provider(
context: JobContext,
service: Arc<dyn GithubRepositoryService>,
provider_id: ID,
) -> Result<()> {
Expand All @@ -36,18 +49,29 @@ async fn refresh_repositories_for_provider(
service
.update_provider_status(provider.id.clone(), false)
.await?;
warn!(
"GitHub credentials for provider {} are expired or invalid",
provider.display_name
);
context
.stderr_writeline(format!(
"GitHub credentials for provider {} are expired or invalid",
provider.display_name
))
.await;
return Err(source.into());
}
Err(e) => {
warn!("Failed to fetch repositories from github: {e}");
context
.stderr_writeline(format!("Failed to fetch repositories from github: {}", e))
.await;
return Err(e.into());
}
};
for repo in repos {
context
.stdout_writeline(format!(
"Importing: {}",
repo.full_name.as_deref().unwrap_or(&repo.name)
))
.await;

let id = repo.id.to_string();
let Some(url) = repo.git_url else {
continue;
Expand Down
Loading

0 comments on commit 602cf5e

Please sign in to comment.