Skip to content

Commit

Permalink
chore: add new cluster store manager for kpack, and some refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Feb 7, 2024
1 parent a4caa73 commit 2dd2624
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 81 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.8.13"
version = "0.8.14"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down
22 changes: 17 additions & 5 deletions builder/src/kpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{errors::Error, Builder, Result};

use amp_common::{config::Credentials, resource::Actor};
use amp_resources::{
kpack::{cluster_builder, cluster_buildpack, image, syncer, BuildExt},
kpack::{cluster_builder, cluster_buildpack, cluster_store, image, syncer, BuildExt},
volume,
};

Expand Down Expand Up @@ -46,6 +46,7 @@ impl Builder for KpackBuilder {
self.try_init_pvc().await.map_err(Error::ResourceError)?;
self.try_init_syncer().await.map_err(Error::ResourceError)?;
self.try_init_buildpack().await.map_err(Error::ResourceError)?;
self.try_init_store().await.map_err(Error::ResourceError)?;
self.try_init_builder().await.map_err(Error::ResourceError)?;

// Build or update the build job
Expand Down Expand Up @@ -80,16 +81,27 @@ impl KpackBuilder {
}

async fn try_init_buildpack(&self) -> Result<(), amp_resources::error::Error> {
let buildpacks = self.actor.spec.character.builder_items().unwrap_or_default();
for buildpack in buildpacks {
if !cluster_buildpack::exists(&self.k8s, &buildpack).await? {
cluster_buildpack::create(&self.k8s, &buildpack).await?;
let buildpacks = self.actor.spec.character.buildpacks();
if buildpacks.is_none() {
return Ok(());
}

for buildpack in buildpacks.unwrap() {
if !cluster_buildpack::exists(&self.k8s, buildpack).await? {
cluster_buildpack::create(&self.k8s, buildpack).await?;
}
}

Ok(())
}

async fn try_init_store(&self) -> Result<(), amp_resources::error::Error> {
if !cluster_store::exists(&self.k8s, &self.actor).await? {
cluster_store::create(&self.k8s, &self.actor).await?;
}

Ok(())
}
async fn try_init_builder(&self) -> Result<(), amp_resources::error::Error> {
if !cluster_builder::exists(&self.k8s, &self.actor).await? {
cluster_builder::create(&self.k8s, &self.actor, self.credentials.clone()).await?;
Expand Down
4 changes: 1 addition & 3 deletions resources/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ pub async fn metrics(client: &Client, namespace: &str, name: &str) -> Result<Pod

pub async fn get(client: &Client, namespace: &str, name: &str) -> Result<Actor> {
let api: Api<Actor> = Api::namespaced(client.clone(), namespace);
let actor = api.get(name).await.map_err(Error::KubeError)?;

Ok(actor)
api.get(name).await.map_err(Error::KubeError)
}

pub async fn list(client: &Client, namespace: &str) -> Result<Vec<Actor>> {
Expand Down
4 changes: 1 addition & 3 deletions resources/src/character.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@ use super::error::{Error, Result};
/// Get a character by name
pub async fn get(client: &Client, name: &str) -> Result<Character> {
let api: Api<Character> = Api::all(client.clone());
let resources = api.get(name).await.map_err(Error::KubeError)?;

Ok(resources)
api.get(name).await.map_err(Error::KubeError)
}
4 changes: 2 additions & 2 deletions resources/src/kpack/cluster_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ async fn new(actor: &Actor, credentials: Arc<RwLock<Credentials>>) -> Result<Dyn
"kind": "ClusterStack",
},
"store": {
"name": "default-cluster-store",
"name": actor.spec.character.store_name(),
"kind": "ClusterStore",
},
"serviceAccountRef": {
"name": "amp-controllers", // @TODO: Use the specific service account from configuration
"name": "amp-controllers", // @TODO: Use the specific service account from configuration
"namespace": "amp-system", // @TODO: Use the namespace from configuration
},
"order": actor.spec.character.builder_orders(),
Expand Down
13 changes: 6 additions & 7 deletions resources/src/kpack/cluster_buildpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use serde_json::{from_value, json};
use tracing::{debug, info};

use crate::error::{Error, Result};
use crate::kpack::buildpack_name;
use crate::kpack::encode_name;

pub async fn exists(client: &Client, image: &str) -> Result<bool> {
let name = buildpack_name(image);
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());
Ok(api.get_opt(&name).await.map_err(Error::KubeError)?.is_some())
Ok(api.get_opt(&encode_name(image)).await.map_err(Error::KubeError)?.is_some())
}

pub async fn create(client: &Client, image: &str) -> Result<DynamicObject> {
Expand All @@ -41,7 +40,7 @@ pub async fn create(client: &Client, image: &str) -> Result<DynamicObject> {
}

pub async fn update(client: &Client, image: &str) -> Result<DynamicObject> {
let name = buildpack_name(image);
let name = encode_name(image);
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());

let mut buildpack = api.get(&name).await.map_err(Error::KubeError)?;
Expand All @@ -66,14 +65,14 @@ fn new(image: &str) -> Result<DynamicObject> {
"apiVersion": "kpack.io/v1alpha2",
"kind": "ClusterBuildpack",
"metadata": {
"name": buildpack_name(image),
"name": encode_name(image),
"labels": {
"app.kubernetes.io/managed-by": "Amphitheatre",
},
},
"spec": {
"serviceAccountRef": {
"name": "amp-controllers", // @TODO: Use the specific service account from configuration
"name": "amp-controllers", // @TODO: Use the specific service account from configuration
"namespace": "amp-system", // @TODO: Use the namespace from configuration
},
"image": image.to_string(),
Expand All @@ -85,7 +84,7 @@ fn new(image: &str) -> Result<DynamicObject> {
}

pub async fn ready(client: &Client, image: &str) -> Result<bool> {
let name = buildpack_name(image);
let name = encode_name(image);
debug!("Check if the ClusterBuildpack {} is ready", name);

let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());
Expand Down
124 changes: 124 additions & 0 deletions resources/src/kpack/cluster_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) The Amphitheatre Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::BuildExt;

use amp_common::resource::Actor;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use kube::api::{Patch, PatchParams, PostParams};
use kube::core::{DynamicObject, GroupVersionKind};
use kube::discovery::ApiResource;
use kube::{Api, Client, ResourceExt};
use serde_json::{from_value, json};
use tracing::{debug, info};

use crate::error::{Error, Result};

pub async fn exists(client: &Client, actor: &Actor) -> Result<bool> {
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());
let name = actor.spec.character.store_name();

Ok(api.get_opt(&name).await.map_err(Error::KubeError)?.is_some())
}

pub async fn get(client: &Client, actor: &Actor) -> Result<DynamicObject> {
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());
let name = actor.spec.character.store_name();

api.get(&name).await.map_err(Error::KubeError)
}

pub async fn create(client: &Client, actor: &Actor) -> Result<DynamicObject> {
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());

let resource = new(actor).await?;
let builder = api.create(&PostParams::default(), &resource).await.map_err(Error::KubeError)?;
info!("Created ClusterStore: {}", builder.name_any());

Ok(builder)
}

pub async fn update(client: &Client, actor: &Actor) -> Result<DynamicObject> {
let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());

let name = actor.spec.character.store_name();
let mut builder = api.get(&name).await.map_err(Error::KubeError)?;
debug!("The ClusterStore \"{}\" already exists", name);

let resource = new(actor).await?;
if builder.data.pointer("/spec") != resource.data.pointer("/spec") {
debug!("The updating ClusterStore resource:\n {:?}\n", resource);
builder = api
.patch(&name, &PatchParams::apply("amp-controllers").force(), &Patch::Apply(&resource))
.await
.map_err(Error::KubeError)?;

info!("Updated ClusterStore: {}", builder.name_any());
}

Ok(builder)
}

#[inline]
fn api_resource() -> ApiResource {
ApiResource::from_gvk(&GroupVersionKind::gvk("kpack.io", "v1alpha2", "ClusterStore"))
}

async fn new(actor: &Actor) -> Result<DynamicObject> {
let name = actor.spec.character.store_name();
let resource = from_value(json!({
"apiVersion": "kpack.io/v1alpha2",
"kind": "ClusterStore",
"metadata": {
"name": name.clone(),
"labels": {
"app.kubernetes.io/managed-by": "Amphitheatre",
},
},
"spec": {
"serviceAccountRef": {
"name": "amp-controllers", // @TODO: Use the specific service account from configuration
"namespace": "amp-system", // @TODO: Use the namespace from configuration
},
"sources": [{
"image": actor.spec.character.store_image(),
}],
}
}))
.map_err(Error::SerializationError)?;

Ok(resource)
}

pub async fn ready(client: &Client, actor: &Actor) -> Result<bool> {
let name = actor.spec.character.builder_name();
debug!("Check if the ClusterStore {} is ready", name);

let api: Api<DynamicObject> = Api::all_with(client.clone(), &api_resource());

if let Some(builder) = api.get_opt(&name).await.map_err(Error::KubeError)? {
debug!("Found ClusterStore {}", &name);
debug!("The ClusterStore data is: {:?}", builder.data);

if let Some(conditions) = builder.data.pointer("/status/conditions") {
let conditions: Vec<Condition> =
serde_json::from_value(json!(conditions)).map_err(Error::SerializationError)?;
return Ok(conditions.iter().any(|condition| condition.type_ == "Ready" && condition.status == "True"));
}
}

debug!("Not found ClusterStore {}", &name);
Ok(false)
}
Loading

0 comments on commit 2dd2624

Please sign in to comment.