diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7b95a672..7c4c391d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -160,6 +160,8 @@ jobs: - name: Exercise RGW run: | set -eux + sudo microceph.ceph status + sudo systemctl status snap.microceph.rgw sudo microceph.radosgw-admin user create --uid=test --display-name=test sudo microceph.radosgw-admin key create --uid=test --key-type=s3 --access-key fooAccessKey --secret-key fooSecretKey sudo apt-get -qq install s3cmd diff --git a/microceph/api/endpoints.go b/microceph/api/endpoints.go index 2fcbba8d..cf83185f 100644 --- a/microceph/api/endpoints.go +++ b/microceph/api/endpoints.go @@ -13,4 +13,8 @@ var Endpoints = []rest.Endpoint{ rgwServiceCmd, configsCmd, restartServiceCmd, + mdsServiceCmd, + mgrServiceCmd, + monServiceCmd, + rgwServiceCmd, } diff --git a/microceph/api/services.go b/microceph/api/services.go index df549576..c7d1e5a0 100644 --- a/microceph/api/services.go +++ b/microceph/api/services.go @@ -32,6 +32,45 @@ func cmdServicesGet(s *state.State, r *http.Request) response.Response { return response.SyncResponse(true, services) } +// Service Enable Endpoint. +var monServiceCmd = rest.Endpoint{ + Path: "services/mon", + Put: rest.EndpointAction{Handler: cmdEnableServicePut, ProxyTarget: true}, +} + +var mgrServiceCmd = rest.Endpoint{ + Path: "services/mgr", + Put: rest.EndpointAction{Handler: cmdEnableServicePut, ProxyTarget: true}, +} + +var mdsServiceCmd = rest.Endpoint{ + Path: "services/mds", + Put: rest.EndpointAction{Handler: cmdEnableServicePut, ProxyTarget: true}, +} + +var rgwServiceCmd = rest.Endpoint{ + Path: "services/rgw", + Put: rest.EndpointAction{Handler: cmdEnableServicePut, ProxyTarget: true}, + Delete: rest.EndpointAction{Handler: cmdRGWServiceDelete, ProxyTarget: true}, +} + +func cmdEnableServicePut(s *state.State, r *http.Request) response.Response { + var payload types.EnableService + + err := json.NewDecoder(r.Body).Decode(&payload) + if err != nil { + logger.Errorf("Failed decoding enable service request: %v", err) + return response.InternalError(err) + } + + err = ceph.ServicePlacementHandler(common.CephState{State: s}, payload) + if err != nil { + return response.SyncResponse(false, err) + } + + return response.SyncResponse(true, nil) +} + // Service Reload Endpoint. var restartServiceCmd = rest.Endpoint{ Path: "services/restart", @@ -69,14 +108,7 @@ func cmdRestartServicePost(s *state.State, r *http.Request) response.Response { return response.EmptySyncResponse } -var rgwServiceCmd = rest.Endpoint{ - Path: "services/rgw", - - Put: rest.EndpointAction{Handler: cmdRGWServicePut, ProxyTarget: true}, -} - -// cmdRGWServicePutRGW is the handler for PUT /1.0/services/rgw. -func cmdRGWServicePut(s *state.State, r *http.Request) response.Response { +func cmdRGWServiceDelete(s *state.State, r *http.Request) response.Response { var req types.RGWService err := json.NewDecoder(r.Body).Decode(&req) @@ -84,11 +116,7 @@ func cmdRGWServicePut(s *state.State, r *http.Request) response.Response { return response.InternalError(err) } - if req.Enabled { - err = ceph.EnableRGW(common.CephState{State: s}, req.Port) - } else { - err = ceph.DisableRGW(common.CephState{State: s}) - } + err = ceph.DisableRGW(common.CephState{State: s}) if err != nil { return response.SmartError(err) } diff --git a/microceph/api/types/services.go b/microceph/api/types/services.go index 3a4b58be..f45ce78b 100644 --- a/microceph/api/types/services.go +++ b/microceph/api/types/services.go @@ -10,6 +10,16 @@ type Service struct { Location string `json:"location" yaml:"location"` } +// Name: Name of the service to be enabled +// Wait: Whether the operation is to be performed in sync or async +// Payload: Service specific additional data encoded as a json string. +type EnableService struct { + Name string `json:"name" yaml:"name"` + Wait bool `json:"bool" yaml:"bool"` + Payload string `json:"payload" yaml:"payload"` + // Enable Service passes all additional data as a json payload string. +} + // RGWService holds a port number and enable/disable flag type RGWService struct { Service diff --git a/microceph/ceph/bootstrap.go b/microceph/ceph/bootstrap.go index 936a3470..420caf8d 100644 --- a/microceph/ceph/bootstrap.go +++ b/microceph/ceph/bootstrap.go @@ -16,21 +16,11 @@ import ( // Bootstrap will initialize a new Ceph deployment. func Bootstrap(s common.StateInterface) error { - - confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") - runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") - dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data") - logPath := filepath.Join(os.Getenv("SNAP_COMMON"), "logs") + pathConsts := common.GetPathConst() + pathFileMode := common.GetPathFileMode() // Create our various paths. - paths := map[string]os.FileMode{ - confPath: 0755, - runPath: 0700, - dataPath: 0700, - logPath: 0700, - } - - for path, perm := range paths { + for path, perm := range pathFileMode { err := os.MkdirAll(path, perm) if err != nil { return fmt.Errorf("Unable to create %q: %w", path, err) @@ -39,12 +29,11 @@ func Bootstrap(s common.StateInterface) error { // Generate a new FSID. fsid := uuid.NewRandom().String() - - conf := newCephConfig(confPath) + conf := newCephConfig(pathConsts.ConfPath) err := conf.WriteConfig( map[string]any{ "fsid": fsid, - "runDir": runPath, + "runDir": pathConsts.RunPath, "monitors": s.ClusterState().Address().Hostname(), "addr": s.ClusterState().Address().Hostname(), }, @@ -53,14 +42,14 @@ func Bootstrap(s common.StateInterface) error { return err } - path, err := createKeyrings(confPath) + path, err := createKeyrings(pathConsts.ConfPath) if err != nil { return err } defer os.RemoveAll(path) - adminKey, err := parseKeyring(filepath.Join(confPath, "ceph.client.admin.keyring")) + adminKey, err := parseKeyring(filepath.Join(pathConsts.ConfPath, "ceph.client.admin.keyring")) if err != nil { return fmt.Errorf("Failed parsing admin keyring: %w", err) } @@ -70,17 +59,17 @@ func Bootstrap(s common.StateInterface) error { return err } - err = initMon(s, dataPath, path) + err = initMon(s, pathConsts.DataPath, path) if err != nil { return err } - err = initMgr(s, dataPath) + err = initMgr(s, pathConsts.DataPath) if err != nil { return err } - err = initMds(s, dataPath) + err = initMds(s, pathConsts.DataPath) if err != nil { return err } @@ -90,7 +79,7 @@ func Bootstrap(s common.StateInterface) error { return err } - err = startOSDs(s, dataPath) + err = startOSDs(s, pathConsts.DataPath) if err != nil { return err } diff --git a/microceph/ceph/join.go b/microceph/ceph/join.go index f3ee041d..97c5d2c3 100644 --- a/microceph/ceph/join.go +++ b/microceph/ceph/join.go @@ -5,28 +5,19 @@ import ( "database/sql" "fmt" "os" - "path/filepath" + "github.com/canonical/lxd/shared/logger" "github.com/canonical/microceph/microceph/common" "github.com/canonical/microceph/microceph/database" ) // Join will join an existing Ceph deployment. func Join(s common.StateInterface) error { - confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") - runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") - dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data") - logPath := filepath.Join(os.Getenv("SNAP_COMMON"), "logs") + pathFileMode := common.GetPathFileMode() + var spt = GetServicePlacementTable() // Create our various paths. - paths := map[string]os.FileMode{ - confPath: 0755, - runPath: 0700, - dataPath: 0700, - logPath: 0700, - } - - for path, perm := range paths { + for path, perm := range pathFileMode { err := os.MkdirAll(path, perm) if err != nil { return fmt.Errorf("Unable to create %q: %w", path, err) @@ -82,63 +73,30 @@ func Join(s common.StateInterface) error { services := []string{} if srvMon < 3 { - monDataPath := filepath.Join(dataPath, "mon", fmt.Sprintf("ceph-%s", s.ClusterState().Name())) - - err = os.MkdirAll(monDataPath, 0700) + err := spt["mon"].ServiceInit(s) if err != nil { - return fmt.Errorf("Failed to join monitor: %w", err) - } - - err = joinMon(s.ClusterState().Name(), monDataPath) - if err != nil { - return fmt.Errorf("Failed to join monitor: %w", err) - } - - err = snapStart("mon", true) - if err != nil { - return fmt.Errorf("Failed to start monitor: %w", err) + logger.Errorf("%v", err) + return err } services = append(services, "mon") } if srvMgr < 3 { - mgrDataPath := filepath.Join(dataPath, "mgr", fmt.Sprintf("ceph-%s", s.ClusterState().Name())) - - err = os.MkdirAll(mgrDataPath, 0700) - if err != nil { - return fmt.Errorf("Failed to join manager: %w", err) - } - - err = joinMgr(s.ClusterState().Name(), mgrDataPath) - if err != nil { - return fmt.Errorf("Failed to join manager: %w", err) - } - - err = snapStart("mgr", true) + err := spt["mgr"].ServiceInit(s) if err != nil { - return fmt.Errorf("Failed to start manager: %w", err) + logger.Errorf("%v", err) + return err } services = append(services, "mgr") } if srvMds < 3 { - mdsDataPath := filepath.Join(dataPath, "mds", fmt.Sprintf("ceph-%s", s.ClusterState().Name())) - - err = os.MkdirAll(mdsDataPath, 0700) + err := spt["mds"].ServiceInit(s) if err != nil { - return fmt.Errorf("Failed to join metadata server: %w", err) - } - - err = joinMds(s.ClusterState().Name(), mdsDataPath) - if err != nil { - return fmt.Errorf("Failed to join metadata server: %w", err) - } - - err = snapStart("mds", true) - if err != nil { - return fmt.Errorf("Failed to start metadata server: %w", err) + logger.Errorf("%v", err) + return err } services = append(services, "mds") diff --git a/microceph/ceph/monitor.go b/microceph/ceph/monitor.go index 1370c21d..d922d56e 100644 --- a/microceph/ceph/monitor.go +++ b/microceph/ceph/monitor.go @@ -57,20 +57,20 @@ func bootstrapMon(hostname string, path string, monmap string, keyring string) e func joinMon(hostname string, path string) error { tmpPath, err := os.MkdirTemp("", "") if err != nil { - return fmt.Errorf("Unable to create temporary path: %w", err) + return fmt.Errorf("unable to create temporary path: %w", err) } defer os.RemoveAll(tmpPath) monmap := filepath.Join(tmpPath, "mon.map") _, err = cephRun("mon", "getmap", "-o", monmap) if err != nil { - return fmt.Errorf("Failed to retrieve monmap: %w", err) + return fmt.Errorf("failed to retrieve monmap: %w", err) } keyring := filepath.Join(tmpPath, "mon.keyring") _, err = cephRun("auth", "get", "mon.", "-o", keyring) if err != nil { - return fmt.Errorf("Failed to retrieve mon keyring: %w", err) + return fmt.Errorf("failed to retrieve mon keyring: %w", err) } return bootstrapMon(hostname, path, monmap, keyring) diff --git a/microceph/ceph/rgw.go b/microceph/ceph/rgw.go index ea90820e..30f5e7d5 100644 --- a/microceph/ceph/rgw.go +++ b/microceph/ceph/rgw.go @@ -13,15 +13,13 @@ import ( // EnableRGW enables the RGW service on the cluster and adds initial configuration given a service port number. func EnableRGW(s common.StateInterface, port int) error { - confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") - runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") - dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data") + pathConsts := common.GetPathConst() // Create RGW configuration. - conf := newRadosGWConfig(confPath) + conf := newRadosGWConfig(pathConsts.ConfPath) err := conf.WriteConfig( map[string]any{ - "runDir": runPath, + "runDir": pathConsts.RunPath, "monitors": s.ClusterState().Address().Hostname(), "rgwPort": port, }, @@ -30,12 +28,12 @@ func EnableRGW(s common.StateInterface, port int) error { return err } // Create RGW keyring. - path := filepath.Join(dataPath, "radosgw", "ceph-radosgw.gateway") + path := filepath.Join(pathConsts.DataPath, "radosgw", "ceph-radosgw.gateway") if err = createRGWKeyring(path); err != nil { return err } // Symlink the keyring to the conf directory for usage with the radosgw-admin command. - if err = symlinkRGWKeyring(path, confPath); err != nil { + if err = symlinkRGWKeyring(path, pathConsts.ConfPath); err != nil { return err } // Record the changes to the database. @@ -52,8 +50,7 @@ func EnableRGW(s common.StateInterface, port int) error { // DisableRGW disables the RGW service on the cluster. func DisableRGW(s common.StateInterface) error { - confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") - dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data") + pathConsts := common.GetPathConst() err := stopRGW() if err != nil { @@ -66,21 +63,21 @@ func DisableRGW(s common.StateInterface) error { } // Remove the keyring symlink. - err = os.Remove(filepath.Join(confPath, "ceph.client.radosgw.gateway.keyring")) + err = os.Remove(filepath.Join(pathConsts.ConfPath, "ceph.client.radosgw.gateway.keyring")) if err != nil { - return fmt.Errorf("Failed to remove RGW keyring symlink: %w", err) + return fmt.Errorf("failed to remove RGW keyring symlink: %w", err) } // Remove the keyring. - err = os.Remove(filepath.Join(dataPath, "radosgw", "ceph-radosgw.gateway", "keyring")) + err = os.Remove(filepath.Join(pathConsts.DataPath, "radosgw", "ceph-radosgw.gateway", "keyring")) if err != nil { - return fmt.Errorf("Failed to remove RGW keyring: %w", err) + return fmt.Errorf("failed to remove RGW keyring: %w", err) } // Remove the configuration. - err = os.Remove(filepath.Join(confPath, "radosgw.conf")) + err = os.Remove(filepath.Join(pathConsts.ConfPath, "radosgw.conf")) if err != nil { - return fmt.Errorf("Failed to remove RGW configuration: %w", err) + return fmt.Errorf("failed to remove RGW configuration: %w", err) } return nil @@ -166,10 +163,10 @@ func createRGWKeyring(path string) error { } // symlinkRGWKeyring creates a symlink to the RGW keyring in the conf directory for use with the radosgw-admin command. -func symlinkRGWKeyring(keyPath, confPath string) error { +func symlinkRGWKeyring(keyPath, ConfPath string) error { if err := os.Symlink( filepath.Join(keyPath, "keyring"), - filepath.Join(confPath, "ceph.client.radosgw.gateway.keyring")); err != nil { + filepath.Join(ConfPath, "ceph.client.radosgw.gateway.keyring")); err != nil { return fmt.Errorf("Failed to create symlink to RGW keyring: %w", err) } diff --git a/microceph/ceph/services_placement.go b/microceph/ceph/services_placement.go new file mode 100644 index 00000000..5484438c --- /dev/null +++ b/microceph/ceph/services_placement.go @@ -0,0 +1,109 @@ +package ceph + +import ( + "fmt" + + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/common" +) + +// PlacementIntf is the interface used for running various services in a MicroCeph cluster. +type PlacementIntf interface { + // Populate json payload data to the service object. + PopulateParams(common.StateInterface, string) error + // Check if host is hospitable to the new service to be enabled. + HospitalityCheck(common.StateInterface) error + // Initialise the new service. + ServiceInit(common.StateInterface) error + // Perform Post Placement checks for the service + PostPlacementCheck(common.StateInterface) error + // Perform DB updates to persist the service enablement changes. + DbUpdate(common.StateInterface) error +} + +func GetServicePlacementTable() map[string](PlacementIntf) { + return map[string](PlacementIntf){ + "mon": &GenericServicePlacement{"mon"}, + "mgr": &GenericServicePlacement{"mgr"}, + "mds": &GenericServicePlacement{"mds"}, + "rgw": &RgwServicePlacement{}, + } +} + +func ServicePlacementHandler(s common.StateInterface, payload types.EnableService) error { + var ok bool + var spt = GetServicePlacementTable() + var sp PlacementIntf + + logger.Debugf("Enabling %s service, payload: %v", payload.Name, payload.Payload) + sp, ok = spt[payload.Name] + if !ok { + err := fmt.Errorf("%s enablement is not supported", payload.Name) + logger.Error(err.Error()) + return err + } + + if payload.Wait { + err := EnableService(s, payload, sp) + if err != nil { + logger.Errorf("failed %s service enablement request: %v", payload.Name, err) + return err + } + } else { + go func() { + // Async call to Enable service. + err := EnableService(s, payload, sp) + if err != nil { + logger.Errorf("failed %s service enablement request: %v", payload.Name, err) + } + }() + } + + return nil +} + +func EnableService(s common.StateInterface, payload types.EnableService, item PlacementIntf) error { + + // Populate json payload data to the service object. + err := item.PopulateParams(s, payload.Payload) + if err != nil { + retErr := fmt.Errorf("failed to populate the payload for %s enablement: %v", payload.Name, err) + logger.Error(retErr.Error()) + return retErr + } + + // Check if host is hospitable to the new service to be enabled. + err = item.HospitalityCheck(s) + if err != nil { + retErr := fmt.Errorf("host failed hospitality check for %s enablement: %v", payload.Name, err) + logger.Error(retErr.Error()) + return retErr + } + + // Initialise the new service. + err = item.ServiceInit(s) + if err != nil { + retErr := fmt.Errorf("failed to initialise %s service at host: %v", payload.Name, err) + logger.Error(retErr.Error()) + return retErr + } + + // Perform Post Placement checks for the service + err = item.PostPlacementCheck(s) + if err != nil { + retErr := fmt.Errorf("%s service unable to sustain on host: %v", payload.Name, err) + logger.Error(retErr.Error()) + return retErr + } + + // Perform DB updates to persist the service enablement changes. + err = item.DbUpdate(s) + if err != nil { + retErr := fmt.Errorf("failed to add DB record for %s: %v", payload.Name, err) + logger.Error(retErr.Error()) + return retErr + } + + return nil +} diff --git a/microceph/ceph/services_placement_generic.go b/microceph/ceph/services_placement_generic.go new file mode 100644 index 00000000..6ddadf87 --- /dev/null +++ b/microceph/ceph/services_placement_generic.go @@ -0,0 +1,132 @@ +package ceph + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/common" + "github.com/canonical/microceph/microceph/database" +) + +// Maps the addService function to respective services. +func GetAddServiceTable() map[string](func(string, string) error) { + return map[string](func(string, string) error){ + "mon": joinMon, + "mgr": joinMgr, + "mds": joinMds, + // Add more services here, for using the generic Interface implementation. + } +} + +// Used by services: mon, mgr, mds +type GenericServicePlacement struct { + Name string +} + +func (gsp *GenericServicePlacement) PopulateParams(s common.StateInterface, payload string) error { + // No params needed to initialise generic service + return nil +} + +func (gsp *GenericServicePlacement) HospitalityCheck(s common.StateInterface) error { + return genericHospitalityCheck(gsp.Name) +} + +func (gsp *GenericServicePlacement) ServiceInit(s common.StateInterface) error { + var ok bool + var addService func(string, string) error + hostname := s.ClusterState().Name() + pathConsts := common.GetPathConst() + pathFileMode := common.GetPathFileMode() + serviceDataPath := filepath.Join(pathConsts.DataPath, gsp.Name, fmt.Sprintf("ceph-%s", hostname)) + addServiceTable := GetAddServiceTable() + + // Fetch addService handler for gsp.Name service + addService, ok = addServiceTable[gsp.Name] + if !ok { + err := fmt.Errorf("%s is not registered in the generic implementation", gsp.Name) + logger.Error(err.Error()) + return err + } + + // Make required directories + err := os.MkdirAll(serviceDataPath, pathFileMode[pathConsts.DataPath]) + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("failed to add datapath %s for service %s: %w", serviceDataPath, gsp.Name, err) + } + + err = addService(hostname, serviceDataPath) + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("failed to add service %s: %w", gsp.Name, err) + } + + err = snapStart(gsp.Name, true) + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("failed to perform snap start for service %s: %w", gsp.Name, err) + } + + return nil +} + +func (gsp *GenericServicePlacement) PostPlacementCheck(s common.StateInterface) error { + return genericPostPlacementCheck(gsp.Name) +} + +func (gsp *GenericServicePlacement) DbUpdate(s common.StateInterface) error { + return genericDbUpdate(s, gsp.Name) +} + +// Generic Method Implementations +func genericHospitalityCheck(service string) error { + // Check if service already exists on host. + err := snapCheckActive(service) + if err == nil { + retErr := fmt.Errorf("%s service already active on host", service) + logger.Error(retErr.Error()) + return retErr + } + + return nil +} + +func genericPostPlacementCheck(service string) error { + // Check in a loop if the service stays up. + attempts := 4 + + for attempts > 0 { + ret := snapCheckActive(service) + if ret != nil { + return ret + } + + // simple delay, since only checking if the service stays up. + time.Sleep(time.Duration(attempts) * time.Second) + attempts-- // Decrease attempt by one. + } + + return nil +} + +func genericDbUpdate(s common.StateInterface, service string) error { + // Update the database. + err := s.ClusterState().Database.Transaction(s.ClusterState().Context, func(ctx context.Context, tx *sql.Tx) error { + // Record the roles. + _, err := database.CreateService(ctx, tx, database.Service{Member: s.ClusterState().Name(), Service: service}) + if err != nil { + return fmt.Errorf("failed to record role: %w", err) + } + return nil + }) + if err != nil { + return err + } + return nil +} diff --git a/microceph/ceph/services_placement_rgw.go b/microceph/ceph/services_placement_rgw.go new file mode 100644 index 00000000..ec35f7d5 --- /dev/null +++ b/microceph/ceph/services_placement_rgw.go @@ -0,0 +1,37 @@ +package ceph + +import ( + "encoding/json" + + "github.com/canonical/microceph/microceph/common" +) + +type RgwServicePlacement struct { + Port int +} + +func (rgw *RgwServicePlacement) PopulateParams(s common.StateInterface, payload string) error { + + err := json.Unmarshal([]byte(payload), &rgw) + if err != nil { + return err + } + + return nil +} + +func (rgw *RgwServicePlacement) HospitalityCheck(s common.StateInterface) error { + return genericHospitalityCheck("rgw") +} + +func (rgw *RgwServicePlacement) ServiceInit(s common.StateInterface) error { + return EnableRGW(s, rgw.Port) +} + +func (rgw *RgwServicePlacement) PostPlacementCheck(s common.StateInterface) error { + return genericPostPlacementCheck("rgw") +} + +func (rgw *RgwServicePlacement) DbUpdate(s common.StateInterface) error { + return genericDbUpdate(s, "rgw") +} diff --git a/microceph/ceph/services_placement_test.go b/microceph/ceph/services_placement_test.go new file mode 100644 index 00000000..e14e0100 --- /dev/null +++ b/microceph/ceph/services_placement_test.go @@ -0,0 +1,141 @@ +package ceph + +import ( + "fmt" + "testing" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type servicesPlacementSuite struct { + baseSuite + TestStateInterface *mocks.StateInterface +} + +func TestServicesPlacement(t *testing.T) { + suite.Run(t, new(servicesPlacementSuite)) +} + +// Set up test suite +func (s *servicesPlacementSuite) SetupTest() { + s.baseSuite.SetupTest() +} + +func addSnapServiceActiveExpectations(r *mocks.Runner, service string, retStr string, retErr error) { + r.On("RunCommand", []interface{}{ + "snapctl", "services", fmt.Sprintf("microceph.%s", service), + }...).Return(retStr, retErr).Once() +} + +func addPlacementServiceInitFailExpectation(sp *mocks.PlacementIntf, s *mocks.StateInterface, payload types.EnableService) { + sp.On("PopulateParams", s, payload.Payload).Return(nil).Once() + sp.On("HospitalityCheck", s).Return(nil).Once() + sp.On("ServiceInit", s).Return(fmt.Errorf("ERROR")).Once() +} + +func addPostPlacementCheckFailExpectation(sp *mocks.PlacementIntf, s *mocks.StateInterface, payload types.EnableService) { + sp.On("PopulateParams", s, payload.Payload).Return(nil).Once() + sp.On("HospitalityCheck", s).Return(nil).Once() + sp.On("ServiceInit", s).Return(nil).Once() + sp.On("PostPlacementCheck", s).Return(fmt.Errorf("ERROR")).Once() +} + +func addDbUpdateFailExpectation(sp *mocks.PlacementIntf, s *mocks.StateInterface, payload types.EnableService) { + sp.On("PopulateParams", s, payload.Payload).Return(nil).Once() + sp.On("HospitalityCheck", s).Return(nil).Once() + sp.On("ServiceInit", s).Return(nil).Once() + sp.On("PostPlacementCheck", s).Return(nil).Once() + sp.On("DbUpdate", s).Return(fmt.Errorf("ERROR")).Once() +} + +func (s *servicesPlacementSuite) TestUnknownServiceFailure() { + payload := types.EnableService{ + Name: "unknowService", + Wait: true, + Payload: "", + } + + // Check Enable Service fails for unregistered services. + err := ServicePlacementHandler(s.TestStateInterface, payload) + assert.Error(s.T(), err) +} + +func (s *servicesPlacementSuite) TestIllStructuredPayloadFailure() { + service := "rgw" + + payload := types.EnableService{ + Name: service, + Wait: true, + Payload: "\"Port\":80", // Json String does not have {} + } + + // Check Enable Service fails for unregistered services. + err := ServicePlacementHandler(s.TestStateInterface, payload) + assert.ErrorContains(s.T(), err, "failed to populate the payload") +} + +func (s *servicesPlacementSuite) TestHospitalityCheckFailure() { + service := "rgw" + + r := mocks.NewRunner(s.T()) + processExec = r + addSnapServiceActiveExpectations(r, service, "active", nil) + + payload := types.EnableService{ + Name: service, + Wait: true, + Payload: "{\"Port\":80}", + } + + // Check Enable Service fails for unregistered services. + err := ServicePlacementHandler(s.TestStateInterface, payload) + assert.ErrorContains(s.T(), err, "host failed hospitality check") +} + +func (s *servicesPlacementSuite) TestServiceInitFailure() { + service := "mon" + payload := types.EnableService{ + Name: service, + Wait: true, + } + + sp := mocks.NewPlacementIntf(s.T()) + addPlacementServiceInitFailExpectation(sp, s.TestStateInterface, payload) + + // Check Enable Service fails for unregistered services. + err := EnableService(s.TestStateInterface, payload, sp) + assert.ErrorContains(s.T(), err, "failed to initialise") +} + +func (s *servicesPlacementSuite) TestPostPlacementCheckFailure() { + service := "mon" + payload := types.EnableService{ + Name: service, + Wait: true, + } + + sp := mocks.NewPlacementIntf(s.T()) + addPostPlacementCheckFailExpectation(sp, s.TestStateInterface, payload) + + // Check Enable Service fails for unregistered services. + err := EnableService(s.TestStateInterface, payload, sp) + assert.ErrorContains(s.T(), err, "service unable to sustain on host") +} + +func (s *servicesPlacementSuite) TestDbUpdateFailure() { + service := "mon" + payload := types.EnableService{ + Name: service, + Wait: true, + } + + sp := mocks.NewPlacementIntf(s.T()) + addDbUpdateFailExpectation(sp, s.TestStateInterface, payload) + + // Check Enable Service fails for unregistered services. + err := EnableService(s.TestStateInterface, payload, sp) + assert.ErrorContains(s.T(), err, "failed to add DB record for") +} diff --git a/microceph/ceph/snap.go b/microceph/ceph/snap.go index 1a188b46..b5fa5db4 100644 --- a/microceph/ceph/snap.go +++ b/microceph/ceph/snap.go @@ -2,6 +2,7 @@ package ceph import ( "fmt" + "strings" "github.com/canonical/lxd/shared/logger" ) @@ -80,3 +81,23 @@ func snapRestart(service string, isReload bool) error { return nil } + +// Check if a particular snap service is active or inactive +func snapCheckActive(service string) error { + args := []string{ + "services", + fmt.Sprintf("microceph.%s", service), + } + + out, err := processExec.RunCommand("snapctl", args...) + if err != nil { + return err + } + + // Check if the particular service is inactive. + if strings.Contains(out, "inactive") { + return fmt.Errorf("%s service is not active", service) + } + + return nil +} diff --git a/microceph/client/client.go b/microceph/client/client.go index 898d8307..5abf20f9 100644 --- a/microceph/client/client.go +++ b/microceph/client/client.go @@ -7,9 +7,7 @@ import ( "time" "github.com/canonical/lxd/shared/api" - "github.com/canonical/lxd/shared/logger" "github.com/canonical/microcluster/client" - "github.com/canonical/microcluster/state" "github.com/canonical/microceph/microceph/api/types" ) @@ -52,47 +50,6 @@ func GetConfig(ctx context.Context, c *client.Client, data *types.Config) (types return configs, nil } -func RestartService(ctx context.Context, c *client.Client, data *types.Services) error { - // 120 second timeout for waiting. - queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) - defer cancel() - - err := c.Query(queryCtx, "POST", api.NewURL().Path("services", "restart"), data, nil) - if err != nil { - url := c.URL() - return fmt.Errorf("Failed Forwarding To: %s: %w", url.String(), err) - } - - return nil -} - -// Sends the desired list of services to be restarted on every other member of the cluster. -func SendRestartRequestToClusterMembers(s *state.State, services []string) error { - // Populate the restart request data. - var data types.Services - for _, service := range services { - data = append(data, types.Service{Service: service}) - } - - // Get a collection of clients to every other cluster member, with the notification user-agent set. - cluster, err := s.Cluster(nil) - if err != nil { - logger.Errorf("Failed to get a client for every cluster member: %v", err) - return err - } - - for _, remoteClient := range cluster { - // In order send restart to each cluster member and wait. - err = RestartService(s.Context, &remoteClient, &data) - if err != nil { - logger.Errorf("Restart error: %v", err) - return err - } - } - - return nil -} - // AddDisk requests Ceph sets up a new OSD. func AddDisk(ctx context.Context, c *client.Client, data *types.DisksPost) error { queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) @@ -135,30 +92,3 @@ func GetResources(ctx context.Context, c *client.Client) (*api.ResourcesStorage, return &storage, nil } - -// GetServices returns the list of configured ceph services. -func GetServices(ctx context.Context, c *client.Client) (types.Services, error) { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - - services := types.Services{} - - err := c.Query(queryCtx, "GET", api.NewURL().Path("services"), nil, &services) - if err != nil { - return nil, fmt.Errorf("Failed listing services: %w", err) - } - - return services, nil -} - -// EnableRGW requests Ceph configures the RGW service. -func EnableRGW(ctx context.Context, c *client.Client, data *types.RGWService) error { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) - defer cancel() - err := c.Query(queryCtx, "PUT", api.NewURL().Path("services", "rgw"), data, nil) - if err != nil { - return fmt.Errorf("Failed enabling RGW: %w", err) - } - - return nil -} diff --git a/microceph/client/services.go b/microceph/client/services.go new file mode 100644 index 00000000..60fcf90e --- /dev/null +++ b/microceph/client/services.go @@ -0,0 +1,99 @@ +// Package client provides a full Go API client. +package client + +import ( + "context" + "fmt" + "time" + + "github.com/canonical/lxd/shared/api" + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microcluster/client" + "github.com/canonical/microcluster/state" +) + +// GetServices returns the list of configured ceph services. +func GetServices(ctx context.Context, c *client.Client) (types.Services, error) { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + services := types.Services{} + + err := c.Query(queryCtx, "GET", api.NewURL().Path("services"), nil, &services) + if err != nil { + return nil, fmt.Errorf("failed listing services: %w", err) + } + + return services, nil +} + +// DisableRGW requests Ceph configures the RGW service. +func DisableRGW(ctx context.Context, c *client.Client, data *types.RGWService) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) + defer cancel() + err := c.Query(queryCtx, "DELETE", api.NewURL().Path("services", "rgw"), data, nil) + if err != nil { + return fmt.Errorf("failed disabling RGW: %w", err) + } + + return nil +} + +// Send a request to start certain service at the target node (hostname for remote target). +func SendServicePlacementReq(ctx context.Context, c *client.Client, data *types.EnableService, target string) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) + defer cancel() + + // Send this request to target. + c = c.UseTarget(target) + + err := c.Query(queryCtx, "PUT", api.NewURL().Path("services", data.Name), data, nil) + if err != nil { + return fmt.Errorf("failed placing service %s: %w", data.Name, err) + } + + return nil +} + +// Sends a request to the host to restart the provided service. +func RestartService(ctx context.Context, c *client.Client, data *types.Services) error { + // 120 second timeout for waiting. + queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) + defer cancel() + + err := c.Query(queryCtx, "POST", api.NewURL().Path("services", "restart"), data, nil) + if err != nil { + url := c.URL() + return fmt.Errorf("failed Forwarding To: %s: %w", url.String(), err) + } + + return nil +} + +// Sends the desired list of services to be restarted on every other member of the cluster. +func SendRestartRequestToClusterMembers(s *state.State, services []string) error { + // Populate the restart request data. + var data types.Services + for _, service := range services { + data = append(data, types.Service{Service: service}) + } + + // Get a collection of clients to every other cluster member, with the notification user-agent set. + cluster, err := s.Cluster(nil) + if err != nil { + logger.Errorf("failed to get a client for every cluster member: %v", err) + return err + } + + for _, remoteClient := range cluster { + // In order send restart to each cluster member and wait. + err = RestartService(s.Context, &remoteClient, &data) + if err != nil { + logger.Errorf("restart error: %v", err) + return err + } + } + + return nil +} diff --git a/microceph/cmd/microceph/disable_rgw.go b/microceph/cmd/microceph/disable_rgw.go index 89aedea1..8194aa01 100644 --- a/microceph/cmd/microceph/disable_rgw.go +++ b/microceph/cmd/microceph/disable_rgw.go @@ -43,7 +43,7 @@ func (c *cmdDisableRGW) Run(cmd *cobra.Command, args []string) error { Enabled: false, } - err = client.EnableRGW(context.Background(), cli, req) + err = client.DisableRGW(context.Background(), cli, req) if err != nil { return err } diff --git a/microceph/cmd/microceph/enable.go b/microceph/cmd/microceph/enable.go index a56885f5..119765a6 100644 --- a/microceph/cmd/microceph/enable.go +++ b/microceph/cmd/microceph/enable.go @@ -11,12 +11,19 @@ type cmdEnable struct { func (c *cmdEnable) Command() *cobra.Command { cmd := &cobra.Command{ Use: "enable", - Short: "Enables a feature on the cluster", + Short: "Enables a feature or service on the cluster", } // Enable RGW enableRGWCmd := cmdEnableRGW{common: c.common} + enableMonCmd := cmdEnableMON{common: c.common} + enableMgrCmd := cmdEnableMGR{common: c.common} + enableMdsCmd := cmdEnableMDS{common: c.common} + cmd.AddCommand(enableRGWCmd.Command()) + cmd.AddCommand(enableMonCmd.Command()) + cmd.AddCommand(enableMgrCmd.Command()) + cmd.AddCommand(enableMdsCmd.Command()) // Workaround for subcommand usage errors. See: https://github.com/spf13/cobra/issues/706 cmd.Args = cobra.NoArgs diff --git a/microceph/cmd/microceph/enable_mds.go b/microceph/cmd/microceph/enable_mds.go new file mode 100644 index 00000000..69d26e54 --- /dev/null +++ b/microceph/cmd/microceph/enable_mds.go @@ -0,0 +1,53 @@ +package main + +import ( + "context" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" +) + +type cmdEnableMDS struct { + common *CmdControl + wait bool + flagTarget string +} + +func (c *cmdEnableMDS) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "mds [--target ] [--wait ]", + Short: "Enable the MDS service on the --target server (default: this server)", + RunE: c.Run, + } + cmd.PersistentFlags().StringVar(&c.flagTarget, "target", "", "Server hostname (default: this server)") + cmd.Flags().BoolVar(&c.wait, "wait", true, "Wait for mds service to be up.") + return cmd +} + +// Run handles the enable mds command. +func (c *cmdEnableMDS) Run(cmd *cobra.Command, args []string) error { + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return err + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.EnableService{ + Name: "mds", + Wait: c.wait, + Payload: "", + } + + err = client.SendServicePlacementReq(context.Background(), cli, req, c.flagTarget) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/enable_mgr.go b/microceph/cmd/microceph/enable_mgr.go new file mode 100644 index 00000000..0e75173e --- /dev/null +++ b/microceph/cmd/microceph/enable_mgr.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" +) + +type cmdEnableMGR struct { + common *CmdControl + wait bool + flagTarget string +} + +func (c *cmdEnableMGR) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "mgr [--target ] [--wait ]", + Short: "Enable the MGR service on the --target server (default: this server)", + RunE: c.Run, + } + cmd.PersistentFlags().StringVar(&c.flagTarget, "target", "", "Server hostname (default: this server)") + cmd.Flags().BoolVar(&c.wait, "wait", true, "Wait for mgr service to be up.") + return cmd +} + +// Run handles the enable mgr command. +func (c *cmdEnableMGR) Run(cmd *cobra.Command, args []string) error { + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return err + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.EnableService{ + Name: "mgr", + Wait: c.wait, + Payload: "", + } + + err = client.SendServicePlacementReq(context.Background(), cli, req, c.flagTarget) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/enable_mon.go b/microceph/cmd/microceph/enable_mon.go new file mode 100644 index 00000000..e3eb4313 --- /dev/null +++ b/microceph/cmd/microceph/enable_mon.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" +) + +type cmdEnableMON struct { + common *CmdControl + wait bool + flagTarget string +} + +func (c *cmdEnableMON) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "mon [--target ] [--wait ]", + Short: "Enable the MON service on the --target server (default: this server)", + RunE: c.Run, + } + cmd.PersistentFlags().StringVar(&c.flagTarget, "target", "", "Server hostname (default: this server)") + cmd.Flags().BoolVar(&c.wait, "wait", true, "Wait for mon service to be up.") + return cmd +} + +// Run handles the enable mon command. +func (c *cmdEnableMON) Run(cmd *cobra.Command, args []string) error { + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return err + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + cli = cli.UseTarget(c.flagTarget) + req := &types.EnableService{ + Name: "mon", + Wait: c.wait, + Payload: "", + } + + err = client.SendServicePlacementReq(context.Background(), cli, req, c.flagTarget) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/enable_rgw.go b/microceph/cmd/microceph/enable_rgw.go index 77ef3fac..f429911b 100644 --- a/microceph/cmd/microceph/enable_rgw.go +++ b/microceph/cmd/microceph/enable_rgw.go @@ -2,28 +2,32 @@ package main import ( "context" + "encoding/json" "github.com/canonical/microcluster/microcluster" "github.com/spf13/cobra" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" "github.com/canonical/microceph/microceph/client" ) type cmdEnableRGW struct { common *CmdControl + wait bool flagPort int flagTarget string } func (c *cmdEnableRGW) Command() *cobra.Command { cmd := &cobra.Command{ - Use: "rgw [--port ] [--target ]", - Short: "Enable the RGW service on this --target server (default: this server)", + Use: "rgw [--port ] [--target ] [--wait ]", + Short: "Enable the RGW service on the --target server (default: this server)", RunE: c.Run, } cmd.PersistentFlags().IntVar(&c.flagPort, "port", 80, "Service port (default: 80)") cmd.PersistentFlags().StringVar(&c.flagTarget, "target", "", "Server hostname (default: this server)") + cmd.Flags().BoolVar(&c.wait, "wait", true, "Wait for rgw service to be up.") return cmd } @@ -38,14 +42,19 @@ func (c *cmdEnableRGW) Run(cmd *cobra.Command, args []string) error { if err != nil { return err } - cli = cli.UseTarget(c.flagTarget) - req := &types.RGWService{ - Port: c.flagPort, - Enabled: true, + jsp, err := json.Marshal(ceph.RgwServicePlacement{Port: c.flagPort}) + if err != nil { + return err + } + + req := &types.EnableService{ + Name: "rgw", + Wait: c.wait, + Payload: string(jsp[:]), } - err = client.EnableRGW(context.Background(), cli, req) + err = client.SendServicePlacementReq(context.Background(), cli, req, c.flagTarget) if err != nil { return err } diff --git a/microceph/common/constants.go b/microceph/common/constants.go new file mode 100644 index 00000000..5c71d00d --- /dev/null +++ b/microceph/common/constants.go @@ -0,0 +1,35 @@ +// Package common +package common + +import ( + "os" + "path/filepath" +) + +type PathConst struct { + ConfPath string + RunPath string + DataPath string + LogPath string +} + +type PathFileMode map[string]os.FileMode + +func GetPathConst() PathConst { + return PathConst{ + ConfPath: filepath.Join(os.Getenv("SNAP_DATA"), "conf"), + RunPath: filepath.Join(os.Getenv("SNAP_DATA"), "run"), + DataPath: filepath.Join(os.Getenv("SNAP_COMMON"), "data"), + LogPath: filepath.Join(os.Getenv("SNAP_COMMON"), "logs"), + } +} + +func GetPathFileMode() PathFileMode { + pathConsts := GetPathConst() + return PathFileMode{ + pathConsts.ConfPath: 0755, + pathConsts.RunPath: 0700, + pathConsts.DataPath: 0700, + pathConsts.LogPath: 0700, + } +} diff --git a/microceph/mocks/PlacementIntf.go b/microceph/mocks/PlacementIntf.go new file mode 100644 index 00000000..8172c097 --- /dev/null +++ b/microceph/mocks/PlacementIntf.go @@ -0,0 +1,97 @@ +// Code generated by mockery v2.30.16. DO NOT EDIT. + +package mocks + +import ( + common "github.com/canonical/microceph/microceph/common" + mock "github.com/stretchr/testify/mock" +) + +// PlacementIntf is an autogenerated mock type for the PlacementIntf type +type PlacementIntf struct { + mock.Mock +} + +// DbUpdate provides a mock function with given fields: _a0 +func (_m *PlacementIntf) DbUpdate(_a0 common.StateInterface) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(common.StateInterface) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// HospitalityCheck provides a mock function with given fields: _a0 +func (_m *PlacementIntf) HospitalityCheck(_a0 common.StateInterface) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(common.StateInterface) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PopulateParams provides a mock function with given fields: _a0, _a1 +func (_m *PlacementIntf) PopulateParams(_a0 common.StateInterface, _a1 string) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(common.StateInterface, string) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PostPlacementCheck provides a mock function with given fields: _a0 +func (_m *PlacementIntf) PostPlacementCheck(_a0 common.StateInterface) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(common.StateInterface) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ServiceInit provides a mock function with given fields: _a0 +func (_m *PlacementIntf) ServiceInit(_a0 common.StateInterface) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(common.StateInterface) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPlacementIntf creates a new instance of PlacementIntf. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPlacementIntf(t interface { + mock.TestingT + Cleanup(func()) +}) *PlacementIntf { + mock := &PlacementIntf{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}