Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refine configuration reconcile and phase #5311

Merged
merged 2 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/apps/v1alpha1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ package v1alpha1

// ConfigurationPhase defines the Configuration FSM phase
// +enum
// +kubebuilder:validation:Enum={Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
// +kubebuilder:validation:Enum={Creating,Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
type ConfigurationPhase string

const (
CCreatingPhase ConfigurationPhase = "Creating"
CInitPhase ConfigurationPhase = "Init"
CRunningPhase ConfigurationPhase = "Running"
CPendingPhase ConfigurationPhase = "Pending"
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
62 changes: 29 additions & 33 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return r.failWithInvalidComponent(configuration, reqCtx)
}

if err := r.runTasks(reqCtx, configuration, fetcherTask, tasks); err != nil {
if err := r.runTasks(TaskContext{configuration, reqCtx, fetcherTask}, tasks); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "failed to run configuration reconcile task.")
}
if !isAllReady(configuration) {
Expand All @@ -130,55 +130,52 @@ func (r *ConfigurationReconciler) failWithInvalidComponent(configuration *appsv1
func isAllReady(configuration *appsv1alpha1.Configuration) bool {
for _, item := range configuration.Spec.ConfigItemDetails {
itemStatus := configuration.Status.GetItemStatus(item.Name)
if itemStatus == nil || itemStatus.Phase != appsv1alpha1.CFinishedPhase {
if itemStatus != nil && !isFinishStatus(itemStatus.Phase) {
return false
}
}
return true
}

func (r *ConfigurationReconciler) runTasks(
reqCtx intctrlutil.RequestCtx,
configuration *appsv1alpha1.Configuration,
fetcher *Task,
tasks []Task) (err error) {
var errs []error
var synthesizedComp *component.SynthesizedComponent

synthesizedComp, err = component.BuildComponent(reqCtx, nil,
fetcher.ClusterObj,
fetcher.ClusterDefObj,
fetcher.ClusterDefComObj,
fetcher.ClusterComObj,
func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (err error) {
var (
errs []error
synthesizedComp *component.SynthesizedComponent

ctx = taskCtx.reqCtx.Ctx
configuration = taskCtx.configuration
)

synthesizedComp, err = component.BuildComponent(taskCtx.reqCtx,
nil,
taskCtx.fetcher.ClusterObj,
taskCtx.fetcher.ClusterDefObj,
taskCtx.fetcher.ClusterDefComObj,
taskCtx.fetcher.ClusterComObj,
nil,
fetcher.ClusterVerComObj)
taskCtx.fetcher.ClusterVerComObj)
if err != nil {
return err
}

// TODO manager multiple version
patch := client.MergeFrom(configuration.DeepCopy())
revision := strconv.FormatInt(configuration.GetGeneration(), 10)
for _, task := range tasks {
if err := task.Do(fetcher, synthesizedComp, revision); err != nil {
task.Status.UpdateRevision = revision
if err := task.Do(taskCtx.fetcher, synthesizedComp, revision); err != nil {
task.Status.Phase = appsv1alpha1.CMergeFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
continue
}
task.Status.UpdateRevision = revision
task.Status.Phase = appsv1alpha1.CMergedPhase
if err := task.SyncStatus(fetcher, task.Status); err != nil {
task.Status.Phase = appsv1alpha1.CFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
}
}

configuration.Status.Message = ""
if len(errs) > 0 {
configuration.Status.Message = utilerrors.NewAggregate(errs).Error()
}
if err := r.Client.Status().Patch(reqCtx.Ctx, configuration, patch); err != nil {
if err := r.Client.Status().Patch(ctx, configuration, patch); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
Expand Down Expand Up @@ -216,12 +213,11 @@ func fromItemStatus(ctx intctrlutil.RequestCtx, status *appsv1alpha1.Configurati
}

func isReconcileStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CRunningPhase ||
phase == appsv1alpha1.CInitPhase ||
phase == appsv1alpha1.CPendingPhase ||
phase == appsv1alpha1.CFailedPhase ||
phase == appsv1alpha1.CMergedPhase ||
phase == appsv1alpha1.CMergeFailedPhase ||
phase == appsv1alpha1.CUpgradingPhase ||
phase == appsv1alpha1.CFinishedPhase
return phase != "" &&
phase != appsv1alpha1.CCreatingPhase &&
phase != appsv1alpha1.CDeletingPhase
}

func isFinishStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CFinishedPhase || phase == appsv1alpha1.CFailedAndPausePhase
}
77 changes: 53 additions & 24 deletions controllers/apps/configuration/reconcile_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package configuration
import (
"strconv"

corev1 "k8s.io/api/core/v1"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/component"
Expand All @@ -35,46 +37,73 @@ type Task struct {
Status *appsv1alpha1.ConfigurationItemDetailStatus
Name string

Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
SyncStatus func(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) error
Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
}

type TaskContext struct {
configuration *appsv1alpha1.Configuration
reqCtx intctrlutil.RequestCtx
fetcher *Task
}

func NewTask(item appsv1alpha1.ConfigurationItemDetail, status *appsv1alpha1.ConfigurationItemDetailStatus) Task {
return Task{
Name: item.Name,
Status: status,
Name: item.Name,
Do: func(fetcher *Task, synComponent *component.SynthesizedComponent, revision string) error {
configSpec := item.ConfigSpec
if configSpec == nil {
return core.MakeError("not found config spec: %s", item.Name)
}
reconcileTask := configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: synComponent,
PodSpec: synComponent.PodSpec,
}, item, status, configSpec)
return reconcileTask.ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err := fetcher.ConfigMap(item.Name).Complete(); err != nil {
return err
}
// Do reconcile for config template
configMap := fetcher.ConfigMapObj
switch intctrlutil.GetConfigSpecReconcilePhase(configMap, item, status) {
default:
return syncStatus(configMap, status)
case appsv1alpha1.CPendingPhase,
appsv1alpha1.CMergeFailedPhase:
return syncImpl(fetcher, item, status, synComponent, revision, configSpec)
case appsv1alpha1.CCreatingPhase:
return nil
}
},
SyncStatus: syncStatus,
Status: status,
}
}

func syncStatus(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
err = fetcher.ConfigMap(status.Name).Complete()
func syncImpl(fetcher *Task,
item appsv1alpha1.ConfigurationItemDetail,
status *appsv1alpha1.ConfigurationItemDetailStatus,
component *component.SynthesizedComponent,
revision string,
configSpec *appsv1alpha1.ComponentConfigSpec) (err error) {
err = configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: component,
PodSpec: component.PodSpec,
}, item, status, configSpec).
ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err != nil {
return
status.Phase = appsv1alpha1.CMergeFailedPhase
} else {
status.Phase = appsv1alpha1.CMergedPhase
}
return
}

annotations := fetcher.ConfigMapObj.GetAnnotations()
func syncStatus(configMap *corev1.ConfigMap, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
annotations := configMap.GetAnnotations()
// status.CurrentRevision = GetCurrentRevision(annotations)
revisions := RetrieveRevision(annotations)
if len(revisions) == 0 {
Expand Down
9 changes: 2 additions & 7 deletions controllers/apps/operations/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/configuration"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

Expand Down Expand Up @@ -216,15 +215,11 @@ func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.Reques
}

item := fetcher.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name)
status := fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)
if status == nil || item == nil {
if item == nil {
return appsv1alpha1.OpsRunningPhase, nil
}

if !configuration.IsApplyConfigChanged(fetcher.ConfigMapObj, *item) {
return appsv1alpha1.OpsRunningPhase, nil
}
switch status.Phase {
switch intctrlutil.GetConfigSpecReconcilePhase(fetcher.ConfigMapObj, *item, fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)) {
default:
return appsv1alpha1.OpsRunningPhase, nil
case appsv1alpha1.CFailedAndPausePhase:
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/crds/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
38 changes: 2 additions & 36 deletions internal/controller/configuration/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package configuration

import (
"encoding/json"
"reflect"
"strconv"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -257,7 +255,7 @@ func (p *updatePipeline) isDone() bool {

func (p *updatePipeline) PrepareForTemplate() *updatePipeline {
buildTemplate := func() (err error) {
p.reconcile = !IsApplyConfigChanged(p.ConfigMapObj, p.item)
p.reconcile = !intctrlutil.IsApplyConfigChanged(p.ConfigMapObj, p.item)
if p.isDone() {
return
}
Expand All @@ -272,23 +270,6 @@ func (p *updatePipeline) PrepareForTemplate() *updatePipeline {
return p.Wrap(buildTemplate)
}

func IsApplyConfigChanged(cm *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool {
if cm == nil {
return false
}

lastAppliedVersion, ok := cm.Annotations[constant.ConfigAppliedVersionAnnotationKey]
if !ok {
return false
}
var target appsv1alpha1.ConfigurationItemDetail
if err := json.Unmarshal([]byte(lastAppliedVersion), &target); err != nil {
return false
}

return reflect.DeepEqual(target, item)
}

func (p *updatePipeline) ConfigSpec() *appsv1alpha1.ComponentConfigSpec {
return p.configSpec
}
Expand All @@ -310,7 +291,7 @@ func (p *updatePipeline) RerenderTemplate() *updatePipeline {
if p.isDone() {
return
}
if needRerender(p.ConfigMapObj, p.item) {
if intctrlutil.IsRerender(p.ConfigMapObj, p.item) {
p.newCM, err = p.renderWrapper.rerenderConfigTemplate(p.ctx.Cluster, p.ctx.Component, *p.configSpec, &p.item)
} else {
p.newCM = p.ConfigMapObj.DeepCopy()
Expand Down Expand Up @@ -409,18 +390,3 @@ func (p *updatePipeline) SyncStatus() *updatePipeline {
return
})
}

func needRerender(obj *corev1.ConfigMap, item appsv1alpha1.ConfigurationItemDetail) bool {
if obj == nil {
return true
}
if item.Version == "" {
return false
}

version, ok := obj.Annotations[constant.CMConfigurationTemplateVersion]
if !ok || version != item.Version {
return true
}
return false
}
Loading