Skip to content

Commit

Permalink
Use ResourceExt methods for resource property access and update rust …
Browse files Browse the repository at this point in the history
…version to 1.75.0

Signed-off-by: Kate Goldenring <[email protected]>
  • Loading branch information
kate-goldenring committed Oct 8, 2024
1 parent 2c3722a commit b01b4ed
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 169 deletions.
10 changes: 10 additions & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
[package]
name = "controller"
<<<<<<< HEAD
authors.workspace = true
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true
=======
version = "0.13.1"
license = "Apache-2.0"
authors = ["<[email protected]>", "<[email protected]>"]
edition = "2021"
rust-version = "1.75.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
>>>>>>> 1aada19 (Use ResourceExt methods for resource property access and update rust version to 1.75.0)

[dependencies]
akri-shared = { path = "../shared" }
Expand Down
55 changes: 23 additions & 32 deletions controller/src/util/instance_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub(crate) struct PodContext {
}

pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Result<PodContext> {
let pod_name = k8s_pod.metadata.name.as_ref().unwrap();
let pod_name = k8s_pod.name_unchecked();
let labels = &k8s_pod.labels();
// Early exits above ensure unwrap will not panic
let node_to_run_pod_on = labels.get(AKRI_TARGET_NODE_LABEL_NAME).ok_or_else(|| {
Expand All @@ -131,7 +131,7 @@ pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Re

Ok(PodContext {
node_name: Some(node_to_run_pod_on.to_string()),
namespace: k8s_pod.metadata.namespace.clone(),
namespace: k8s_pod.namespace(),
action,
})
}
Expand All @@ -143,7 +143,7 @@ fn determine_action_for_pod(
k8s_pod: &Pod,
nodes_to_act_on: &mut HashMap<String, PodContext>,
) -> anyhow::Result<()> {
let pod_name = k8s_pod.metadata.name.as_ref().unwrap();
let pod_name = k8s_pod.name_unchecked();
let pod_phase = k8s_pod
.status
.as_ref()
Expand All @@ -164,7 +164,7 @@ fn determine_action_for_pod(
phase: pod_phase.to_string(),
status_start_time: pod_start_time,
unknown_node: !nodes_to_act_on.contains_key(node_to_run_pod_on),
trace_node_name: k8s_pod.metadata.name.clone().unwrap(),
trace_node_name: k8s_pod.name_unchecked(),
};
update_pod_context.action = pod_action_info.select_pod_action()?;
nodes_to_act_on.insert(node_to_run_pod_on.to_string(), update_pod_context);
Expand Down Expand Up @@ -305,19 +305,15 @@ pub async fn handle_instance_change(
ctx: Arc<ControllerContext>,
) -> Result<Action> {
trace!("handle_instance_change - enter");
let instance_namespace = instance
.metadata
.namespace
.as_ref()
.context("no namespace")?;
let api: Box<dyn Api<Configuration>> = ctx.client.namespaced(instance_namespace);
let instance_namespace = instance.namespace().unwrap();
let api: Box<dyn Api<Configuration>> = ctx.client.namespaced(&instance_namespace);
let Ok(Some(configuration)) = api.get(&instance.spec.configuration_name).await else {
// In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances.
// Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent
// is designed to shutdown when it's Configuration watcher fails.
error!(
"handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly",
&instance.spec.configuration_name, &instance.metadata.name
&instance.spec.configuration_name, &instance.name_unchecked()
);

return Ok(default_requeue_action());
Expand Down Expand Up @@ -352,41 +348,35 @@ pub async fn handle_instance_change_job(
client: Arc<dyn ControllerKubeClient>,
) -> anyhow::Result<()> {
trace!("handle_instance_change_job - enter");
let api: Box<dyn Api<Job>> = client.namespaced(instance.metadata.namespace.as_ref().unwrap());
if api
.get(instance.metadata.name.as_ref().unwrap())
.await?
.is_some()
{
let api: Box<dyn Api<Job>> = client.namespaced(&instance.namespace().unwrap());
if api.get(&instance.name_unchecked()).await?.is_some() {
// Job already exists, do nothing
return Ok(());
}
let instance_name = instance.name_unchecked();
// Create name for Job. Includes Configuration generation in the suffix
// to track what version of the Configuration the Job is associated with.
let job_name = pod::create_broker_app_name(
instance.metadata.name.as_ref().unwrap(),
&instance_name,
None,
instance.spec.shared,
&format!("{}-job", config_generation),
);

let instance_name = instance.metadata.name.as_ref().unwrap();
let instance_namespace = instance.metadata.namespace.as_ref().unwrap();
let instance_uid = instance.metadata.uid.as_ref().unwrap();
trace!("handle_instance_change_job - instance added");
let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name);
let new_job = job::create_new_job_from_spec(
instance,
OwnershipInfo::new(
OwnershipType::Instance,
instance_name.to_string(),
instance_uid.to_string(),
instance_name,
instance.uid().unwrap(),
),
&capability_id,
job_spec,
&job_name,
)?;
let api: Box<dyn Api<Job>> = client.namespaced(instance_namespace);
let api: Box<dyn Api<Job>> = client.namespaced(&instance.namespace().unwrap());
// TODO: Consider using server side apply instead of create
api.create(&new_job).await?;
Ok(())
Expand All @@ -405,8 +395,6 @@ pub async fn handle_instance_change_pod(
) -> anyhow::Result<()> {
trace!("handle_instance_change_pod - enter");

let instance_name = instance.metadata.name.clone().unwrap();

// If InstanceAction::Remove, assume all nodes require PodAction::NoAction (reflect that there is no running Pod unless we find one)
// Otherwise, assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one)
let default_action = PodAction::Add;
Expand All @@ -430,8 +418,11 @@ pub async fn handle_instance_change_pod(
nodes_to_act_on
);

let lp =
ListParams::default().labels(&format!("{}={}", AKRI_INSTANCE_LABEL_NAME, instance_name));
let lp = ListParams::default().labels(&format!(
"{}={}",
AKRI_INSTANCE_LABEL_NAME,
instance.name_unchecked()
));
let api = ctx
.client
.namespaced(&instance.namespace().context("no namespace")?);
Expand Down Expand Up @@ -473,7 +464,7 @@ pub(crate) async fn do_pod_action_for_nodes(
((v.action) == PodAction::Remove) | ((v.action) == PodAction::RemoveAndAdd)
}) {
handle_deletion_work(
instance.metadata.name.as_ref().unwrap(),
&instance.name_unchecked(),
&instance.spec.configuration_name,
instance.spec.shared,
node_to_delete_pod,
Expand All @@ -496,17 +487,17 @@ pub(crate) async fn do_pod_action_for_nodes(
.collect::<Vec<String>>();

// Iterate over nodes_to_act_on where value == (PodAction::Add | PodAction::RemoveAndAdd)
let instance_name = instance.metadata.name.clone().unwrap();
let instance_name = instance.name_unchecked();
let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name);
for new_node in nodes_to_add {
let new_pod = pod::create_new_pod_from_spec(
instance.metadata.namespace.as_ref().unwrap(),
&instance.namespace().unwrap(),
&instance_name,
&instance.spec.configuration_name,
OwnershipInfo::new(
OwnershipType::Instance,
instance_name.clone(),
instance.metadata.uid.clone().unwrap(),
instance.uid().unwrap(),
),
&capability_id,
&new_node,
Expand Down
29 changes: 13 additions & 16 deletions controller/src/util/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub async fn reconcile(node: Arc<Node>, ctx: Arc<ControllerContext>) -> Result<A
async fn reconcile_inner(event: Event<Node>, ctx: Arc<ControllerContext>) -> Result<Action> {
match event {
Event::Apply(node) => {
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
info!("handle_node - Added or modified: {}", node_name);
if is_node_ready(&node) {
ctx.known_nodes
Expand All @@ -109,12 +109,9 @@ async fn reconcile_inner(event: Event<Node>, ctx: Arc<ControllerContext>) -> Res
Ok(Action::await_change())
}
Event::Cleanup(node) => {
info!("handle_node - Deleted: {:?}", &node.metadata.name);
info!("handle_node - Deleted: {:?}", &node.name_unchecked());
call_handle_node_disappearance_if_needed(&node, ctx.clone()).await?;
ctx.known_nodes
.write()
.await
.remove(&node.metadata.name.as_deref().unwrap().to_string());
ctx.known_nodes.write().await.remove(&node.name_unchecked());
Ok(Action::await_change())
}
}
Expand All @@ -126,7 +123,7 @@ async fn call_handle_node_disappearance_if_needed(
node: &Node,
ctx: Arc<ControllerContext>,
) -> anyhow::Result<()> {
let node_name = node.metadata.name.as_deref().unwrap();
let node_name = node.name_unchecked();
trace!(
"call_handle_node_disappearance_if_needed - enter: {:?}",
&node.metadata.name
Expand All @@ -135,7 +132,7 @@ async fn call_handle_node_disappearance_if_needed(
.known_nodes
.read()
.await
.get(node_name)
.get(&node_name)
.unwrap_or(&NodeState::Running)
.clone();
trace!(
Expand All @@ -152,7 +149,7 @@ async fn call_handle_node_disappearance_if_needed(
"call_handle_node_disappearance_if_needed - call handle_node_disappearance: {:?}",
&node.metadata.name
);
handle_node_disappearance(node_name, ctx.clone()).await?;
handle_node_disappearance(&node_name, ctx.clone()).await?;
ctx.known_nodes
.write()
.await
Expand Down Expand Up @@ -194,7 +191,7 @@ async fn handle_node_disappearance(
instances.items.len()
);
for instance in instances.items {
let instance_name = instance.metadata.name.clone().unwrap();
let instance_name = instance.name_unchecked();

trace!(
"handle_node_disappearance - make sure node is not referenced here: {:?}",
Expand Down Expand Up @@ -316,7 +313,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand All @@ -337,7 +334,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand All @@ -358,7 +355,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand All @@ -384,7 +381,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand Down Expand Up @@ -428,7 +425,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand Down Expand Up @@ -468,7 +465,7 @@ mod tests {
let _ = env_logger::builder().is_test(true).try_init();
let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json");
let node: Node = serde_json::from_str(&node_json).unwrap();
let node_name = node.metadata.name.clone().unwrap();
let node_name = node.name_unchecked();
let mut mock = MockControllerKubeClient::default();
mock.node
.expect_all()
Expand Down
Loading

0 comments on commit b01b4ed

Please sign in to comment.