diff --git a/apis/apps/v1alpha1/config.go b/apis/apps/v1alpha1/config.go index bc91687dd17..c0cf6afbe1c 100644 --- a/apis/apps/v1alpha1/config.go +++ b/apis/apps/v1alpha1/config.go @@ -72,10 +72,11 @@ package v1alpha1 // ConfigurationPhase defines the Configuration FSM phase // +enum -// +kubebuilder:validation:Enum={Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished} +// +kubebuilder:validation:Enum={Creating,Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished} type ConfigurationPhase string const ( + CCreatingPhase ConfigurationPhase = "Creating" CInitPhase ConfigurationPhase = "Init" CRunningPhase ConfigurationPhase = "Running" CPendingPhase ConfigurationPhase = "Pending" diff --git a/config/crd/bases/apps.kubeblocks.io_configurations.yaml b/config/crd/bases/apps.kubeblocks.io_configurations.yaml index 627422ccfe9..7dbc26db996 100644 --- a/config/crd/bases/apps.kubeblocks.io_configurations.yaml +++ b/config/crd/bases/apps.kubeblocks.io_configurations.yaml @@ -310,6 +310,7 @@ spec: phase: description: phase is status of configurationItem. enum: + - Creating - Init - Running - Pending diff --git a/controllers/apps/configuration/configuration_controller.go b/controllers/apps/configuration/configuration_controller.go index 1d0881d8993..9208d02a498 100644 --- a/controllers/apps/configuration/configuration_controller.go +++ b/controllers/apps/configuration/configuration_controller.go @@ -107,7 +107,7 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques return r.failWithInvalidComponent(configuration, reqCtx) } - if err := r.runTasks(reqCtx, configuration, fetcherTask, tasks); err != nil { + if err := r.runTasks(TaskContext{configuration, reqCtx, fetcherTask}, tasks); err != nil { return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "failed to run configuration reconcile task.") } if !isAllReady(configuration) { @@ -130,55 +130,52 @@ func (r *ConfigurationReconciler) failWithInvalidComponent(configuration *appsv1 func isAllReady(configuration *appsv1alpha1.Configuration) bool { for _, item := range configuration.Spec.ConfigItemDetails { itemStatus := configuration.Status.GetItemStatus(item.Name) - if itemStatus == nil || itemStatus.Phase != appsv1alpha1.CFinishedPhase { + if itemStatus != nil && !isFinishStatus(itemStatus.Phase) { return false } } return true } -func (r *ConfigurationReconciler) runTasks( - reqCtx intctrlutil.RequestCtx, - configuration *appsv1alpha1.Configuration, - fetcher *Task, - tasks []Task) (err error) { - var errs []error - var synthesizedComp *component.SynthesizedComponent - - synthesizedComp, err = component.BuildComponent(reqCtx, nil, - fetcher.ClusterObj, - fetcher.ClusterDefObj, - fetcher.ClusterDefComObj, - fetcher.ClusterComObj, +func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (err error) { + var ( + errs []error + synthesizedComp *component.SynthesizedComponent + + ctx = taskCtx.reqCtx.Ctx + configuration = taskCtx.configuration + ) + + synthesizedComp, err = component.BuildComponent(taskCtx.reqCtx, + nil, + taskCtx.fetcher.ClusterObj, + taskCtx.fetcher.ClusterDefObj, + taskCtx.fetcher.ClusterDefComObj, + taskCtx.fetcher.ClusterComObj, nil, - fetcher.ClusterVerComObj) + taskCtx.fetcher.ClusterVerComObj) if err != nil { return err } + // TODO manager multiple version patch := client.MergeFrom(configuration.DeepCopy()) revision := strconv.FormatInt(configuration.GetGeneration(), 10) for _, task := range tasks { - if err := task.Do(fetcher, synthesizedComp, revision); err != nil { + task.Status.UpdateRevision = revision + if err := task.Do(taskCtx.fetcher, synthesizedComp, revision); err != nil { task.Status.Phase = appsv1alpha1.CMergeFailedPhase task.Status.Message = cfgutil.ToPointer(err.Error()) errs = append(errs, err) continue } - task.Status.UpdateRevision = revision - task.Status.Phase = appsv1alpha1.CMergedPhase - if err := task.SyncStatus(fetcher, task.Status); err != nil { - task.Status.Phase = appsv1alpha1.CFailedPhase - task.Status.Message = cfgutil.ToPointer(err.Error()) - errs = append(errs, err) - } } configuration.Status.Message = "" if len(errs) > 0 { configuration.Status.Message = utilerrors.NewAggregate(errs).Error() } - if err := r.Client.Status().Patch(reqCtx.Ctx, configuration, patch); err != nil { + if err := r.Client.Status().Patch(ctx, configuration, patch); err != nil { errs = append(errs, err) } if len(errs) == 0 { @@ -216,12 +213,11 @@ func fromItemStatus(ctx intctrlutil.RequestCtx, status *appsv1alpha1.Configurati } func isReconcileStatus(phase appsv1alpha1.ConfigurationPhase) bool { - return phase == appsv1alpha1.CRunningPhase || - phase == appsv1alpha1.CInitPhase || - phase == appsv1alpha1.CPendingPhase || - phase == appsv1alpha1.CFailedPhase || - phase == appsv1alpha1.CMergedPhase || - phase == appsv1alpha1.CMergeFailedPhase || - phase == appsv1alpha1.CUpgradingPhase || - phase == appsv1alpha1.CFinishedPhase + return phase != "" && + phase != appsv1alpha1.CCreatingPhase && + phase != appsv1alpha1.CDeletingPhase +} + +func isFinishStatus(phase appsv1alpha1.ConfigurationPhase) bool { + return phase == appsv1alpha1.CFinishedPhase || phase == appsv1alpha1.CFailedAndPausePhase } diff --git a/controllers/apps/configuration/reconcile_task.go b/controllers/apps/configuration/reconcile_task.go index 3f4454281b4..03f1510682a 100644 --- a/controllers/apps/configuration/reconcile_task.go +++ b/controllers/apps/configuration/reconcile_task.go @@ -22,6 +22,8 @@ package configuration import ( "strconv" + corev1 "k8s.io/api/core/v1" + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "github.com/apecloud/kubeblocks/internal/configuration/core" "github.com/apecloud/kubeblocks/internal/controller/component" @@ -35,46 +37,73 @@ type Task struct { Status *appsv1alpha1.ConfigurationItemDetailStatus Name string - Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error - SyncStatus func(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) error + Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error +} + +type TaskContext struct { + configuration *appsv1alpha1.Configuration + reqCtx intctrlutil.RequestCtx + fetcher *Task } func NewTask(item appsv1alpha1.ConfigurationItemDetail, status *appsv1alpha1.ConfigurationItemDetailStatus) Task { return Task{ - Name: item.Name, - Status: status, + Name: item.Name, Do: func(fetcher *Task, synComponent *component.SynthesizedComponent, revision string) error { configSpec := item.ConfigSpec if configSpec == nil { return core.MakeError("not found config spec: %s", item.Name) } - reconcileTask := configuration.NewReconcilePipeline(configuration.ReconcileCtx{ - ResourceCtx: fetcher.ResourceCtx, - Cluster: fetcher.ClusterObj, - ClusterVer: fetcher.ClusterVerObj, - Component: synComponent, - PodSpec: synComponent.PodSpec, - }, item, status, configSpec) - return reconcileTask.ConfigMap(item.Name). - ConfigConstraints(configSpec.ConfigConstraintRef). - PrepareForTemplate(). - RerenderTemplate(). - ApplyParameters(). - UpdateConfigVersion(revision). - Sync(). - Complete() + if err := fetcher.ConfigMap(item.Name).Complete(); err != nil { + return err + } + // Do reconcile for config template + configMap := fetcher.ConfigMapObj + switch intctrlutil.GetConfigSpecReconcilePhase(configMap, item, status) { + default: + return syncStatus(configMap, status) + case appsv1alpha1.CPendingPhase, + appsv1alpha1.CMergeFailedPhase: + return syncImpl(fetcher, item, status, synComponent, revision, configSpec) + case appsv1alpha1.CCreatingPhase: + return nil + } }, - SyncStatus: syncStatus, + Status: status, } } -func syncStatus(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) { - err = fetcher.ConfigMap(status.Name).Complete() +func syncImpl(fetcher *Task, + item appsv1alpha1.ConfigurationItemDetail, + status *appsv1alpha1.ConfigurationItemDetailStatus, + component *component.SynthesizedComponent, + revision string, + configSpec *appsv1alpha1.ComponentConfigSpec) (err error) { + err = configuration.NewReconcilePipeline(configuration.ReconcileCtx{ + ResourceCtx: fetcher.ResourceCtx, + Cluster: fetcher.ClusterObj, + ClusterVer: fetcher.ClusterVerObj, + Component: component, + PodSpec: component.PodSpec, + }, item, status, configSpec). + ConfigMap(item.Name). + ConfigConstraints(configSpec.ConfigConstraintRef). + PrepareForTemplate(). + RerenderTemplate(). + ApplyParameters(). + UpdateConfigVersion(revision). + Sync(). + Complete() if err != nil { - return + status.Phase = appsv1alpha1.CMergeFailedPhase + } else { + status.Phase = appsv1alpha1.CMergedPhase } + return +} - annotations := fetcher.ConfigMapObj.GetAnnotations() +func syncStatus(configMap *corev1.ConfigMap, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) { + annotations := configMap.GetAnnotations() // status.CurrentRevision = GetCurrentRevision(annotations) revisions := RetrieveRevision(annotations) if len(revisions) == 0 { diff --git a/controllers/apps/operations/reconfigure.go b/controllers/apps/operations/reconfigure.go index c0a978c6070..a88ea5c3872 100644 --- a/controllers/apps/operations/reconfigure.go +++ b/controllers/apps/operations/reconfigure.go @@ -28,7 +28,6 @@ import ( appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "github.com/apecloud/kubeblocks/internal/configuration/core" - "github.com/apecloud/kubeblocks/internal/controller/configuration" intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil" ) @@ -216,15 +215,11 @@ func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.Reques } item := fetcher.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name) - status := fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name) - if status == nil || item == nil { + if item == nil { return appsv1alpha1.OpsRunningPhase, nil } - if !configuration.IsApplyConfigChanged(fetcher.ConfigMapObj, *item) { - return appsv1alpha1.OpsRunningPhase, nil - } - switch status.Phase { + switch intctrlutil.GetConfigSpecReconcilePhase(fetcher.ConfigMapObj, *item, fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)) { default: return appsv1alpha1.OpsRunningPhase, nil case appsv1alpha1.CFailedAndPausePhase: diff --git a/deploy/helm/crds/apps.kubeblocks.io_configurations.yaml b/deploy/helm/crds/apps.kubeblocks.io_configurations.yaml index 627422ccfe9..7dbc26db996 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_configurations.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_configurations.yaml @@ -310,6 +310,7 @@ spec: phase: description: phase is status of configurationItem. enum: + - Creating - Init - Running - Pending diff --git a/internal/controller/configuration/pipeline.go b/internal/controller/configuration/pipeline.go index 2a0e47e8035..cdc159c093b 100644 --- a/internal/controller/configuration/pipeline.go +++ b/internal/controller/configuration/pipeline.go @@ -20,8 +20,6 @@ along with this program. If not, see . package configuration import ( - "encoding/json" - "reflect" "strconv" corev1 "k8s.io/api/core/v1" @@ -257,7 +255,7 @@ func (p *updatePipeline) isDone() bool { func (p *updatePipeline) PrepareForTemplate() *updatePipeline { buildTemplate := func() (err error) { - p.reconcile = !IsApplyConfigChanged(p.ConfigMapObj, p.item) + p.reconcile = !intctrlutil.IsApplyConfigChanged(p.ConfigMapObj, p.item) if p.isDone() { return } @@ -272,23 +270,6 @@ func (p *updatePipeline) PrepareForTemplate() *updatePipeline { return p.Wrap(buildTemplate) } -func IsApplyConfigChanged(cm *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool { - if cm == nil { - return false - } - - lastAppliedVersion, ok := cm.Annotations[constant.ConfigAppliedVersionAnnotationKey] - if !ok { - return false - } - var target appsv1alpha1.ConfigurationItemDetail - if err := json.Unmarshal([]byte(lastAppliedVersion), &target); err != nil { - return false - } - - return reflect.DeepEqual(target, item) -} - func (p *updatePipeline) ConfigSpec() *appsv1alpha1.ComponentConfigSpec { return p.configSpec } @@ -310,7 +291,7 @@ func (p *updatePipeline) RerenderTemplate() *updatePipeline { if p.isDone() { return } - if needRerender(p.ConfigMapObj, p.item) { + if intctrlutil.IsRerender(p.ConfigMapObj, p.item) { p.newCM, err = p.renderWrapper.rerenderConfigTemplate(p.ctx.Cluster, p.ctx.Component, *p.configSpec, &p.item) } else { p.newCM = p.ConfigMapObj.DeepCopy() @@ -409,18 +390,3 @@ func (p *updatePipeline) SyncStatus() *updatePipeline { return }) } - -func needRerender(obj *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool { - if obj == nil { - return true - } - if item.Version == "" { - return false - } - - version, ok := obj.Annotations[constant.CMConfigurationTemplateVersion] - if !ok || version != item.Version { - return true - } - return false -} diff --git a/internal/controllerutil/config_util.go b/internal/controllerutil/config_util.go index c524365a3ac..0052dd066d5 100644 --- a/internal/controllerutil/config_util.go +++ b/internal/controllerutil/config_util.go @@ -21,6 +21,8 @@ package controllerutil import ( "context" + "encoding/json" + "reflect" "github.com/StudioSol/set" appsv1 "k8s.io/api/apps/v1" @@ -32,6 +34,7 @@ import ( "github.com/apecloud/kubeblocks/internal/configuration/core" "github.com/apecloud/kubeblocks/internal/configuration/util" "github.com/apecloud/kubeblocks/internal/configuration/validate" + "github.com/apecloud/kubeblocks/internal/constant" ) type ConfigEventContext struct { @@ -116,3 +119,50 @@ func fromUpdatedConfig(m map[string]string, sets *set.LinkedHashSetString) map[s } return r } + +// IsApplyConfigChanged checks if the configuration is changed +func IsApplyConfigChanged(configMap *corev1.ConfigMap, item v1alpha1.ConfigurationItemDetail) bool { + if configMap == nil { + return false + } + + lastAppliedVersion, ok := configMap.Annotations[constant.ConfigAppliedVersionAnnotationKey] + if !ok { + return false + } + var target v1alpha1.ConfigurationItemDetail + if err := json.Unmarshal([]byte(lastAppliedVersion), &target); err != nil { + return false + } + + return reflect.DeepEqual(target, item) +} + +// IsRerender checks if the configuration template is changed +func IsRerender(configMap *corev1.ConfigMap, item v1alpha1.ConfigurationItemDetail) bool { + if configMap == nil { + return true + } + if item.Version == "" { + return false + } + + version, ok := configMap.Annotations[constant.CMConfigurationTemplateVersion] + if !ok || version != item.Version { + return true + } + return false +} + +// GetConfigSpecReconcilePhase gets the configuration phase +func GetConfigSpecReconcilePhase(configMap *corev1.ConfigMap, + item v1alpha1.ConfigurationItemDetail, + status *v1alpha1.ConfigurationItemDetailStatus) v1alpha1.ConfigurationPhase { + if status == nil || status.Phase == "" { + return v1alpha1.CCreatingPhase + } + if !IsApplyConfigChanged(configMap, item) { + return v1alpha1.CPendingPhase + } + return status.Phase +} diff --git a/internal/controllerutil/config_util_test.go b/internal/controllerutil/config_util_test.go index 23edc80ba64..7d91f33540d 100644 --- a/internal/controllerutil/config_util_test.go +++ b/internal/controllerutil/config_util_test.go @@ -28,10 +28,13 @@ import ( . "github.com/onsi/gomega" "github.com/StudioSol/set" + corev1 "k8s.io/api/core/v1" "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "github.com/apecloud/kubeblocks/internal/configuration/core" cfgutil "github.com/apecloud/kubeblocks/internal/configuration/util" + "github.com/apecloud/kubeblocks/internal/constant" + "github.com/apecloud/kubeblocks/internal/controller/builder" testapps "github.com/apecloud/kubeblocks/internal/testutil/apps" testutil "github.com/apecloud/kubeblocks/internal/testutil/k8s" "github.com/apecloud/kubeblocks/test/testdata" @@ -81,6 +84,134 @@ func TestFromUpdatedConfig(t *testing.T) { } } +func TestIsRerender(t *testing.T) { + type args struct { + cm *corev1.ConfigMap + item v1alpha1.ConfigurationItemDetail + } + tests := []struct { + name string + args args + want bool + }{{ + + name: "test", + args: args{ + cm: nil, + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + }, + }, + want: true, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test").GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + }, + }, + want: false, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test"). + GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + Version: "v1", + }, + }, + want: true, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test"). + AddAnnotations(constant.CMConfigurationTemplateVersion, "v1"). + GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + Version: "v2", + }, + }, + want: true, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test"). + AddAnnotations(constant.CMConfigurationTemplateVersion, "v1"). + GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + Version: "v1", + }, + }, + want: false, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsRerender(tt.args.cm, tt.args.item); got != tt.want { + t.Errorf("IsRerender() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetConfigSpecReconcilePhase(t *testing.T) { + type args struct { + cm *corev1.ConfigMap + item v1alpha1.ConfigurationItemDetail + status *v1alpha1.ConfigurationItemDetailStatus + } + tests := []struct { + name string + args args + want v1alpha1.ConfigurationPhase + }{{ + name: "test", + args: args{ + cm: nil, + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + }, + }, + want: v1alpha1.CCreatingPhase, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test").GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + }, + status: &v1alpha1.ConfigurationItemDetailStatus{ + Phase: v1alpha1.CInitPhase, + }, + }, + want: v1alpha1.CPendingPhase, + }, { + name: "test", + args: args{ + cm: builder.NewConfigMapBuilder("default", "test"). + AddAnnotations(constant.ConfigAppliedVersionAnnotationKey, `{"name":"test"}`). + GetObject(), + item: v1alpha1.ConfigurationItemDetail{ + Name: "test", + }, + status: &v1alpha1.ConfigurationItemDetailStatus{ + Phase: v1alpha1.CUpgradingPhase, + }, + }, + want: v1alpha1.CUpgradingPhase, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetConfigSpecReconcilePhase(tt.args.cm, tt.args.item, tt.args.status); got != tt.want { + t.Errorf("GetConfigSpecReconcilePhase() = %v, want %v", got, tt.want) + } + }) + } +} + var _ = Describe("config_util", func() { var k8sMockClient *testutil.K8sClientMockHelper