Skip to content

Commit

Permalink
Merge branch 'main' into support/improve-backup-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ldming committed Oct 7, 2023
2 parents 84e5280 + 1f87a87 commit 105f2d5
Show file tree
Hide file tree
Showing 20 changed files with 315 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cicd-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ jobs:
if: contains(needs.trigger-mode.outputs.trigger-mode, '[docker]')
uses: apecloud/apecloud-cd/.github/workflows/[email protected]
with:
MAKE_OPS_PRE: "generate test-go-generate"
MAKE_OPS_PRE: "module generate test-go-generate"
IMG: "apecloud/kubeblocks-tools"
GO_VERSION: "1.21"
BUILDX_PLATFORMS: "linux/amd64"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cicd-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ jobs:
if: ${{ contains(needs.trigger-mode.outputs.trigger-mode, '[docker]') }}
uses: apecloud/apecloud-cd/.github/workflows/[email protected]
with:
MAKE_OPS_PRE: "generate test-go-generate"
MAKE_OPS_PRE: "module generate test-go-generate"
IMG: "apecloud/kubeblocks-tools"
GO_VERSION: "1.21"
BUILDX_PLATFORMS: "linux/amd64"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
needs: image-tag
uses: apecloud/apecloud-cd/.github/workflows/[email protected]
with:
MAKE_OPS_PRE: "generate test-go-generate"
MAKE_OPS_PRE: "module generate test-go-generate"
IMG: "apecloud/kubeblocks-tools"
VERSION: "${{ needs.image-tag.outputs.tag-name }}"
GO_VERSION: "1.21"
Expand Down
3 changes: 2 additions & 1 deletion apis/apps/v1alpha1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ package v1alpha1

// ConfigurationPhase defines the Configuration FSM phase
// +enum
// +kubebuilder:validation:Enum={Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
// +kubebuilder:validation:Enum={Creating,Init,Running,Pending,Merged,MergeFailed,FailedAndPause,Upgrading,Deleting,FailedAndRetry,Finished}
type ConfigurationPhase string

const (
CCreatingPhase ConfigurationPhase = "Creating"
CInitPhase ConfigurationPhase = "Init"
CRunningPhase ConfigurationPhase = "Running"
CPendingPhase ConfigurationPhase = "Pending"
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
62 changes: 29 additions & 33 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return r.failWithInvalidComponent(configuration, reqCtx)
}

