Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve require-osd-release #460

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 85 additions & 43 deletions microceph/ceph/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/canonical/lxd/shared/logger"

"github.com/canonical/microceph/microceph/database"
"github.com/canonical/microceph/microceph/interfaces"
)
Expand All @@ -25,87 +24,125 @@ type cephVersion struct {
Overall cephVersionElem `json:"overall"`
}

func checkVersions() (bool, error) {
out, err := processExec.RunCommand("ceph", "versions")
// getCurrentVersion extracts the version codename from the 'ceph -v' output
func getCurrentVersion() (string, error) {
output, err := processExec.RunCommand("ceph", "-v")
if err != nil {
return false, fmt.Errorf("Failed to get Ceph versions: %w", err)
return "", fmt.Errorf("failed to get ceph version: %w", err)
}

var cephVer cephVersion
err = json.Unmarshal([]byte(out), &cephVer)
if err != nil {
return false, fmt.Errorf("Failed to unmarshal Ceph versions: %w", err)
parts := strings.Fields(output)
if len(parts) < 6 { // need sth like "ceph version 19.2.0 (e7ad534...) squid (stable)"
return "", fmt.Errorf("invalid version string format: %s", output)
}

if len(cephVer.Overall) > 1 {
logger.Debug("Not all upgrades have completed")
return false, nil
}
return parts[len(parts)-2], nil // second to last is version code name
}

if len(cephVer.Osd) < 1 {
logger.Debug("No OSD versions found")
return false, nil
}
// checkVersions checks if all Ceph services are running the same version
// retry up to 3 times if multiple versions are detected to allow for upgrades to complete as they are performed
// concurrently
func checkVersions() (bool, error) {
const (
maxRetries = 3
retryDelay = 5 * time.Second
)

for attempt := 0; attempt < maxRetries; attempt++ {
out, err := processExec.RunCommand("ceph", "versions")
if err != nil {
return false, fmt.Errorf("failed to get Ceph versions: %w", err)
}

return true, nil
var cephVer cephVersion
err = json.Unmarshal([]byte(out), &cephVer)
if err != nil {
return false, fmt.Errorf("failed to unmarshal Ceph versions: %w", err)
}

if len(cephVer.Overall) > 1 {
if attempt < maxRetries-1 {
logger.Debugf("multiple versions detected (attempt %d/%d), waiting %v before retry",
attempt+1, maxRetries, retryDelay)
time.Sleep(retryDelay)
continue
}
logger.Debug("not all upgrades have completed after retries")
return false, nil
}

if len(cephVer.Osd) < 1 {
logger.Debug("no OSD versions found")
return false, nil
}

return true, nil
}
// this should never be reached
return false, nil
}

func osdReleaseRequired(version string) (bool, error) {
out, err := processExec.RunCommand("ceph", "osd", "dump", "-f", "json")
if err != nil {
return false, fmt.Errorf("Failed to get OSD dump: %w", err)
return false, fmt.Errorf("failed to get OSD dump: %w", err)
}

var result map[string]any
err = json.Unmarshal([]byte(out), &result)
if err != nil {
return false, fmt.Errorf("Failed to unmarshal OSD dump: %w", err)
return false, fmt.Errorf("failed to unmarshal OSD dump: %w", err)
}

return result["require_osd_release"].(string) != version, nil
releaseVersion, ok := result["require_osd_release"].(string)
if !ok {
return false, fmt.Errorf("invalid or missing require_osd_release in OSD dump")
}

return releaseVersion != version, nil
}

func PostRefresh() error {
currentVersion, err := processExec.RunCommand("ceph", "-v")
func updateOSDRelease(version string) error {
_, err := processExec.RunCommand("ceph", "osd", "require-osd-release",
version, "--yes-i-really-mean-it")
if err != nil {
return err
}

lastPos := strings.LastIndex(currentVersion, " ")
if lastPos < 0 {
return fmt.Errorf("invalid version string: %s", currentVersion)
return fmt.Errorf("failed to update OSD release version: %w", err)
}
return nil
}

currentVersion = currentVersion[0:lastPos]
lastPos = strings.LastIndex(currentVersion, " ")
if lastPos < 0 {
return fmt.Errorf("invalid version string: %s", currentVersion)
// PostRefresh handles version checking and OSD release updates
func PostRefresh() error {
currentVersion, err := getCurrentVersion()
if err != nil {
return fmt.Errorf("version check failed: %w", err)
}

currentVersion = currentVersion[lastPos+1 : len(currentVersion)]
allVersionsEqual, err := checkVersions()

if err != nil {
return err
return fmt.Errorf("version equality check failed: %w", err)
}

if !allVersionsEqual {
logger.Info("versions not equal, skipping OSD release update")
return nil
}

mustUpdate, err := osdReleaseRequired(currentVersion)
if err != nil {
return err
return fmt.Errorf("OSD release check failed: %w", err)
}

if mustUpdate {
_, err = processExec.RunCommand("ceph", "osd", "require-osd-release",
currentVersion, "--yes-i-really-mean-it")
if err != nil {
return err
}
if !mustUpdate {
logger.Debug("OSD release update not required")
return nil
}
err = updateOSDRelease(currentVersion)
if err != nil {
return fmt.Errorf("OSD release update failed: %w", err)
}

logger.Infof("successfully updated OSD release version: %s", currentVersion)
return nil
}

Expand Down Expand Up @@ -164,7 +201,12 @@ func Start(ctx context.Context, s interfaces.StateInterface) error {
}
}()

go PostRefresh()
go func() {
err := PostRefresh()
if err != nil {
logger.Errorf("PostRefresh failed: %v", err)
}
}()

return nil
}
126 changes: 125 additions & 1 deletion microceph/ceph/start_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ceph

import (
"errors"
"testing"

"github.com/canonical/microceph/microceph/mocks"
Expand Down Expand Up @@ -46,12 +47,135 @@ func addExpected(r *mocks.Runner) {
"squid", "--yes-i-really-mean-it").Return("ok", nil).Once()
}

func (s *startSuite) TestStart() {
func (s *startSuite) TestStartOSDReleaseUpdate() {
r := mocks.NewRunner(s.T())

addExpected(r)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestInvalidVersionString() {
r := mocks.NewRunner(s.T())
// only expect the version command, others shouldnt be reached
r.On("RunCommand", "ceph", "-v").Return("invalid version", nil).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "invalid version string")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestMultipleVersionsPresent() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Times(3)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestNoOSDVersions() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err) // no OSD versions, so no update required
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpToDate() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "squid"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpdateFails() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "reef"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
r.On("RunCommand", "ceph", "osd", "require-osd-release", "squid", "--yes-i-really-mean-it").
Return("", errors.New("update failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "OSD release update failed")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestCephVersionCommandFails() {
r := mocks.NewRunner(s.T())
r.On("RunCommand", "ceph", "-v").Return("", errors.New("command failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "version check failed")
r.AssertExpectations(s.T())
}
Loading