Skip to content

Commit

Permalink
refactor: actor init task and builder completed logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Feb 24, 2024
1 parent 571ff14 commit ac34e96
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 82 deletions.
5 changes: 5 additions & 0 deletions builder/src/kaniko.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ impl Builder for KanikoBuilder {

Ok(())
}

#[inline]
async fn completed(&self) -> Result<bool> {
job::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError)
}
}
5 changes: 5 additions & 0 deletions builder/src/kpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ impl Builder for KpackBuilder {

Ok(())
}

#[inline]
async fn completed(&self) -> Result<bool> {
image::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError)
}
}

impl KpackBuilder {
Expand Down
6 changes: 6 additions & 0 deletions builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use async_trait::async_trait;
#[async_trait]
pub trait Builder: Send + Sync {
async fn build(&self) -> Result<()>;
async fn completed(&self) -> Result<bool>;
}

/// Build director, it's a strategy pattern implementation
Expand All @@ -52,6 +53,11 @@ impl BuildDirector {
pub async fn build(&self) -> Result<()> {
self.builder.build().await
}

/// Check if the build is completed
pub async fn completed(&self) -> Result<bool> {
self.builder.completed().await
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions builder/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ impl Builder for LifecycleBuilder {

Ok(())
}

#[inline]
async fn completed(&self) -> Result<bool> {
job::completed(&self.k8s, &self.actor).await.map_err(Error::ResourceError)
}
}
87 changes: 18 additions & 69 deletions workflow/src/actor/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use crate::errors::{Error, Result};
use crate::{Context, Intent, State, Task};

use amp_builder::{BuildDirector, KanikoBuilder, KpackBuilder};
use amp_common::docker::{self, registry, DockerConfig};
use amp_common::resource::{Actor, ActorState};
use amp_common::schema::BuildMethod;

use amp_resources::kpack::image;
use amp_resources::{actor, job};
use amp_resources::actor;
use async_trait::async_trait;
use kube::runtime::controller::Action;
use kube::ResourceExt;
Expand Down Expand Up @@ -65,74 +63,12 @@ impl Task<Actor> for BuildTask {
ctx.object.status.as_ref().is_some_and(|status| status.building())
}

/// Execute the task logic for BuildTask using shared data
/// Execute the task logic for BuildTask using shared data.
async fn execute(&self, ctx: &Context<Actor>) -> Result<Option<Intent<Actor>>> {
let actor = &ctx.object;

// build if actor is live or the image is not built, else skip to next state
if actor.spec.live || !self.built(ctx).await? {
self.build(ctx).await?;

let build = actor.spec.character.build.clone().unwrap_or_default();
match build.method() {
BuildMethod::Dockerfile => {
// [lifecycle] Check if the build job is completed and wait for it to finish.
if !job::completed(&ctx.k8s, &ctx.object).await.map_err(Error::ResourceError)? {
info!("Build job is not completed yet, wait for it to finish");
return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5)))));
}
}
BuildMethod::Buildpacks => {
// [kpack] Check if the image is completed and wait for it to ready.
if !image::completed(&ctx.k8s, &ctx.object).await.map_err(Error::ResourceError)? {
info!("kpack Image is not completed yet, wait for it to finish");
return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5)))));
}
}
};
}

// patch the status to running
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?;

Ok(None)
}
}

impl BuildTask {
/// Check if the image is already built
async fn built(&self, ctx: &Context<Actor>) -> Result<bool> {
let image = &ctx.object.spec.image;

let credentials = ctx.credentials.read().await;
let config = DockerConfig::from(&credentials.registries);

let credential = match docker::get_credential(&config, image) {
Ok(credential) => Some(credential),
Err(err) => {
error!("Error handling docker configuration: {}", err);
None
}
};

if registry::exists(image, credential).await.map_err(Error::DockerRegistryError)? {
info!("The images already exists");
return Ok(true);
}

Ok(false)
}

/// Generate `Builder` based on the build strategy
///
/// The source code can be local or remote,
/// and the build method can be dockerfile or buildpacks,
/// and build frequency can be once or live.
///
async fn build(&self, ctx: &Context<Actor>) -> Result<()> {
let actor = &ctx.object;
let build = actor.spec.character.build.clone().unwrap_or_default();

// Generate `Builder` based on the build method
let builder = match build.method() {
BuildMethod::Dockerfile => {
info!("Found dockerfile, build it with Kaniko");
Expand All @@ -144,6 +80,19 @@ impl BuildTask {
}
};

builder.build().await.map_err(Error::BuildError)
// Build the image
builder.build().await.map_err(Error::BuildError)?;

// Check if the build is completed and wait for it to finish.
if !builder.completed().await.map_err(Error::BuildError)? {
info!("Build job is not completed yet, wait for it to finish");
return Ok(Some(Intent::Action(Action::requeue(Duration::from_secs(5)))));
}

// Patch the status to running
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?;

Ok(None)
}
}
61 changes: 48 additions & 13 deletions workflow/src/actor/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
use crate::errors::{Error, Result};
use crate::{Context, Intent, State, Task};

use amp_common::docker::{self, registry, DockerConfig};
use amp_common::resource::{Actor, ActorState};

use amp_resources::actor;
use async_trait::async_trait;
use kube::ResourceExt;
use tracing::{error, trace};
use tracing::{error, info, trace};

use super::BuildingState;
use super::{BuildingState, DeployingState};

pub struct InitialState;

Expand All @@ -34,16 +35,16 @@ impl State<Actor> for InitialState {

// Check if InitTask should be executed
let task = InitTask::new();
if task.matches(ctx) {
match task.execute(ctx).await {
Ok(Some(intent)) => return Some(intent),
Err(err) => error!("Error during InitTask execution: {}", err),
Ok(None) => {}
}
if !task.matches(ctx) {
return None;
}

let result = task.execute(ctx).await;
if let Err(err) = &result {
error!("Error during InitTask execution: {}", err);
}

// Transition to the next state if needed
Some(Intent::State(Box::new(BuildingState)))
result.ok().and_then(|intent| intent)
}
}

Expand All @@ -61,9 +62,43 @@ impl Task<Actor> for InitTask {

/// Execute the task logic for InitTask using shared data
async fn execute(&self, ctx: &Context<Actor>) -> Result<Option<Intent<Actor>>> {
let condition = ActorState::building();
actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?;
let actor = &ctx.object;

// build if actor is live or the image is not built, else skip to next state
if actor.spec.live || !self.built(ctx).await? {
let condition = ActorState::building();
actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?;
Ok(Some(Intent::State(Box::new(BuildingState))))
} else {
// patch the status to running
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, &ctx.object, condition).await.map_err(Error::ResourceError)?;
Ok(Some(Intent::State(Box::new(DeployingState))))
}
}
}

impl InitTask {
/// Check if the image is already built
async fn built(&self, ctx: &Context<Actor>) -> Result<bool> {
let image = &ctx.object.spec.image;

let credentials = ctx.credentials.read().await;
let config = DockerConfig::from(&credentials.registries);

let credential = match docker::get_credential(&config, image) {
Ok(credential) => Some(credential),
Err(err) => {
error!("Error handling docker configuration: {}", err);
None
}
};

if registry::exists(image, credential).await.map_err(Error::DockerRegistryError)? {
info!("The images already exists");
return Ok(true);
}

Ok(None)
Ok(false)
}
}

0 comments on commit ac34e96

Please sign in to comment.