Skip to content

Commit

Permalink
refactor: extract try_init_* kpack methods into prepare method
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Feb 24, 2024
1 parent ac34e96 commit 17bb51e
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 29 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.8.19"
version = "0.8.20"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down
7 changes: 6 additions & 1 deletion builder/src/kaniko.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{errors::Error, Builder, Result};

Expand All @@ -36,6 +36,11 @@ impl KanikoBuilder {

#[async_trait]
impl Builder for KanikoBuilder {
// initialize the some resources before building
async fn prepare(&self) -> Result<Option<Duration>> {
Ok(None) // No need to wait
}

async fn build(&self) -> Result<()> {
let name = format!("{}-builder", &self.actor.spec.name);
let pod = kaniko::pod(&self.actor).map_err(Error::ResourceError)?;
Expand Down
60 changes: 46 additions & 14 deletions builder/src/kpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{errors::Error, Builder, Result};

Expand Down Expand Up @@ -45,13 +45,29 @@ impl KpackBuilder {

#[async_trait]
impl Builder for KpackBuilder {
async fn build(&self) -> Result<()> {
// initialize the some resources before building
// initialize the some resources before building
async fn prepare(&self) -> Result<Option<Duration>> {
self.try_init_pvc().await.map_err(Error::ResourceError)?;
self.try_init_syncer().await.map_err(Error::ResourceError)?;
self.try_init_buildpack().await.map_err(Error::ResourceError)?;
self.try_init_builder().await.map_err(Error::ResourceError)?;

// Check if the syncer is ready
if let Some(duration) = self.try_init_syncer().await.map_err(Error::ResourceError)? {
return Ok(Some(duration));
}

// Check if the buildpacks are ready
if let Some(duration) = self.try_init_buildpack().await.map_err(Error::ResourceError)? {
return Ok(Some(duration));
}

// Check if the builder is ready
if let Some(duration) = self.try_init_builder().await.map_err(Error::ResourceError)? {
return Ok(Some(duration));
}

Ok(None)
}

async fn build(&self) -> Result<()> {
// Build or update the Image
let name = format!("{}-builder", &self.actor.spec.name);
match image::exists(&self.k8s, &self.actor).await.map_err(Error::ResourceError)? {
Expand Down Expand Up @@ -88,29 +104,33 @@ impl KpackBuilder {
Ok(())
}

async fn try_init_buildpack(&self) -> Result<(), amp_resources::error::Error> {
async fn try_init_buildpack(&self) -> Result<Option<Duration>, amp_resources::error::Error> {
let buildpacks = self.actor.spec.character.buildpacks();
if buildpacks.is_none() {
return Ok(());
return Ok(None);
}

for buildpack in buildpacks.unwrap() {
if !cluster_buildpack::exists(&self.k8s, buildpack).await? {
cluster_buildpack::create(&self.k8s, buildpack).await?;
}

if !cluster_buildpack::ready(&self.k8s, buildpack).await? {
return Ok(Some(Duration::from_secs(5))); // wait for the ClusterBuildpack to be ready
}
}

Ok(())
Ok(None)
}

async fn try_init_builder(&self) -> Result<(), amp_resources::error::Error> {
async fn try_init_builder(&self) -> Result<Option<Duration>, amp_resources::error::Error> {
if !cluster_builder::exists(&self.k8s, &self.actor).await? {
if !cluster_store::exists(&self.k8s, &self.actor).await? {
cluster_store::create(&self.k8s, &self.actor).await?;
}

if !cluster_store::ready(&self.k8s, &self.actor).await? {
return Ok(()); // wait for the ClusterStore to be ready
return Ok(Some(Duration::from_secs(5))); // wait for the ClusterStore to be ready
}

let mut order = Vec::new();
Expand Down Expand Up @@ -143,14 +163,26 @@ impl KpackBuilder {
cluster_builder::create(&self.k8s, &self.actor, &tag, order).await?;
}

Ok(())
if !cluster_builder::ready(&self.k8s, &self.actor).await? {
return Ok(Some(Duration::from_secs(5))); // wait for the ClusterBuilder to be ready
}

Ok(None)
}

async fn try_init_syncer(&self) -> Result<(), amp_resources::error::Error> {
async fn try_init_syncer(&self) -> Result<Option<Duration>, amp_resources::error::Error> {
if !self.actor.spec.live {
return Ok(None);
}

if !syncer::exists(&self.k8s, &self.actor).await? {
syncer::create(&self.k8s, &self.actor).await?;
}

Ok(())
if !syncer::ready(&self.k8s, &self.actor).await? {
return Ok(Some(Duration::from_secs(5))); // wait for the Syncer to be ready
}

Ok(None)
}
}
8 changes: 8 additions & 0 deletions builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

mod lifecycle;
use std::time::Duration;

pub use lifecycle::LifecycleBuilder;

mod kaniko;
Expand All @@ -29,6 +31,7 @@ use async_trait::async_trait;
/// Builder trait
#[async_trait]
pub trait Builder: Send + Sync {
async fn prepare(&self) -> Result<Option<Duration>>;
async fn build(&self) -> Result<()>;
async fn completed(&self) -> Result<bool>;
}
Expand All @@ -49,6 +52,11 @@ impl BuildDirector {
self.builder = builder;
}

/// Prepare the build
pub async fn prepare(&self) -> Result<Option<Duration>> {
self.builder.prepare().await
}

/// Execute the build logic
pub async fn build(&self) -> Result<()> {
self.builder.build().await
Expand Down
7 changes: 6 additions & 1 deletion builder/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{errors::Error, Builder, Result};

Expand All @@ -36,6 +36,11 @@ impl LifecycleBuilder {

#[async_trait]
impl Builder for LifecycleBuilder {
// initialize the some resources before building
async fn prepare(&self) -> Result<Option<Duration>> {
Ok(None) // No need to wait
}

async fn build(&self) -> Result<()> {
let name = format!("{}-builder", &self.actor.spec.name);
let pod = lifecycle::pod(&self.actor).map_err(Error::ResourceError)?;
Expand Down
32 changes: 28 additions & 4 deletions resources/src/kpack/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use k8s_openapi::api::core::v1::{PersistentVolumeClaimVolumeSource, Pod, PodSpec
use kube::api::{Patch, PatchParams, PostParams};
use kube::core::ObjectMeta;
use kube::{Api, Client, Resource, ResourceExt};
use tracing::{debug, info};

use crate::containers::syncer;
use crate::error::{Error, Result};
Expand All @@ -40,7 +41,7 @@ pub async fn create(client: &Client, actor: &Actor) -> Result<Pod> {

let resource = new(actor)?;
let pod = api.create(&PostParams::default(), &resource).await.map_err(Error::KubeError)?;
tracing::info!("Created Pod: {}", pod.name_any());
info!("Created Pod: {}", pod.name_any());

Ok(pod)
}
Expand All @@ -51,21 +52,21 @@ pub async fn update(client: &Client, actor: &Actor) -> Result<Pod> {
let name = format!("{}-syncer", actor.spec.name);

let mut pod = api.get(&name).await.map_err(Error::KubeError)?;
tracing::debug!("The Pod {} already exists", &name);
debug!("The Pod {} already exists", &name);

let expected_hash = hash(&actor.spec)?;
let found_hash: String = pod.annotations().get(LAST_APPLIED_HASH_KEY).map_or("".into(), |v| v.into());

if found_hash != expected_hash {
let resource = new(actor)?;
tracing::debug!("The updating syncer pod resource:\n {:?}\n", resource);
debug!("The updating syncer pod resource:\n {:?}\n", resource);

pod = api
.patch(&name, &PatchParams::apply("amp-controllers").force(), &Patch::Apply(&resource))
.await
.map_err(Error::KubeError)?;

tracing::info!("Updated Pod: {}", pod.name_any());
info!("Updated Pod: {}", pod.name_any());
}

Ok(pod)
Expand Down Expand Up @@ -105,3 +106,26 @@ fn new(actor: &Actor) -> Result<Pod> {
..Default::default()
})
}

pub async fn ready(client: &Client, actor: &Actor) -> Result<bool> {
debug!("Check If the syncer Pod is ready");

let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?;
let api: Api<Pod> = Api::namespaced(client.clone(), namespace.as_str());
let name = format!("{}-syncer", actor.spec.name);

let result = api.get_opt(&name).await.map_err(Error::KubeError)?;
if result.is_none() {
debug!("Not found Syncer {}", &name);
return Ok(false);
}

let pod = result.unwrap();
let status = pod.status.as_ref().ok_or_else(|| Error::MissingObjectKey(".status"))?;

if let Some(conditions) = &status.conditions {
return Ok(conditions.iter().any(|c| c.type_ == "Ready" && c.status == "True"));
}

Ok(false)
}
5 changes: 5 additions & 0 deletions workflow/src/actor/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl Task<Actor> for BuildTask {
}
};

// Prepare the build, initialize the some resources before building
if let Some(duration) = builder.prepare().await.map_err(Error::BuildError)? {
return Ok(Some(Intent::Action(Action::requeue(duration))));
}

// Build the image
builder.build().await.map_err(Error::BuildError)?;

Expand Down

0 comments on commit 17bb51e

Please sign in to comment.