Skip to content

Commit

Permalink
Merge pull request #222 from canonical/wal-db
Browse files Browse the repository at this point in the history
Add the WAL and DB devices for the disk-add command
  • Loading branch information
UtkarshBhatthere authored Oct 6, 2023
2 parents e5c33c3 + 61cab69 commit d6334c1
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 71 deletions.
14 changes: 13 additions & 1 deletion microceph/api/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func cmdDisksGet(s *state.State, r *http.Request) response.Response {

func cmdDisksPost(s *state.State, r *http.Request) response.Response {
var req types.DisksPost
var wal *types.DiskParameter
var db *types.DiskParameter
var data types.DiskParameter

logger.Debugf("cmdDisksPost: %v", req)
err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -56,7 +59,16 @@ func cmdDisksPost(s *state.State, r *http.Request) response.Response {

mu.Lock()
defer mu.Unlock()
err = ceph.AddOSD(s, req.Path, req.Wipe, req.Encrypt)

data = types.DiskParameter{req.Path, req.Encrypt, req.Wipe}
if req.WALDev != nil {
wal = &types.DiskParameter{*req.WALDev, req.WALEncrypt, req.WALWipe}
}
if req.DBDev != nil {
db = &types.DiskParameter{*req.DBDev, req.DBEncrypt, req.DBWipe}
}

err = ceph.AddOSD(s, data, wal, db)
if err != nil {
return response.SmartError(err)
}
Expand Down
18 changes: 15 additions & 3 deletions microceph/api/types/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package types

// DisksPost hold a path and a flag for enabling device wiping
type DisksPost struct {
Path string `json:"path" yaml:"path"`
Wipe bool `json:"wipe" yaml:"wipe"`
Encrypt bool `json:"encrypt" yaml:"encrypt"`
Path string `json:"path" yaml:"path"`
Wipe bool `json:"wipe" yaml:"wipe"`
Encrypt bool `json:"encrypt" yaml:"encrypt"`
WALDev *string `json:"waldev" yaml:"waldev"`
WALWipe bool `json:"walwipe" yaml:"walwipe"`
WALEncrypt bool `json:"walencrypt" yaml:"walencrypt"`
DBDev *string `json:"dbdev" yaml:"dbdev"`
DBWipe bool `json:"dbwipe" yaml:"dbwipe"`
DBEncrypt bool `json:"dbencrypt" yaml:"dbencrypt"`
}

// DisksDelete holds an OSD number and a flag for forcing the removal
Expand All @@ -25,3 +31,9 @@ type Disk struct {
Path string `json:"path" yaml:"path"`
Location string `json:"location" yaml:"location"`
}

type DiskParameter struct {
Path string
Encrypt bool
Wipe bool
}
169 changes: 102 additions & 67 deletions microceph/ceph/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"syscall"
"time"

"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/common"

Expand Down Expand Up @@ -80,12 +81,39 @@ func nextOSD(s *state.State) (int64, error) {
}
}

func prepareDisk(disk *types.DiskParameter, suffix string, osdPath string, osdID int64) error {
if disk.Wipe {
err := timeoutWipe(disk.Path)
if err != nil {
return fmt.Errorf("Failed to wipe device %s: %w", disk.Path, err)
}
}
if disk.Encrypt {
err := checkEncryptSupport()
if err != nil {
return fmt.Errorf("Encryption unsupported on this machine: %w", err)
}
path, err := setupEncryptedOSD(disk.Path, osdPath, osdID, suffix)
if err != nil {
return fmt.Errorf("Failed to encrypt device %s: %w", disk.Path, err)
}
disk.Path = path
}
// Only the data device needs to be symlinked (suffix != "").
// Other devices (WAL and DB) are automatically handled by Ceph itself.
if suffix != "" {
return nil
}
return os.Symlink(disk.Path, filepath.Join(osdPath, "block"))
}

// setupEncryptedOSD sets up an encrypted OSD on the given disk.
//
// Takes a path to the disk device as well as the osd data path and the osd id.
// Takes a path to the disk device as well as the OSD data path, the OSD id and
// a suffix (to differentiate invocations between data, WAL and DB devices).
// Returns the path to the encrypted device and an error if any.
func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (string, error) {
if err := os.Symlink(devicePath, filepath.Join(osdDataPath, "unencrypted")); err != nil {
func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64, suffix string) (string, error) {
if err := os.Symlink(devicePath, filepath.Join(osdDataPath, "unencrypted"+suffix)); err != nil {
return "", fmt.Errorf("Failed to add unencrypted block symlink: %w", err)
}

Expand All @@ -96,7 +124,7 @@ func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (stri
}

// Store key in ceph key value store
if err = storeKey(key, osdID); err != nil {
if err = storeKey(key, osdID, suffix); err != nil {
return "", fmt.Errorf("Key store error: %w", err)
}

Expand All @@ -106,7 +134,7 @@ func setupEncryptedOSD(devicePath string, osdDataPath string, osdID int64) (stri
}

// Open the encrypted device
encryptedDevicePath, err := openEncryptedDevice(devicePath, osdID, key)
encryptedDevicePath, err := openEncryptedDevice(devicePath, osdID, key, suffix)
if err != nil {
return "", fmt.Errorf("Failed to open: %w", err)
}
Expand Down Expand Up @@ -151,25 +179,25 @@ func encryptDevice(path string, key []byte) error {
}

// Store the key in the ceph key value store, under a name that derives from the osd id.
func storeKey(key []byte, osdID int64) error {
func storeKey(key []byte, osdID int64, suffix string) error {
// Run the ceph config-key set command
_, err := processExec.RunCommand("ceph", "config-key", "set", fmt.Sprintf("microceph:osd.%d/key", osdID), string(key))
_, err := processExec.RunCommand("ceph", "config-key", "set", fmt.Sprintf("microceph:osd%s.%d/key", suffix, osdID), string(key))
if err != nil {
return fmt.Errorf("Failed to store key: %w", err)
}
return nil
}

// Open the encrypted device and return its path.
func openEncryptedDevice(path string, osdID int64, key []byte) (string, error) {
func openEncryptedDevice(path string, osdID int64, key []byte, suffix string) (string, error) {
// Run the cryptsetup open command, expect key on stdin
cmd := exec.Command(
"cryptsetup",
"--keyfile-size", "128",
"--key-file", "-",
"luksOpen",
path,
fmt.Sprintf("luksosd-%d", osdID),
fmt.Sprintf("luksosd%s-%d", suffix, osdID),
)
stdin, err := cmd.StdinPipe()
if err != nil {
Expand All @@ -187,7 +215,7 @@ NOTE: OSD Encryption requires a snapd >= 2.59.1
Verify your version of snapd by running "snap version"
`, path, err, out)
}
return fmt.Sprintf("/dev/mapper/luksosd-%d", osdID), nil
return fmt.Sprintf("/dev/mapper/luksosd%s-%d", suffix, osdID), nil
}

// checkEncryptSupport checks if the kernel supports encryption.
Expand Down Expand Up @@ -264,97 +292,86 @@ func updateFailureDomain(s *state.State) error {
return nil
}

// AddOSD adds an OSD to the cluster, given a device path and a flag for wiping
func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
logger.Debugf("Adding OSD %s", path)

revert := revert.New()
defer revert.Fail()

func setStablePath(storage *api.ResourcesStorage, param *types.DiskParameter) error {
// Validate the path.
if !shared.IsBlockdevPath(path) {
return fmt.Errorf("Invalid disk path: %s", path)
}
// Check if we need to support encryption
if encrypt {
if err := checkEncryptSupport(); err != nil {
return fmt.Errorf("Encryption unsupported on this machine: %w", err)
}
if !shared.IsBlockdevPath(param.Path) {
return fmt.Errorf("Invalid disk path: %s", param.Path)
}

_, _, major, minor, _, _, err := shared.GetFileStat(path)
_, _, major, minor, _, _, err := shared.GetFileStat(param.Path)
if err != nil {
return fmt.Errorf("Invalid disk path: %w", err)
}

dev := fmt.Sprintf("%d:%d", major, minor)

// Lookup a stable path for it.
storage, err := resources.GetStorage()
if err != nil {
return fmt.Errorf("Unable to list system disks: %w", err)
}

for _, disk := range storage.Disks {
// Check if full disk.
if disk.Device == dev {
candidate := fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID)

// check if candidate exists
if shared.PathExists(candidate) && !shared.IsDir(candidate) {
path = candidate
param.Path = candidate
} else {
candidate = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath)
if shared.PathExists(candidate) && !shared.IsDir(candidate) {
path = candidate
param.Path = candidate
}
}

break
}

// Check if partition.
found := false
for _, part := range disk.Partitions {
if part.Device == dev {
candidate := fmt.Sprintf("/dev/disk/by-id/%s-part%d", disk.DeviceID, part.Partition)
if shared.PathExists(candidate) {
path = candidate
param.Path = candidate
} else {
candidate = fmt.Sprintf("/dev/disk/by-path/%s-part%d", disk.DevicePath, part.Partition)
if shared.PathExists(candidate) {
path = candidate
param.Path = candidate
}
}

break
}
}
}

if found {
break
}
// Fallthrough. We didn't find a /dev/disk path for this device, use the original path.
return nil
}

// AddOSD adds an OSD to the cluster, given the data, WAL and DB devices and their respective
// flags for wiping and encrypting.
func AddOSD(s *state.State, data types.DiskParameter, wal *types.DiskParameter, db *types.DiskParameter) error {
logger.Debugf("Adding OSD %s", data.Path)

revert := revert.New()
defer revert.Fail()

// Lookup a stable path for it.
storage, err := resources.GetStorage()
if err != nil {
return fmt.Errorf("Unable to list system disks: %w", err)
}

// Wipe the block device if requested.
if wipe {
err = timeoutWipe(path)
if err != nil {
return fmt.Errorf("Failed to wipe the device: %w", err)
}
if err := setStablePath(storage, &data); err != nil {
return fmt.Errorf("Failed to set stable disk path: %w", err)
}

// Get a OSD number.
nr, err := nextOSD(s)
if err != nil {
return fmt.Errorf("Failed to find next OSD number: %w", err)
}
logger.Debugf("nextOSD number is %d for disk %s", nr, path)
logger.Debugf("nextOSD number is %d for disk %s", nr, data.Path)

// Record the disk.
err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
_, err := database.CreateDisk(ctx, tx, database.Disk{Member: s.Name(), Path: path, OSD: int(nr)})
_, err := database.CreateDisk(ctx, tx, database.Disk{Member: s.Name(), Path: data.Path, OSD: int(nr)})
if err != nil {
return fmt.Errorf("Failed to record disk: %w", err)
}
Expand All @@ -367,22 +384,31 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {

logger.Debugf("Created disk record for osd.%d", nr)

// Keep the old path in case it changes after encrypting.
oldPath := data.Path

dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data")
osdDataPath := filepath.Join(dataPath, "osd", fmt.Sprintf("ceph-%d", nr))

// if we fail later, make sure we free up the record
revert.Add(func() {
os.RemoveAll(osdDataPath)
s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
database.DeleteDisk(ctx, tx, s.Name(), path)
database.DeleteDisk(ctx, tx, s.Name(), oldPath)
return nil
})
})

// Create directory.
err = os.MkdirAll(osdDataPath, 0700)
if err != nil {
return fmt.Errorf("Failed to bootstrap monitor: %w", err)
return fmt.Errorf("Failed to create OSD directory: %w", err)
}

// Wipe and/or encrypt the disk if needed.
err = prepareDisk(&data, "", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to prepare data device: %w", err)
}

// Generate keyring.
Expand All @@ -391,21 +417,6 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
return fmt.Errorf("Failed to generate OSD keyring: %w", err)
}

var blockPath string
if encrypt {
blockPath, err = setupEncryptedOSD(path, osdDataPath, nr)
if err != nil {
return err
}
} else {
blockPath = path
}

// Setup device symlink.
if err = os.Symlink(blockPath, filepath.Join(osdDataPath, "block")); err != nil {
return fmt.Errorf("Failed to add block symlink: %w", err)
}

// Generate OSD uuid.
fsid := uuid.NewRandom().String()

Expand All @@ -416,7 +427,31 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
}

// Bootstrap OSD.
_, err = processExec.RunCommand("ceph-osd", "--mkfs", "--no-mon-config", "-i", fmt.Sprintf("%d", nr))
args := []string{"--mkfs", "--no-mon-config", "-i", fmt.Sprintf("%d", nr)}
if wal != nil {
if err = setStablePath(storage, wal); err != nil {
return fmt.Errorf("Failed to set stable path for WAL: %w", err)
}

err = prepareDisk(wal, ".wal", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to set up WAL device: %w", err)
}
args = append(args, []string{"--bluestore-block-wal-path", wal.Path}...)
}
if db != nil {
if err = setStablePath(storage, db); err != nil {
return fmt.Errorf("Failed to set stable path for DB: %w", err)
}

err = prepareDisk(db, ".db", osdDataPath, nr)
if err != nil {
return fmt.Errorf("Failed to set up DB device: %w", err)
}
args = append(args, []string{"--bluestore-block-db-path", db.Path}...)
}

_, err = processExec.RunCommand("ceph-osd", args...)
if err != nil {
return fmt.Errorf("Failed to bootstrap OSD: %w", err)
}
Expand Down
Loading

0 comments on commit d6334c1

Please sign in to comment.