Skip to content

Commit

Permalink
chore: sync ops status for reconfiguring progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sophon-zt committed Oct 20, 2023
1 parent 37945f7 commit 88677d9
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 198 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,10 @@ type ConfigurationItemStatus struct {
// +optional
Status string `json:"status,omitempty"`

// message describes the details about this operation.
// +optional
Message string `json:"message,omitempty"`

// succeedCount describes the number of successful reconfiguring.
// +kubebuilder:default=0
// +optional
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,9 @@ spec:
description: lastStatus describes the last status for the
reconfiguring controller.
type: string
message:
description: message describes the details about this operation.
type: string
name:
description: name is a config template name.
maxLength: 63
Expand Down
10 changes: 10 additions & 0 deletions controllers/apps/configuration/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func GcRevision(annotations map[string]string) []ConfigurationRevision {
return revisions[0 : len(revisions)-revisionHistoryLimit]
}

func GetLastRevision(annotations map[string]string, revision int64) (ConfigurationRevision, bool) {
revisions := RetrieveRevision(annotations)
for i := len(revisions) - 1; i >= 0; i++ {
if revisions[i].Revision == revision {
return revisions[i], true
}
}
return ConfigurationRevision{}, false
}

func RetrieveRevision(annotations map[string]string) []ConfigurationRevision {
var revisions []ConfigurationRevision
var revisionPrefix = constant.LastConfigurationRevisionPhase + "-"
Expand Down
5 changes: 5 additions & 0 deletions controllers/apps/configuration/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

func TestGcConfigRevision(t *testing.T) {
Expand Down Expand Up @@ -95,6 +96,10 @@ func TestParseRevision(t *testing.T) {
StrRevision: "120000",
Revision: 120000,
Phase: appsv1alpha1.CPendingPhase,
Result: intctrlutil.Result{
Phase: appsv1alpha1.CPendingPhase,
Revision: "120000",
},
},
wantErr: false,
}, {
Expand Down
224 changes: 84 additions & 140 deletions controllers/apps/operations/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package operations
import (
"time"

"github.com/spf13/cast"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/apps/configuration"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand All @@ -45,7 +47,7 @@ func init() {
OpsHandler: &reAction,
ProcessingReasonInClusterCondition: ProcessingReasonReconfiguring,
}
intctrlutil.ConfigEventHandlerMap["ops_status_reconfigure"] = &reAction
// intctrlutil.ConfigEventHandlerMap["ops_status_reconfigure"] = &reAction
opsManager.RegisterOps(appsv1alpha1.ReconfiguringType, reconfigureBehaviour)
}

Expand All @@ -58,85 +60,17 @@ func (r *reconfigureAction) SaveLastConfiguration(reqCtx intctrlutil.RequestCtx,
return nil
}

func (r *reconfigureAction) Handle(eventContext intctrlutil.ConfigEventContext, lastOpsRequest string, phase appsv1alpha1.OpsPhase, cfgError error) error {
var (
opsRequest = &appsv1alpha1.OpsRequest{}
cm = eventContext.ConfigMap
cli = eventContext.Client
ctx = eventContext.ReqCtx.Ctx
)

opsRes := &OpsResource{
OpsRequest: opsRequest,
Recorder: eventContext.ReqCtx.Recorder,
Cluster: eventContext.Cluster,
}

if len(lastOpsRequest) == 0 {
return nil
}
if err := cli.Get(ctx, client.ObjectKey{
Name: lastOpsRequest,
Namespace: cm.Namespace,
}, opsRequest); err != nil {
return err
}

opsDeepCopy := opsRequest.DeepCopy()
if err := patchReconfigureOpsStatus(opsRes, eventContext.ConfigSpecName,
handleReconfigureStatusProgress(eventContext.PolicyStatus, phase, &opsRequest.Status)); err != nil {
return err
}

switch phase {
case appsv1alpha1.OpsSucceedPhase:
// only update the condition of the opsRequest.
eventContext.ReqCtx.Recorder.Eventf(opsRequest,
corev1.EventTypeNormal,
appsv1alpha1.ReasonReconfigureSucceed,
"the reconfigure has been processed successfully")
return PatchOpsStatusWithOpsDeepCopy(ctx, cli, opsRes, opsDeepCopy, appsv1alpha1.OpsRunningPhase,
appsv1alpha1.NewReconfigureRunningCondition(opsRequest,
appsv1alpha1.ReasonReconfigureSucceed,
eventContext.ConfigSpecName,
formatConfigPatchToMessage(eventContext.ConfigPatch, &eventContext.PolicyStatus)),
appsv1alpha1.NewSucceedCondition(opsRequest))
case appsv1alpha1.OpsFailedPhase:
eventContext.ReqCtx.Recorder.Eventf(opsRequest,
corev1.EventTypeWarning,
appsv1alpha1.ReasonReconfigureFailed,
"failed to process the reconfigure, error: %v", cfgError)
return PatchOpsStatusWithOpsDeepCopy(ctx, cli, opsRes, opsDeepCopy, appsv1alpha1.OpsRunningPhase,
appsv1alpha1.NewReconfigureRunningCondition(opsRequest,
appsv1alpha1.ReasonReconfigureFailed,
eventContext.ConfigSpecName,
formatConfigPatchToMessage(eventContext.ConfigPatch, &eventContext.PolicyStatus)),
appsv1alpha1.NewReconfigureFailedCondition(opsRequest, cfgError))
default:
return PatchOpsStatusWithOpsDeepCopy(ctx, cli, opsRes, opsDeepCopy, appsv1alpha1.OpsRunningPhase,
appsv1alpha1.NewReconfigureRunningCondition(opsRequest,
appsv1alpha1.ReasonReconfigureRunning,
eventContext.ConfigSpecName))
}
}

func handleReconfigureStatusProgress(execStatus core.PolicyExecStatus, phase appsv1alpha1.OpsPhase, opsStatus *appsv1alpha1.OpsRequestStatus) handleReconfigureOpsStatus {
func handleReconfigureStatusProgress(result intctrlutil.Result, opsStatus *appsv1alpha1.OpsRequestStatus) handleReconfigureOpsStatus {
return func(cmStatus *appsv1alpha1.ConfigurationItemStatus) (err error) {
cmStatus.LastAppliedStatus = execStatus.ExecStatus
cmStatus.UpdatePolicy = appsv1alpha1.UpgradePolicy(execStatus.PolicyName)
cmStatus.SucceedCount = execStatus.SucceedCount
cmStatus.ExpectedCount = execStatus.ExpectedCount
cmStatus.LastAppliedStatus = result.ExecResult
cmStatus.UpdatePolicy = appsv1alpha1.UpgradePolicy(result.Policy)
cmStatus.SucceedCount = result.SucceedCount
cmStatus.ExpectedCount = result.ExpectedCount
cmStatus.Message = result.Message
cmStatus.Status = string(result.Phase)
if cmStatus.SucceedCount != core.Unconfirmed && cmStatus.ExpectedCount != core.Unconfirmed {
opsStatus.Progress = getSlowestReconfiguringProgress(opsStatus.ReconfiguringStatus.ConfigurationStatus)
}
switch phase {
case appsv1alpha1.OpsSucceedPhase:
cmStatus.Status = appsv1alpha1.ReasonReconfigureSucceed
case appsv1alpha1.OpsFailedPhase:
cmStatus.Status = appsv1alpha1.ReasonReconfigureFailed
default:
cmStatus.Status = appsv1alpha1.ReasonReconfigureRunning
}
return
}
}
Expand All @@ -156,48 +90,11 @@ func handleNewReconfigureRequest(configPatch *core.ConfigPatchInfo, lastAppliedC
}
}

func (r *reconfigureAction) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (appsv1alpha1.OpsPhase, time.Duration, error) {
status := opsRes.OpsRequest.Status
if len(status.Conditions) == 0 {
return status.Phase, 30 * time.Second, nil
}
condition := status.Conditions[len(status.Conditions)-1]
isNoChanged := isNoChange(condition)
if isSucceedPhase(condition) || isNoChanged {
// TODO Sync reload progress from config manager.
return appsv1alpha1.OpsSucceedPhase, 0, nil
}
if isFailedPhase(condition) {
// TODO Sync reload progress from config manager.
return appsv1alpha1.OpsFailedPhase, 0, nil
}
if !isRunningPhase(condition) {
return appsv1alpha1.OpsRunningPhase, 30 * time.Second, nil
}

ops := &opsRes.OpsRequest.Spec
if ops.Reconfigure == nil || len(ops.Reconfigure.Configurations) == 0 {
return appsv1alpha1.OpsFailedPhase, 0, nil
}
phase, err := r.syncReconfigureOperatorStatus(reqCtx, cli, opsRes)
switch {
default:
return appsv1alpha1.OpsRunningPhase, 30 * time.Second, nil
case err != nil:
return "", 30 * time.Second, err
case phase == appsv1alpha1.OpsFailedPhase:
return appsv1alpha1.OpsFailedPhase, 0, err
case phase == appsv1alpha1.OpsSucceedPhase:
return appsv1alpha1.OpsSucceedPhase, 0, nil
}
}

func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (appsv1alpha1.OpsPhase, error) {

func (r *reconfigureAction) syncStatus(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (*intctrlutil.Fetcher, error) {
ops := &opsRes.OpsRequest.Spec
configSpec := ops.Reconfigure.Configurations[0]
fetcher := intctrlutil.NewResourceFetcher(&intctrlutil.ResourceCtx{
Context: ctx.Ctx,
Context: reqCtx.Ctx,
Client: cli,
Namespace: opsRes.Cluster.Namespace,
ClusterName: ops.ClusterRef,
Expand All @@ -211,22 +108,63 @@ func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.Reques
ConfigMap(configSpec.Name).
Complete()
if err != nil {
return appsv1alpha1.OpsRunningPhase, err
return nil, err
}
return fetcher, nil
}

func (r *reconfigureAction) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (appsv1alpha1.OpsPhase, time.Duration, error) {
status := opsRes.OpsRequest.Status
if len(status.Conditions) == 0 {
return status.Phase, 30 * time.Second, nil
}
if isNoChange(status.Conditions) {
return appsv1alpha1.OpsSucceedPhase, 0, nil
}

item := fetcher.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name)
ops := &opsRes.OpsRequest.Spec
if ops.Reconfigure == nil || len(ops.Reconfigure.Configurations) == 0 {
return appsv1alpha1.OpsFailedPhase, 0, nil
}

resource, err := r.syncStatus(reqCtx, cli, opsRes)
if err != nil {
return "", 30 * time.Second, err
}
configSpec := ops.Reconfigure.Configurations[0]
item := resource.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name)
if item == nil {
return appsv1alpha1.OpsRunningPhase, nil
return appsv1alpha1.OpsRunningPhase, 30 * time.Second, nil
}

phase := intctrlutil.GetConfigSpecReconcilePhase(
resource.ConfigMapObj,
*item,
resource.ConfigurationObj.Status.GetItemStatus(configSpec.Name),
)
if phase == appsv1alpha1.CCreatingPhase {
return appsv1alpha1.OpsFailedPhase, 0, core.MakeError("the configuration is creating")
}
if phase == appsv1alpha1.CFailedAndPausePhase {
return appsv1alpha1.OpsFailedPhase, 0, nil
}
if phase == appsv1alpha1.CFinishedPhase {
return appsv1alpha1.OpsSucceedPhase, 0, nil
}

switch intctrlutil.GetConfigSpecReconcilePhase(fetcher.ConfigMapObj, *item, fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)) {
default:
return appsv1alpha1.OpsRunningPhase, nil
case appsv1alpha1.CFailedAndPausePhase:
return appsv1alpha1.OpsFailedPhase, nil
case appsv1alpha1.CFinishedPhase:
return appsv1alpha1.OpsSucceedPhase, nil
result := fromReconfiguringResult(resource.ConfigMapObj, resource.ConfigurationObj.GetGeneration(), phase)
opsDeepCopy := opsRes.OpsRequest.DeepCopy()
if err := patchReconfigureOpsStatus(opsRes, configSpec.Name,
handleReconfigureStatusProgress(result, &opsRes.OpsRequest.Status)); err != nil {
return "", 30 * time.Second, err
}
if err := PatchOpsStatusWithOpsDeepCopy(reqCtx.Ctx, cli, opsRes, opsDeepCopy, appsv1alpha1.OpsRunningPhase,
appsv1alpha1.NewReconfigureRunningCondition(opsRes.OpsRequest,
appsv1alpha1.ReasonReconfigureRunning,
configSpec.Name)); err != nil {
return "", 30 * time.Second, err
}
return appsv1alpha1.OpsRunningPhase, 30 * time.Second, nil
}

func isExpectedPhase(condition metav1.Condition, expectedTypes []string, expectedStatus metav1.ConditionStatus) bool {
Expand All @@ -238,21 +176,13 @@ func isExpectedPhase(condition metav1.Condition, expectedTypes []string, expecte
return false
}

func isSucceedPhase(condition metav1.Condition) bool {
return isExpectedPhase(condition, []string{appsv1alpha1.ConditionTypeSucceed, appsv1alpha1.ReasonReconfigureSucceed}, metav1.ConditionTrue)
}

func isNoChange(condition metav1.Condition) bool {
return isExpectedPhase(condition, []string{appsv1alpha1.ReasonReconfigureNoChanged}, metav1.ConditionTrue)
}

func isFailedPhase(condition metav1.Condition) bool {
return isExpectedPhase(condition, []string{appsv1alpha1.ConditionTypeFailed, appsv1alpha1.ReasonReconfigureFailed}, metav1.ConditionFalse)
}

func isRunningPhase(condition metav1.Condition) bool {
return isExpectedPhase(condition, []string{appsv1alpha1.ReasonReconfigureRunning, appsv1alpha1.ReasonReconfigureMerged},
metav1.ConditionTrue)
func isNoChange(conditions []metav1.Condition) bool {
for i := len(conditions); i > 0; i-- {
if isExpectedPhase(conditions[i-1], []string{appsv1alpha1.ReasonReconfigureNoChanged}, metav1.ConditionTrue) {
return true
}
}
return false
}

func (r *reconfigureAction) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, resource *OpsResource) error {
Expand Down Expand Up @@ -325,3 +255,17 @@ func needReconfigure(request *appsv1alpha1.OpsRequest) bool {
}
return true
}

func fromReconfiguringResult(obj *corev1.ConfigMap, revision int64, phase appsv1alpha1.ConfigurationPhase) intctrlutil.Result {
result, ok := configuration.GetLastRevision(obj.ObjectMeta.Annotations, revision)
if ok {
return result.Result
}

return intctrlutil.Result{
Phase: phase,
Revision: cast.ToString(revision),
SucceedCount: core.Unconfirmed,
ExpectedCount: core.Unconfirmed,
}
}
Loading

0 comments on commit 88677d9

Please sign in to comment.