From 17bb51e7986db35e576e6422d7d5127e0d5f65ca Mon Sep 17 00:00:00 2001 From: Eguo Wang Date: Sat, 24 Feb 2024 21:58:07 +0800 Subject: [PATCH] refactor: extract try_init_* kpack methods into prepare method --- Cargo.lock | 16 +++++----- Cargo.toml | 2 +- builder/src/kaniko.rs | 7 +++- builder/src/kpack.rs | 60 +++++++++++++++++++++++++++-------- builder/src/lib.rs | 8 +++++ builder/src/lifecycle.rs | 7 +++- resources/src/kpack/syncer.rs | 32 ++++++++++++++++--- workflow/src/actor/build.rs | 5 +++ 8 files changed, 108 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3771f03..7ad1a03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,7 +62,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "amp-resources", @@ -89,7 +89,7 @@ dependencies = [ [[package]] name = "amp-builder" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "amp-resources", @@ -131,7 +131,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "amp-resources", @@ -152,7 +152,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "clap", @@ -164,7 +164,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "amp-resources", @@ -177,7 +177,7 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "anyhow", @@ -197,7 +197,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-common", "async-nats", @@ -213,7 +213,7 @@ dependencies = [ [[package]] name = "amp-workflow" -version = "0.8.19" +version = "0.8.20" dependencies = [ "amp-builder", "amp-common", diff --git a/Cargo.toml b/Cargo.toml index 09f43b0..6babc99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.8.19" +version = "0.8.20" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" diff --git a/builder/src/kaniko.rs b/builder/src/kaniko.rs index a4cf2eb..725d957 100644 --- a/builder/src/kaniko.rs +++ b/builder/src/kaniko.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{errors::Error, Builder, Result}; @@ -36,6 +36,11 @@ impl KanikoBuilder { #[async_trait] impl Builder for KanikoBuilder { + // initialize the some resources before building + async fn prepare(&self) -> Result> { + Ok(None) // No need to wait + } + async fn build(&self) -> Result<()> { let name = format!("{}-builder", &self.actor.spec.name); let pod = kaniko::pod(&self.actor).map_err(Error::ResourceError)?; diff --git a/builder/src/kpack.rs b/builder/src/kpack.rs index 31b051c..18da266 100644 --- a/builder/src/kpack.rs +++ b/builder/src/kpack.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{errors::Error, Builder, Result}; @@ -45,13 +45,29 @@ impl KpackBuilder { #[async_trait] impl Builder for KpackBuilder { - async fn build(&self) -> Result<()> { - // initialize the some resources before building + // initialize the some resources before building + async fn prepare(&self) -> Result> { self.try_init_pvc().await.map_err(Error::ResourceError)?; - self.try_init_syncer().await.map_err(Error::ResourceError)?; - self.try_init_buildpack().await.map_err(Error::ResourceError)?; - self.try_init_builder().await.map_err(Error::ResourceError)?; + // Check if the syncer is ready + if let Some(duration) = self.try_init_syncer().await.map_err(Error::ResourceError)? { + return Ok(Some(duration)); + } + + // Check if the buildpacks are ready + if let Some(duration) = self.try_init_buildpack().await.map_err(Error::ResourceError)? { + return Ok(Some(duration)); + } + + // Check if the builder is ready + if let Some(duration) = self.try_init_builder().await.map_err(Error::ResourceError)? { + return Ok(Some(duration)); + } + + Ok(None) + } + + async fn build(&self) -> Result<()> { // Build or update the Image let name = format!("{}-builder", &self.actor.spec.name); match image::exists(&self.k8s, &self.actor).await.map_err(Error::ResourceError)? { @@ -88,29 +104,33 @@ impl KpackBuilder { Ok(()) } - async fn try_init_buildpack(&self) -> Result<(), amp_resources::error::Error> { + async fn try_init_buildpack(&self) -> Result, amp_resources::error::Error> { let buildpacks = self.actor.spec.character.buildpacks(); if buildpacks.is_none() { - return Ok(()); + return Ok(None); } for buildpack in buildpacks.unwrap() { if !cluster_buildpack::exists(&self.k8s, buildpack).await? { cluster_buildpack::create(&self.k8s, buildpack).await?; } + + if !cluster_buildpack::ready(&self.k8s, buildpack).await? { + return Ok(Some(Duration::from_secs(5))); // wait for the ClusterBuildpack to be ready + } } - Ok(()) + Ok(None) } - async fn try_init_builder(&self) -> Result<(), amp_resources::error::Error> { + async fn try_init_builder(&self) -> Result, amp_resources::error::Error> { if !cluster_builder::exists(&self.k8s, &self.actor).await? { if !cluster_store::exists(&self.k8s, &self.actor).await? { cluster_store::create(&self.k8s, &self.actor).await?; } if !cluster_store::ready(&self.k8s, &self.actor).await? { - return Ok(()); // wait for the ClusterStore to be ready + return Ok(Some(Duration::from_secs(5))); // wait for the ClusterStore to be ready } let mut order = Vec::new(); @@ -143,14 +163,26 @@ impl KpackBuilder { cluster_builder::create(&self.k8s, &self.actor, &tag, order).await?; } - Ok(()) + if !cluster_builder::ready(&self.k8s, &self.actor).await? { + return Ok(Some(Duration::from_secs(5))); // wait for the ClusterBuilder to be ready + } + + Ok(None) } - async fn try_init_syncer(&self) -> Result<(), amp_resources::error::Error> { + async fn try_init_syncer(&self) -> Result, amp_resources::error::Error> { + if !self.actor.spec.live { + return Ok(None); + } + if !syncer::exists(&self.k8s, &self.actor).await? { syncer::create(&self.k8s, &self.actor).await?; } - Ok(()) + if !syncer::ready(&self.k8s, &self.actor).await? { + return Ok(Some(Duration::from_secs(5))); // wait for the Syncer to be ready + } + + Ok(None) } } diff --git a/builder/src/lib.rs b/builder/src/lib.rs index 977f2c8..10b8041 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -13,6 +13,8 @@ // limitations under the License. mod lifecycle; +use std::time::Duration; + pub use lifecycle::LifecycleBuilder; mod kaniko; @@ -29,6 +31,7 @@ use async_trait::async_trait; /// Builder trait #[async_trait] pub trait Builder: Send + Sync { + async fn prepare(&self) -> Result>; async fn build(&self) -> Result<()>; async fn completed(&self) -> Result; } @@ -49,6 +52,11 @@ impl BuildDirector { self.builder = builder; } + /// Prepare the build + pub async fn prepare(&self) -> Result> { + self.builder.prepare().await + } + /// Execute the build logic pub async fn build(&self) -> Result<()> { self.builder.build().await diff --git a/builder/src/lifecycle.rs b/builder/src/lifecycle.rs index d38bda3..7b4b8d6 100644 --- a/builder/src/lifecycle.rs +++ b/builder/src/lifecycle.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{errors::Error, Builder, Result}; @@ -36,6 +36,11 @@ impl LifecycleBuilder { #[async_trait] impl Builder for LifecycleBuilder { + // initialize the some resources before building + async fn prepare(&self) -> Result> { + Ok(None) // No need to wait + } + async fn build(&self) -> Result<()> { let name = format!("{}-builder", &self.actor.spec.name); let pod = lifecycle::pod(&self.actor).map_err(Error::ResourceError)?; diff --git a/resources/src/kpack/syncer.rs b/resources/src/kpack/syncer.rs index c8f9778..b4ff39b 100644 --- a/resources/src/kpack/syncer.rs +++ b/resources/src/kpack/syncer.rs @@ -19,6 +19,7 @@ use k8s_openapi::api::core::v1::{PersistentVolumeClaimVolumeSource, Pod, PodSpec use kube::api::{Patch, PatchParams, PostParams}; use kube::core::ObjectMeta; use kube::{Api, Client, Resource, ResourceExt}; +use tracing::{debug, info}; use crate::containers::syncer; use crate::error::{Error, Result}; @@ -40,7 +41,7 @@ pub async fn create(client: &Client, actor: &Actor) -> Result { let resource = new(actor)?; let pod = api.create(&PostParams::default(), &resource).await.map_err(Error::KubeError)?; - tracing::info!("Created Pod: {}", pod.name_any()); + info!("Created Pod: {}", pod.name_any()); Ok(pod) } @@ -51,21 +52,21 @@ pub async fn update(client: &Client, actor: &Actor) -> Result { let name = format!("{}-syncer", actor.spec.name); let mut pod = api.get(&name).await.map_err(Error::KubeError)?; - tracing::debug!("The Pod {} already exists", &name); + debug!("The Pod {} already exists", &name); let expected_hash = hash(&actor.spec)?; let found_hash: String = pod.annotations().get(LAST_APPLIED_HASH_KEY).map_or("".into(), |v| v.into()); if found_hash != expected_hash { let resource = new(actor)?; - tracing::debug!("The updating syncer pod resource:\n {:?}\n", resource); + debug!("The updating syncer pod resource:\n {:?}\n", resource); pod = api .patch(&name, &PatchParams::apply("amp-controllers").force(), &Patch::Apply(&resource)) .await .map_err(Error::KubeError)?; - tracing::info!("Updated Pod: {}", pod.name_any()); + info!("Updated Pod: {}", pod.name_any()); } Ok(pod) @@ -105,3 +106,26 @@ fn new(actor: &Actor) -> Result { ..Default::default() }) } + +pub async fn ready(client: &Client, actor: &Actor) -> Result { + debug!("Check If the syncer Pod is ready"); + + let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; + let api: Api = Api::namespaced(client.clone(), namespace.as_str()); + let name = format!("{}-syncer", actor.spec.name); + + let result = api.get_opt(&name).await.map_err(Error::KubeError)?; + if result.is_none() { + debug!("Not found Syncer {}", &name); + return Ok(false); + } + + let pod = result.unwrap(); + let status = pod.status.as_ref().ok_or_else(|| Error::MissingObjectKey(".status"))?; + + if let Some(conditions) = &status.conditions { + return Ok(conditions.iter().any(|c| c.type_ == "Ready" && c.status == "True")); + } + + Ok(false) +} diff --git a/workflow/src/actor/build.rs b/workflow/src/actor/build.rs index 5faef24..e3e3cf0 100644 --- a/workflow/src/actor/build.rs +++ b/workflow/src/actor/build.rs @@ -80,6 +80,11 @@ impl Task for BuildTask { } }; + // Prepare the build, initialize the some resources before building + if let Some(duration) = builder.prepare().await.map_err(Error::BuildError)? { + return Ok(Some(Intent::Action(Action::requeue(duration)))); + } + // Build the image builder.build().await.map_err(Error::BuildError)?;