Skip to content

Commit

Permalink
Consolidate pod change handling code
Browse files Browse the repository at this point in the history
Signed-off-by: Kate Goldenring <[email protected]>
  • Loading branch information
kate-goldenring committed Oct 8, 2024
1 parent b01b4ed commit 2a23d56
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 238 deletions.
148 changes: 39 additions & 109 deletions controller/src/util/instance_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ async fn reconcile_inner(event: Event<Instance>, ctx: Arc<ControllerContext>) ->
/// specific Node's protocol broker Pod.
///
/// * the node is described by node_name
/// * the protocol (or capability) is described by instance_name and namespace
/// * what to do with the broker Pod is described by action
// TODO: add Pod name so does not need to be
// generated on deletes and remove Option wrappers.
Expand All @@ -118,16 +117,17 @@ pub(crate) struct PodContext {
}

pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Result<PodContext> {
let pod_name = k8s_pod.name_unchecked();
let labels = &k8s_pod.labels();
// Early exits above ensure unwrap will not panic
let node_to_run_pod_on = labels.get(AKRI_TARGET_NODE_LABEL_NAME).ok_or_else(|| {
anyhow::anyhow!(
"no {} label found for {:?}",
AKRI_TARGET_NODE_LABEL_NAME,
pod_name
)
})?;
let node_to_run_pod_on = &k8s_pod
.labels()
.get(AKRI_TARGET_NODE_LABEL_NAME)
.ok_or_else(|| {
anyhow::anyhow!(
"no {} label found for {:?}",
AKRI_TARGET_NODE_LABEL_NAME,
k8s_pod.name_unchecked()
)
})?;

Ok(PodContext {
node_name: Some(node_to_run_pod_on.to_string()),
Expand All @@ -152,28 +152,27 @@ fn determine_action_for_pod(
.as_ref()
.ok_or_else(|| anyhow::anyhow!("No pod phase found for Pod {:?}", pod_name))?;

let mut update_pod_context = create_pod_context(k8s_pod, PodAction::NoAction)?;
let node_to_run_pod_on = update_pod_context.node_name.as_ref().unwrap();
let mut ctx = create_pod_context(k8s_pod, PodAction::NoAction)?;

// Early exits above ensure unwrap will not panic
let pod_start_time = k8s_pod.status.as_ref().unwrap().start_time.clone();
let node_to_run_pod_on = ctx.node_name.as_ref().unwrap();

let pod_action_info = PodActionInfo {
pending_grace_time_in_minutes: PENDING_POD_GRACE_PERIOD_MINUTES,
ended_grace_time_in_minutes: FAILED_POD_GRACE_PERIOD_MINUTES,
phase: pod_phase.to_string(),
status_start_time: pod_start_time,
unknown_node: !nodes_to_act_on.contains_key(node_to_run_pod_on),
trace_node_name: k8s_pod.name_unchecked(),
trace_pod_name: k8s_pod.name_unchecked(),
};
update_pod_context.action = pod_action_info.select_pod_action()?;
nodes_to_act_on.insert(node_to_run_pod_on.to_string(), update_pod_context);
ctx.action = pod_action_info.select_pod_action()?;
nodes_to_act_on.insert(node_to_run_pod_on.to_string(), ctx);
Ok(())
}

/// This handles Instance deletion event by deleting the
/// broker Pod, the broker Service (if there are no remaining broker Pods),
/// and the capability Service (if there are no remaining capability Pods).
/// This deliberately deletes the broker Pod, the broker Service (if there are no remaining broker Pods), and the configuration service (if there are no remaining capability Pods).
/// This is done before recreating the broker Pod and svcs
async fn handle_deletion_work(
instance_name: &str,
configuration_name: &str,
Expand All @@ -189,18 +188,11 @@ async fn handle_deletion_work(
context
)
})?;
let context_namespace = context.namespace.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"handle_deletion_work - Context namespace is missing for {}: {:?}",
node_to_delete_pod,
context
)
})?;

trace!(
"handle_deletion_work - pod::create_broker_app_name({:?}, {:?}, {:?}, {:?})",
&instance_name,
context_node_name,
context.node_name,
instance_shared,
"pod"
);
Expand All @@ -213,66 +205,16 @@ async fn handle_deletion_work(
trace!(
"handle_deletion_work - pod::remove_pod name={:?}, namespace={:?}",
&pod_app_name,
&context_namespace
&context.namespace
);
api.delete(&pod_app_name).await?;
trace!("handle_deletion_work - pod::remove_pod succeeded");
BROKER_POD_COUNT_METRIC
.with_label_values(&[configuration_name, context_node_name])
.with_label_values(&[configuration_name, &context_node_name])
.dec();
Ok(())
}

