From ac34e966e4bb371eaffbd437f93f8d87b76358de Mon Sep 17 00:00:00 2001 From: Eguo Wang Date: Sat, 24 Feb 2024 20:11:18 +0800 Subject: [PATCH] refactor: actor init task and builder completed logic --- builder/src/kaniko.rs | 5 +++ builder/src/kpack.rs | 5 +++ builder/src/lib.rs | 6 +++ builder/src/lifecycle.rs | 5 +++ workflow/src/actor/build.rs | 87 ++++++++----------------------------- workflow/src/actor/init.rs | 61 ++++++++++++++++++++------ 6 files changed, 87 insertions(+), 82 deletions(-) diff --git a/builder/src/kaniko.rs b/builder/src/kaniko.rs index 9781476..a4cf2eb 100644 --- a/builder/src/kaniko.rs +++ b/builder/src/kaniko.rs @@ -55,4 +55,9 @@ impl Builder for KanikoBuilder { Ok(()) } + + #[inline] + async fn completed(&self) -> Result { + job::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError) + } } diff --git a/builder/src/kpack.rs b/builder/src/kpack.rs index 3e3d8aa..31b051c 100644 --- a/builder/src/kpack.rs +++ b/builder/src/kpack.rs @@ -68,6 +68,11 @@ impl Builder for KpackBuilder { Ok(()) } + + #[inline] + async fn completed(&self) -> Result { + image::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError) + } } impl KpackBuilder { diff --git a/builder/src/lib.rs b/builder/src/lib.rs index 9d68e83..977f2c8 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -30,6 +30,7 @@ use async_trait::async_trait; #[async_trait] pub trait Builder: Send + Sync { async fn build(&self) -> Result<()>; + async fn completed(&self) -> Result; } /// Build director, it's a strategy pattern implementation @@ -52,6 +53,11 @@ impl BuildDirector { pub async fn build(&self) -> Result<()> { self.builder.build().await } + + /// Check if the build is completed + pub async fn completed(&self) -> Result { + self.builder.completed().await + } } #[cfg(test)] diff --git a/builder/src/lifecycle.rs b/builder/src/lifecycle.rs index dac6a46..d38bda3 100644 --- a/builder/src/lifecycle.rs +++ b/builder/src/lifecycle.rs @@ -55,4 +55,9 @@ impl Builder for LifecycleBuilder { Ok(()) } + + #[inline] + async fn completed(&self) -> Result { + job::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError) + } } diff --git a/workflow/src/actor/build.rs b/workflow/src/actor/build.rs index eb0711d..5faef24 100644 --- a/workflow/src/actor/build.rs +++ b/workflow/src/actor/build.rs @@ -19,12 +19,10 @@ use crate::errors::{Error, Result}; use crate::{Context, Intent, State, Task}; use amp_builder::{BuildDirector, KanikoBuilder, KpackBuilder}; -use amp_common::docker::{self, registry, DockerConfig}; use amp_common::resource::{Actor, ActorState}; use amp_common::schema::BuildMethod; -use amp_resources::kpack::image; -use amp_resources::{actor, job}; +use amp_resources::actor; use async_trait::async_trait; use kube::runtime::controller::Action; use kube::ResourceExt; @@ -65,74 +63,12 @@ impl Task for BuildTask { ctx.object.status.as_ref().is_some_and(|status| status.building()) } - /// Execute the task logic for BuildTask using shared data + /// Execute the task logic for BuildTask using shared data. async fn execute(&self, ctx: &Context) -> Result>> { let actor = &ctx.object; - - // build if actor is live or the image is not built, else skip to next state - if actor.spec.live || !self.built(ctx).await? { - self.build(ctx).await?; - - let build = actor.spec.character.build.clone().unwrap_or_default(); - match build.method() { - BuildMethod::Dockerfile => { - // [lifecycle] Check if the build job is completed and wait for it to finish. - if !job::completed(&ctx.k8s, &ctx.object).await.map_err(Error::ResourceError)? { - info!("Build job is not completed yet, wait for it to finish"); - return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5))))); - } - } - BuildMethod::Buildpacks => { - // [kpack] Check if the image is completed and wait for it to ready. - if !image::completed(&ctx.k8s, &ctx.object).await.map_err(Error::ResourceError)? { - info!("kpack Image is not completed yet, wait for it to finish"); - return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5))))); - } - } - }; - } - - // patch the status to running - let condition = ActorState::running(true, "AutoRun", None); - actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?; - - Ok(None) - } -} - -impl BuildTask { - /// Check if the image is already built - async fn built(&self, ctx: &Context) -> Result { - let image = &ctx.object.spec.image; - - let credentials = ctx.credentials.read().await; - let config = DockerConfig::from(&credentials.registries); - - let credential = match docker::get_credential(&config, image) { - Ok(credential) => Some(credential), - Err(err) => { - error!("Error handling docker configuration: {}", err); - None - } - }; - - if registry::exists(image, credential).await.map_err(Error::DockerRegistryError)? { - info!("The images already exists"); - return Ok(true); - } - - Ok(false) - } - - /// Generate `Builder` based on the build strategy - /// - /// The source code can be local or remote, - /// and the build method can be dockerfile or buildpacks, - /// and build frequency can be once or live. - /// - async fn build(&self, ctx: &Context) -> Result<()> { - let actor = &ctx.object; let build = actor.spec.character.build.clone().unwrap_or_default(); + + // Generate `Builder` based on the build method let builder = match build.method() { BuildMethod::Dockerfile => { info!("Found dockerfile, build it with Kaniko"); @@ -144,6 +80,19 @@ impl BuildTask { } }; - builder.build().await.map_err(Error::BuildError) + // Build the image + builder.build().await.map_err(Error::BuildError)?; + + // Check if the build is completed and wait for it to finish. + if !builder.completed().await.map_err(Error::BuildError)? { + info!("Build job is not completed yet, wait for it to finish"); + return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5))))); + } + + // Patch the status to running + let condition = ActorState::running(true, "AutoRun", None); + actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?; + + Ok(None) } } diff --git a/workflow/src/actor/init.rs b/workflow/src/actor/init.rs index d93eefc..672ecbd 100644 --- a/workflow/src/actor/init.rs +++ b/workflow/src/actor/init.rs @@ -15,14 +15,15 @@ use crate::errors::{Error, Result}; use crate::{Context, Intent, State, Task}; +use amp_common::docker::{self, registry, DockerConfig}; use amp_common::resource::{Actor, ActorState}; use amp_resources::actor; use async_trait::async_trait; use kube::ResourceExt; -use tracing::{error, trace}; +use tracing::{error, info, trace}; -use super::BuildingState; +use super::{BuildingState, DeployingState}; pub struct InitialState; @@ -34,16 +35,16 @@ impl State for InitialState { // Check if InitTask should be executed let task = InitTask::new(); - if task.matches(ctx) { - match task.execute(ctx).await { - Ok(Some(intent)) => return Some(intent), - Err(err) => error!("Error during InitTask execution: {}", err), - Ok(None) => {} - } + if !task.matches(ctx) { + return None; + } + + let result = task.execute(ctx).await; + if let Err(err) = &result { + error!("Error during InitTask execution: {}", err); } - // Transition to the next state if needed - Some(Intent::State(Box::new(BuildingState))) + result.ok().and_then(|intent| intent) } } @@ -61,9 +62,43 @@ impl Task for InitTask { /// Execute the task logic for InitTask using shared data async fn execute(&self, ctx: &Context) -> Result>> { - let condition = ActorState::building(); - actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?; + let actor = &ctx.object; + + // build if actor is live or the image is not built, else skip to next state + if actor.spec.live || !self.built(ctx).await? { + let condition = ActorState::building(); + actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?; + Ok(Some(Intent::State(Box::new(BuildingState)))) + } else { + // patch the status to running + let condition = ActorState::running(true, "AutoRun", None); + actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?; + Ok(Some(Intent::State(Box::new(DeployingState)))) + } + } +} + +impl InitTask { + /// Check if the image is already built + async fn built(&self, ctx: &Context) -> Result { + let image = &ctx.object.spec.image; + + let credentials = ctx.credentials.read().await; + let config = DockerConfig::from(&credentials.registries); + + let credential = match docker::get_credential(&config, image) { + Ok(credential) => Some(credential), + Err(err) => { + error!("Error handling docker configuration: {}", err); + None + } + }; + + if registry::exists(image, credential).await.map_err(Error::DockerRegistryError)? { + info!("The images already exists"); + return Ok(true); + } - Ok(None) + Ok(false) } }