if err := r.runTasks(reqCtx, configuration, fetcherTask, tasks); err != nil {
if err := r.runTasks(TaskContext{configuration, reqCtx, fetcherTask}, tasks); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "failed to run configuration reconcile task.")
}
if !isAllReady(configuration) {
Expand All @@ -130,55 +130,52 @@ func (r *ConfigurationReconciler) failWithInvalidComponent(configuration *appsv1
func isAllReady(configuration *appsv1alpha1.Configuration) bool {
for _, item := range configuration.Spec.ConfigItemDetails {
itemStatus := configuration.Status.GetItemStatus(item.Name)
if itemStatus == nil || itemStatus.Phase != appsv1alpha1.CFinishedPhase {
if itemStatus != nil && !isFinishStatus(itemStatus.Phase) {
return false
}
}
return true
}

func (r *ConfigurationReconciler) runTasks(
reqCtx intctrlutil.RequestCtx,
configuration *appsv1alpha1.Configuration,
fetcher *Task,
tasks []Task) (err error) {
var errs []error
var synthesizedComp *component.SynthesizedComponent

synthesizedComp, err = component.BuildComponent(reqCtx, nil,
fetcher.ClusterObj,
fetcher.ClusterDefObj,
fetcher.ClusterDefComObj,
fetcher.ClusterComObj,
func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (err error) {
var (
errs []error
synthesizedComp *component.SynthesizedComponent

ctx = taskCtx.reqCtx.Ctx
configuration = taskCtx.configuration
)

synthesizedComp, err = component.BuildComponent(taskCtx.reqCtx,
nil,
taskCtx.fetcher.ClusterObj,
taskCtx.fetcher.ClusterDefObj,
taskCtx.fetcher.ClusterDefComObj,
taskCtx.fetcher.ClusterComObj,
nil,
fetcher.ClusterVerComObj)
taskCtx.fetcher.ClusterVerComObj)
if err != nil {
return err
}

// TODO manager multiple version
patch := client.MergeFrom(configuration.DeepCopy())
revision := strconv.FormatInt(configuration.GetGeneration(), 10)
for _, task := range tasks {
if err := task.Do(fetcher, synthesizedComp, revision); err != nil {
task.Status.UpdateRevision = revision
if err := task.Do(taskCtx.fetcher, synthesizedComp, revision); err != nil {
task.Status.Phase = appsv1alpha1.CMergeFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
continue
}
task.Status.UpdateRevision = revision
task.Status.Phase = appsv1alpha1.CMergedPhase
if err := task.SyncStatus(fetcher, task.Status); err != nil {
task.Status.Phase = appsv1alpha1.CFailedPhase
task.Status.Message = cfgutil.ToPointer(err.Error())
errs = append(errs, err)
}
}

configuration.Status.Message = ""
if len(errs) > 0 {
configuration.Status.Message = utilerrors.NewAggregate(errs).Error()
}
if err := r.Client.Status().Patch(reqCtx.Ctx, configuration, patch); err != nil {
if err := r.Client.Status().Patch(ctx, configuration, patch); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
Expand Down Expand Up @@ -216,12 +213,11 @@ func fromItemStatus(ctx intctrlutil.RequestCtx, status *appsv1alpha1.Configurati
}

func isReconcileStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CRunningPhase ||
phase == appsv1alpha1.CInitPhase ||
phase == appsv1alpha1.CPendingPhase ||
phase == appsv1alpha1.CFailedPhase ||
phase == appsv1alpha1.CMergedPhase ||
phase == appsv1alpha1.CMergeFailedPhase ||
phase == appsv1alpha1.CUpgradingPhase ||
phase == appsv1alpha1.CFinishedPhase
return phase != "" &&
phase != appsv1alpha1.CCreatingPhase &&
phase != appsv1alpha1.CDeletingPhase
}

func isFinishStatus(phase appsv1alpha1.ConfigurationPhase) bool {
return phase == appsv1alpha1.CFinishedPhase || phase == appsv1alpha1.CFailedAndPausePhase
}
77 changes: 53 additions & 24 deletions controllers/apps/configuration/reconcile_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package configuration
import (
"strconv"

corev1 "k8s.io/api/core/v1"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/component"
Expand All @@ -35,46 +37,73 @@ type Task struct {
Status *appsv1alpha1.ConfigurationItemDetailStatus
Name string

Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
SyncStatus func(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) error
Do func(fetcher *Task, component *component.SynthesizedComponent, revision string) error
}

type TaskContext struct {
configuration *appsv1alpha1.Configuration
reqCtx intctrlutil.RequestCtx
fetcher *Task
}

func NewTask(item appsv1alpha1.ConfigurationItemDetail, status *appsv1alpha1.ConfigurationItemDetailStatus) Task {
return Task{
Name: item.Name,
Status: status,
Name: item.Name,
Do: func(fetcher *Task, synComponent *component.SynthesizedComponent, revision string) error {
configSpec := item.ConfigSpec
if configSpec == nil {
return core.MakeError("not found config spec: %s", item.Name)
}
reconcileTask := configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: synComponent,
PodSpec: synComponent.PodSpec,
}, item, status, configSpec)
return reconcileTask.ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err := fetcher.ConfigMap(item.Name).Complete(); err != nil {
return err
}
// Do reconcile for config template
configMap := fetcher.ConfigMapObj
switch intctrlutil.GetConfigSpecReconcilePhase(configMap, item, status) {
default:
return syncStatus(configMap, status)
case appsv1alpha1.CPendingPhase,
appsv1alpha1.CMergeFailedPhase:
return syncImpl(fetcher, item, status, synComponent, revision, configSpec)
case appsv1alpha1.CCreatingPhase:
return nil
}
},
SyncStatus: syncStatus,
Status: status,
}
}

func syncStatus(fetcher *Task, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
err = fetcher.ConfigMap(status.Name).Complete()
func syncImpl(fetcher *Task,
item appsv1alpha1.ConfigurationItemDetail,
status *appsv1alpha1.ConfigurationItemDetailStatus,
component *component.SynthesizedComponent,
revision string,
configSpec *appsv1alpha1.ComponentConfigSpec) (err error) {
err = configuration.NewReconcilePipeline(configuration.ReconcileCtx{
ResourceCtx: fetcher.ResourceCtx,
Cluster: fetcher.ClusterObj,
ClusterVer: fetcher.ClusterVerObj,
Component: component,
PodSpec: component.PodSpec,
}, item, status, configSpec).
ConfigMap(item.Name).
ConfigConstraints(configSpec.ConfigConstraintRef).
PrepareForTemplate().
RerenderTemplate().
ApplyParameters().
UpdateConfigVersion(revision).
Sync().
Complete()
if err != nil {
return
status.Phase = appsv1alpha1.CMergeFailedPhase
} else {
status.Phase = appsv1alpha1.CMergedPhase
}
return
}

annotations := fetcher.ConfigMapObj.GetAnnotations()
func syncStatus(configMap *corev1.ConfigMap, status *appsv1alpha1.ConfigurationItemDetailStatus) (err error) {
annotations := configMap.GetAnnotations()
// status.CurrentRevision = GetCurrentRevision(annotations)
revisions := RetrieveRevision(annotations)
if len(revisions) == 0 {
Expand Down
9 changes: 2 additions & 7 deletions controllers/apps/operations/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/configuration/core"
"github.com/apecloud/kubeblocks/internal/controller/configuration"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

Expand Down Expand Up @@ -216,15 +215,11 @@ func (r *reconfigureAction) syncReconfigureOperatorStatus(ctx intctrlutil.Reques
}

item := fetcher.ConfigurationObj.Spec.GetConfigurationItem(configSpec.Name)
status := fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)
if status == nil || item == nil {
if item == nil {
return appsv1alpha1.OpsRunningPhase, nil
}

if !configuration.IsApplyConfigChanged(fetcher.ConfigMapObj, *item) {
return appsv1alpha1.OpsRunningPhase, nil
}
switch status.Phase {
switch intctrlutil.GetConfigSpecReconcilePhase(fetcher.ConfigMapObj, *item, fetcher.ConfigurationObj.Status.GetItemStatus(configSpec.Name)) {
default:
return appsv1alpha1.OpsRunningPhase, nil
case appsv1alpha1.CFailedAndPausePhase:
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/crds/apps.kubeblocks.io_configurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ spec:
phase:
description: phase is status of configurationItem.
enum:
- Creating
- Init
- Running
- Pending
Expand Down
7 changes: 3 additions & 4 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ ENV GOPROXY=${GOPROXY}

WORKDIR /src
# Copy the Go Modules manifests
#COPY go.mod go.mod
#COPY go.sum go.sum
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN --mount=type=bind,target=. \
--mount=type=cache,target=/go/pkg/mod \
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download

# Copy the go source
Expand Down
7 changes: 3 additions & 4 deletions docker/Dockerfile-dataprotection
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ ENV GOPROXY=${GOPROXY}

WORKDIR /src
# Copy the Go Modules manifests
#COPY go.mod go.mod
#COPY go.sum go.sum
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN --mount=type=bind,target=. \
--mount=type=cache,target=/go/pkg/mod \
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download

# Copy the go source
Expand Down
7 changes: 3 additions & 4 deletions docker/Dockerfile-tools
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ ENV GOPROXY=${GOPROXY}
WORKDIR /src

# Copy the Go Modules manifests
#COPY go.mod go.mod
#COPY go.sum go.sum
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
# RUN go mod download
Expand All @@ -44,8 +44,7 @@ WORKDIR /src
#COPY cmd/cli/ cmd/cli/
#COPY apis/ apis/
#COPY test/testdata/testdata.go test/testdata/testdata.go
RUN --mount=type=bind,target=. \
--mount=type=cache,target=/go/pkg/mod \
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download

# Build
Expand Down
3 changes: 3 additions & 0 deletions internal/cli/cmd/plugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (p *Paths) IndexPluginsPath(name string) []string {
if _, err := os.Stat(filepath.Join(p.IndexPath(name), "krew-plugins")); err == nil {
result = append(result, filepath.Join(p.IndexPath(name), "krew-plugins"))
}
if _, err := os.Stat(filepath.Join(p.IndexPath(name), "cli-plugins")); err == nil {
result = append(result, filepath.Join(p.IndexPath(name), "cli-plugins"))
}
return result
}

Expand Down
11 changes: 9 additions & 2 deletions internal/cli/util/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package util

import (
"bytes"
"context"
"fmt"
"os/exec"
Expand Down Expand Up @@ -128,9 +129,15 @@ func GetKubeBlocksDeploy(client kubernetes.Interface) (*appsv1.Deployment, error
func GetDockerVersion() (*gv.Version, error) {
// exec cmd to get output from docker info --format '{{.ServerVersion}}'
cmd := exec.Command("docker", "info", "--format", "{{.ServerVersion}}")
var stderr bytes.Buffer
cmd.Stderr = &stderr
out, err := cmd.Output()
if err != nil {
return nil, err
if err != nil || stderr.String() != "" {
errMsg := stderr.String()
if errMsg == "" {
errMsg = err.Error()
}
return nil, fmt.Errorf("failed to get the docker version by executing \"docker info --format {{.ServerVersion}}\": %s", errMsg)
}
return gv.NewVersion(strings.TrimSpace(string(out)))
}
Loading

0 comments on commit 105f2d5

Please sign in to comment.