From bc88e143f10cb633c3d4556e9070fcd793dfe25d Mon Sep 17 00:00:00 2001 From: Andrew Pryde Date: Fri, 17 Aug 2018 16:32:51 +0100 Subject: [PATCH] Implement MySQL version upgrade support (#207) * Implement MySQL version upgrade support * Test MySQL upgrades --- pkg/controllers/cluster/controller.go | 73 ++++++++++++++++- pkg/controllers/cluster/controller_test.go | 73 +++++++++++++++++ pkg/resources/statefulsets/statefulset.go | 3 + test/e2e/framework/cluster.go | 94 +++++++++++++++++++++- test/e2e/upgrade.go | 62 ++++++++++++++ 5 files changed, 300 insertions(+), 5 deletions(-) create mode 100644 test/e2e/upgrade.go diff --git a/pkg/controllers/cluster/controller.go b/pkg/controllers/cluster/controller.go index 17ca7265b..9db425ffb 100644 --- a/pkg/controllers/cluster/controller.go +++ b/pkg/controllers/cluster/controller.go @@ -17,6 +17,7 @@ package cluster import ( "context" "fmt" + "strings" "time" apps "k8s.io/api/apps/v1beta1" @@ -38,6 +39,7 @@ import ( record "k8s.io/client-go/tools/record" workqueue "k8s.io/client-go/util/workqueue" + "github.com/coreos/go-semver/semver" "github.com/golang/glog" "github.com/pkg/errors" @@ -390,9 +392,13 @@ func (m *MySQLController) syncHandler(key string) error { } // Upgrade the required component resources the current MySQLOperator version. - err = m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()) - if err != nil { - return err + if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil { + return errors.Wrap(err, "ensuring MySQL Operator version") + } + + // Upgrade the MySQL server version if required. + if err := m.ensureMySQLVersion(cluster, ss); err != nil { + return errors.Wrap(err, "ensuring MySQL version") } // If this number of the members on the Cluster does not equal the @@ -421,6 +427,67 @@ func (m *MySQLController) syncHandler(key string) error { return nil } +func getMySQLContainerIndex(containers []corev1.Container) (int, error) { + for i, c := range containers { + if c.Name == statefulsets.MySQLServerName { + return i, nil + } + } + + return 0, errors.Errorf("no %q container found", statefulsets.MySQLServerName) +} + +// splitImage splits an image into its name and version. +func splitImage(image string) (string, string, error) { + parts := strings.Split(image, ":") + if len(parts) < 2 { + return "", "", errors.Errorf("invalid image %q", image) + } + return strings.Join(parts[:len(parts)-1], ""), parts[len(parts)-1], nil +} + +func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error { + index, err := getMySQLContainerIndex(ss.Spec.Template.Spec.Containers) + if err != nil { + return errors.Wrapf(err, "getting MySQL container for StatefulSet %q", ss.Name) + } + imageName, actualVersion, err := splitImage(ss.Spec.Template.Spec.Containers[index].Image) + if err != nil { + return errors.Wrapf(err, "getting MySQL version for StatefulSet %q", ss.Name) + } + + actual, err := semver.NewVersion(actualVersion) + if err != nil { + return errors.Wrap(err, "parsing StatuefulSet MySQL version") + } + expected, err := semver.NewVersion(c.Spec.Version) + if err != nil { + return errors.Wrap(err, "parsing Cluster MySQL version") + } + + switch expected.Compare(*actual) { + case -1: + return errors.Errorf("attempted unsupported downgrade from %q to %q", actual, expected) + case 0: + return nil + } + + updated := ss.DeepCopy() + updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf("%s:%s", imageName, c.Spec.Version) + // NOTE: We do this as previously we defaulted to the OnDelete strategy + // so clusters created with previous versions would not support upgrades. + updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + } + + err = m.statefulSetControl.Patch(ss, updated) + if err != nil { + return errors.Wrap(err, "patching StatefulSet") + } + + return nil +} + // ensureMySQLOperatorVersion updates the MySQLOperator resource types that //require it to make it consistent with the specified operator version. func (m *MySQLController) ensureMySQLOperatorVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet, operatorVersion string) error { diff --git a/pkg/controllers/cluster/controller_test.go b/pkg/controllers/cluster/controller_test.go index f8bc41191..9ddb96d67 100644 --- a/pkg/controllers/cluster/controller_test.go +++ b/pkg/controllers/cluster/controller_test.go @@ -30,6 +30,9 @@ import ( "k8s.io/client-go/kubernetes/fake" cache "k8s.io/client-go/tools/cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" "github.com/oracle/mysql-operator/pkg/constants" "github.com/oracle/mysql-operator/pkg/controllers/util" @@ -42,6 +45,76 @@ import ( buildversion "github.com/oracle/mysql-operator/pkg/version" ) +func TestGetMySQLContainerIndex(t *testing.T) { + testCases := map[string]struct { + containers []v1.Container + index int + errors bool + }{ + "empty_errors": { + containers: []v1.Container{}, + errors: true, + }, + "mysql_server_only": { + containers: []v1.Container{{Name: "mysql"}}, + index: 0, + }, + "mysql_server_and_agent": { + containers: []v1.Container{{Name: "mysql-agent"}, {Name: "mysql"}}, + index: 1, + }, + "mysql_agent_only": { + containers: []v1.Container{{Name: "mysql-agent"}}, + errors: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + index, err := getMySQLContainerIndex(tc.containers) + if tc.errors { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, index, tc.index) + } + }) + } +} + +func TestSplitImage(t *testing.T) { + testCases := map[string]struct { + image string + name string + version string + errors bool + }{ + "8.0.11": { + image: "mysql/mysql-server:8.0.11", + name: "mysql/mysql-server", + version: "8.0.11", + errors: false, + }, + "invalid": { + image: "mysql/mysql-server", + errors: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + name, version, err := splitImage(tc.image) + if tc.errors { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, name, tc.name) + assert.Equal(t, version, tc.version) + } + }) + } +} + func mockOperatorConfig() operatoropts.MySQLOperatorOpts { opts := operatoropts.MySQLOperatorOpts{} opts.EnsureDefaults() diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index 4609ef26c..8ed3abd52 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -378,6 +378,9 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic Volumes: podVolumes, }, }, + UpdateStrategy: apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + }, ServiceName: serviceName, }, } diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index 6a49f7ad0..7363b72e5 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -19,6 +19,8 @@ import ( "strings" "time" + apps "k8s.io/api/apps/v1beta1" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -35,6 +37,7 @@ import ( "github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler" mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned" "github.com/oracle/mysql-operator/pkg/resources/secrets" + "github.com/oracle/mysql-operator/pkg/resources/statefulsets" ) // TestDBName is the name of database to use when executing test SQL queries. @@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i return cluster } -func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster { +func (j *ClusterTestJig) WaitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster { var cluster *v1alpha1.Cluster pollFunc := func() (bool, error) { c, err := j.MySQLClient.MySQLV1alpha1().Clusters(namespace).Get(name, metav1.GetOptions{}) @@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout // the running phase. func (j *ClusterTestJig) WaitForClusterReadyOrFail(namespace, name string, timeout time.Duration) *v1alpha1.Cluster { Logf("Waiting up to %v for Cluster \"%s/%s\" to be ready", timeout, namespace, name) - cluster := j.waitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool { + cluster := j.WaitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool { return clusterutil.IsClusterReady(cluster) }) return cluster } +// WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the +// given version or fails. +func (j *ClusterTestJig) WaitForClusterUpgradedOrFail(namespace, name, version string, timeout time.Duration) *v1alpha1.Cluster { + Logf("Waiting up to %v for Cluster \"%s/%s\" to be upgraded", timeout, namespace, name) + + cluster := j.WaitForConditionOrFail(namespace, name, timeout, "be upgraded ", func(cluster *v1alpha1.Cluster) bool { + set, err := j.KubeClient.AppsV1beta1().StatefulSets(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v", name, err) + } + + set = j.waitForSSRollingUpdate(set) + + var actualVersion string + { + var found bool + var index int + for i, c := range set.Spec.Template.Spec.Containers { + if c.Name == statefulsets.MySQLServerName { + index = i + found = true + break + } + } + + if !found { + Failf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, set.Name) + } + image := set.Spec.Template.Spec.Containers[index].Image + parts := strings.Split(image, ":") + if len(parts) < 2 { + Failf("invalid image %q for StatefulSet %q", image, set.Name) + } + actualVersion = parts[len(parts)-1] + } + + return actualVersion == version + }) + return cluster +} + +// waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error +func (j *ClusterTestJig) waitForSSState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) { + pollErr := wait.PollImmediate(Poll, DefaultTimeout, + func() (bool, error) { + ssGet, err := j.KubeClient.AppsV1beta1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector) + ExpectNoError(err) + podList, err := j.KubeClient.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + ExpectNoError(err) + + return until(ssGet, podList) + }) + if pollErr != nil { + Failf("Failed waiting for state update: %v", pollErr) + } +} + +// waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to +// complete. set must have a RollingUpdateStatefulSetStrategyType. +func (j *ClusterTestJig) waitForSSRollingUpdate(set *apps.StatefulSet) *apps.StatefulSet { + var pods *v1.PodList + if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType { + Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s", + set.Namespace, + set.Name, + set.Spec.UpdateStrategy.Type) + } + Logf("Waiting for StatefulSet %s/%s to complete update", set.Namespace, set.Name) + j.waitForSSState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) { + set = set2 + pods = pods2 + if len(pods.Items) < int(*set.Spec.Replicas) { + return false, nil + } + if set.Status.UpdateRevision != set.Status.CurrentRevision { + return false, nil + } + return true, nil + }) + return set +} + // SanityCheckCluster checks basic properties of a given Cluster match // our expectations. func (j *ClusterTestJig) SanityCheckCluster(cluster *v1alpha1.Cluster) { diff --git a/test/e2e/upgrade.go b/test/e2e/upgrade.go new file mode 100644 index 000000000..a59000196 --- /dev/null +++ b/test/e2e/upgrade.go @@ -0,0 +1,62 @@ +// Copyright 2018 Oracle and/or its affiliates. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" + mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned" + "github.com/oracle/mysql-operator/test/e2e/framework" +) + +var _ = Describe("MySQL Upgrade", func() { + f := framework.NewDefaultFramework("upgrade") + + var mcs mysqlclientset.Interface + BeforeEach(func() { + mcs = f.MySQLClientSet + }) + + It("should be possible to upgrade a cluster from 8.0.11 to 8.0.12", func() { + jig := framework.NewClusterTestJig(mcs, f.ClientSet, "upgrade-test") + + By("creating an 8.0.11 cluster") + + cluster := jig.CreateAndAwaitClusterOrFail(f.Namespace.Name, 3, func(c *v1alpha1.Cluster) { + c.Spec.Version = "8.0.11" + }, framework.DefaultTimeout) + + expected, err := framework.WriteSQLTest(cluster, cluster.Name+"-0") + Expect(err).NotTo(HaveOccurred()) + + By("triggering an upgrade to 8.0.12") + + cluster.Spec.Version = "8.0.12" + cluster, err = mcs.MySQLV1alpha1().Clusters(cluster.Namespace).Update(cluster) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for the upgrade to complete") + + cluster = jig.WaitForClusterUpgradedOrFail(cluster.Namespace, cluster.Name, "8.0.12", framework.DefaultTimeout) + + By("testing we can read from the upgraded database") + + actual, err := framework.ReadSQLTest(cluster, cluster.Name+"-0") + Expect(err).NotTo(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) +})