From 7167410ff91cc3f662a1b878589ec6fa1a140e29 Mon Sep 17 00:00:00 2001 From: utkarshbhatthere Date: Tue, 18 Apr 2023 15:41:06 +0530 Subject: [PATCH] Added API for service restart request and housekeeping Signed-off-by: utkarshbhatthere --- microceph/api/configs.go | 62 +++++-------- microceph/api/endpoints.go | 1 + microceph/api/services.go | 32 ++++++- microceph/ceph/config.go | 86 ++++++++++--------- microceph/ceph/osd.go | 2 +- microceph/ceph/services.go | 44 +++++++--- microceph/ceph/snap.go | 12 ++- microceph/client/client.go | 52 ++++++----- microceph/cmd/microceph/cluster_config.go | 2 - microceph/cmd/microceph/cluster_config_get.go | 5 +- .../cmd/microceph/cluster_config_reset.go | 2 + microceph/cmd/microceph/cluster_config_set.go | 2 + 12 files changed, 180 insertions(+), 122 deletions(-) diff --git a/microceph/api/configs.go b/microceph/api/configs.go index 2b8c66b1..8d5894ab 100644 --- a/microceph/api/configs.go +++ b/microceph/api/configs.go @@ -11,12 +11,8 @@ import ( "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/microceph/microceph/ceph" "github.com/canonical/microceph/microceph/client" - "github.com/canonical/microceph/microceph/common" ) -// Config table for ConfigExtras -var configTable = ceph.GetConfigTable() - // /1.0/configs endpoint. var configsCmd = rest.Endpoint{ Path: "configs", @@ -35,7 +31,7 @@ func cmdConfigsGet(s *state.State, r *http.Request) response.Response { } // Fetch configs. - configs, err := ceph.ListConfigs(common.CephState{State: s}, req.Key) + configs, err := ceph.ListConfigs() if err != nil { return response.InternalError(err) } @@ -45,62 +41,50 @@ func cmdConfigsGet(s *state.State, r *http.Request) response.Response { func cmdConfigsPut(s *state.State, r *http.Request) response.Response { var req types.Config + configTable := ceph.GetConfigTable() err := json.NewDecoder(r.Body).Decode(&req) if err != nil { return response.InternalError(err) } - if !client.IsForwardedRequest(r) { - // Call set on the original Request but not on Forwarded Requests. - err = ceph.SetConfigItem(common.CephState{State: s}, req) - if err != nil { - return response.SmartError(err) - } - - // Forward request to cluster members - err = client.ForwardConfigRequestToClusterMembers(s, r, &req, client.SetConfig) + // Configure the key/value + err = ceph.SetConfigItem(req) + if err != nil { + return response.SmartError(err) } - // Restart Daemons on host. - daemons := configTable[req.Key].Daemons - for i := range daemons { - err = ceph.RestartCephService(daemons[i]) - if err != nil { - return response.SmartError(err) - } - } + services := configTable[req.Key].Daemons + go func() { + client.SendRestartRequestToClusterMembers(s, r, services) + // Restart Daemons on host. + ceph.RestartCephServices(services) + }() return response.EmptySyncResponse } func cmdConfigsDelete(s *state.State, r *http.Request) response.Response { var req types.Config + configTable := ceph.GetConfigTable() err := json.NewDecoder(r.Body).Decode(&req) if err != nil { return response.InternalError(err) } - if !client.IsForwardedRequest(r) { - // Call set on the original Request but not on Forwarded Requests. - err = ceph.RemoveConfigItem(common.CephState{State: s}, req) - if err != nil { - return response.SmartError(err) - } - - // Forward request to cluster members - err = client.ForwardConfigRequestToClusterMembers(s, r, &req, client.ClearConfig) + // Clean the key/value + err = ceph.RemoveConfigItem(req) + if err != nil { + return response.SmartError(err) } - // Restart Daemons on host. - daemons := configTable[req.Key].Daemons - for i := range daemons { - err = ceph.RestartCephService(daemons[i]) - if err != nil { - return response.SmartError(err) - } - } + services := configTable[req.Key].Daemons + go func() { + client.SendRestartRequestToClusterMembers(s, r, services) + // Restart Daemons on host. + ceph.RestartCephServices(services) + }() return response.EmptySyncResponse } diff --git a/microceph/api/endpoints.go b/microceph/api/endpoints.go index 1edeef31..2fcbba8d 100644 --- a/microceph/api/endpoints.go +++ b/microceph/api/endpoints.go @@ -12,4 +12,5 @@ var Endpoints = []rest.Endpoint{ servicesCmd, rgwServiceCmd, configsCmd, + restartServiceCmd, } diff --git a/microceph/api/services.go b/microceph/api/services.go index da347355..59aef0e5 100644 --- a/microceph/api/services.go +++ b/microceph/api/services.go @@ -2,13 +2,16 @@ package api import ( "encoding/json" + "fmt" + "net/http" + "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/microceph/microceph/common" - "net/http" "github.com/canonical/microcluster/rest" "github.com/canonical/microcluster/state" "github.com/lxc/lxd/lxd/response" + "github.com/lxc/lxd/shared/logger" "github.com/canonical/microceph/microceph/ceph" ) @@ -29,6 +32,33 @@ func cmdServicesGet(s *state.State, r *http.Request) response.Response { return response.SyncResponse(true, services) } +// Service Reload Endpoint. +var restartServiceCmd = rest.Endpoint{ + Path: "services/restart", + Post: rest.EndpointAction{Handler: cmdRestartServicePost, ProxyTarget: true}, +} + +func cmdRestartServicePost(s *state.State, r *http.Request) response.Response { + var services types.Services + + err := json.NewDecoder(r.Body).Decode(&services) + if err != nil { + logger.Error(fmt.Sprintf("Failed decoding restart services: %v", err)) + return response.InternalError(err) + } + + for _, service := range services { + err = ceph.RestartCephService(service.Service) + if err != nil { + url := s.Address().String() + logger.Error(fmt.Sprintf("Failed restarting %s on host %s", service.Service, url)) + return response.SyncResponse(false, err) + } + } + + return response.EmptySyncResponse +} + var rgwServiceCmd = rest.Endpoint{ Path: "services/rgw", diff --git a/microceph/ceph/config.go b/microceph/ceph/config.go index 6374649a..7986382c 100644 --- a/microceph/ceph/config.go +++ b/microceph/ceph/config.go @@ -14,83 +14,87 @@ import ( "github.com/canonical/microceph/microceph/database" ) -type ConfigExtras struct{ +type ConfigTable map[string]struct{ Who string // Ceph Config internal against each key Daemons []string // Daemons that need to be restarted post config change. - Regexp string // Regular Expression to check value against key - Cb *func(s common.StateInterface, c types.Config) } -type ConfigTable map[string]ConfigExtras - // Check if certain key is present in the map. func (c ConfigTable) isKeyPresent(key string) bool { if _, ok := c[key]; !ok { - return false - } - + return false + } + return true } // Return keys of the given set func (c ConfigTable) Keys() (keys []string) { - for k := range c { - keys = append(keys, k) - } - return keys + for k := range c { + keys = append(keys, k) + } + return keys } func GetConfigTable() ConfigTable { return ConfigTable{ - "public_network": {"global", []string{"mon", "osd"}, "", nil}, - "cluster_network": {"global", []string{"osd"}, "", nil}, + "public_network": {"global", []string{"mon", "osd"}}, + "cluster_network": {"global", []string{"osd"}}, } } -// Instantiation of config table. -var configTable = GetConfigTable() // Struct to get Config Items from config dump json output. -type ConfigDump []struct{ +type ConfigDumpItem struct{ Section string Name string Value string } +type ConfigDump []ConfigDumpItem -func SetConfigItem(s common.StateInterface, c types.Config) error { +func SetConfigItem(c types.Config) error { + configTable := GetConfigTable() + args := []string{ "config", "set", configTable[c.Key].Who, c.Key, c.Value, + "-f", + "json-pretty", } - + _, err := processExec.RunCommand("ceph", args...) if err != nil { return err } - + return nil } -func GetConfigItem(s common.StateInterface, c types.Config) (types.Config, error) { +func GetConfigItem(c types.Config) (types.Config, error) { var err error + configTable := GetConfigTable() + args := []string{ "config", "get", configTable[c.Key].Who, c.Key, + "-f", + "json-pretty", } - + c.Value, err = processExec.RunCommand("ceph", args...) if err != nil { return c, err } - + return c, nil } -func RemoveConfigItem(s common.StateInterface, c types.Config) error { +func RemoveConfigItem(c types.Config) error { + configTable := GetConfigTable() args := []string{ "config", "rm", @@ -99,30 +103,31 @@ func RemoveConfigItem(s common.StateInterface, c types.Config) error { "-f", "json-pretty", } - + _, err := processExec.RunCommand("ceph", args...) if err != nil { return err } - + return nil } -func ListConfigs(s common.StateInterface, key string) (types.Configs, error) { +func ListConfigs() (types.Configs, error) { var dump ConfigDump var configs types.Configs + configTable := GetConfigTable() args := []string{ "config", "dump", "-f", "json-pretty", } - + output, err := processExec.RunCommand("ceph", args...) if err != nil { return configs, err } - + json.Unmarshal([]byte(output), &dump) // Only take configs permitted in config table. for _, configItem := range dump { @@ -133,42 +138,43 @@ func ListConfigs(s common.StateInterface, key string) (types.Configs, error) { }) } } - + return configs, nil } +// updates the ceph config file. func updateConfig(s common.StateInterface) error { confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") - + // Get the configuration and servers. var err error var configItems []database.ConfigItem var monitors []database.Service - + err = s.ClusterState().Database.Transaction(s.ClusterState().Context, func(ctx context.Context, tx *sql.Tx) error { configItems, err = database.GetConfigItems(ctx, tx) if err != nil { return err } - + serviceName := "mon" monitors, err = database.GetServices(ctx, tx, database.ServiceFilter{Service: &serviceName}) if err != nil { return err } - + return nil }) if err != nil { return err } - + config := map[string]string{} for _, item := range configItems { config[item.Key] = item.Value } - + monitorAddresses := make([]string, len(monitors)) remotes := s.ClusterState().Remotes().RemotesByName() for i, monitor := range monitors { @@ -176,10 +182,10 @@ func updateConfig(s common.StateInterface) error { if !ok { continue } - + monitorAddresses[i] = remote.Address.Addr().String() } - + conf := newCephConfig(confPath) address := s.ClusterState().Address().Hostname() err = conf.WriteConfig( @@ -195,7 +201,7 @@ func updateConfig(s common.StateInterface) error { if err != nil { return fmt.Errorf("Couldn't render ceph.conf: %w", err) } - + // Generate ceph.client.admin.keyring keyring := newCephKeyring(confPath, "ceph.keyring") err = keyring.WriteConfig( @@ -207,6 +213,6 @@ func updateConfig(s common.StateInterface) error { if err != nil { return fmt.Errorf("Couldn't render ceph.client.admin.keyring: %w", err) } - + return nil } diff --git a/microceph/ceph/osd.go b/microceph/ceph/osd.go index 507ada62..327c3462 100644 --- a/microceph/ceph/osd.go +++ b/microceph/ceph/osd.go @@ -366,7 +366,7 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error { } // Spawn the OSD. - err = snapReload("osd") + err = snapRestart("osd", true) if err != nil { return err } diff --git a/microceph/ceph/services.go b/microceph/ceph/services.go index f715c98a..ecd56b0e 100644 --- a/microceph/ceph/services.go +++ b/microceph/ceph/services.go @@ -4,18 +4,20 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/Rican7/retry" "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" "github.com/canonical/microcluster/state" + "github.com/lxc/lxd/shared/logger" "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/microceph/microceph/database" "github.com/tidwall/gjson" ) -type Set map[string]int +type Set map[string]struct{} func (sub Set) isIn(super Set) bool { flag := true @@ -38,20 +40,38 @@ var serviceWorkerTable = map[string](func () (Set, error)) { "mon": getMons, } -func RestartCephService(service string) error { +// Restarts (in order) all Ceph Services provided in the input slice on the host. +func RestartCephServices(services []string) error { + for i := range services { + err := RestartCephService(services[i]) + if err != nil { + logger.Error(fmt.Sprintf("Service %s restart failed: %v ", services[i], err)) + return err + } + } + return nil +} + +// Restart provided ceph service ("mon"/"osd"...) on the host. +func RestartCephService(service string) error { if _, ok := serviceWorkerTable[service]; !ok { - return fmt.Errorf("No handler defined for service %s", service) + errStr := fmt.Sprintf("No handler defined for service %s", service) + logger.Error(errStr) + return fmt.Errorf(errStr) } + // Fetch a Set{} of available daemons for the service. workers, err := serviceWorkerTable[service]() if err != nil { + logger.Errorf("Failed fetching service %s workers", service) return err } - // Reload the service. - snapReload(service) + // Restart the service. + snapRestart(service, false) + // Check all the daemons available before Restart are up. err = retry.Retry(func(i uint) error { iWorkers, err := serviceWorkerTable[service]() if err != nil { @@ -60,12 +80,14 @@ func RestartCephService(service string) error { // All still not up if !workers.isIn(iWorkers) { - return fmt.Errorf( + errStr := fmt.Sprintf( "Attempt %d: Workers: %v not all present in %v", i, workers, iWorkers, ) + logger.Error(errStr) + return fmt.Errorf(errStr) } return nil - }, strategy.Delay(5), strategy.Limit(10), strategy.Backoff(backoff.Linear(5))) + }, strategy.Delay(5), strategy.Limit(10), strategy.Backoff(backoff.Linear(10*time.Second))) if err != nil { return err } @@ -74,7 +96,7 @@ func RestartCephService(service string) error { } func getMons() (Set, error) { - var retval Set + retval := Set{} output, err := processExec.RunCommand("ceph", "mon", "dump", "-f", "json-pretty") if err != nil { return nil, err @@ -83,14 +105,14 @@ func getMons() (Set, error) { // Get a list of mons. mons := gjson.Get(output, "mons.#.name") for _, key := range mons.Array() { - retval[key.String()] = 1 + retval[key.String()] = struct{}{} } return retval, nil } func getUpOsds() (Set, error) { - var retval Set + retval := Set{} output, err := processExec.RunCommand("ceph", "osd", "dump", "-f", "json-pretty") if err != nil { return nil, err @@ -99,7 +121,7 @@ func getUpOsds() (Set, error) { // Get a list of uuid of osds in up state. upOsds := gjson.Get(output, "osds.#(up==1)#.uuid") for _, element := range upOsds.Array() { - retval[element.String()] = 1 + retval[element.String()] = struct{}{} } return retval, nil } diff --git a/microceph/ceph/snap.go b/microceph/ceph/snap.go index 4ae6d10b..522c5116 100644 --- a/microceph/ceph/snap.go +++ b/microceph/ceph/snap.go @@ -42,14 +42,18 @@ func snapStop(service string, disable bool) error { return nil } -// snapReload restarts a service via snapctl. -func snapReload(service string) error { +// Restarts (optionally reloads) a service via snapctl. +func snapRestart(service string, isReload bool) error { args := []string{ "restart", - "--reload", - fmt.Sprintf("microceph.%s", service), } + if isReload { + args = append(args, "--reload") + } + + args = append(args, fmt.Sprintf("microceph.%s", service)) + _, err := processExec.RunCommand("snapctl", args...) if err != nil { return err diff --git a/microceph/client/client.go b/microceph/client/client.go index e2a2903d..56fd8d26 100644 --- a/microceph/client/client.go +++ b/microceph/client/client.go @@ -8,10 +8,9 @@ import ( "time" "github.com/canonical/microcluster/client" - restTypes "github.com/canonical/microcluster/rest/types" "github.com/canonical/microcluster/state" - "github.com/lxc/lxd/lxd/cluster/request" "github.com/lxc/lxd/shared/api" + "github.com/lxc/lxd/shared/logger" "github.com/canonical/microceph/microceph/api/types" ) @@ -54,37 +53,44 @@ func GetConfig(ctx context.Context, c *client.Client, data *types.Config) (types return configs, nil } -// IsForwardedRequest determines if this request has been forwarded from another cluster member. -func IsForwardedRequest(r *http.Request) bool { - return r.Header.Get("User-Agent") == request.UserAgentNotifier -} +func RestartService(ctx context.Context, c *client.Client, data *types.Services) (error) { + // 120 second timeout for waiting. + queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) + defer cancel() -func ForwardConfigRequestToClusterMembers(s *state.State, r *http.Request, data *types.Config, handle func(ctx context.Context, c *client.Client, data *types.Config) error) (error) { - // Get a collection of clients every other cluster member, with the notification user-agent set. - cluster, err := s.Cluster(r) + err := c.Query(queryCtx, "POST", api.NewURL().Path("services", "restart"), data, nil) if err != nil { - return fmt.Errorf("Failed to get a client for every cluster member: %w", err) + url := c.URL() + return fmt.Errorf("Failed Forwarding To: %s: %w", url.String(), err) } - err = cluster.Query(s.Context, true, func(ctx context.Context, c *client.Client) error { - addrPort, err := restTypes.ParseAddrPort(s.Address().URL.Host) - if err != nil { - return fmt.Errorf("Failed to parse addr:port of listen address %q: %w", s.Address().URL.Host, err) - } + return nil +} - // Asynchronously send a POST to each other cluster member. - err = handle(ctx, c, data) - if err != nil { - clientURL := c.URL() - return fmt.Errorf("Failed Forwarding To: %q, From: %s: %w", clientURL.String(), addrPort, err) - } +// Sends the desired list of services to be restarted on every other member of the cluster. +func SendRestartRequestToClusterMembers(s *state.State, r *http.Request, services []string) (error) { + // Populate the restart request data. + var data types.Services + for _, service := range services { + data = append(data, types.Service{Service: service}) + } - return nil - }) + // Get a collection of clients to every other cluster member, with the notification user-agent set. + cluster, err := s.Cluster(r) if err != nil { + logger.Error(fmt.Sprintf("Failed to get a client for every cluster member: %v", err)) return err } + for _, remoteClient := range cluster { + // In order send restart to each cluster member and wait. + err = RestartService(s.Context, &remoteClient, &data) + if err != nil { + logger.Error(fmt.Sprintf("Restart error: %v", err)) + return err + } + } + return nil } diff --git a/microceph/cmd/microceph/cluster_config.go b/microceph/cmd/microceph/cluster_config.go index 7956435e..3e825882 100644 --- a/microceph/cmd/microceph/cluster_config.go +++ b/microceph/cmd/microceph/cluster_config.go @@ -1,7 +1,6 @@ package main import ( - "github.com/canonical/microceph/microceph/ceph" "github.com/spf13/cobra" ) @@ -10,7 +9,6 @@ type cmdClusterConfig struct { cluster *cmdCluster } -var allowList = ceph.GetConfigTable() func (c *cmdClusterConfig) Command() *cobra.Command { cmd := &cobra.Command{ Use: "config", diff --git a/microceph/cmd/microceph/cluster_config_get.go b/microceph/cmd/microceph/cluster_config_get.go index d05361a1..53912293 100644 --- a/microceph/cmd/microceph/cluster_config_get.go +++ b/microceph/cmd/microceph/cluster_config_get.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" "github.com/canonical/microceph/microceph/client" "github.com/canonical/microcluster/microcluster" "github.com/lxc/lxd/lxc/utils" @@ -28,7 +29,9 @@ func (c *cmdClusterConfigGet) Command() *cobra.Command { } func (c *cmdClusterConfigGet) Run(cmd *cobra.Command, args []string) error { - // Get can be called with a single key or without any key for all configs. + allowList := ceph.GetConfigTable() + + // Get can be called with a single key. if len(args) != 1 { return cmd.Help() } diff --git a/microceph/cmd/microceph/cluster_config_reset.go b/microceph/cmd/microceph/cluster_config_reset.go index 12deab57..d0576bf7 100644 --- a/microceph/cmd/microceph/cluster_config_reset.go +++ b/microceph/cmd/microceph/cluster_config_reset.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" "github.com/canonical/microceph/microceph/client" "github.com/canonical/microcluster/microcluster" "github.com/spf13/cobra" @@ -27,6 +28,7 @@ func (c *cmdClusterConfigReset) Command() *cobra.Command { } func (c *cmdClusterConfigReset) Run(cmd *cobra.Command, args []string) error { + allowList := ceph.GetConfigTable() if len(args) != 1 { return cmd.Help() } diff --git a/microceph/cmd/microceph/cluster_config_set.go b/microceph/cmd/microceph/cluster_config_set.go index a53c6400..c510c198 100644 --- a/microceph/cmd/microceph/cluster_config_set.go +++ b/microceph/cmd/microceph/cluster_config_set.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" "github.com/canonical/microceph/microceph/client" "github.com/canonical/microcluster/microcluster" "github.com/spf13/cobra" @@ -27,6 +28,7 @@ func (c *cmdClusterConfigSet) Command() *cobra.Command { } func (c *cmdClusterConfigSet) Run(cmd *cobra.Command, args []string) error { + allowList := ceph.GetConfigTable() if len(args) != 2 { return cmd.Help() }