Skip to content

Commit

Permalink
fix: stop data protection deployment when upgrade kubeblocks (#5561)
Browse files Browse the repository at this point in the history
  • Loading branch information
ldming authored Oct 20, 2023
1 parent 0e307bd commit 184cbf1
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 65 deletions.
64 changes: 40 additions & 24 deletions pkg/cli/cmd/kubeblocks/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"github.com/hashicorp/go-version"
"github.com/pkg/errors"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/templates"
Expand All @@ -52,6 +55,8 @@ var (
kbcli kubeblocks upgrade --set replicaCount=3`)
)

type getDeploymentFunc func(client kubernetes.Interface) (*appsv1.Deployment, error)

func newUpgradeCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
o := &InstallOptions{
Options: Options{
Expand Down Expand Up @@ -81,6 +86,7 @@ func newUpgradeCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra
}

func (o *InstallOptions) Upgrade() error {
klog.V(1).Info("##### Start to upgrade KubeBlocks #####")
if o.HelmCfg.Namespace() == "" {
ns, err := util.GetKubeBlocksNamespace(o.Client)
if err != nil || ns == "" {
Expand Down Expand Up @@ -119,8 +125,7 @@ func (o *InstallOptions) Upgrade() error {
return err
}

// double check for KubeBlocks upgrade
// and only check when KubeBlocks version change
// double check when KubeBlocks version change
if !o.autoApprove && o.Version != "" {
oldVersion, err := version.NewVersion(kbVersion)
if err != nil {
Expand Down Expand Up @@ -156,7 +161,15 @@ func (o *InstallOptions) Upgrade() error {
// KubeBlocks after upgrade.
s = spinner.New(o.Out, spinnerMsg("Stop KubeBlocks "+kbVersion))
defer s.Fail()
if err = o.stopKubeBlocks(); err != nil {
if err = o.stopDeployment(util.GetKubeBlocksDeploy); err != nil {
return err
}
s.Success()

// stop the data protection deployment
s = spinner.New(o.Out, spinnerMsg("Stop DataProtection"))
defer s.Fail()
if err = o.stopDeployment(util.GetDataProtectionDeploy); err != nil {
return err
}
s.Success()
Expand Down Expand Up @@ -186,37 +199,40 @@ func (o *InstallOptions) upgradeChart() error {
return o.buildChart().Upgrade(o.HelmCfg)
}

// stopKubeBlocks stops the old version KubeBlocks by setting the replicas of
// KubeBlocks deployment to 0
func (o *InstallOptions) stopKubeBlocks() error {
kbDeploy, err := util.GetKubeBlocksDeploy(o.Client)
// stopDeployment stops the deployment by setting the replicas to 0
func (o *InstallOptions) stopDeployment(getDeployFn getDeploymentFunc) error {
deploy, err := getDeployFn(o.Client)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// if KubeBlocks is not deployed, just return
if kbDeploy == nil {
klog.V(1).Info("KubeBlocks is not deployed, no need to stop")
if deploy == nil {
klog.V(1).Info("deployment is not found, no need to stop")
return nil
}

if _, err = o.Client.AppsV1().Deployments(kbDeploy.Namespace).Patch(
context.TODO(), kbDeploy.Name, apitypes.JSONPatchType,
if _, err = o.Client.AppsV1().Deployments(deploy.Namespace).Patch(
context.TODO(), deploy.Name, apitypes.JSONPatchType,
[]byte(`[{"op": "replace", "path": "/spec/replicas", "value": 0}]`),
metav1.PatchOptions{}); err != nil {
return err
}

// wait for KubeBlocks to be stopped
return wait.PollImmediate(5*time.Second, o.Timeout, func() (bool, error) {
kbDeploy, err = util.GetKubeBlocksDeploy(o.Client)
if err != nil {
return false, err
}
if *kbDeploy.Spec.Replicas == 0 && kbDeploy.Status.Replicas == 0 &&
kbDeploy.Status.AvailableReplicas == 0 {
return true, nil
}
return false, nil
})
// wait for deployment to be stopped
return wait.PollUntilContextTimeout(context.Background(), 5*time.Second, o.Timeout, true,
func(_ context.Context) (bool, error) {
deploy, err = util.GetKubeBlocksDeploy(o.Client)
if err != nil {
return false, err
}
if *deploy.Spec.Replicas == 0 &&
deploy.Status.Replicas == 0 &&
deploy.Status.AvailableReplicas == 0 {
return true, nil
}
return false, nil
})
}
43 changes: 13 additions & 30 deletions pkg/cli/cmd/kubeblocks/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ var _ = Describe("kubeblocks upgrade", func() {
tf.Cleanup()
})

mockKubeBlocksDeploy := func() *appsv1.Deployment {
deploy := &appsv1.Deployment{}
deploy.SetLabels(map[string]string{
"app.kubernetes.io/component": "apps",
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/version": "0.3.0",
})
return deploy
}

It("check upgrade", func() {
var cfg string
cmd = newUpgradeCmd(tf, streams)
Expand All @@ -73,21 +83,12 @@ var _ = Describe("kubeblocks upgrade", func() {
})

It("double-check when version change", func() {
mockDeploy := func() *appsv1.Deployment {
deploy := &appsv1.Deployment{}
deploy.SetLabels(map[string]string{
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/version": "0.3.0",
})
return deploy
}

o := &InstallOptions{
Options: Options{
IOStreams: streams,
HelmCfg: helm.NewFakeConfig(namespace),
Namespace: "default",
Client: testing.FakeClientSet(mockDeploy()),
Client: testing.FakeClientSet(mockKubeBlocksDeploy()),
Dynamic: testing.FakeDynamicClient(),
},
Version: "0.5.0-fake",
Expand All @@ -102,21 +103,12 @@ var _ = Describe("kubeblocks upgrade", func() {
})

It("helm ValueOpts upgrade", func() {
mockDeploy := func() *appsv1.Deployment {
deploy := &appsv1.Deployment{}
deploy.SetLabels(map[string]string{
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/version": "0.3.0",
})
return deploy
}

o := &InstallOptions{
Options: Options{
IOStreams: streams,
HelmCfg: helm.NewFakeConfig(namespace),
Namespace: "default",
Client: testing.FakeClientSet(mockDeploy()),
Client: testing.FakeClientSet(mockKubeBlocksDeploy()),
Dynamic: testing.FakeDynamicClient(),
},
Version: "",
Expand All @@ -126,21 +118,12 @@ var _ = Describe("kubeblocks upgrade", func() {
})

It("run upgrade", func() {
mockDeploy := func() *appsv1.Deployment {
deploy := &appsv1.Deployment{}
deploy.SetLabels(map[string]string{
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/version": "0.3.0",
})
return deploy
}

o := &InstallOptions{
Options: Options{
IOStreams: streams,
HelmCfg: helm.NewFakeConfig(namespace),
Namespace: "default",
Client: testing.FakeClientSet(mockDeploy()),
Client: testing.FakeClientSet(mockKubeBlocksDeploy()),
Dynamic: testing.FakeDynamicClient(),
},
Version: version.DefaultKubeBlocksVersion,
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/testing/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,8 @@ func FakeKBDeploy(version string) *appsv1.Deployment {
},
}
deploy.SetLabels(map[string]string{
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/name": types.KubeBlocksChartName,
"app.kubernetes.io/component": "apps",
})
if len(version) > 0 {
deploy.Labels["app.kubernetes.io/version"] = version
Expand Down
41 changes: 32 additions & 9 deletions pkg/cli/util/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ import (
"github.com/apecloud/kubeblocks/version"
)

const (
// kubeblocksAppComponent the value of app.kubernetes.io/component label for KubeBlocks deployment
kubeblocksAppComponent = "apps"
// dataprotectionAppComponent the value of app.kubernetes.io/component label for DataProtection deployment
dataprotectionAppComponent = "dataprotection"
)

type Version struct {
KubeBlocks string
Kubernetes string
Expand Down Expand Up @@ -104,27 +111,43 @@ func GetK8sVersion(discoveryClient discovery.DiscoveryInterface) (string, error)
// GetKubeBlocksDeploy gets KubeBlocks deployments, now one kubernetes cluster
// only support one KubeBlocks
func GetKubeBlocksDeploy(client kubernetes.Interface) (*appsv1.Deployment, error) {
deploys, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=" + types.KubeBlocksChartName,
})
deploys, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(context.Background(),
metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s,app.kubernetes.io/component=%s",
types.KubeBlocksChartName, kubeblocksAppComponent),
})
if err != nil {
return nil, err
}
if deploys == nil || len(deploys.Items) == 0 {
return nil, nil
}
if len(deploys.Items) > 1 {
// for compatibility with older versions, filter here instead of LabelSelector
for _, i := range deploys.Items {
if _, ok := i.Labels["app.kubernetes.io/component"]; ok {
return &i, nil
}
}
return nil, fmt.Errorf("found multiple KubeBlocks deployments, please check your cluster")
}
return &deploys.Items[0], nil
}

// GetDataProtectionDeploy gets DataProtection deployments, now one kubernetes cluster
// only support one DataProtection
func GetDataProtectionDeploy(client kubernetes.Interface) (*appsv1.Deployment, error) {
deploys, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(context.Background(),
metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s,app.kubernetes.io/component=%s",
types.KubeBlocksChartName, dataprotectionAppComponent),
})
if err != nil {
return nil, err
}
if deploys == nil || len(deploys.Items) == 0 {
return nil, nil
}
if len(deploys.Items) > 1 {
return nil, fmt.Errorf("found multiple DataProtection deployments, please check your cluster")
}
return &deploys.Items[0], nil
}

// GetDockerVersion get Docker Version
func GetDockerVersion() (*gv.Version, error) {
// exec cmd to get output from docker info --format '{{.ServerVersion}}'
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/util/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("version util", func() {
Expect(v.Cli).ShouldNot(BeEmpty())
})

It("get vsion info when KubeBlocks is deployed", func() {
It("get version info when KubeBlocks is deployed", func() {
client := testing.FakeClientSet(testing.FakeKBDeploy(kbVersion))
v, err := GetVersionInfo(client)
Expect(err).Should(Succeed())
Expand Down

0 comments on commit 184cbf1

Please sign in to comment.