diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 42ef63841b..e269715a91 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -140,3 +140,8 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi } return statusErr } + +func IsDoesNotExistError(err error) bool { + adminError, ok := err.(FlyteAdminError) + return ok && adminError.Code() == codes.NotFound +} diff --git a/flyteadmin/pkg/errors/errors_test.go b/flyteadmin/pkg/errors/errors_test.go index 4b6b250166..cb6a2a0ae0 100644 --- a/flyteadmin/pkg/errors/errors_test.go +++ b/flyteadmin/pkg/errors/errors_test.go @@ -2,6 +2,7 @@ package errors import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -90,3 +91,15 @@ func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) { _, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsIdenticalStructure) assert.True(t, ok) } + +func TestIsDoesNotExistError(t *testing.T) { + assert.True(t, IsDoesNotExistError(NewFlyteAdminError(codes.NotFound, "foo"))) +} + +func TestIsNotDoesNotExistError(t *testing.T) { + assert.False(t, IsDoesNotExistError(NewFlyteAdminError(codes.Canceled, "foo"))) +} + +func TestIsNotDoesNotExistErrorBecauseOfNoneAdminError(t *testing.T) { + assert.False(t, IsDoesNotExistError(errors.New("foo"))) +} diff --git a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go index 712470a1ee..74dad42ce0 100644 --- a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go +++ b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go @@ -6,8 +6,6 @@ import ( "hash/fnv" "math/rand" - "google.golang.org/grpc/codes" - "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster" "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" @@ -102,10 +100,8 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu LaunchPlan: spec.LaunchPlan, ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + return nil, err } var weightedRandomList random.WeightedRandomList diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index e31f501686..06516657d2 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -164,11 +164,8 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID * LaunchPlan: launchPlanName, ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE, }) - if err != nil { - ec, ok := err.(errors.FlyteAdminError) - if !ok || ec.Code() != codes.NotFound { - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + return nil, err } if override != nil && override.Attributes != nil && override.Attributes.GetPluginOverrides() != nil { return override.Attributes.GetPluginOverrides().Overrides, nil @@ -427,11 +424,9 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad Domain: request.Domain, ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err) - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err) + return nil, err } if resource != nil && resource.Attributes.GetClusterAssignment() != nil { return resource.Attributes.GetClusterAssignment(), nil diff --git a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go index 345b0e926d..a96d99d3d6 100644 --- a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go +++ b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go @@ -43,7 +43,7 @@ func (q qualityOfServiceAllocator) getQualityOfServiceFromDb(ctx context.Context ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, }) if err != nil { - if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound { + if !errors.IsDoesNotExistError(err) { logger.Warningf(ctx, "Failed to fetch override values when assigning quality of service values for [%+v] with err: %v", workflowIdentifier, err) diff --git a/flyteadmin/pkg/manager/impl/executions/queues.go b/flyteadmin/pkg/manager/impl/executions/queues.go index 9ec4ff865a..5e4706700c 100644 --- a/flyteadmin/pkg/manager/impl/executions/queues.go +++ b/flyteadmin/pkg/manager/impl/executions/queues.go @@ -4,6 +4,7 @@ import ( "context" "math/rand" + "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces" repoInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces" @@ -64,7 +65,7 @@ func (q *queueAllocatorImpl) GetQueue(ctx context.Context, identifier core.Ident ResourceType: admin.MatchableResource_EXECUTION_QUEUE, }) - if err != nil { + if err != nil && !errors.IsDoesNotExistError(err) { logger.Warningf(ctx, "Failed to fetch override values when assigning execution queue for [%+v] with err: %v", identifier, err) } diff --git a/flyteadmin/pkg/manager/impl/util/resources.go b/flyteadmin/pkg/manager/impl/util/resources.go index 0180484005..8001b907dd 100644 --- a/flyteadmin/pkg/manager/impl/util/resources.go +++ b/flyteadmin/pkg/manager/impl/util/resources.go @@ -6,6 +6,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" + "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" @@ -100,7 +101,7 @@ func GetTaskResources(ctx context.Context, id *core.Identifier, resourceManager } resource, err := resourceManager.GetResource(ctx, request) - if err != nil { + if err != nil && !errors.IsDoesNotExistError(err) { logger.Infof(ctx, "Failed to fetch override values when assigning task resource default values for [%+v]: %v", id, err) } diff --git a/flyteadmin/pkg/manager/impl/util/shared.go b/flyteadmin/pkg/manager/impl/util/shared.go index cf7febd619..24c97f416a 100644 --- a/flyteadmin/pkg/manager/impl/util/shared.go +++ b/flyteadmin/pkg/manager/impl/util/shared.go @@ -275,12 +275,10 @@ func GetMatchableResource(ctx context.Context, resourceManager interfaces.Resour Workflow: workflowName, ResourceType: resourceType, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType, - project, domain, workflowName, err) - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType, + project, domain, workflowName, err) + return nil, err } return matchableResource, nil }