Skip to content

Commit

Permalink
chore: using domain-qualified finalizers
Browse files Browse the repository at this point in the history
Signed-off-by: Roger Torrentsgenerós <[email protected]>
  • Loading branch information
trutx committed Nov 18, 2024
1 parent d1a723e commit 9d180a8
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 165 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down Expand Up @@ -155,6 +156,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
github.com/ory/viper v1.7.5 // indirect
Expand Down Expand Up @@ -199,6 +201,7 @@ require (
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
Expand All @@ -212,6 +215,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/component-base v0.28.4 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
6 changes: 6 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo=
github.com/go-openapi/validate v0.19.10/go.mod h1:RKEZTUWDkxKQxN2jDT7ZnZi2bhZlbNMAuKvKB+IaGx8=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -1013,6 +1015,8 @@ github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oleiade/reflections v1.0.0/go.mod h1:RbATFBbKYkVdqmSFtx13Bb/tVhR0lgOBXunWTZKeL4w=
github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM=
Expand All @@ -1024,6 +1028,7 @@ github.com/onsi/ginkgo v1.9.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
Expand Down Expand Up @@ -1967,6 +1972,7 @@ gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76
gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
4 changes: 3 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

func addMapValues(overrides map[string]string, defaultValues map[string]string) map[string]string {
Expand Down Expand Up @@ -130,7 +132,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1
flyteWorkflow.AcceptedAt = &acceptAtWrapper

// Add finalizer
flyteWorkflow.Finalizers = append(flyteWorkflow.Finalizers, "flyte-finalizer")
_ = controllerutil.AddFinalizer(flyteWorkflow, controller.Finalizer)

// add permissions from auth and security context. Adding permissions from auth would be removed once all clients
// have migrated over to security context
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

const testRole = "role"
Expand Down Expand Up @@ -254,5 +255,5 @@ func TestPrepareFlyteWorkflow(t *testing.T) {
OutputLocationPrefix: "s3://bucket/key",
},
})
assert.Equal(t, flyteWorkflow.Finalizers, []string{"flyte-finalizer"})
assert.Equal(t, flyteWorkflow.Finalizers, []string{controller.Finalizer})
}
2 changes: 1 addition & 1 deletion flyteidl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/go-test/deep v1.0.7
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/protobuf v1.5.3
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
Expand Down
42 changes: 24 additions & 18 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -30,8 +31,11 @@ const (
ErrBuildPodTemplate stdErrors.ErrorCode = "POD_TEMPLATE_FAILED"
ErrReplaceCmdTemplate stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED"
FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX"
finalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
finalizer string = "flyte.lyft.com/finalizer-array"
// Old non-domain-qualified finalizer for backwards compatibility
// This should eventually be removed
oldFinalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)

var (
Expand Down Expand Up @@ -69,8 +73,7 @@ func addMetadata(stCtx SubTaskExecutionContext, cfg *Config, k8sPluginCfg *confi
}

if k8sPluginCfg.InjectFinalizer {
f := append(pod.GetFinalizers(), finalizer)
pod.SetFinalizers(f)
_ = controllerutil.AddFinalizer(pod, finalizer)

Check warning on line 76 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L76

Added line #L76 was not covered by tests
}

if len(cfg.DefaultScheduler) > 0 {
Expand Down Expand Up @@ -134,25 +137,28 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi
}

if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v",

Check warning on line 140 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L140

Added line #L140 was not covered by tests
resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err)
return err
}

return nil
}

// clearFinalizers removes finalizers (if they exist) from the k8s resource
func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
if len(o.GetFinalizers()) > 0 {
o.SetFinalizers([]string{})
// clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource
func clearFinalizer(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
// Checking for the old finalizer too for backwards compatibility. This should eventually be removed
// Go does short-circuiting so we have to make sure both are removed
finalizerRemoved := controllerutil.RemoveFinalizer(o, finalizer)
oldFinalizerRemoved := controllerutil.RemoveFinalizer(o, oldFinalizer)
if finalizerRemoved || oldFinalizerRemoved {

Check warning on line 154 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L149-L154

Added lines #L149 - L154 were not covered by tests
err := kubeClient.GetClient().Update(ctx, o)
if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)

Check warning on line 157 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L157

Added line #L157 was not covered by tests
return err
}
} else {
logger.Debugf(ctx, "Finalizers are already empty for Resource with name: %v/%v", o.GetNamespace(), o.GetName())
logger.Debugf(ctx, "Finalizer is already cleared for Resource with name: %v/%v", o.GetNamespace(), o.GetName())

Check warning on line 161 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L161

Added line #L161 was not covered by tests
}
return nil
}
Expand Down Expand Up @@ -211,7 +217,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
}

// finalizeSubtask performs operations to complete the k8s pod defined by the SubTaskExecutionContext
// and Config. These may include removing finalizers and deleting the k8s resource.
// and Config. These may include removing finalizer and deleting the k8s resource.
func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) error {
errs := stdErrors.ErrorCollection{}
var pod *v1.Pod
Expand All @@ -231,10 +237,10 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
nsName = k8stypes.NamespacedName{Namespace: pod.GetNamespace(), Name: pod.GetName()}
}

// In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
// clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// In InjectFinalizer is on, it means we may have added the finalizer when we launched this resource. Attempt to
// clear it to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// after the resource was created, we will not find any finalizers to clear and the object may have already been
// deleted at this point. Therefore, account for these cases and do not consider them errors.
// deleted at this point. Therefore, account for these cases and do not consider the errors.
if k8sPluginCfg.InjectFinalizer {
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := kubeClient.GetClient().Get(ctx, nsName, pod); err != nil {
Expand All @@ -250,7 +256,7 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err := clearFinalizers(ctx, pod, kubeClient)
err := clearFinalizer(ctx, pod, kubeClient)

Check warning on line 259 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L259

Added line #L259 was not covered by tests
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -308,10 +314,10 @@ func getSubtaskPhaseInfo(ctx context.Context, stCtx SubTaskExecutionContext, cfg
return pluginsCore.PhaseInfoUndefined, err
}

if !phaseInfo.Phase().IsTerminal() && o.GetDeletionTimestamp() != nil {
if !phaseInfo.Phase().IsTerminal() && !o.GetDeletionTimestamp().IsZero() {
// If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should
// mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on
// the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted.
// the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted.

Check warning on line 320 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L320

Added line #L320 was not covered by tests
// This can also happen when a user deletes a Pod directly.
failureReason := fmt.Sprintf("object [%s] terminated in the background, manually", nsName.String())
return pluginsCore.PhaseInfoSystemRetryableFailure("UnexpectedObjectDeletion", failureReason, nil), nil
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ import (
)

const (
// Finalizer is the global and domain-qualified Flyte finalizer
Finalizer = "flyte.lyft.com/finalizer"
// OldFinalizer is the old non-domain-qualified finalizer, kept for backwards compatibility
// This should eventually be removed
OldFinalizer = "flyte-finalizer"
resourceLevelMonitorCycleDuration = 5 * time.Second
missing = "missing"
podDefaultNamespace = "flyte"
Expand Down
36 changes: 0 additions & 36 deletions flytepropeller/pkg/controller/finalizer.go

This file was deleted.

70 changes: 0 additions & 70 deletions flytepropeller/pkg/controller/finalizer_test.go

This file was deleted.

Loading

0 comments on commit 9d180a8

Please sign in to comment.