Skip to content

Commit

Permalink
chore: refine configuration reconcile and phase (#5311)
Browse files Browse the repository at this point in the history
  • Loading branch information
sophon-zt authored Oct 7, 2023
1 parent bc7b193 commit 1f87a87
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 101 deletions.
3 changes: 2 additions & 1 deletion apis/apps/v1alpha1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ package v1alpha1

// ConfigurationPhase defines the Configuration FSM phase
// +enum
// +kubebuilder:validation:Enum={Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
// +kubebuilder:validation:Enum={Creating,Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
type ConfigurationPhase string

const (
CCreatingPhase ConfigurationPhase = "Creating"
CInitPhase ConfigurationPhase = "Init"
CRunningPhase ConfigurationPhase = "Running"
CPendingPhase ConfigurationPhase = "Pending"
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
62 changes: 29 additions & 33 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return r.failWithInvalidComponent(configuration, reqCtx)
}

if err := r.runTasks(reqCtx, configuration, fetcherTask, tasks); err != nil {
if err := r.runTasks(TaskContext{configuration, reqCtx, fetcherTask}, tasks); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "failed to run configuration reconcile task.")
}
if !isAllReady(configuration) {
Expand All @@ -130,55 +130,52 @@ func (r *ConfigurationReconciler) failWithInvalidComponent(configuration *appsv1
func isAllReady(configuration *appsv1alpha1.Configuration) bool {
for _, item := range configuration.Spec.ConfigItemDetails {
itemStatus := configuration.Status.GetItemStatus(item.Name)
if itemStatus == nil || itemStatus.Phase != appsv1alpha1.CFinishedPhase {
if itemStatus != nil && !isFinishStatus(itemStatus.Phase) {
return false
}
}
return true
}

func (r *ConfigurationReconciler) runTasks(
reqCtx intctrlutil.RequestCtx,
configuration *appsv1alpha1.Configuration,
fetcher *Task,
tasks []Task) (err error) {
var errs []error
var synthesizedComp *component.SynthesizedComponent

synthesizedComp, err = component.BuildComponent(reqCtx, nil,
fetcher.ClusterObj,
fetcher.ClusterDefObj,
fetcher.ClusterDefComObj,
fetcher.ClusterComObj,
func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (err error) {
var (
errs []error
synthesizedComp *component.SynthesizedComponent

ctx = taskCtx.reqCtx.Ctx
configuration = taskCtx.configuration
)

synthesizedComp, err = component.BuildComponent(taskCtx.reqCtx,
nil,
taskCtx.fetcher.ClusterObj,
taskCtx.fetcher.ClusterDefObj,
taskCtx.fetcher.ClusterDefComObj,
taskCtx.fetcher.ClusterComObj,
nil,
fetcher.ClusterVerComObj)
taskCtx.fetcher.ClusterVerComObj)
if err != nil {
return err
}

// TODO manager multiple version
patch := client.MergeFrom(configuration.DeepCopy())
revision := strconv.FormatInt(configuration.GetGeneration(), 10)
for _, task := range tasks {
if err := task.Do(fetcher, synthesizedComp, revision); err != nil {
task.Status.UpdateRevision = revision
if err := task.Do(taskCtx.fetcher, synthesizedComp, revision); err != nil {
task.Status.Phase = appsv1alpha1.CMergeFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
continue
}
task.Status.UpdateRevision = revision
task.Status.Phase = appsv1alpha1.CMergedPhase
if err := task.SyncStatus(fetcher, task.Status); err != nil {
task.Status.Phase = appsv1alpha1.CFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
}
}

configuration.Status.Message = ""
if len(errs) > 0 {
configuration.Status.Message = utilerrors.NewAggregate(errs).Error()
}
if err := r.Client.Status().Patch(reqCtx.Ctx, configuration, patch); err != nil {
if err := r.Client.Status().Patch(ctx, configuration, patch); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
Expand Down Expand Up @@ -216,12 +213,11 @@ func fromItemStatus(ctx intctrlutil.RequestCtx, status *appsv1alpha1.Configurati
}

func isReconcileStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CRunningPhase ||
phase == appsv1alpha1.CInitPhase ||
phase == appsv1alpha1.CPendingPhase ||
phase == appsv1alpha1.CFailedPhase ||
phase == appsv1alpha1.CMergedPhase ||
phase == appsv1alpha1.CMergeFailedPhase ||
phase == appsv1alpha1.CUpgradingPhase ||
phase == appsv1alpha1.CFinishedPhase
return phase != "" &&
phase != appsv1alpha1.CCreatingPhase &&
phase != appsv1alpha1.CDeletingPhase
}

func isFinishStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CFinishedPhase || phase == appsv1alpha1.CFailedAndPausePhase
}
77 changes: 53 additions & 24 deletions controllers/apps/configuration/reconcile_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package configuration
import (
"strconv"

corev1 "k8s.io/api/core/v1"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/component"
Expand All @@ -35,46 +37,73 @@ type Task struct {
Status *appsv1alpha1.ConfigurationItemDetailStatus
Name string

Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
SyncStatus func(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) error
Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
}

type TaskContext struct {
configuration *appsv1alpha1.Configuration
reqCtx intctrlutil.RequestCtx
fetcher *Task
}

func NewTask(item appsv1alpha1.ConfigurationItemDetail, status *appsv1alpha1.ConfigurationItemDetailStatus) Task {
return Task{
Name: item.Name,
Status: status,
Name: item.Name,
Do: func(fetcher *Task, synComponent *component.SynthesizedComponent, revision string) error {
configSpec := item.ConfigSpec
if configSpec == nil {
return core.MakeError("not found config spec: %s", item.Name)
}
reconcileTask := configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: synComponent,
PodSpec: synComponent.PodSpec,
}, item, status, configSpec)
return reconcileTask.ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err := fetcher.ConfigMap(item.Name).Complete(); err != nil {
return err
}
// Do reconcile for config template
configMap := fetcher.ConfigMapObj
switch intctrlutil.GetConfigSpecReconcilePhase(configMap, item, status) {
default:
return syncStatus(configMap, status)
case appsv1alpha1.CPendingPhase,
appsv1alpha1.CMergeFailedPhase:
return syncImpl(fetcher, item, status, synComponent, revision, configSpec)
case appsv1alpha1.CCreatingPhase:
return nil
}
},
SyncStatus: syncStatus,
Status: status,
}
}

func syncStatus(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
err = fetcher.ConfigMap(status.Name).Complete()
func syncImpl(fetcher *Task,
item appsv1alpha1.ConfigurationItemDetail,
status *appsv1alpha1.ConfigurationItemDetailStatus,
component *component.SynthesizedComponent,
revision string,
configSpec *appsv1alpha1.ComponentConfigSpec) (err error) {
err = configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: component,
PodSpec: component.PodSpec,
}, item, status, configSpec).
ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err != nil {
return
status.Phase = appsv1alpha1.CMergeFailedPhase
} else {
status.Phase = appsv1alpha1.CMergedPhase
}
return
}

annotations := fetcher.ConfigMapObj.GetAnnotations()
func syncStatus(configMap *corev1.ConfigMap, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
annotations := configMap.GetAnnotations()
// status.CurrentRevision = GetCurrentRevision(annotations)
revisions := RetrieveRevision(annotations)
if len(revisions) == 0 {
Expand Down
9 changes: 2 additions & 7 deletions controllers/apps/operations/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/configuration"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

Expand Down Expand Up @@ -216,15 +215,11 @@ func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.Reques
}

item := fetcher.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name)
status := fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)
if status == nil || item == nil {
if item == nil {
return appsv1alpha1.OpsRunningPhase, nil
}

if !configuration.IsApplyConfigChanged(fetcher.ConfigMapObj, *item) {
return appsv1alpha1.OpsRunningPhase, nil
}
switch status.Phase {
switch intctrlutil.GetConfigSpecReconcilePhase(fetcher.ConfigMapObj, *item, fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)) {
default:
return appsv1alpha1.OpsRunningPhase, nil
case appsv1alpha1.CFailedAndPausePhase:
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/crds/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
38 changes: 2 additions & 36 deletions internal/controller/configuration/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package configuration

import (
"encoding/json"
"reflect"
"strconv"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -257,7 +255,7 @@ func (p *updatePipeline) isDone() bool {

func (p *updatePipeline) PrepareForTemplate() *updatePipeline {
buildTemplate := func() (err error) {
p.reconcile = !IsApplyConfigChanged(p.ConfigMapObj, p.item)
p.reconcile = !intctrlutil.IsApplyConfigChanged(p.ConfigMapObj, p.item)
if p.isDone() {
return
}
Expand All @@ -272,23 +270,6 @@ func (p *updatePipeline) PrepareForTemplate() *updatePipeline {
return p.Wrap(buildTemplate)
}

func IsApplyConfigChanged(cm *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool {
if cm == nil {
return false
}

lastAppliedVersion, ok := cm.Annotations[constant.ConfigAppliedVersionAnnotationKey]
if !ok {
return false
}
var target appsv1alpha1.ConfigurationItemDetail
if err := json.Unmarshal([]byte(lastAppliedVersion), &target); err != nil {
return false
}

return reflect.DeepEqual(target, item)
}

func (p *updatePipeline) ConfigSpec() *appsv1alpha1.ComponentConfigSpec {
return p.configSpec
}
Expand All @@ -310,7 +291,7 @@ func (p *updatePipeline) RerenderTemplate() *updatePipeline {
if p.isDone() {
return
}
if needRerender(p.ConfigMapObj, p.item) {
if intctrlutil.IsRerender(p.ConfigMapObj, p.item) {
p.newCM, err = p.renderWrapper.rerenderConfigTemplate(p.ctx.Cluster, p.ctx.Component, *p.configSpec, &p.item)
} else {
p.newCM = p.ConfigMapObj.DeepCopy()
Expand Down Expand Up @@ -409,18 +390,3 @@ func (p *updatePipeline) SyncStatus() *updatePipeline {
return
})
}

func needRerender(obj *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool {
if obj == nil {
return true
}
if item.Version == "" {
return false
}

version, ok := obj.Annotations[constant.CMConfigurationTemplateVersion]
if !ok || version != item.Version {
return true
}
return false
}
Loading

0 comments on commit 1f87a87

Please sign in to comment.