Skip to content
This repository has been archived by the owner on May 28, 2021. It is now read-only.

Commit

Permalink
Implement MySQL version upgrade support (#207)
Browse files Browse the repository at this point in the history
* Implement MySQL version upgrade support
* Test MySQL upgrades
  • Loading branch information
prydie authored and owainlewis committed Aug 17, 2018
1 parent 0306087 commit bc88e14
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 5 deletions.
73 changes: 70 additions & 3 deletions pkg/controllers/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"fmt"
"strings"
"time"

apps "k8s.io/api/apps/v1beta1"
Expand All @@ -38,6 +39,7 @@ import (
record "k8s.io/client-go/tools/record"
workqueue "k8s.io/client-go/util/workqueue"

"github.com/coreos/go-semver/semver"
"github.com/golang/glog"
"github.com/pkg/errors"

Expand Down Expand Up @@ -390,9 +392,13 @@ func (m *MySQLController) syncHandler(key string) error {
}

// Upgrade the required component resources the current MySQLOperator version.
err = m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion())
if err != nil {
return err
if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil {
return errors.Wrap(err, "ensuring MySQL Operator version")
}

// Upgrade the MySQL server version if required.
if err := m.ensureMySQLVersion(cluster, ss); err != nil {
return errors.Wrap(err, "ensuring MySQL version")
}

// If this number of the members on the Cluster does not equal the
Expand Down Expand Up @@ -421,6 +427,67 @@ func (m *MySQLController) syncHandler(key string) error {
return nil
}

func getMySQLContainerIndex(containers []corev1.Container) (int, error) {
for i, c := range containers {
if c.Name == statefulsets.MySQLServerName {
return i, nil
}
}

return 0, errors.Errorf("no %q container found", statefulsets.MySQLServerName)
}

// splitImage splits an image into its name and version.
func splitImage(image string) (string, string, error) {
parts := strings.Split(image, ":")
if len(parts) < 2 {
return "", "", errors.Errorf("invalid image %q", image)
}
return strings.Join(parts[:len(parts)-1], ""), parts[len(parts)-1], nil
}

func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error {
index, err := getMySQLContainerIndex(ss.Spec.Template.Spec.Containers)
if err != nil {
return errors.Wrapf(err, "getting MySQL container for StatefulSet %q", ss.Name)
}
imageName, actualVersion, err := splitImage(ss.Spec.Template.Spec.Containers[index].Image)
if err != nil {
return errors.Wrapf(err, "getting MySQL version for StatefulSet %q", ss.Name)
}

actual, err := semver.NewVersion(actualVersion)
if err != nil {
return errors.Wrap(err, "parsing StatuefulSet MySQL version")
}
expected, err := semver.NewVersion(c.Spec.Version)
if err != nil {
return errors.Wrap(err, "parsing Cluster MySQL version")
}

switch expected.Compare(*actual) {
case -1:
return errors.Errorf("attempted unsupported downgrade from %q to %q", actual, expected)
case 0:
return nil
}

updated := ss.DeepCopy()
updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf("%s:%s", imageName, c.Spec.Version)
// NOTE: We do this as previously we defaulted to the OnDelete strategy
// so clusters created with previous versions would not support upgrades.
updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
}

err = m.statefulSetControl.Patch(ss, updated)
if err != nil {
return errors.Wrap(err, "patching StatefulSet")
}

return nil
}

// ensureMySQLOperatorVersion updates the MySQLOperator resource types that
//require it to make it consistent with the specified operator version.
func (m *MySQLController) ensureMySQLOperatorVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet, operatorVersion string) error {
Expand Down
73 changes: 73 additions & 0 deletions pkg/controllers/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"k8s.io/client-go/kubernetes/fake"
cache "k8s.io/client-go/tools/cache"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1"
"github.com/oracle/mysql-operator/pkg/constants"
"github.com/oracle/mysql-operator/pkg/controllers/util"
Expand All @@ -42,6 +45,76 @@ import (
buildversion "github.com/oracle/mysql-operator/pkg/version"
)

func TestGetMySQLContainerIndex(t *testing.T) {
testCases := map[string]struct {
containers []v1.Container
index int
errors bool
}{
"empty_errors": {
containers: []v1.Container{},
errors: true,
},
"mysql_server_only": {
containers: []v1.Container{{Name: "mysql"}},
index: 0,
},
"mysql_server_and_agent": {
containers: []v1.Container{{Name: "mysql-agent"}, {Name: "mysql"}},
index: 1,
},
"mysql_agent_only": {
containers: []v1.Container{{Name: "mysql-agent"}},
errors: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
index, err := getMySQLContainerIndex(tc.containers)
if tc.errors {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, index, tc.index)
}
})
}
}

func TestSplitImage(t *testing.T) {
testCases := map[string]struct {
image string
name string
version string
errors bool
}{
"8.0.11": {
image: "mysql/mysql-server:8.0.11",
name: "mysql/mysql-server",
version: "8.0.11",
errors: false,
},
"invalid": {
image: "mysql/mysql-server",
errors: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
name, version, err := splitImage(tc.image)
if tc.errors {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, name, tc.name)
assert.Equal(t, version, tc.version)
}
})
}
}