#[cfg(test)]
mod handle_deletion_work_tests {
use akri_shared::k8s::api::MockApi;

use super::*;
#[tokio::test]
async fn test_handle_deletion_work_with_no_node_name() {
let _ = env_logger::builder().is_test(true).try_init();

let context = PodContext {
node_name: None,
namespace: Some("namespace".into()),
action: PodAction::NoAction,
};
let api: Box<dyn Api<Pod>> = Box::new(MockApi::new());
assert!(handle_deletion_work(
"instance_name",
"configuration_name",
true,
"node_to_delete_pod",
&context,
api.as_ref(),
)
.await
.is_err());
}

#[tokio::test]
async fn test_handle_deletion_work_with_no_namespace() {
let _ = env_logger::builder().is_test(true).try_init();

let context = PodContext {
node_name: Some("node-a".into()),
namespace: None,
action: PodAction::NoAction,
};
let api: Box<dyn Api<Pod>> = Box::new(MockApi::new());
assert!(handle_deletion_work(
"instance_name",
"configuration_name",
true,
"node_to_delete_pod",
&context,
api.as_ref(),
)
.await
.is_err());
}
}

/// This handles Instance addition event by creating the
/// broker Pod.
async fn handle_addition_work(
Expand Down Expand Up @@ -311,29 +253,29 @@ pub async fn handle_instance_change(
// In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances.
// Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent
// is designed to shutdown when it's Configuration watcher fails.
error!(
"handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly",
error!("handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly",
&instance.spec.configuration_name, &instance.name_unchecked()
);

return Ok(default_requeue_action());
};
if let Some(broker_spec) = &configuration.spec.broker_spec {
let instance_change_result = match broker_spec {
BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await,
BrokerSpec::BrokerJobSpec(j) => {
handle_instance_change_job(
instance,
*configuration.metadata.generation.as_ref().unwrap(),
j,
ctx.client.clone(),
)
.await
}
};
if let Err(e) = instance_change_result {
error!("Unable to handle Broker action: {:?}", e);
let Some(broker_spec) = &configuration.spec.broker_spec else {
return Ok(default_requeue_action());
};
let res = match broker_spec {
BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await,
BrokerSpec::BrokerJobSpec(j) => {
handle_instance_change_job(
instance,
*configuration.metadata.generation.as_ref().unwrap(),
j,
ctx.client.clone(),
)
.await
}
};
if let Err(e) = res {
error!("Unable to handle Broker action: {:?}", e);
}
Ok(default_requeue_action())
}
Expand Down Expand Up @@ -383,20 +325,14 @@ pub async fn handle_instance_change_job(
}

/// Called when an Instance has changed that requires a Pod broker.
/// Action determined by InstanceAction and changes to the Instance's `nodes` list.
/// Starts broker Pods that are missing and stops Pods that are no longer needed.
/// InstanceAction::Add => Deploy Pod to each Node on Instance's `nodes` list (up to `capacity` total)
/// InstanceAction::Remove => Delete all Pods labeled with the Instance name
/// InstanceAction::Update => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod
/// Ensures that each Node on Instance's `nodes` list (up to `capacity` total) has a running Pod
pub async fn handle_instance_change_pod(
instance: &Instance,
podspec: &PodSpec,
ctx: Arc<ControllerContext>,
) -> anyhow::Result<()> {
trace!("handle_instance_change_pod - enter");

// If InstanceAction::Remove, assume all nodes require PodAction::NoAction (reflect that there is no running Pod unless we find one)
// Otherwise, assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one)
// Assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one)
let default_action = PodAction::Add;
let mut nodes_to_act_on: HashMap<String, PodContext> = instance
.spec
Expand All @@ -413,10 +349,6 @@ pub async fn handle_instance_change_pod(
)
})
.collect();
trace!(
"handle_instance_change - nodes tracked from instance={:?}",
nodes_to_act_on
);

let lp = ListParams::default().labels(&format!(
"{}={}",
Expand All @@ -431,8 +363,6 @@ pub async fn handle_instance_change_pod(
"handle_instance_change - found {} pods",
instance_pods.items.len()
);

trace!("handle_instance_change - update actions based on the existing pods");
// By default, assume any pod tracked by the instance need to be added.
// Query the existing pods to see if some of these are already added, or
// need to be removed
Expand Down
65 changes: 25 additions & 40 deletions controller/src/util/pod_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct PodActionInfo {
pub phase: String,
pub status_start_time: Option<Time>,
pub unknown_node: bool,
pub trace_node_name: String,
pub trace_pod_name: String,
}

impl PodActionInfo {
Expand All @@ -58,7 +58,22 @@ impl PodActionInfo {
&self.phase,
self.unknown_node
);
self.choice_for_pod_action()
if self.unknown_node {
//
// For pods (with our controller's selector) that are not described by the Instance, we
// will REMOVE the pods.
//
log::trace!(
"choice_for_pod_action - Running Pod (untracked) ... PodAction::Remove ({:?})",
&self.trace_pod_name
);
Ok(PodAction::Remove)
} else {
//
// For pods (with our controller's selector) that are described by the Instance ...
//
self.choice_for_pods_on_known_nodes()
}
}

/// This will determine what to do with a non-Running Pod based on how long the Pod has existed
Expand Down Expand Up @@ -96,10 +111,6 @@ impl PodActionInfo {
);
} else {
// If the pod has no start_time, give it more time
log::trace!(
"time_choice_for_non_running_pods - no start time found ... give it more time? ({:?})",
&self.trace_node_name
);
give_it_more_time = true;
log::trace!(
"time_choice_for_non_running_pods - give_it_more_time: ({:?})",
Expand All @@ -110,13 +121,13 @@ impl PodActionInfo {
if give_it_more_time {
log::trace!(
"time_choice_for_non_running_pods - Pending Pod (tracked) ... PodAction::NoAction ({:?})",
&self.trace_node_name
&self.trace_pod_name
);
Ok(PodAction::NoAction)
} else {
log::trace!(
"time_choice_for_non_running_pods - Pending Pod (tracked) ... PodAction::RemoveAndAdd ({:?})",
&self.trace_node_name
&self.trace_pod_name
);
Ok(PodAction::RemoveAndAdd)
}
Expand All @@ -127,7 +138,7 @@ impl PodActionInfo {
log::trace!(
"choice_for_pods_on_known_nodes phase={:?} trace_node_name={:?}",
&self.phase,
&self.trace_node_name
&self.trace_pod_name
);
match self.phase.as_str() {
"Running" | "ContainerCreating" | "PodInitializing" => {
Expand Down Expand Up @@ -175,32 +186,6 @@ impl PodActionInfo {
}
}
}

/// This will determine what to do with a Pod
fn choice_for_pod_action(&self) -> anyhow::Result<PodAction> {
log::trace!(
"choice_for_pod_action phase={:?} unknown_node={:?} trace_node_name={:?}",
&self.phase,
self.unknown_node,
&self.trace_node_name
);
if self.unknown_node {
//
// For pods (with our controller's selector) that are not described by the Instance, we
// will REMOVE the pods.
//
log::trace!(
"choice_for_pod_action - Running Pod (untracked) ... PodAction::Remove ({:?})",
&self.trace_node_name
);
Ok(PodAction::Remove)
} else {
//
// For pods (with our controller's selector) that are described by the Instance ...
//
self.choice_for_pods_on_known_nodes()
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -243,7 +228,7 @@ mod controller_tests {
phase: map_tuple.0.to_string(),
status_start_time: start_time.clone(),
unknown_node: true,
trace_node_name: "foo".to_string(),
trace_pod_name: "foo".to_string(),
};
assert_eq!(map_tuple.1, pod_action_info.select_pod_action().unwrap());
});
Expand Down Expand Up @@ -276,7 +261,7 @@ mod controller_tests {
phase: map_tuple.0.to_string(),
status_start_time: start_time.clone(),
unknown_node: false,
trace_node_name: "foo".to_string(),
trace_pod_name: "foo".to_string(),
};
assert_eq!(map_tuple.1, pod_action_info.select_pod_action().unwrap());
});
Expand Down Expand Up @@ -314,7 +299,7 @@ mod controller_tests {
phase: map_tuple.0.to_string(),
status_start_time: start_time.clone(),
unknown_node: false,
trace_node_name: "foo".to_string(),
trace_pod_name: "foo".to_string(),
};
assert_eq!(map_tuple.1, pod_action_info1.select_pod_action().unwrap());
});
Expand Down Expand Up @@ -347,7 +332,7 @@ mod controller_tests {
phase: map_tuple.0.to_string(),
status_start_time: start_time.clone(),
unknown_node: false,
trace_node_name: "foo".to_string(),
trace_pod_name: "foo".to_string(),
};
assert_eq!(map_tuple.1, pod_action_info.select_pod_action().unwrap());
});
Expand Down Expand Up @@ -386,7 +371,7 @@ mod controller_tests {
phase: map_tuple.0.to_string(),
status_start_time: start_time.clone(),
unknown_node: false,
trace_node_name: "foo".to_string(),
trace_pod_name: "foo".to_string(),
};
assert_eq!(map_tuple.1, pod_action_info1.select_pod_action().unwrap());
});
Expand Down
Loading

0 comments on commit 2a23d56

Please sign in to comment.