diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index beb4dfaf..54448a9c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -130,6 +130,36 @@ jobs: s3cmd --host localhost --host-bucket="localhost/%(bucket)" --access_key=fooAccessKey --secret_key=fooSecretKey --no-ssl put -P ~/test.txt s3://testbucket curl -s http://localhost/testbucket/test.txt | grep -F hello-radosgw + - name: Test Cluster Config + run: | + set -eux + cip=$(ip -4 -j route | jq -r '.[] | select(.dst | contains("default")) | .prefsrc' | tr -d '[:space:]') + + # pre config set timestamp for service age + ts=$(sudo systemctl show --property ActiveEnterTimestampMonotonic snap.microceph.osd.service | cut -d= -f2) + + # set config + sudo microceph cluster config set cluster_network $cip/8 --wait + + # post config set timestamp for service age + ts2=$(sudo systemctl show --property ActiveEnterTimestampMonotonic snap.microceph.osd.service | cut -d= -f2) + + # Check config output + output=$(sudo microceph cluster config get cluster_network | grep -cim1 'cluster_network') + if [[ $output -lt 1 ]] ; then echo "config check failed: $output"; exit 1; fi + + # Check service restarted + if [ $ts2 -lt $ts ]; then echo "config check failed: TS1: $ts2 TS2: $ts3"; exit 1; fi + + # reset config + sudo microceph cluster config reset cluster_network --wait + + # post config reset timestamp for service age + ts3=$(sudo systemctl show --property ActiveEnterTimestampMonotonic snap.microceph.osd.service | cut -d= -f2) + + # Check service restarted + if [ $ts3 -lt $ts2 ]; then echo "config check failed: TS2: $ts2 TS3: $ts3"; exit 1; fi + - name: Upload artifacts if: always() uses: actions/upload-artifact@v3 diff --git a/microceph/api/configs.go b/microceph/api/configs.go index 8d5894ab..ffc32ed5 100644 --- a/microceph/api/configs.go +++ b/microceph/api/configs.go @@ -23,17 +23,21 @@ var configsCmd = rest.Endpoint{ } func cmdConfigsGet(s *state.State, r *http.Request) response.Response { + var err error var req types.Config + var configs types.Configs - err := json.NewDecoder(r.Body).Decode(&req) + err = json.NewDecoder(r.Body).Decode(&req) if err != nil { return response.InternalError(err) } - // Fetch configs. - configs, err := ceph.ListConfigs() - if err != nil { - return response.InternalError(err) + // If a valid key string is passed, fetch that key. + if len(req.Key) > 0 { + configs, err = ceph.GetConfigItem(req) + } else { + // Fetch all configs. + configs, err = ceph.ListConfigs() } return response.SyncResponse(true, configs) @@ -55,12 +59,8 @@ func cmdConfigsPut(s *state.State, r *http.Request) response.Response { } services := configTable[req.Key].Daemons - go func() { - client.SendRestartRequestToClusterMembers(s, r, services) - // Restart Daemons on host. - ceph.RestartCephServices(services) - }() - + client.ConfigChangeRefresh(s, services, req.Wait) + return response.EmptySyncResponse } @@ -80,11 +80,7 @@ func cmdConfigsDelete(s *state.State, r *http.Request) response.Response { } services := configTable[req.Key].Daemons - go func() { - client.SendRestartRequestToClusterMembers(s, r, services) - // Restart Daemons on host. - ceph.RestartCephServices(services) - }() + client.ConfigChangeRefresh(s, services, req.Wait) return response.EmptySyncResponse } diff --git a/microceph/api/services.go b/microceph/api/services.go index 59aef0e5..04399600 100644 --- a/microceph/api/services.go +++ b/microceph/api/services.go @@ -2,7 +2,6 @@ package api import ( "encoding/json" - "fmt" "net/http" "github.com/canonical/microceph/microceph/api/types" @@ -43,7 +42,7 @@ func cmdRestartServicePost(s *state.State, r *http.Request) response.Response { err := json.NewDecoder(r.Body).Decode(&services) if err != nil { - logger.Error(fmt.Sprintf("Failed decoding restart services: %v", err)) + logger.Errorf("Failed decoding restart services: %v", err) return response.InternalError(err) } @@ -51,7 +50,7 @@ func cmdRestartServicePost(s *state.State, r *http.Request) response.Response { err = ceph.RestartCephService(service.Service) if err != nil { url := s.Address().String() - logger.Error(fmt.Sprintf("Failed restarting %s on host %s", service.Service, url)) + logger.Errorf("Failed restarting %s on host %s", service.Service, url) return response.SyncResponse(false, err) } } diff --git a/microceph/api/types/configs.go b/microceph/api/types/configs.go index 070f852d..c3a40b85 100644 --- a/microceph/api/types/configs.go +++ b/microceph/api/types/configs.go @@ -5,6 +5,7 @@ package types type Config struct { Key string `json:"key" yaml:"key"` Value string `json:"value" yaml:"value"` + Wait bool `json:"wait" yaml:"wait"` } // Configs is a slice of configs diff --git a/microceph/ceph/config.go b/microceph/ceph/config.go index 7986382c..751808ad 100644 --- a/microceph/ceph/config.go +++ b/microceph/ceph/config.go @@ -14,9 +14,11 @@ import ( "github.com/canonical/microceph/microceph/database" ) +// Config Table is the source of additional information for each supported config key +// Refer to GetConfigTable() type ConfigTable map[string]struct{ Who string // Ceph Config internal against each key - Daemons []string // Daemons that need to be restarted post config change. + Daemons []string // List of Daemons that need to be restarted across the cluster for the config change to take effect. } // Check if certain key is present in the map. @@ -24,7 +26,7 @@ func (c ConfigTable) isKeyPresent(key string) bool { if _, ok := c[key]; !ok { return false } - + return true } @@ -53,7 +55,7 @@ type ConfigDump []ConfigDumpItem func SetConfigItem(c types.Config) error { configTable := GetConfigTable() - + args := []string{ "config", "set", @@ -63,34 +65,40 @@ func SetConfigItem(c types.Config) error { "-f", "json-pretty", } - + _, err := processExec.RunCommand("ceph", args...) if err != nil { return err } - + return nil } -func GetConfigItem(c types.Config) (types.Config, error) { +func GetConfigItem(c types.Config) (types.Configs, error) { var err error configTable := GetConfigTable() - + ret := make(types.Configs, 1) + who := "mon" + + // workaround to query global configs from mon entity + if configTable[c.Key].Who != "global" { + who = configTable[c.Key].Who + } + args := []string{ "config", "get", - configTable[c.Key].Who, + who, c.Key, - "-f", - "json-pretty", } - - c.Value, err = processExec.RunCommand("ceph", args...) + + ret[0].Key = c.Key + ret[0].Value, err = processExec.RunCommand("ceph", args...) if err != nil { - return c, err + return nil, err } - - return c, nil + + return ret, nil } func RemoveConfigItem(c types.Config) error { @@ -100,15 +108,13 @@ func RemoveConfigItem(c types.Config) error { "rm", configTable[c.Key].Who, c.Key, - "-f", - "json-pretty", } - + _, err := processExec.RunCommand("ceph", args...) if err != nil { return err } - + return nil } @@ -122,12 +128,12 @@ func ListConfigs() (types.Configs, error) { "-f", "json-pretty", } - + output, err := processExec.RunCommand("ceph", args...) if err != nil { return configs, err } - + json.Unmarshal([]byte(output), &dump) // Only take configs permitted in config table. for _, configItem := range dump { @@ -138,7 +144,7 @@ func ListConfigs() (types.Configs, error) { }) } } - + return configs, nil } @@ -146,35 +152,35 @@ func ListConfigs() (types.Configs, error) { func updateConfig(s common.StateInterface) error { confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") - + // Get the configuration and servers. var err error var configItems []database.ConfigItem var monitors []database.Service - + err = s.ClusterState().Database.Transaction(s.ClusterState().Context, func(ctx context.Context, tx *sql.Tx) error { configItems, err = database.GetConfigItems(ctx, tx) if err != nil { return err } - + serviceName := "mon" monitors, err = database.GetServices(ctx, tx, database.ServiceFilter{Service: &serviceName}) if err != nil { return err } - + return nil }) if err != nil { return err } - + config := map[string]string{} for _, item := range configItems { config[item.Key] = item.Value } - + monitorAddresses := make([]string, len(monitors)) remotes := s.ClusterState().Remotes().RemotesByName() for i, monitor := range monitors { @@ -182,10 +188,10 @@ func updateConfig(s common.StateInterface) error { if !ok { continue } - + monitorAddresses[i] = remote.Address.Addr().String() } - + conf := newCephConfig(confPath) address := s.ClusterState().Address().Hostname() err = conf.WriteConfig( @@ -201,7 +207,7 @@ func updateConfig(s common.StateInterface) error { if err != nil { return fmt.Errorf("Couldn't render ceph.conf: %w", err) } - + // Generate ceph.client.admin.keyring keyring := newCephKeyring(confPath, "ceph.keyring") err = keyring.WriteConfig( @@ -213,6 +219,6 @@ func updateConfig(s common.StateInterface) error { if err != nil { return fmt.Errorf("Couldn't render ceph.client.admin.keyring: %w", err) } - + return nil } diff --git a/microceph/ceph/config_test.go b/microceph/ceph/config_test.go new file mode 100644 index 00000000..b46dfa15 --- /dev/null +++ b/microceph/ceph/config_test.go @@ -0,0 +1,91 @@ +package ceph + +import ( + "encoding/json" + "testing" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type configSuite struct { + baseSuite +} + +func TestConfig(t *testing.T) { + suite.Run(t, new(configSuite)) +} + +// Set up test suite +func (s *configSuite) SetupTest() { + s.baseSuite.SetupTest() +} + +func addConfigSetExpectations(r *mocks.Runner, key string, value string) { + r.On("RunCommand", []interface{}{ + "ceph", "config", "set", "global", key, value, "-f", "json-pretty", + }...).Return(value, nil).Once() +} + +func addConfigOpExpectations(r *mocks.Runner, op string, key string, value string) { + r.On("RunCommand", []interface{}{ + "ceph", "config", op, "global", key, + }...).Return(value, nil).Once() +} + +func addListConfigExpectations(r *mocks.Runner, key string, value string) { + var configs = ConfigDump{} + configs = append(configs, ConfigDumpItem{Section: "", Name: key, Value: value}) + ret, _ := json.Marshal(configs) + r.On("RunCommand", []interface{}{ + "ceph", "config", "dump", "-f", "json-pretty", + }...).Return(string(ret[:]), nil).Once() +} + +func (s *configSuite) TestSetConfig() { + t := types.Config{Key: "cluster_network", Value: "0.0.0.0/16"} + + r := mocks.NewRunner(s.T()) + addConfigSetExpectations(r, t.Key, t.Value) + processExec = r + + err := SetConfigItem(t) + assert.NoError(s.T(), err) +} + +func (s *configSuite) TestGetConfig() { + t := types.Config{Key: "cluster_network", Value: "0.0.0.0/16"} + + r := mocks.NewRunner(s.T()) + addConfigOpExpectations(r, "get", t.Key, t.Value) + processExec = r + + _, err := GetConfigItem(t) + assert.NoError(s.T(), err) +} + +func (s *configSuite) TestResetConfig() { + t := types.Config{Key: "cluster_network", Value: "0.0.0.0/16"} + + r := mocks.NewRunner(s.T()) + addConfigOpExpectations(r, "rm", t.Key, t.Value) + processExec = r + + err := RemoveConfigItem(t) + assert.NoError(s.T(), err) +} + +func (s *configSuite) TestListConfig() { + t := types.Config{Key: "cluster_network", Value: "0.0.0.0/16"} + + r := mocks.NewRunner(s.T()) + addListConfigExpectations(r, t.Key, t.Value) + processExec = r + + configs, err := ListConfigs() + assert.NoError(s.T(), err) + assert.Equal(s.T(), configs[0].Key, t.Key) + assert.Equal(s.T(), configs[0].Value, t.Value) +} diff --git a/microceph/ceph/services.go b/microceph/ceph/services.go index ecd56b0e..2fb72701 100644 --- a/microceph/ceph/services.go +++ b/microceph/ceph/services.go @@ -45,7 +45,7 @@ func RestartCephServices(services []string) error { for i := range services { err := RestartCephService(services[i]) if err != nil { - logger.Error(fmt.Sprintf("Service %s restart failed: %v ", services[i], err)) + logger.Errorf("Service %s restart failed: %v ", services[i], err) return err } } @@ -56,9 +56,9 @@ func RestartCephServices(services []string) error { // Restart provided ceph service ("mon"/"osd"...) on the host. func RestartCephService(service string) error { if _, ok := serviceWorkerTable[service]; !ok { - errStr := fmt.Sprintf("No handler defined for service %s", service) - logger.Error(errStr) - return fmt.Errorf(errStr) + err := fmt.Errorf("No handler defined for service %s", service) + logger.Errorf("%v", err) + return err } // Fetch a Set{} of available daemons for the service. @@ -80,11 +80,11 @@ func RestartCephService(service string) error { // All still not up if !workers.isIn(iWorkers) { - errStr := fmt.Sprintf( + err := fmt.Errorf( "Attempt %d: Workers: %v not all present in %v", i, workers, iWorkers, ) - logger.Error(errStr) - return fmt.Errorf(errStr) + logger.Errorf("%v", err) + return (err) } return nil }, strategy.Delay(5), strategy.Limit(10), strategy.Backoff(backoff.Linear(10*time.Second))) @@ -99,9 +99,11 @@ func getMons() (Set, error) { retval := Set{} output, err := processExec.RunCommand("ceph", "mon", "dump", "-f", "json-pretty") if err != nil { + logger.Errorf("Failed fetching Mon dump: %v", err) return nil, err } + logger.Debugf("Mon Dump:\n%s", output) // Get a list of mons. mons := gjson.Get(output, "mons.#.name") for _, key := range mons.Array() { @@ -115,9 +117,11 @@ func getUpOsds() (Set, error) { retval := Set{} output, err := processExec.RunCommand("ceph", "osd", "dump", "-f", "json-pretty") if err != nil { + logger.Errorf("Failed fetching OSD dump: %v", err) return nil, err } + logger.Debugf("OSD Dump:\n%s", output) // Get a list of uuid of osds in up state. upOsds := gjson.Get(output, "osds.#(up==1)#.uuid") for _, element := range upOsds.Array() { diff --git a/microceph/ceph/services_test.go b/microceph/ceph/services_test.go new file mode 100644 index 00000000..48a276c3 --- /dev/null +++ b/microceph/ceph/services_test.go @@ -0,0 +1,73 @@ +package ceph + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/canonical/microceph/microceph/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type servicesSuite struct { + baseSuite + // TestStateInterface *mocks.StateInterface +} + +func TestServices(t *testing.T) { + suite.Run(t, new(servicesSuite)) +} + +// Set up test suite +func (s *servicesSuite) SetupTest() { + s.baseSuite.SetupTest() +} + +func addOsdDumpExpectations(r *mocks.Runner) { + osdDumpObj := "{\"osds\":[{\"up\":1,\"uuid\":\"bfbbd27a-472f-4771-a6f7-7c5db9803d41\"}]}" + osdDump, _ := json.Marshal(osdDumpObj) + + // Expect osd service worker query + r.On("RunCommand", []interface{}{ + "ceph", "osd", "dump", "-f", "json-pretty", + }...).Return(string(osdDump[:]), nil).Twice() +} + +func addMonDumpExpectations(r *mocks.Runner) { + monDumpObj := "{\"mons\":[{\"name\":\"bfbbd27a\"}]}" + monDump, _ := json.Marshal(monDumpObj) + + // Expect mon service worker query + r.On("RunCommand", []interface{}{ + "ceph", "mon", "dump", "-f", "json-pretty", + }...).Return(string(monDump[:]), nil).Twice() +} + +func addServiceRestartExpectations(r *mocks.Runner, services []string) { + for _, service := range services { + r.On("RunCommand", []interface{}{ + "snapctl", "restart", fmt.Sprintf("microceph.%s", service), + }...).Return("ok", nil).Once() + } +} + +func (s *servicesSuite) TestRestartInvalidService() { + err := RestartCephService("InvalidService") + assert.ErrorContains(s.T(), err, "No handler defined") +} + +func (s *servicesSuite) TestRestartServiceWorkerSuccess() { + ts := []string{"mon", "osd"} // test services + + r := mocks.NewRunner(s.T()) + addMonDumpExpectations(r) + addOsdDumpExpectations(r) + addServiceRestartExpectations(r, ts) + processExec = r + + // Handler is defined for both mon and osd services. + err := RestartCephServices(ts) + assert.Equal(s.T(), err, nil) +} + diff --git a/microceph/client/client.go b/microceph/client/client.go index 56fd8d26..9f60fa57 100644 --- a/microceph/client/client.go +++ b/microceph/client/client.go @@ -4,7 +4,6 @@ package client import ( "context" "fmt" - "net/http" "time" "github.com/canonical/microcluster/client" @@ -13,10 +12,11 @@ import ( "github.com/lxc/lxd/shared/logger" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" ) func SetConfig(ctx context.Context, c *client.Client, data *types.Config) error { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) defer cancel() err := c.Query(queryCtx, "PUT", api.NewURL().Path("configs"), data, nil) @@ -28,7 +28,7 @@ func SetConfig(ctx context.Context, c *client.Client, data *types.Config) error } func ClearConfig(ctx context.Context, c *client.Client, data *types.Config) error { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) defer cancel() err := c.Query(queryCtx, "DELETE", api.NewURL().Path("configs"), data, nil) @@ -53,6 +53,30 @@ func GetConfig(ctx context.Context, c *client.Client, data *types.Config) (types return configs, nil } +// Perform ordered (one after other) restart of provided Ceph services across the ceph cluster. +func ConfigChangeRefresh(s *state.State, services []string, wait bool) error { + if wait { + // Execute restart synchronously + err := SendRestartRequestToClusterMembers(s, services) + if err != nil { + return err + } + + // Restart on current host. + err = ceph.RestartCephServices(services) + if err != nil { + return err + } + } else { // Execute restart asynchronously + go func() { + SendRestartRequestToClusterMembers(s, services) + ceph.RestartCephServices(services) // Restart on current host. + }() + } + + return nil +} + func RestartService(ctx context.Context, c *client.Client, data *types.Services) (error) { // 120 second timeout for waiting. queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) @@ -68,7 +92,7 @@ func RestartService(ctx context.Context, c *client.Client, data *types.Services) } // Sends the desired list of services to be restarted on every other member of the cluster. -func SendRestartRequestToClusterMembers(s *state.State, r *http.Request, services []string) (error) { +func SendRestartRequestToClusterMembers(s *state.State, services []string) (error) { // Populate the restart request data. var data types.Services for _, service := range services { @@ -76,9 +100,9 @@ func SendRestartRequestToClusterMembers(s *state.State, r *http.Request, service } // Get a collection of clients to every other cluster member, with the notification user-agent set. - cluster, err := s.Cluster(r) + cluster, err := s.Cluster(nil); if err != nil { - logger.Error(fmt.Sprintf("Failed to get a client for every cluster member: %v", err)) + logger.Errorf("Failed to get a client for every cluster member: %v", err) return err } @@ -86,7 +110,7 @@ func SendRestartRequestToClusterMembers(s *state.State, r *http.Request, service // In order send restart to each cluster member and wait. err = RestartService(s.Context, &remoteClient, &data) if err != nil { - logger.Error(fmt.Sprintf("Restart error: %v", err)) + logger.Errorf("Restart error: %v", err) return err } } diff --git a/microceph/cmd/microceph/cluster_config_reset.go b/microceph/cmd/microceph/cluster_config_reset.go index d0576bf7..5b7e855b 100644 --- a/microceph/cmd/microceph/cluster_config_reset.go +++ b/microceph/cmd/microceph/cluster_config_reset.go @@ -15,6 +15,8 @@ type cmdClusterConfigReset struct { common *CmdControl cluster *cmdCluster clusterConfig *cmdClusterConfig + + flagWait bool } func (c *cmdClusterConfigReset) Command() *cobra.Command { @@ -24,6 +26,7 @@ func (c *cmdClusterConfigReset) Command() *cobra.Command { RunE: c.Run, } + cmd.Flags().BoolVar(&c.flagWait, "wait", false, "Wait for required ceph services to restart post config reset.") return cmd } @@ -49,6 +52,7 @@ func (c *cmdClusterConfigReset) Run(cmd *cobra.Command, args []string) error { req := &types.Config{ Key: args[0], + Wait: c.flagWait, } err = client.ClearConfig(context.Background(), cli, req) diff --git a/microceph/cmd/microceph/cluster_config_set.go b/microceph/cmd/microceph/cluster_config_set.go index c510c198..3022d9ed 100644 --- a/microceph/cmd/microceph/cluster_config_set.go +++ b/microceph/cmd/microceph/cluster_config_set.go @@ -15,6 +15,8 @@ type cmdClusterConfigSet struct { common *CmdControl cluster *cmdCluster clusterConfig *cmdClusterConfig + + flagWait bool } func (c *cmdClusterConfigSet) Command() *cobra.Command { @@ -24,6 +26,7 @@ func (c *cmdClusterConfigSet) Command() *cobra.Command { RunE: c.Run, } + cmd.Flags().BoolVar(&c.flagWait, "wait", false, "Wait for required ceph services to restart post config set.") return cmd } @@ -50,6 +53,7 @@ func (c *cmdClusterConfigSet) Run(cmd *cobra.Command, args []string) error { req := &types.Config{ Key: args[0], Value: args[1], + Wait: c.flagWait, } err = client.SetConfig(context.Background(), cli, req)