diff --git a/controller/src/util/instance_action.rs b/controller/src/util/instance_action.rs index 5df0a743b..0b0e98d41 100644 --- a/controller/src/util/instance_action.rs +++ b/controller/src/util/instance_action.rs @@ -106,7 +106,6 @@ async fn reconcile_inner(event: Event, ctx: Arc) -> /// specific Node's protocol broker Pod. /// /// * the node is described by node_name -/// * the protocol (or capability) is described by instance_name and namespace /// * what to do with the broker Pod is described by action // TODO: add Pod name so does not need to be // generated on deletes and remove Option wrappers. @@ -118,16 +117,17 @@ pub(crate) struct PodContext { } pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Result { - let pod_name = k8s_pod.name_unchecked(); - let labels = &k8s_pod.labels(); // Early exits above ensure unwrap will not panic - let node_to_run_pod_on = labels.get(AKRI_TARGET_NODE_LABEL_NAME).ok_or_else(|| { - anyhow::anyhow!( - "no {} label found for {:?}", - AKRI_TARGET_NODE_LABEL_NAME, - pod_name - ) - })?; + let node_to_run_pod_on = &k8s_pod + .labels() + .get(AKRI_TARGET_NODE_LABEL_NAME) + .ok_or_else(|| { + anyhow::anyhow!( + "no {} label found for {:?}", + AKRI_TARGET_NODE_LABEL_NAME, + k8s_pod.name_unchecked() + ) + })?; Ok(PodContext { node_name: Some(node_to_run_pod_on.to_string()), @@ -152,11 +152,11 @@ fn determine_action_for_pod( .as_ref() .ok_or_else(|| anyhow::anyhow!("No pod phase found for Pod {:?}", pod_name))?; - let mut update_pod_context = create_pod_context(k8s_pod, PodAction::NoAction)?; - let node_to_run_pod_on = update_pod_context.node_name.as_ref().unwrap(); + let mut ctx = create_pod_context(k8s_pod, PodAction::NoAction)?; // Early exits above ensure unwrap will not panic let pod_start_time = k8s_pod.status.as_ref().unwrap().start_time.clone(); + let node_to_run_pod_on = ctx.node_name.as_ref().unwrap(); let pod_action_info = PodActionInfo { pending_grace_time_in_minutes: PENDING_POD_GRACE_PERIOD_MINUTES, @@ -164,16 +164,15 @@ fn determine_action_for_pod( phase: pod_phase.to_string(), status_start_time: pod_start_time, unknown_node: !nodes_to_act_on.contains_key(node_to_run_pod_on), - trace_node_name: k8s_pod.name_unchecked(), + trace_pod_name: k8s_pod.name_unchecked(), }; - update_pod_context.action = pod_action_info.select_pod_action()?; - nodes_to_act_on.insert(node_to_run_pod_on.to_string(), update_pod_context); + ctx.action = pod_action_info.select_pod_action()?; + nodes_to_act_on.insert(node_to_run_pod_on.to_string(), ctx); Ok(()) } -/// This handles Instance deletion event by deleting the -/// broker Pod, the broker Service (if there are no remaining broker Pods), -/// and the capability Service (if there are no remaining capability Pods). +/// This deliberately deletes the broker Pod, the broker Service (if there are no remaining broker Pods), and the configuration service (if there are no remaining capability Pods). +/// This is done before recreating the broker Pod and svcs async fn handle_deletion_work( instance_name: &str, configuration_name: &str, @@ -189,18 +188,11 @@ async fn handle_deletion_work( context ) })?; - let context_namespace = context.namespace.as_ref().ok_or_else(|| { - anyhow::anyhow!( - "handle_deletion_work - Context namespace is missing for {}: {:?}", - node_to_delete_pod, - context - ) - })?; trace!( "handle_deletion_work - pod::create_broker_app_name({:?}, {:?}, {:?}, {:?})", &instance_name, - context_node_name, + context.node_name, instance_shared, "pod" ); @@ -213,66 +205,16 @@ async fn handle_deletion_work( trace!( "handle_deletion_work - pod::remove_pod name={:?}, namespace={:?}", &pod_app_name, - &context_namespace + &context.namespace ); api.delete(&pod_app_name).await?; trace!("handle_deletion_work - pod::remove_pod succeeded"); BROKER_POD_COUNT_METRIC - .with_label_values(&[configuration_name, context_node_name]) + .with_label_values(&[configuration_name, &context_node_name]) .dec(); Ok(()) } -#[cfg(test)] -mod handle_deletion_work_tests { - use akri_shared::k8s::api::MockApi; - - use super::*; - #[tokio::test] - async fn test_handle_deletion_work_with_no_node_name() { - let _ = env_logger::builder().is_test(true).try_init(); - - let context = PodContext { - node_name: None, - namespace: Some("namespace".into()), - action: PodAction::NoAction, - }; - let api: Box> = Box::new(MockApi::new()); - assert!(handle_deletion_work( - "instance_name", - "configuration_name", - true, - "node_to_delete_pod", - &context, - api.as_ref(), - ) - .await - .is_err()); - } - - #[tokio::test] - async fn test_handle_deletion_work_with_no_namespace() { - let _ = env_logger::builder().is_test(true).try_init(); - - let context = PodContext { - node_name: Some("node-a".into()), - namespace: None, - action: PodAction::NoAction, - }; - let api: Box> = Box::new(MockApi::new()); - assert!(handle_deletion_work( - "instance_name", - "configuration_name", - true, - "node_to_delete_pod", - &context, - api.as_ref(), - ) - .await - .is_err()); - } -} - /// This handles Instance addition event by creating the /// broker Pod. async fn handle_addition_work( @@ -311,29 +253,29 @@ pub async fn handle_instance_change( // In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances. // Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent // is designed to shutdown when it's Configuration watcher fails. - error!( - "handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly", + error!("handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly", &instance.spec.configuration_name, &instance.name_unchecked() ); return Ok(default_requeue_action()); }; - if let Some(broker_spec) = &configuration.spec.broker_spec { - let instance_change_result = match broker_spec { - BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await, - BrokerSpec::BrokerJobSpec(j) => { - handle_instance_change_job( - instance, - *configuration.metadata.generation.as_ref().unwrap(), - j, - ctx.client.clone(), - ) - .await - } - }; - if let Err(e) = instance_change_result { - error!("Unable to handle Broker action: {:?}", e); + let Some(broker_spec) = &configuration.spec.broker_spec else { + return Ok(default_requeue_action()); + }; + let res = match broker_spec { + BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await, + BrokerSpec::BrokerJobSpec(j) => { + handle_instance_change_job( + instance, + *configuration.metadata.generation.as_ref().unwrap(), + j, + ctx.client.clone(), + ) + .await } + }; + if let Err(e) = res { + error!("Unable to handle Broker action: {:?}", e); } Ok(default_requeue_action()) } @@ -383,20 +325,14 @@ pub async fn handle_instance_change_job( } /// Called when an Instance has changed that requires a Pod broker. -/// Action determined by InstanceAction and changes to the Instance's `nodes` list. -/// Starts broker Pods that are missing and stops Pods that are no longer needed. -/// InstanceAction::Add => Deploy Pod to each Node on Instance's `nodes` list (up to `capacity` total) -/// InstanceAction::Remove => Delete all Pods labeled with the Instance name -/// InstanceAction::Update => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod +/// Ensures that each Node on Instance's `nodes` list (up to `capacity` total) has a running Pod pub async fn handle_instance_change_pod( instance: &Instance, podspec: &PodSpec, ctx: Arc, ) -> anyhow::Result<()> { trace!("handle_instance_change_pod - enter"); - - // If InstanceAction::Remove, assume all nodes require PodAction::NoAction (reflect that there is no running Pod unless we find one) - // Otherwise, assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one) + // Assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one) let default_action = PodAction::Add; let mut nodes_to_act_on: HashMap = instance .spec @@ -413,10 +349,6 @@ pub async fn handle_instance_change_pod( ) }) .collect(); - trace!( - "handle_instance_change - nodes tracked from instance={:?}", - nodes_to_act_on - ); let lp = ListParams::default().labels(&format!( "{}={}", @@ -431,8 +363,6 @@ pub async fn handle_instance_change_pod( "handle_instance_change - found {} pods", instance_pods.items.len() ); - - trace!("handle_instance_change - update actions based on the existing pods"); // By default, assume any pod tracked by the instance need to be added. // Query the existing pods to see if some of these are already added, or // need to be removed diff --git a/controller/src/util/pod_action.rs b/controller/src/util/pod_action.rs index ecfb6b2b3..ad2777622 100644 --- a/controller/src/util/pod_action.rs +++ b/controller/src/util/pod_action.rs @@ -32,7 +32,7 @@ pub struct PodActionInfo { pub phase: String, pub status_start_time: Option