From e64ee2283fe0835aa9e7d4c16232a91cff22985f Mon Sep 17 00:00:00 2001 From: linzhengen Date: Mon, 29 Jul 2024 22:48:54 +0900 Subject: [PATCH] fix: improve get archived workflow query performance during controller estimation. Fixes #13382 (#13394) Signed-off-by: linzhengen (cherry picked from commit 7cc20bbfa5a4b871b757413a76cc3259894baaea) --- persist/sqldb/mocks/WorkflowArchive.go | 31 ++++++++++++ persist/sqldb/null_workflow_archive.go | 6 +++ persist/sqldb/workflow_archive.go | 49 +++++++++++++++++++ .../estimation/estimator_factory.go | 17 ++----- .../estimation/estimator_factory_test.go | 13 ++--- 5 files changed, 93 insertions(+), 23 deletions(-) diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index 634961944077..319e5933df04 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + labels "k8s.io/apimachinery/pkg/labels" time "time" @@ -109,6 +110,36 @@ func (_m *WorkflowArchive) GetWorkflow(uid string, namespace string, name string return r0, r1 } +// GetWorkflowForEstimator provides a mock function with given fields: namespace, requirements +func (_m *WorkflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*v1alpha1.Workflow, error) { + ret := _m.Called(namespace, requirements) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowForEstimator") + } + + var r0 *v1alpha1.Workflow + var r1 error + if rf, ok := ret.Get(0).(func(string, []labels.Requirement) (*v1alpha1.Workflow, error)); ok { + return rf(namespace, requirements) + } + if rf, ok := ret.Get(0).(func(string, []labels.Requirement) *v1alpha1.Workflow); ok { + r0 = rf(namespace, requirements) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1alpha1.Workflow) + } + } + + if rf, ok := ret.Get(1).(func(string, []labels.Requirement) error); ok { + r1 = rf(namespace, requirements) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // IsEnabled provides a mock function with given fields: func (_m *WorkflowArchive) IsEnabled() bool { ret := _m.Called() diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index e3f4863bcc7c..fa97b6650710 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/labels" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" sutils "github.com/argoproj/argo-workflows/v3/server/utils" ) @@ -32,6 +34,10 @@ func (r *nullWorkflowArchive) GetWorkflow(string, string, string) (*wfv1.Workflo return nil, fmt.Errorf("getting archived workflows not supported") } +func (r *nullWorkflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) { + return nil, fmt.Errorf("getting archived workflow for estimator not supported") +} + func (r *nullWorkflowArchive) DeleteWorkflow(string) error { return fmt.Errorf("deleting archived workflows not supported") } diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 10411968bb5f..c18c2017b369 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -69,6 +70,7 @@ type WorkflowArchive interface { ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) CountWorkflows(options sutils.ListOptions) (int64, error) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) + GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error DeleteExpiredWorkflows(ttl time.Duration) error IsEnabled() bool @@ -289,6 +291,13 @@ func namePrefixClause(namePrefix string) db.Cond { return db.Cond{} } +func phaseEqual(phase string) db.Cond { + if phase != "" { + return db.Cond{"phase": phase} + } + return db.Cond{} +} + func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) { var err error archivedWf := &archivedWorkflowRecord{} @@ -343,6 +352,46 @@ func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) return wf, nil } +func (r *workflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) { + selector := r.session.SQL(). + Select("name", "namespace", "uid", "startedat", "finishedat"). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()). + And(phaseEqual(string(wfv1.NodeSucceeded))) + + selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, sutils.ListOptions{ + Namespace: namespace, + LabelRequirements: requirements, + Limit: 1, + Offset: 0, + }, false) + if err != nil { + return nil, err + } + + var awf archivedWorkflowMetadata + err = selector.One(&awf) + if err != nil { + return nil, err + } + + return &wfv1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + Name: awf.Name, + Namespace: awf.Namespace, + UID: types.UID(awf.UID), + Labels: map[string]string{ + common.LabelKeyWorkflowArchivingStatus: "Persisted", + }, + }, + Status: wfv1.WorkflowStatus{ + StartedAt: v1.Time{Time: awf.StartedAt}, + FinishedAt: v1.Time{Time: awf.FinishedAt}, + }, + }, nil + +} + func (r *workflowArchive) DeleteWorkflow(uid string) error { rs, err := r.session.SQL(). DeleteFrom(archiveTableName). diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 984e76d98cdc..192f2a7c2513 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -9,7 +9,6 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" @@ -72,23 +71,15 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { return &estimator{wf, newestWf}, nil } // we failed to find a base-line in the live set, so we now look in the archive - requirements, err := labels.ParseToRequirements(common.LabelKeyPhase + "=" + string(wfv1.NodeSucceeded) + "," + labelName + "=" + labelValue) + requirements, err := labels.ParseToRequirements(labelName + "=" + labelValue) if err != nil { return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err) } - workflows, err := f.wfArchive.ListWorkflows( - utils.ListOptions{ - Namespace: wf.Namespace, - LabelRequirements: requirements, - Limit: 1, - Offset: 0, - }) + baselineWF, err := f.wfArchive.GetWorkflowForEstimator(wf.Namespace, requirements) if err != nil { - return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err) - } - if len(workflows) > 0 { - return &estimator{wf, &workflows[0]}, nil + return defaultEstimator, fmt.Errorf("failed to get archived workflow for estimator: %v", err) } + return &estimator{wf, baselineWF}, nil } } return defaultEstimator, nil diff --git a/workflow/controller/estimation/estimator_factory_test.go b/workflow/controller/estimation/estimator_factory_test.go index c4bb0ab06981..6951ef546070 100644 --- a/workflow/controller/estimation/estimator_factory_test.go +++ b/workflow/controller/estimation/estimator_factory_test.go @@ -9,7 +9,6 @@ import ( sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - "github.com/argoproj/argo-workflows/v3/server/utils" testutil "github.com/argoproj/argo-workflows/v3/test/util" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" @@ -51,17 +50,11 @@ metadata: workflows.argoproj.io/phase: Succeeded `), wfFailed) wfArchive := &sqldbmocks.WorkflowArchive{} - r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl") + r, err := labels.ParseToRequirements("workflows.argoproj.io/workflow-template=my-archived-wftmpl") assert.NoError(t, err) - wfArchive.On("ListWorkflows", utils.ListOptions{ - Namespace: "my-ns", - LabelRequirements: r, - Limit: 1, - }).Return(wfv1.Workflows{ - *testutil.MustUnmarshalWorkflow(` + wfArchive.On("GetWorkflowForEstimator", "my-ns", r).Return(testutil.MustUnmarshalWorkflow(` metadata: - name: my-archived-wftmpl-baseline`), - }, nil) + name: my-archived-wftmpl-baseline`), nil) f := NewEstimatorFactory(informer, hydratorfake.Always, wfArchive) t.Run("None", func(t *testing.T) { p, err := f.NewEstimator(&wfv1.Workflow{})