func mockOperatorConfig() operatoropts.MySQLOperatorOpts {
opts := operatoropts.MySQLOperatorOpts{}
opts.EnsureDefaults()
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/statefulsets/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic
Volumes: podVolumes,
},
},
UpdateStrategy: apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
},
ServiceName: serviceName,
},
}
Expand Down
94 changes: 92 additions & 2 deletions test/e2e/framework/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strings"
"time"

apps "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler"
mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
"github.com/oracle/mysql-operator/pkg/resources/secrets"
"github.com/oracle/mysql-operator/pkg/resources/statefulsets"
)

// TestDBName is the name of database to use when executing test SQL queries.
Expand Down Expand Up @@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i
return cluster
}

func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster {
func (j *ClusterTestJig) WaitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster {
var cluster *v1alpha1.Cluster
pollFunc := func() (bool, error) {
c, err := j.MySQLClient.MySQLV1alpha1().Clusters(namespace).Get(name, metav1.GetOptions{})
Expand All @@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout
// the running phase.
func (j *ClusterTestJig) WaitForClusterReadyOrFail(namespace, name string, timeout time.Duration) *v1alpha1.Cluster {
Logf("Waiting up to %v for Cluster \"%s/%s\" to be ready", timeout, namespace, name)
cluster := j.waitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool {
cluster := j.WaitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool {
return clusterutil.IsClusterReady(cluster)
})
return cluster
}

// WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the
// given version or fails.
func (j *ClusterTestJig) WaitForClusterUpgradedOrFail(namespace, name, version string, timeout time.Duration) *v1alpha1.Cluster {
Logf("Waiting up to %v for Cluster \"%s/%s\" to be upgraded", timeout, namespace, name)

cluster := j.WaitForConditionOrFail(namespace, name, timeout, "be upgraded ", func(cluster *v1alpha1.Cluster) bool {
set, err := j.KubeClient.AppsV1beta1().StatefulSets(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{})
if err != nil {
Failf("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v", name, err)
}

set = j.waitForSSRollingUpdate(set)

var actualVersion string
{
var found bool
var index int
for i, c := range set.Spec.Template.Spec.Containers {
if c.Name == statefulsets.MySQLServerName {
index = i
found = true
break
}
}

if !found {
Failf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, set.Name)
}
image := set.Spec.Template.Spec.Containers[index].Image
parts := strings.Split(image, ":")
if len(parts) < 2 {
Failf("invalid image %q for StatefulSet %q", image, set.Name)
}
actualVersion = parts[len(parts)-1]
}

return actualVersion == version
})
return cluster
}

// waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error
func (j *ClusterTestJig) waitForSSState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) {
pollErr := wait.PollImmediate(Poll, DefaultTimeout,
func() (bool, error) {
ssGet, err := j.KubeClient.AppsV1beta1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
if err != nil {
return false, err
}

selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
ExpectNoError(err)
podList, err := j.KubeClient.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
ExpectNoError(err)

return until(ssGet, podList)
})
if pollErr != nil {
Failf("Failed waiting for state update: %v", pollErr)
}
}

// waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
// complete. set must have a RollingUpdateStatefulSetStrategyType.
func (j *ClusterTestJig) waitForSSRollingUpdate(set *apps.StatefulSet) *apps.StatefulSet {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType {
Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
Logf("Waiting for StatefulSet %s/%s to complete update", set.Namespace, set.Name)
j.waitForSSState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if set.Status.UpdateRevision != set.Status.CurrentRevision {
return false, nil
}
return true, nil
})
return set
}

// SanityCheckCluster checks basic properties of a given Cluster match
// our expectations.
func (j *ClusterTestJig) SanityCheckCluster(cluster *v1alpha1.Cluster) {
Expand Down
62 changes: 62 additions & 0 deletions test/e2e/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2018 Oracle and/or its affiliates. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1"
mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
"github.com/oracle/mysql-operator/test/e2e/framework"
)

var _ = Describe("MySQL Upgrade", func() {
f := framework.NewDefaultFramework("upgrade")

var mcs mysqlclientset.Interface
BeforeEach(func() {
mcs = f.MySQLClientSet
})

It("should be possible to upgrade a cluster from 8.0.11 to 8.0.12", func() {
jig := framework.NewClusterTestJig(mcs, f.ClientSet, "upgrade-test")

By("creating an 8.0.11 cluster")

cluster := jig.CreateAndAwaitClusterOrFail(f.Namespace.Name, 3, func(c *v1alpha1.Cluster) {
c.Spec.Version = "8.0.11"
}, framework.DefaultTimeout)

expected, err := framework.WriteSQLTest(cluster, cluster.Name+"-0")
Expect(err).NotTo(HaveOccurred())

By("triggering an upgrade to 8.0.12")

cluster.Spec.Version = "8.0.12"
cluster, err = mcs.MySQLV1alpha1().Clusters(cluster.Namespace).Update(cluster)
Expect(err).NotTo(HaveOccurred())

By("waiting for the upgrade to complete")

cluster = jig.WaitForClusterUpgradedOrFail(cluster.Namespace, cluster.Name, "8.0.12", framework.DefaultTimeout)

By("testing we can read from the upgraded database")

actual, err := framework.ReadSQLTest(cluster, cluster.Name+"-0")
Expect(err).NotTo(HaveOccurred())
Expect(actual).To(Equal(expected))
})
})

0 comments on commit bc88e14

Please sign in to comment.