Skip to content

Commit

Permalink
#7617 support k8s native sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
pritidesai authored and kgcarr committed Jun 19, 2024
1 parent f154502 commit 7de815f
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 50 deletions.
33 changes: 33 additions & 0 deletions pkg/apis/pipeline/v1/container_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,36 @@ func TestSidecarGetVarSubstitutionExpressions(t *testing.T) {
t.Fatalf("Unexpected result (-want, +got): %s", d)
}
}

func TestSidecarRestartPolicyToK8sContainer(t *testing.T) {
always := corev1.ContainerRestartPolicyAlways
s := Sidecar{
Name: "sidecarName",
RestartPolicy: &always,
}

expectedContainer := corev1.Container{
Name: "sidecarName",
RestartPolicy: &always,
}

c := s.ToK8sContainer()

if !(c.RestartPolicy == expectedContainer.RestartPolicy) {
t.Fatalf("Unexpected result with RestartPolicy")
}

s = Sidecar{
Name: "sidecarName",
}

expectedContainer = corev1.Container{
Name: "sidecarName",
}

c = s.ToK8sContainer()
if !(c.RestartPolicy == expectedContainer.RestartPolicy) {
t.Fatalf("Unexpected result without RestartPolicy")
}

}
7 changes: 3 additions & 4 deletions pkg/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,12 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1.TaskRun, taskSpec v1.Ta
// we need to do the current logic
svMinorInt, _ := strconv.Atoi(sv.Minor)
svMajorInt, _ := strconv.Atoi(sv.Major)
if svMajorInt == 1 && svMinorInt >= SidecarK8sMinorVersionCheck {
if svMajorInt >= 1 && svMinorInt >= SidecarK8sMinorVersionCheck {
// Add RestartPolicy and Merge into initContainer
for i := range sidecarContainers {
sc := &sidecarContainers[i]
always := new(corev1.ContainerRestartPolicy)
*always = corev1.ContainerRestartPolicyAlways
sc.RestartPolicy = always
always := corev1.ContainerRestartPolicyAlways
sc.RestartPolicy = &always
sc.Name = names.SimpleNameGenerator.RestrictLength(fmt.Sprintf("%v%v", sidecarPrefix, sc.Name))
mergedPodInitContainers = append(mergedPodInitContainers, *sc)
}
Expand Down
93 changes: 48 additions & 45 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,24 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1.TaskRun) pkgrecon
// and may not have had all of the assumed default specified.
tr.SetDefaults(ctx)

if err := c.stopSidecars(ctx, tr); err != nil {
// Check if current k8s version is less than 1.29
// Since Kubernetes Major version cannot be 0 and if it's 2 then sidecar will be in
// we are only concerned about major version 1 and if the minor is less than 29 then
// we need to do the current logic
dc := c.KubeClientSet.Discovery()
sv, err := dc.ServerVersion()
if err != nil {
return err
}
svMajorInt, _ := strconv.Atoi(sv.Major)
svMinorInt, _ := strconv.Atoi(sv.Minor)
if svMajorInt >= 1 && svMinorInt >= 29 {
logger.Infof("Using Kubernetes Native Sidecars \n")
} else {
if err := c.stopSidecars(ctx, tr); err != nil {
return err
}
}

return c.finishReconcileUpdateEmitEvents(ctx, tr, before, nil)
}
Expand Down Expand Up @@ -302,56 +317,44 @@ func (c *Reconciler) durationAndCountMetrics(ctx context.Context, tr *v1.TaskRun
}

func (c *Reconciler) stopSidecars(ctx context.Context, tr *v1.TaskRun) error {
// Check if current k8s version is less than 1.29
// Since Kubernetes Major version cannot be 0 and if it's 2 then sidecar will be in
// we are only concerned about major version 1 and if the minor is less than 29 then
// we need to do the current logic
dc := c.KubeClientSet.Discovery()
sv, err := dc.ServerVersion()
if err != nil {
return err
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "stopSidecars")
defer span.End()
logger := logging.FromContext(ctx)
// do not continue without knowing the associated pod
if tr.Status.PodName == "" {
return nil
}
svMinorInt, _ := strconv.Atoi(sv.Minor)
if sv.Major == "1" && svMinorInt < 29 {
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "stopSidecars")
defer span.End()
logger := logging.FromContext(ctx)
// do not continue without knowing the associated pod
if tr.Status.PodName == "" {
return nil
}

// do not continue if the TaskRun was canceled or timed out as this caused the pod to be deleted in failTaskRun
condition := tr.Status.GetCondition(apis.ConditionSucceeded)
if condition != nil {
reason := v1.TaskRunReason(condition.Reason)
if reason == v1.TaskRunReasonCancelled || reason == v1.TaskRunReasonTimedOut {
return nil
}
// do not continue if the TaskRun was canceled or timed out as this caused the pod to be deleted in failTaskRun
condition := tr.Status.GetCondition(apis.ConditionSucceeded)
if condition != nil {
reason := v1.TaskRunReason(condition.Reason)
if reason == v1.TaskRunReasonCancelled || reason == v1.TaskRunReasonTimedOut {
return nil
}
}

pod, err := podconvert.StopSidecars(ctx, c.Images.NopImage, c.KubeClientSet, tr.Namespace, tr.Status.PodName)
if err == nil {
// Check if any SidecarStatuses are still shown as Running after stopping
// Sidecars. If any Running, update SidecarStatuses based on Pod ContainerStatuses.
if podconvert.IsSidecarStatusRunning(tr) {
err = updateStoppedSidecarStatus(pod, tr)
}
pod, err := podconvert.StopSidecars(ctx, c.Images.NopImage, c.KubeClientSet, tr.Namespace, tr.Status.PodName)
if err == nil {
// Check if any SidecarStatuses are still shown as Running after stopping
// Sidecars. If any Running, update SidecarStatuses based on Pod ContainerStatuses.
if podconvert.IsSidecarStatusRunning(tr) {
err = updateStoppedSidecarStatus(pod, tr)
}
if k8serrors.IsNotFound(err) {
// At this stage the TaskRun has been completed if the pod is not found, it won't come back,
// it has probably evicted. We can return the error, but we consider it a permanent one.
return controller.NewPermanentError(err)
} else if err != nil {
// It is admissible for Pods to fail with concurrentModification errors
// when stopping sideCars. Instead of failing the TaskRun, we shall just
// let the reconciler requeue.
if isConcurrentModificationError(err) {
return controller.NewRequeueAfter(time.Second)
}
logger.Errorf("Error stopping sidecars for TaskRun %q: %v", tr.Name, err)
tr.Status.MarkResourceFailed(v1.TaskRunReasonStopSidecarFailed, err)
}
if k8serrors.IsNotFound(err) {
// At this stage the TaskRun has been completed if the pod is not found, it won't come back,
// it has probably evicted. We can return the error, but we consider it a permanent one.
return controller.NewPermanentError(err)
} else if err != nil {
// It is admissible for Pods to fail with concurrentModification errors
// when stopping sideCars. Instead of failing the TaskRun, we shall just
// let the reconciler requeue.
if isConcurrentModificationError(err) {
return controller.NewRequeueAfter(time.Second)
}
logger.Errorf("Error stopping sidecars for TaskRun %q: %v", tr.Name, err)
tr.Status.MarkResourceFailed(v1.TaskRunReasonStopSidecarFailed, err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@ status:

// Check actions
actions := clients.Kube.Actions()
if len(actions) != 2 || !actions[0].Matches("list", "configmaps") || !actions[1].Matches("watch", "configmaps") {
if len(actions) != 3 || !actions[0].Matches("list", "configmaps") || !actions[1].Matches("watch", "configmaps") || !actions[2].Matches("get", "version") {
t.Errorf("expected 2 actions (list configmaps, and watch configmaps) created by the reconciler,"+
" got %d. Actions: %#v", len(actions), actions)
}
Expand Down

0 comments on commit 7de815f

Please sign in to comment.