Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the WAL and DB devices for the disk-add command #222

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion microceph/api/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func cmdDisksGet(s *state.State, r *http.Request) response.Response {

func cmdDisksPost(s *state.State, r *http.Request) response.Response {
var req types.DisksPost
var wal *types.DiskParameter
var db *types.DiskParameter
lmlg marked this conversation as resolved.
Show resolved Hide resolved
var data types.DiskParameter

logger.Debugf("cmdDisksPost: %v", req)
err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -56,7 +59,16 @@ func cmdDisksPost(s *state.State, r *http.Request) response.Response {

mu.Lock()
defer mu.Unlock()
err = ceph.AddOSD(s, req.Path, req.Wipe, req.Encrypt)

data = types.DiskParameter{req.Path, req.Encrypt, req.Wipe}
if req.WALDev != nil {
wal = &types.DiskParameter{*req.WALDev, req.WALEncrypt, req.WALWipe}
}
if req.DBDev != nil {
db = &types.DiskParameter{*req.DBDev, req.DBEncrypt, req.DBWipe}
}

err = ceph.AddOSD(s, data, wal, db)
if err != nil {
return response.SmartError(err)
}
Expand Down
18 changes: 15 additions & 3 deletions microceph/api/types/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package types

// DisksPost hold a path and a flag for enabling device wiping
type DisksPost struct {
Path string `json:"path" yaml:"path"`
Wipe bool `json:"wipe" yaml:"wipe"`
Encrypt bool `json:"encrypt" yaml:"encrypt"`
Path string `json:"path" yaml:"path"`
Wipe bool `json:"wipe" yaml:"wipe"`
Encrypt bool `json:"encrypt" yaml:"encrypt"`
WALDev *string `json:"waldev" yaml:"waldev"`
WALWipe bool `json:"walwipe" yaml:"walwipe"`
WALEncrypt bool `json:"walencrypt" yaml:"walencrypt"`
DBDev *string `json:"dbdev" yaml:"dbdev"`
DBWipe bool `json:"dbwipe" yaml:"dbwipe"`
DBEncrypt bool `json:"dbencrypt" yaml:"dbencrypt"`
}

// DisksDelete holds an OSD number and a flag for forcing the removal
Expand All @@ -25,3 +31,9 @@ type Disk struct {
Path string `json:"path" yaml:"path"`
Location string `json:"location" yaml:"location"`
}

type DiskParameter struct {
Path string
Encrypt bool
Wipe bool
}
169 changes: 102 additions & 67 deletions microceph/ceph/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"syscall"
"time"

"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/common"

Expand Down Expand Up @@ -80,12 +81,39 @@ func nextOSD(s *state.State) (int64, error) {
}
}

func prepareDisk(disk *types.DiskParameter, suffix string, osdPath string, osdID int64) error {
if disk.Wipe {
err := timeoutWipe(disk.Path)
if err != nil {
return fmt.Errorf("Failed to wipe device %s: %w", disk.Path, err)
}
}
if disk.Encrypt {
err := checkEncryptSupport()
if err != nil {
return fmt.Errorf("Encryption unsupported on this machine: %w", err)
}
path, err := setupEncryptedOSD(disk.Path, osdPath, osdID, suffix)
if err != nil {
return fmt.Errorf("Failed to encrypt device %s: %w", disk.Path, err)
}
disk.Path = path
}
// Only the data device needs to be symlinked (suffix != "").
// Other devices (WAL and DB) are automatically handled by Ceph itself.
if suffix != "" {
lmlg marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return os.Symlink(disk.Path, filepath.Join(osdPath, "block"))
}

// setupEncryptedOSD sets up an encrypted OSD on the given disk.
//
// Takes a path to the disk device as well as the osd data path and the osd id.
// Takes a path to the disk device as well as the OSD data path, the OSD id and
// a suffix (to differentiate invocations between data, WAL and DB devices).
// Returns the path to the encrypted device and an error if any.
func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (string, error) {
if err := os.Symlink(devicePath, filepath.Join(osdDataPath, "unencrypted")); err != nil {
func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64, suffix string) (string, error) {
if err := os.Symlink(devicePath, filepath.Join(osdDataPath, "unencrypted"+suffix)); err != nil {
return "", fmt.Errorf("Failed to add unencrypted block symlink: %w", err)
}

Expand All @@ -96,7 +124,7 @@ func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (stri
}

// Store key in ceph key value store
if err = storeKey(key, osdID); err != nil {
if err = storeKey(key, osdID, suffix); err != nil {
return "", fmt.Errorf("Key store error: %w", err)
}

Expand All @@ -106,7 +134,7 @@ func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (stri
}

// Open the encrypted device
encryptedDevicePath, err := openEncryptedDevice(devicePath, osdID, key)
encryptedDevicePath, err := openEncryptedDevice(devicePath, osdID, key, suffix)
if err != nil {
return "", fmt.Errorf("Failed to open: %w", err)
}
Expand Down Expand Up @@ -151,25 +179,25 @@ func encryptDevice(path string, key []byte) error {
}

// Store the key in the ceph key value store, under a name that derives from the osd id.
func storeKey(key []byte, osdID int64) error {
func storeKey(key []byte, osdID int64, suffix string) error {
// Run the ceph config-key set command
_, err := processExec.RunCommand("ceph", "config-key", "set", fmt.Sprintf("microceph:osd.%d/key", osdID), string(key))
_, err := processExec.RunCommand("ceph", "config-key", "set", fmt.Sprintf("microceph:osd%s.%d/key", suffix, osdID), string(key))
if err != nil {
return fmt.Errorf("Failed to store key: %w", err)
}
return nil
}

// Open the encrypted device and return its path.
func openEncryptedDevice(path string, osdID int64, key []byte) (string, error) {
func openEncryptedDevice(path string, osdID int64, key []byte, suffix string) (string, error) {
// Run the cryptsetup open command, expect key on stdin
cmd := exec.Command(
"cryptsetup",
"--keyfile-size", "128",
"--key-file", "-",
"luksOpen",
path,
fmt.Sprintf("luksosd-%d", osdID),
fmt.Sprintf("luksosd%s-%d", suffix, osdID),
)
stdin, err := cmd.StdinPipe()
if err != nil {
Expand All @@ -187,7 +215,7 @@ NOTE: OSD Encryption requires a snapd >= 2.59.1
Verify your version of snapd by running "snap version"
`, path, err, out)
}
return fmt.Sprintf("/dev/mapper/luksosd-%d", osdID), nil
return fmt.Sprintf("/dev/mapper/luksosd%s-%d", suffix, osdID), nil
}

// checkEncryptSupport checks if the kernel supports encryption.
Expand Down Expand Up @@ -264,97 +292,86 @@ func updateFailureDomain(s *state.State) error {
return nil
}

// AddOSD adds an OSD to the cluster, given a device path and a flag for wiping
func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
logger.Debugf("Adding OSD %s", path)

revert := revert.New()
defer revert.Fail()

func setStablePath(storage *api.ResourcesStorage, param *types.DiskParameter) error {
// Validate the path.
if !shared.IsBlockdevPath(path) {
return fmt.Errorf("Invalid disk path: %s", path)
}
// Check if we need to support encryption
if encrypt {
if err := checkEncryptSupport(); err != nil {
return fmt.Errorf("Encryption unsupported on this machine: %w", err)
}
if !shared.IsBlockdevPath(param.Path) {
return fmt.Errorf("Invalid disk path: %s", param.Path)
}

_, _, major, minor, _, _, err := shared.GetFileStat(path)
_, _, major, minor, _, _, err := shared.GetFileStat(param.Path)
if err != nil {
return fmt.Errorf("Invalid disk path: %w", err)
}

dev := fmt.Sprintf("%d:%d", major, minor)

// Lookup a stable path for it.
storage, err := resources.GetStorage()
if err != nil {
return fmt.Errorf("Unable to list system disks: %w", err)
}

for _, disk := range storage.Disks {
// Check if full disk.
if disk.Device == dev {
candidate := fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID)

// check if candidate exists
if shared.PathExists(candidate) && !shared.IsDir(candidate) {
path = candidate
param.Path = candidate
} else {
candidate = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath)
if shared.PathExists(candidate) && !shared.IsDir(candidate) {
path = candidate
param.Path = candidate
}
}

break
}

// Check if partition.
found := false
for _, part := range disk.Partitions {
if part.Device == dev {
candidate := fmt.Sprintf("/dev/disk/by-id/%s-part%d", disk.DeviceID, part.Partition)
if shared.PathExists(candidate) {
path = candidate
param.Path = candidate
} else {
candidate = fmt.Sprintf("/dev/disk/by-path/%s-part%d", disk.DevicePath, part.Partition)
if shared.PathExists(candidate) {
path = candidate
param.Path = candidate
}
}

break
}
}
}

if found {
break
}
// Fallthrough. We didn't find a /dev/disk path for this device, use the original path.
return nil
}

// AddOSD adds an OSD to the cluster, given the data, WAL and DB devices and their respective
// flags for wiping and encrypting.
func AddOSD(s *state.State, data types.DiskParameter, wal *types.DiskParameter, db *types.DiskParameter) error {
logger.Debugf("Adding OSD %s", data.Path)

revert := revert.New()
defer revert.Fail()

// Lookup a stable path for it.
storage, err := resources.GetStorage()
if err != nil {
return fmt.Errorf("Unable to list system disks: %w", err)
}

// Wipe the block device if requested.
if wipe {
err = timeoutWipe(path)
if err != nil {
return fmt.Errorf("Failed to wipe the device: %w", err)
}
if err := setStablePath(storage, &data); err != nil {
return fmt.Errorf("Failed to set stable disk path: %w", err)
}

// Get a OSD number.
nr, err := nextOSD(s)
if err != nil {
return fmt.Errorf("Failed to find next OSD number: %w", err)
}
logger.Debugf("nextOSD number is %d for disk %s", nr, path)
logger.Debugf("nextOSD number is %d for disk %s", nr, data.Path)

// Record the disk.
err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
_, err := database.CreateDisk(ctx, tx, database.Disk{Member: s.Name(), Path: path, OSD: int(nr)})
_, err := database.CreateDisk(ctx, tx, database.Disk{Member: s.Name(), Path: data.Path, OSD: int(nr)})
if err != nil {
return fmt.Errorf("Failed to record disk: %w", err)
}
Expand All @@ -367,22 +384,31 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {

logger.Debugf("Created disk record for osd.%d", nr)

// Keep the old path in case it changes after encrypting.
oldPath := data.Path

dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data")
osdDataPath := filepath.Join(dataPath, "osd", fmt.Sprintf("ceph-%d", nr))

// if we fail later, make sure we free up the record
revert.Add(func() {
os.RemoveAll(osdDataPath)
s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
database.DeleteDisk(ctx, tx, s.Name(), path)
database.DeleteDisk(ctx, tx, s.Name(), oldPath)
return nil
})
})

// Create directory.
err = os.MkdirAll(osdDataPath, 0700)
if err != nil {
return fmt.Errorf("Failed to bootstrap monitor: %w", err)
return fmt.Errorf("Failed to create OSD directory: %w", err)
}

// Wipe and/or encrypt the disk if needed.
err = prepareDisk(&data, "", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to prepare data device: %w", err)
}

// Generate keyring.
Expand All @@ -391,21 +417,6 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
return fmt.Errorf("Failed to generate OSD keyring: %w", err)
}

var blockPath string
if encrypt {
blockPath, err = setupEncryptedOSD(path, osdDataPath, nr)
if err != nil {
return err
}
} else {
blockPath = path
}

// Setup device symlink.
if err = os.Symlink(blockPath, filepath.Join(osdDataPath, "block")); err != nil {
return fmt.Errorf("Failed to add block symlink: %w", err)
}

// Generate OSD uuid.
fsid := uuid.NewRandom().String()

Expand All @@ -416,7 +427,31 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
}

// Bootstrap OSD.
_, err = processExec.RunCommand("ceph-osd", "--mkfs", "--no-mon-config", "-i", fmt.Sprintf("%d", nr))
args := []string{"--mkfs", "--no-mon-config", "-i", fmt.Sprintf("%d", nr)}
if wal != nil {
if err = setStablePath(storage, wal); err != nil {
return fmt.Errorf("Failed to set stable path for WAL: %w", err)
}

err = prepareDisk(wal, ".wal", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to set up WAL device: %w", err)
}
args = append(args, []string{"--bluestore-block-wal-path", wal.Path}...)
}
if db != nil {
if err = setStablePath(storage, db); err != nil {
return fmt.Errorf("Failed to set stable path for DB: %w", err)
}

err = prepareDisk(db, ".db", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to set up DB device: %w", err)
}
args = append(args, []string{"--bluestore-block-db-path", db.Path}...)
}

_, err = processExec.RunCommand("ceph-osd", args...)
if err != nil {
return fmt.Errorf("Failed to bootstrap OSD: %w", err)
}
Expand Down
Loading
Loading