From 019a392aac36bc8776a81bbd65c347906a9f7579 Mon Sep 17 00:00:00 2001 From: Utkarsh Bhatt Date: Wed, 4 Oct 2023 13:33:29 +0530 Subject: [PATCH] Adds microceph client configuration support for rbd_cache Signed-off-by: Utkarsh Bhatt --- microceph/api/client_configs.go | 183 ++++++++ microceph/api/endpoints.go | 3 + microceph/api/types/client_configs.go | 12 + microceph/ceph/bootstrap.go | 2 +- microceph/ceph/client_config.go | 61 +++ microceph/ceph/config.go | 30 +- microceph/ceph/configwriter.go | 7 + microceph/ceph/join.go | 2 +- microceph/ceph/services.go | 14 +- microceph/ceph/start.go | 2 +- microceph/client/client_configs.go | 99 ++++ microceph/client/configs.go | 49 ++ microceph/client/{client.go => disks.go} | 52 +-- microceph/cmd/microceph/client.go | 26 ++ microceph/cmd/microceph/client_config.go | 39 ++ microceph/cmd/microceph/client_config_get.go | 79 ++++ microceph/cmd/microceph/client_config_list.go | 70 +++ .../cmd/microceph/client_config_reset.go | 68 +++ microceph/cmd/microceph/client_config_set.go | 69 +++ microceph/cmd/microceph/cluster_migrate.go | 1 + microceph/cmd/microceph/main.go | 3 + microceph/common/cluster.go | 26 ++ microceph/common/constants.go | 2 + microceph/database/client_config.go | 37 ++ microceph/database/client_config.mapper.go | 424 ++++++++++++++++++ microceph/database/client_config_extras.go | 220 +++++++++ microceph/database/disk_extras.go | 1 + microceph/database/schema.go | 19 + 28 files changed, 1541 insertions(+), 59 deletions(-) create mode 100644 microceph/api/client_configs.go create mode 100644 microceph/api/types/client_configs.go create mode 100644 microceph/ceph/client_config.go create mode 100644 microceph/client/client_configs.go create mode 100644 microceph/client/configs.go rename microceph/client/{client.go => disks.go} (58%) create mode 100644 microceph/cmd/microceph/client.go create mode 100644 microceph/cmd/microceph/client_config.go create mode 100644 microceph/cmd/microceph/client_config_get.go create mode 100644 microceph/cmd/microceph/client_config_list.go create mode 100644 microceph/cmd/microceph/client_config_reset.go create mode 100644 microceph/cmd/microceph/client_config_set.go create mode 100644 microceph/common/cluster.go create mode 100644 microceph/database/client_config.go create mode 100644 microceph/database/client_config.mapper.go create mode 100644 microceph/database/client_config_extras.go diff --git a/microceph/api/client_configs.go b/microceph/api/client_configs.go new file mode 100644 index 00000000..f67271d4 --- /dev/null +++ b/microceph/api/client_configs.go @@ -0,0 +1,183 @@ +package api + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + + "github.com/canonical/lxd/lxd/response" + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microceph/microceph/common" + "github.com/canonical/microceph/microceph/database" + "github.com/canonical/microcluster/rest" + "github.com/canonical/microcluster/state" +) + +// Top level client API +var clientCmd = rest.Endpoint{ + Path: "client", + Get: rest.EndpointAction{Handler: cmdClientGet, ProxyTarget: true}, +} + +func cmdClientGet(s *state.State, r *http.Request) response.Response { + return response.EmptySyncResponse +} + +// client configs API +var clientConfigsCmd = rest.Endpoint{ + Path: "client/configs", + Put: rest.EndpointAction{Handler: cmdClientConfigsPut, ProxyTarget: true}, + Get: rest.EndpointAction{Handler: cmdClientConfigsGet, ProxyTarget: true}, +} + +func cmdClientConfigsGet(s *state.State, r *http.Request) response.Response { + var req types.ClientConfig + var configs database.ClientConfigItems + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + return response.InternalError(err) + } + + if len(req.Host) > 0 { + configs, err = database.ClientConfigQuery.GetAllForHost(s, req.Host) + } else { + configs, err = database.ClientConfigQuery.GetAll(s) + } + if err != nil { + logger.Errorf("failed fetching client configs: %v for %v", err, req) + return response.SyncResponse(false, nil) + } + + logger.Infof("Database Response: %v", configs) + + return response.SyncResponse(true, configs.GetClientConfigSlice()) +} + +// Implements the render .conf file at that particular host. +func cmdClientConfigsPut(s *state.State, r *http.Request) response.Response { + // Check if microceph is bootstrapped. + err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + isFsid, err := database.ConfigItemExists(ctx, tx, "fsid") + if err != nil || !isFsid { + return fmt.Errorf("cluster is not bootstrapped yet: %v", err) + } + return nil + }) + if err != nil { + logger.Error(err.Error()) + return response.SyncResponse(false, nil) + } + + err = ceph.UpdateConfig(common.CephState{State: s}) + if err != nil { + logger.Error(err.Error()) + return response.SyncResponse(false, nil) + } + + return response.EmptySyncResponse +} + +// client configs key API +var clientConfigsKeyCmd = rest.Endpoint{ + Path: "client/configs/{key}", + Put: rest.EndpointAction{Handler: clientConfigsKeyPut, ProxyTarget: true}, + Get: rest.EndpointAction{Handler: clientConfigsKeyGet, ProxyTarget: true}, + Delete: rest.EndpointAction{Handler: clientConfigsKeyDelete, ProxyTarget: true}, +} + +func clientConfigsKeyGet(s *state.State, r *http.Request) response.Response { + var req types.ClientConfig + var configs database.ClientConfigItems + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + return response.InternalError(err) + } + + if len(req.Host) > 0 { + configs, err = database.ClientConfigQuery.GetAllForKeyAndHost(s, req.Key, req.Host) + } else { + configs, err = database.ClientConfigQuery.GetAllForKey(s, req.Key) + } + if err != nil { + logger.Errorf("failed fetching client configs: %v for %v", err, req) + return response.SyncResponse(false, nil) + } + + logger.Infof("Database Response: %v", configs) + + return response.SyncResponse(true, configs.GetClientConfigSlice()) +} + +func clientConfigsKeyPut(s *state.State, r *http.Request) response.Response { + var req types.ClientConfig + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + return response.InternalError(err) + } + + // If new config request is for global configuration. + err = database.ClientConfigQuery.AddNew(s, req.Key, req.Value, req.Host) + if err != nil { + return response.InternalError(err) + } + + // Trigger /conf file update across cluster. + clientConfigUpdate(s, req.Wait) + + return response.EmptySyncResponse +} + +func clientConfigsKeyDelete(s *state.State, r *http.Request) response.Response { + var req types.ClientConfig + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + return response.InternalError(err) + } + + if len(req.Host) > 0 { + err = database.ClientConfigQuery.RemoveOneForKeyAndHost(s, req.Key, req.Host) + if err != nil { + return response.InternalError(err) + } + } else { + err = database.ClientConfigQuery.RemoveAllForKey(s, req.Key) + if err != nil { + return response.InternalError(err) + } + } + + return response.EmptySyncResponse +} + +// Perform ordered (one after other) updation of ceph.conf across the ceph cluster. +func clientConfigUpdate(s *state.State, wait bool) error { + if wait { + // Execute update conf synchronously + err := client.SendUpdateClientConfRequestToClusterMembers(common.CephState{State: s}) + if err != nil { + return err + } + + // Update on current host. + err = ceph.UpdateConfig(common.CephState{State: s}) + if err != nil { + return err + } + } else { // Execute update asynchronously + go func() { + client.SendUpdateClientConfRequestToClusterMembers(common.CephState{State: s}) + ceph.UpdateConfig(common.CephState{State: s}) // Restart on current host. + }() + } + + return nil +} diff --git a/microceph/api/endpoints.go b/microceph/api/endpoints.go index 7af08549..845fc417 100644 --- a/microceph/api/endpoints.go +++ b/microceph/api/endpoints.go @@ -18,4 +18,7 @@ var Endpoints = []rest.Endpoint{ mgrServiceCmd, monServiceCmd, rgwServiceCmd, + clientCmd, + clientConfigsCmd, + clientConfigsKeyCmd, } diff --git a/microceph/api/types/client_configs.go b/microceph/api/types/client_configs.go new file mode 100644 index 00000000..3983e274 --- /dev/null +++ b/microceph/api/types/client_configs.go @@ -0,0 +1,12 @@ +package types + +// Configs holds the key value pair +type ClientConfig struct { + Key string `json:"key" yaml:"key"` + Value string `json:"value" yaml:"value"` + Host string `json:"host" yaml:"host"` + Wait bool `json:"wait" yaml:"wait"` +} + +// Configs is a slice of configs +type ClientConfigs []ClientConfig diff --git a/microceph/ceph/bootstrap.go b/microceph/ceph/bootstrap.go index b85ac68f..55f46eee 100644 --- a/microceph/ceph/bootstrap.go +++ b/microceph/ceph/bootstrap.go @@ -102,7 +102,7 @@ func Bootstrap(s common.StateInterface) error { } // Re-generate the configuration from the database. - err = updateConfig(s) + err = UpdateConfig(s) if err != nil { return fmt.Errorf("Failed to re-generate the configuration: %w", err) } diff --git a/microceph/ceph/client_config.go b/microceph/ceph/client_config.go new file mode 100644 index 00000000..d3fc4b6e --- /dev/null +++ b/microceph/ceph/client_config.go @@ -0,0 +1,61 @@ +package ceph + +import ( + "fmt" + "reflect" + + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/common" + "github.com/canonical/microceph/microceph/database" +) + +type ClientConfigT struct { + IsCache string + CacheSize string + IsCacheWritethrough string + CacheMaxDirty string + CacheTargetDirty string +} + +func GetClientConfigForHost(s common.StateInterface, hostname string) (ClientConfigT, error) { + retval := ClientConfigT{} + + // Get all client configs for the current host. + configs, err := database.ClientConfigQuery.GetAllForHost(s.ClusterState(), hostname) + if err != nil { + return ClientConfigT{}, fmt.Errorf("could not query database for client configs: %v", err) + } + + logger.Infof("Client Configs for host %s, %v", hostname, configs) + + for _, config := range configs { + // Populate client config table using the database values. + setterTable := GetClientConfigSet() + err = setFieldValue(&retval, fmt.Sprint(setterTable[config.Key]), config.Value) + if err != nil { + return ClientConfigT{}, fmt.Errorf("cailed object population: %v", err) + } + } + + return retval, nil +} + +func setFieldValue(ogp *ClientConfigT, field string, value string) error { + r := reflect.ValueOf(ogp) + f := reflect.Indirect(r).FieldByName(field) + if f.Kind() != reflect.Invalid { + f.SetString(value) + return nil + } + return fmt.Errorf("cannot set field %s", field) +} + +func GetClientConfigSet() Set { + return Set{ + "rbd_cache": "IsCache", + "rbd_cache_size": "CacheSize", + "rbd_cache_writethrough_until_flush": "IsCacheWritethrough", + "rbd_cache_max_dirty": "CacheMaxDirty", + "rbd_cache_target_dirty": "CacheTargetDirty", + } +} diff --git a/microceph/ceph/config.go b/microceph/ceph/config.go index adf38a4d..e416b627 100644 --- a/microceph/ceph/config.go +++ b/microceph/ceph/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" + "github.com/canonical/lxd/shared/logger" "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/microceph/microceph/common" "github.com/canonical/microceph/microceph/database" @@ -162,7 +163,7 @@ func ListConfigs() (types.Configs, error) { } // updates the ceph config file. -func updateConfig(s common.StateInterface) error { +func UpdateConfig(s common.StateInterface) error { confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf") runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run") @@ -207,18 +208,29 @@ func updateConfig(s common.StateInterface) error { conf := newCephConfig(confPath) address := s.ClusterState().Address().Hostname() + clientConfig, err := GetClientConfigForHost(s, s.ClusterState().Name()) + if err != nil { + logger.Errorf("Failed to pull Client Configurations: %v", err) + return err + } + err = conf.WriteConfig( map[string]any{ - "fsid": config["fsid"], - "runDir": runPath, - "monitors": strings.Join(monitorAddresses, ","), - "addr": address, - "ipv4": strings.Contains(address, "."), - "ipv6": strings.Contains(address, ":"), + "fsid": config["fsid"], + "runDir": runPath, + "monitors": strings.Join(monitorAddresses, ","), + "addr": address, + "ipv4": strings.Contains(address, "."), + "ipv6": strings.Contains(address, ":"), + "isCache": clientConfig.IsCache, + "cacheSize": clientConfig.CacheSize, + "isCacheWritethrough": clientConfig.IsCacheWritethrough, + "cacheMaxDirty": clientConfig.CacheMaxDirty, + "cacheTargetDirty": clientConfig.CacheTargetDirty, }, ) if err != nil { - return fmt.Errorf("Couldn't render ceph.conf: %w", err) + return fmt.Errorf("couldn't render ceph.conf: %w", err) } // Generate ceph.client.admin.keyring @@ -230,7 +242,7 @@ func updateConfig(s common.StateInterface) error { }, ) if err != nil { - return fmt.Errorf("Couldn't render ceph.client.admin.keyring: %w", err) + return fmt.Errorf("couldn't render ceph.client.admin.keyring: %w", err) } return nil diff --git a/microceph/ceph/configwriter.go b/microceph/ceph/configwriter.go index 08d34d0b..1cf075d6 100644 --- a/microceph/ceph/configwriter.go +++ b/microceph/ceph/configwriter.go @@ -52,6 +52,13 @@ auth allow insecure global id reclaim = false public addr = {{.addr}} ms bind ipv4 = {{.ipv4}} ms bind ipv6 = {{.ipv6}} + +[client] +{{if .isCache}}rbd_cache = {{.isCache}}{{end}} +{{if .cacheSize}}rbd_cache_size = {{.cacheSize}}{{end}} +{{if .isCacheWritethrough}}rbd_cache_writethrough_until_flush = {{.isCacheWritethrough}}{{end}} +{{if .cacheMaxDirty}}rbd_cache_max_dirty = {{.cacheMaxDirty}}{{end}} +{{if .cacheTargetDirty}}rbd_cache_target_dirty = {{.cacheTargetDirty}}{{end}} `)), configFile: "ceph.conf", configDir: configDir, diff --git a/microceph/ceph/join.go b/microceph/ceph/join.go index 97c5d2c3..d068317e 100644 --- a/microceph/ceph/join.go +++ b/microceph/ceph/join.go @@ -25,7 +25,7 @@ func Join(s common.StateInterface) error { } // Generate the configuration from the database. - err := updateConfig(s) + err := UpdateConfig(s) if err != nil { return fmt.Errorf("Failed to generate the configuration: %w", err) } diff --git a/microceph/ceph/services.go b/microceph/ceph/services.go index 2e88fb61..ca429da7 100644 --- a/microceph/ceph/services.go +++ b/microceph/ceph/services.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "github.com/canonical/microceph/microceph/common" "os" "path/filepath" "time" @@ -16,11 +15,22 @@ import ( "github.com/canonical/microcluster/state" "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/common" "github.com/canonical/microceph/microceph/database" "github.com/tidwall/gjson" ) -type Set map[string]struct{} +type Set map[string]interface{} + +func (sub Set) Keys() []string { + keys := make([]string, len(sub)) + + for key := range sub { + keys = append(keys, key) + } + + return keys +} func (sub Set) isIn(super Set) bool { flag := true diff --git a/microceph/ceph/start.go b/microceph/ceph/start.go index ae2d2f87..ced3ce84 100644 --- a/microceph/ceph/start.go +++ b/microceph/ceph/start.go @@ -49,7 +49,7 @@ func Start(s common.StateInterface) error { continue } - err = updateConfig(s) + err = UpdateConfig(s) if err != nil { time.Sleep(10 * time.Second) continue diff --git a/microceph/client/client_configs.go b/microceph/client/client_configs.go new file mode 100644 index 00000000..38a6467d --- /dev/null +++ b/microceph/client/client_configs.go @@ -0,0 +1,99 @@ +package client + +import ( + "context" + "fmt" + "time" + + "github.com/canonical/lxd/shared/api" + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/common" + "github.com/canonical/microcluster/client" +) + +func SetClientConfig(ctx context.Context, c *client.Client, data *types.ClientConfig) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) + defer cancel() + + err := c.Query(queryCtx, "PUT", api.NewURL().Path("client", "configs", data.Key), data, nil) + if err != nil { + return fmt.Errorf("failed setting client config: %w, Key: %s, Value: %s", err, data.Key, data.Value) + } + + return nil +} + +func ResetClientConfig(ctx context.Context, c *client.Client, data *types.ClientConfig) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) + defer cancel() + + err := c.Query(queryCtx, "DELETE", api.NewURL().Path("client", "configs", data.Key), data, nil) + if err != nil { + return fmt.Errorf("failed clearing client config: %w, Key: %s", err, data.Key) + } + + return nil +} + +func GetClientConfig(ctx context.Context, c *client.Client, data *types.ClientConfig) (types.ClientConfigs, error) { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + configs := types.ClientConfigs{} + + err := c.Query(queryCtx, "GET", api.NewURL().Path("client", "configs", data.Key), data, &configs) + if err != nil { + return nil, fmt.Errorf("failed to fetch client config: %w, Key: %s", err, data.Key) + } + + return configs, nil +} + +func ListClientConfig(ctx context.Context, c *client.Client, data *types.ClientConfig) (types.ClientConfigs, error) { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + configs := types.ClientConfigs{} + + err := c.Query(queryCtx, "GET", api.NewURL().Path("client", "configs"), data, &configs) + if err != nil { + return nil, fmt.Errorf("failed to fetch client config: %w, Key: %s", err, data.Key) + } + + return configs, nil +} + +// /client/configs/ +func UpdateClientConf(ctx context.Context, c *client.Client) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*20) + defer cancel() + + err := c.Query(queryCtx, "PUT", api.NewURL().Path("client", "configs"), nil, nil) + if err != nil { + return fmt.Errorf("failed to update the configuration file: %w", err) + } + + return nil +} + +// Sends the update conf request to every other member of the cluster. +func SendUpdateClientConfRequestToClusterMembers(s common.StateInterface) error { + // Get a collection of clients to every other cluster member, with the notification user-agent set. + cluster, err := s.ClusterState().Cluster(nil) + if err != nil { + logger.Errorf("failed to get a client for every cluster member: %v", err) + return err + } + + for _, remoteClient := range cluster { + // In order send restart to each cluster member and wait. + err = UpdateClientConf(s.ClusterState().Context, &remoteClient) + if err != nil { + logger.Errorf("update conf error: %v", err) + return err + } + } + + return nil +} diff --git a/microceph/client/configs.go b/microceph/client/configs.go new file mode 100644 index 00000000..8c3e27f0 --- /dev/null +++ b/microceph/client/configs.go @@ -0,0 +1,49 @@ +package client + +import ( + "context" + "fmt" + "time" + + "github.com/canonical/lxd/shared/api" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microcluster/client" +) + +func SetConfig(ctx context.Context, c *client.Client, data *types.Config) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) + defer cancel() + + err := c.Query(queryCtx, "PUT", api.NewURL().Path("configs"), data, nil) + if err != nil { + return fmt.Errorf("failed setting cluster config: %w, Key: %s, Value: %s", err, data.Key, data.Value) + } + + return nil +} + +func ClearConfig(ctx context.Context, c *client.Client, data *types.Config) error { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) + defer cancel() + + err := c.Query(queryCtx, "DELETE", api.NewURL().Path("configs"), data, nil) + if err != nil { + return fmt.Errorf("failed clearing cluster config: %w, Key: %s", err, data.Key) + } + + return nil +} + +func GetConfig(ctx context.Context, c *client.Client, data *types.Config) (types.Configs, error) { + queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + configs := types.Configs{} + + err := c.Query(queryCtx, "GET", api.NewURL().Path("configs"), data, &configs) + if err != nil { + return nil, fmt.Errorf("failed to fetch cluster config: %w, Key: %s", err, data.Key) + } + + return configs, nil +} diff --git a/microceph/client/client.go b/microceph/client/disks.go similarity index 58% rename from microceph/client/client.go rename to microceph/client/disks.go index e2d1f14b..c4cd02ee 100644 --- a/microceph/client/client.go +++ b/microceph/client/disks.go @@ -14,44 +14,6 @@ import ( "github.com/canonical/microceph/microceph/api/types" ) -func SetConfig(ctx context.Context, c *client.Client, data *types.Config) error { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) - defer cancel() - - err := c.Query(queryCtx, "PUT", api.NewURL().Path("configs"), data, nil) - if err != nil { - return fmt.Errorf("Failed setting cluster config: %w, Key: %s, Value: %s", err, data.Key, data.Value) - } - - return nil -} - -func ClearConfig(ctx context.Context, c *client.Client, data *types.Config) error { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*200) - defer cancel() - - err := c.Query(queryCtx, "DELETE", api.NewURL().Path("configs"), data, nil) - if err != nil { - return fmt.Errorf("Failed clearing cluster config: %w, Key: %s", err, data.Key) - } - - return nil -} - -func GetConfig(ctx context.Context, c *client.Client, data *types.Config) (types.Configs, error) { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - - configs := types.Configs{} - - err := c.Query(queryCtx, "GET", api.NewURL().Path("configs"), data, &configs) - if err != nil { - return nil, fmt.Errorf("Failed to fetch cluster config: %w, Key: %s", err, data.Key) - } - - return configs, nil -} - // AddDisk requests Ceph sets up a new OSD. func AddDisk(ctx context.Context, c *client.Client, data *types.DisksPost) error { queryCtx, cancel := context.WithTimeout(ctx, time.Second*120) @@ -59,7 +21,7 @@ func AddDisk(ctx context.Context, c *client.Client, data *types.DisksPost) error err := c.Query(queryCtx, "POST", api.NewURL().Path("disks"), data, nil) if err != nil { - return fmt.Errorf("Failed adding new disk: %w", err) + return fmt.Errorf("failed adding new disk: %w", err) } return nil @@ -74,7 +36,7 @@ func GetDisks(ctx context.Context, c *client.Client) (types.Disks, error) { err := c.Query(queryCtx, "GET", api.NewURL().Path("disks"), nil, &disks) if err != nil { - return nil, fmt.Errorf("Failed listing disks: %w", err) + return nil, fmt.Errorf("failed listing disks: %w", err) } return disks, nil @@ -89,7 +51,7 @@ func GetResources(ctx context.Context, c *client.Client) (*api.ResourcesStorage, err := c.Query(queryCtx, "GET", api.NewURL().Path("resources"), nil, &storage) if err != nil { - return nil, fmt.Errorf("Failed listing storage devices: %w", err) + return nil, fmt.Errorf("failed listing storage devices: %w", err) } return &storage, nil @@ -104,7 +66,7 @@ func RemoveDisk(ctx context.Context, c *client.Client, data *types.DisksDelete) // get disks and determine osd location disks, err := GetDisks(ctx, c) if err != nil { - return fmt.Errorf("Failed to get disks: %w", err) + return fmt.Errorf("failed to get disks: %w", err) } var location string for _, disk := range disks { @@ -114,7 +76,7 @@ func RemoveDisk(ctx context.Context, c *client.Client, data *types.DisksDelete) } } if location == "" { - return fmt.Errorf("Failed to find location for osd.%d", data.OSD) + return fmt.Errorf("failed to find location for osd.%d", data.OSD) } c = c.UseTarget(location) @@ -122,9 +84,9 @@ func RemoveDisk(ctx context.Context, c *client.Client, data *types.DisksDelete) if err != nil { // Checking if the error is a context deadline exceeded error if errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("Failed to remove disk, timeout (%ds) reached - abort", data.Timeout) + return fmt.Errorf("failed to remove disk, timeout (%ds) reached - abort", data.Timeout) } - return fmt.Errorf("Failed to remove disk: %w", err) + return fmt.Errorf("failed to remove disk: %w", err) } return nil } diff --git a/microceph/cmd/microceph/client.go b/microceph/cmd/microceph/client.go new file mode 100644 index 00000000..2a62a5a5 --- /dev/null +++ b/microceph/cmd/microceph/client.go @@ -0,0 +1,26 @@ +package main + +import ( + "github.com/spf13/cobra" +) + +type cmdClient struct { + common *CmdControl +} + +func (c *cmdClient) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "client", + Short: "Manage the MicroCeph client", + } + + // Config Subcommand + clientConfigCmd := cmdClientConfig{common: c.common, client: c} + cmd.AddCommand(clientConfigCmd.Command()) + + // Workaround for subcommand usage errors. See: https://github.com/spf13/cobra/issues/706 + cmd.Args = cobra.NoArgs + cmd.Run = func(cmd *cobra.Command, args []string) { _ = cmd.Usage() } + + return cmd +} diff --git a/microceph/cmd/microceph/client_config.go b/microceph/cmd/microceph/client_config.go new file mode 100644 index 00000000..b0d6a387 --- /dev/null +++ b/microceph/cmd/microceph/client_config.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/spf13/cobra" +) + +type cmdClientConfig struct { + common *CmdControl + client *cmdClient +} + +func (c *cmdClientConfig) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "config", + Short: "Manage Ceph Client configs", + } + + // Get + clientConfigGetCmd := cmdClientConfigGet{common: c.common, client: c.client, clientConfig: c} + cmd.AddCommand(clientConfigGetCmd.Command()) + + // Set + clientConfigSetCmd := cmdClientConfigSet{common: c.common, client: c.client, clientConfig: c} + cmd.AddCommand(clientConfigSetCmd.Command()) + + // Reset + clientConfigResetCmd := cmdClientConfigReset{common: c.common, client: c.client, clientConfig: c} + cmd.AddCommand(clientConfigResetCmd.Command()) + + // List + clientConfigListCmd := cmdClientConfigList{common: c.common, client: c.client, clientConfig: c} + cmd.AddCommand(clientConfigListCmd.Command()) + + // Workaround for subcommand usage errors. See: https://github.com/spf13/cobra/issues/706 + cmd.Args = cobra.NoArgs + cmd.Run = func(cmd *cobra.Command, args []string) { _ = cmd.Usage() } + + return cmd +} diff --git a/microceph/cmd/microceph/client_config_get.go b/microceph/cmd/microceph/client_config_get.go new file mode 100644 index 00000000..868da533 --- /dev/null +++ b/microceph/cmd/microceph/client_config_get.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "fmt" + + lxdCmd "github.com/canonical/lxd/shared/cmd" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" +) + +type cmdClientConfigGet struct { + common *CmdControl + client *cmdClient + clientConfig *cmdClientConfig + + flagHost string +} + +func (c *cmdClientConfigGet) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "get ", + Short: "Get specified Ceph Client config", + RunE: c.Run, + } + + // * stands for global configs, hence all configs are global by default unless specifies. + cmd.Flags().StringVar(&c.flagHost, "host", "*", "Specify a microceph node that provided config applies to.") + return cmd +} + +func (c *cmdClientConfigGet) Run(cmd *cobra.Command, args []string) error { + allowList := ceph.GetClientConfigSet() + + // Get can be called with a single key. + if len(args) != 1 { + return cmd.Help() + } + + if _, ok := allowList[args[0]]; !ok { + return fmt.Errorf("Key %s is invalid. \nPermitted Keys: %v", args[0], allowList.Keys()) + } + + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return fmt.Errorf("Unable to configure MicroCeph: %w", err) + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.ClientConfig{ + Key: args[0], + Host: c.flagHost, + } + + configs, err := client.GetClientConfig(context.Background(), cli, req) + if err != nil { + return err + } + + data := make([][]string, len(configs)) + for i, config := range configs { + data[i] = []string{fmt.Sprintf("%d", i), config.Key, config.Value, config.Host} + } + + header := []string{"#", "Key", "Value", "Host"} + err = lxdCmd.RenderTable(lxdCmd.TableFormatTable, header, data, configs) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/client_config_list.go b/microceph/cmd/microceph/client_config_list.go new file mode 100644 index 00000000..0be35a1e --- /dev/null +++ b/microceph/cmd/microceph/client_config_list.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "fmt" + + lxdCmd "github.com/canonical/lxd/shared/cmd" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" +) + +type cmdClientConfigList struct { + common *CmdControl + client *cmdClient + clientConfig *cmdClientConfig + + flagHost string +} + +func (c *cmdClientConfigList) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List all set Ceph level configs", + RunE: c.Run, + } + + // * stands for global configs, hence all configs are global by default unless specifies. + cmd.Flags().StringVar(&c.flagHost, "host", "*", "Specify a microceph node that provided config applies to.") + return cmd +} + +func (c *cmdClientConfigList) Run(cmd *cobra.Command, args []string) error { + if len(args) != 0 { + return cmd.Help() + } + + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return fmt.Errorf("Unable to configure MicroCeph: %w", err) + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.ClientConfig{ + Host: c.flagHost, + } + + configs, err := client.ListClientConfig(context.Background(), cli, req) + if err != nil { + return err + } + + data := make([][]string, len(configs)) + for i, config := range configs { + data[i] = []string{fmt.Sprintf("%d", i), config.Key, config.Value, config.Host} + } + + header := []string{"#", "Key", "Value", "Host"} + err = lxdCmd.RenderTable(lxdCmd.TableFormatTable, header, data, configs) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/client_config_reset.go b/microceph/cmd/microceph/client_config_reset.go new file mode 100644 index 00000000..6a230b62 --- /dev/null +++ b/microceph/cmd/microceph/client_config_reset.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" +) + +type cmdClientConfigReset struct { + common *CmdControl + client *cmdClient + clientConfig *cmdClientConfig + + flagWait bool + flagHost string +} + +func (c *cmdClientConfigReset) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "reset ", + Short: "Clear specified Ceph Client config", + RunE: c.Run, + } + + cmd.Flags().BoolVar(&c.flagWait, "wait", false, "Wait for required ceph services to restart post config reset.") + // * stands for global configs, hence all configs are global by default unless specifies. + cmd.Flags().StringVar(&c.flagHost, "host", "*", "Specify a microceph node that provided config applies to.") + return cmd +} + +func (c *cmdClientConfigReset) Run(cmd *cobra.Command, args []string) error { + allowList := ceph.GetClientConfigSet() + if len(args) != 1 { + return cmd.Help() + } + + if _, ok := allowList[args[0]]; !ok { + return fmt.Errorf("Resetting key %s is not allowed", args[0]) + } + + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return fmt.Errorf("Unable to configure MicroCeph: %w", err) + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.ClientConfig{ + Key: args[0], + Wait: c.flagWait, + Host: c.flagHost, + } + + err = client.ResetClientConfig(context.Background(), cli, req) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/client_config_set.go b/microceph/cmd/microceph/client_config_set.go new file mode 100644 index 00000000..0f98004e --- /dev/null +++ b/microceph/cmd/microceph/client_config_set.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "fmt" + + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/ceph" + "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/microcluster" + "github.com/spf13/cobra" +) + +type cmdClientConfigSet struct { + common *CmdControl + client *cmdClient + clientConfig *cmdClientConfig + + flagWait bool + flagHost string +} + +func (c *cmdClientConfigSet) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "set ", + Short: "Set specified Ceph Client config", + RunE: c.Run, + } + + cmd.Flags().BoolVar(&c.flagWait, "wait", false, "Wait for configs to propagate across the cluster.") + // * stands for global configs, hence all configs are global by default unless specifies. + cmd.Flags().StringVar(&c.flagHost, "host", "*", "Specify a microceph node that provided config applies to.") + return cmd +} + +func (c *cmdClientConfigSet) Run(cmd *cobra.Command, args []string) error { + allowList := ceph.GetClientConfigSet() + if len(args) != 2 { + return cmd.Help() + } + + if _, ok := allowList[args[0]]; !ok { + return fmt.Errorf("Configuring key %s is not allowed. \nPermitted Keys: %v", args[0], allowList.Keys()) + } + + m, err := microcluster.App(context.Background(), microcluster.Args{StateDir: c.common.FlagStateDir, Verbose: c.common.FlagLogVerbose, Debug: c.common.FlagLogDebug}) + if err != nil { + return fmt.Errorf("Unable to configure MicroCeph: %w", err) + } + + cli, err := m.LocalClient() + if err != nil { + return err + } + + req := &types.ClientConfig{ + Key: args[0], + Value: args[1], + Wait: c.flagWait, + Host: c.flagHost, + } + + err = client.SetClientConfig(context.Background(), cli, req) + if err != nil { + return err + } + + return nil +} diff --git a/microceph/cmd/microceph/cluster_migrate.go b/microceph/cmd/microceph/cluster_migrate.go index 0f350727..bf7e430b 100644 --- a/microceph/cmd/microceph/cluster_migrate.go +++ b/microceph/cmd/microceph/cluster_migrate.go @@ -2,6 +2,7 @@ package main import ( "context" + "github.com/canonical/lxd/shared/logger" "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/microceph/microceph/client" diff --git a/microceph/cmd/microceph/main.go b/microceph/cmd/microceph/main.go index 36d99810..ba20f1af 100644 --- a/microceph/cmd/microceph/main.go +++ b/microceph/cmd/microceph/main.go @@ -61,6 +61,9 @@ func main() { var cmdDisk = cmdDisk{common: &commonCmd} app.AddCommand(cmdDisk.Command()) + var cmdClient = cmdClient{common: &commonCmd} + app.AddCommand(cmdClient.Command()) + app.InitDefaultHelpCmd() err := app.Execute() diff --git a/microceph/common/cluster.go b/microceph/common/cluster.go new file mode 100644 index 00000000..7ad5e21e --- /dev/null +++ b/microceph/common/cluster.go @@ -0,0 +1,26 @@ +package common + +import "github.com/canonical/lxd/shared/logger" + +func GetClusterMemberNames(s StateInterface) ([]string, error) { + leader, err := s.ClusterState().Leader() + if err != nil { + return nil, err + } + + members, err := leader.GetClusterMembers(s.ClusterState().Context) + if err != nil { + return nil, err + } + + logger.Infof("Cluster Members are: %v", members) + + memberNames := make([]string, len(members)) + for i, member := range members { + memberNames[i] = member.Name + } + + logger.Infof("Cluster Members Names are: %v", memberNames) + + return memberNames, nil +} diff --git a/microceph/common/constants.go b/microceph/common/constants.go index 54ac15aa..3d05214c 100644 --- a/microceph/common/constants.go +++ b/microceph/common/constants.go @@ -6,6 +6,8 @@ import ( "path/filepath" ) +const ClientConfigGlobalHostConst = "*" + type PathConst struct { ConfPath string RunPath string diff --git a/microceph/database/client_config.go b/microceph/database/client_config.go new file mode 100644 index 00000000..97725171 --- /dev/null +++ b/microceph/database/client_config.go @@ -0,0 +1,37 @@ +package database + +//go:generate -command mapper lxd-generate db mapper -t client_config.mapper.go +//go:generate mapper reset +// +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem objects table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem objects-by-Key table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem objects-by-Host table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem objects-by-Key-and-Host table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem id table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem create table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem delete-by-Key table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem delete-by-Host table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem delete-by-Key-and-Host table=client_config +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e ClientConfigItem update table=client_config + +// +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem GetMany table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem GetOne table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem ID table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem Exists table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem Create table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem DeleteOne-by-Key-and-Host table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem DeleteMany-by-Key table=client_config +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e ClientConfigItem Update table=client_config + +type ClientConfigItem struct { + ID int + Host string `db:"primary=yes&join=internal_cluster_members.name&joinon=client_config.member_id"` + Key string `db:"primary=yes"` + Value string +} + +type ClientConfigItemFilter struct { + Host *string + Key *string +} diff --git a/microceph/database/client_config.mapper.go b/microceph/database/client_config.mapper.go new file mode 100644 index 00000000..bc7de4c5 --- /dev/null +++ b/microceph/database/client_config.mapper.go @@ -0,0 +1,424 @@ +package database + +// The code below was generated by lxd-generate - DO NOT EDIT! + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/http" + "strings" + + "github.com/canonical/lxd/lxd/db/query" + "github.com/canonical/lxd/shared/api" + "github.com/canonical/microcluster/cluster" +) + +var _ = api.ServerEnvironment{} + +var clientConfigItemObjects = cluster.RegisterStmt(` +SELECT client_config.id, internal_cluster_members.name AS host, client_config.key, client_config.value + FROM client_config + JOIN internal_cluster_members ON client_config.member_id = internal_cluster_members.id + ORDER BY internal_cluster_members.id, client_config.key +`) + +var clientConfigItemObjectsByKey = cluster.RegisterStmt(` +SELECT client_config.id, internal_cluster_members.name AS host, client_config.key, client_config.value + FROM client_config + JOIN internal_cluster_members ON client_config.member_id = internal_cluster_members.id + WHERE ( client_config.key = ? ) + ORDER BY internal_cluster_members.id, client_config.key +`) + +var clientConfigItemObjectsByHost = cluster.RegisterStmt(` +SELECT client_config.id, internal_cluster_members.name AS host, client_config.key, client_config.value + FROM client_config + JOIN internal_cluster_members ON client_config.member_id = internal_cluster_members.id + WHERE ( host = ? ) + ORDER BY internal_cluster_members.id, client_config.key +`) + +var clientConfigItemObjectsByKeyAndHost = cluster.RegisterStmt(` +SELECT client_config.id, internal_cluster_members.name AS host, client_config.key, client_config.value + FROM client_config + JOIN internal_cluster_members ON client_config.member_id = internal_cluster_members.id + WHERE ( client_config.key = ? AND host = ? ) + ORDER BY internal_cluster_members.id, client_config.key +`) + +var clientConfigItemID = cluster.RegisterStmt(` +SELECT client_config.id FROM client_config + JOIN internal_cluster_members ON client_config.member_id = internal_cluster_members.id + WHERE internal_cluster_members.name = ? AND client_config.key = ? +`) + +var clientConfigItemCreate = cluster.RegisterStmt(` +INSERT INTO client_config (member_id, key, value) + VALUES ((SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), ?, ?) +`) + +var clientConfigItemDeleteByKey = cluster.RegisterStmt(` +DELETE FROM client_config WHERE key = ? +`) + +var clientConfigItemDeleteByHost = cluster.RegisterStmt(` +DELETE FROM client_config WHERE member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) +`) + +var clientConfigItemDeleteByKeyAndHost = cluster.RegisterStmt(` +DELETE FROM client_config WHERE key = ? AND member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) +`) + +var clientConfigItemUpdate = cluster.RegisterStmt(` +UPDATE client_config + SET member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), key = ?, value = ? + WHERE id = ? +`) + +// clientConfigItemColumns returns a string of column names to be used with a SELECT statement for the entity. +// Use this function when building statements to retrieve database entries matching the ClientConfigItem entity. +func clientConfigItemColumns() string { + return "client_config.id, internal_cluster_members.name AS host, client_config.key, client_config.value" +} + +// getClientConfigItems can be used to run handwritten sql.Stmts to return a slice of objects. +func getClientConfigItems(ctx context.Context, stmt *sql.Stmt, args ...any) ([]ClientConfigItem, error) { + objects := make([]ClientConfigItem, 0) + + dest := func(scan func(dest ...any) error) error { + c := ClientConfigItem{} + err := scan(&c.ID, &c.Host, &c.Key, &c.Value) + if err != nil { + return err + } + + objects = append(objects, c) + + return nil + } + + err := query.SelectObjects(ctx, stmt, dest, args...) + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"client_config\" table: %w", err) + } + + return objects, nil +} + +// getClientConfigItemsRaw can be used to run handwritten query strings to return a slice of objects. +func getClientConfigItemsRaw(ctx context.Context, tx *sql.Tx, sql string, args ...any) ([]ClientConfigItem, error) { + objects := make([]ClientConfigItem, 0) + + dest := func(scan func(dest ...any) error) error { + c := ClientConfigItem{} + err := scan(&c.ID, &c.Host, &c.Key, &c.Value) + if err != nil { + return err + } + + objects = append(objects, c) + + return nil + } + + err := query.Scan(ctx, tx, sql, dest, args...) + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"client_config\" table: %w", err) + } + + return objects, nil +} + +// GetClientConfigItems returns all available ClientConfigItems. +// generator: ClientConfigItem GetMany +func GetClientConfigItems(ctx context.Context, tx *sql.Tx, filters ...ClientConfigItemFilter) ([]ClientConfigItem, error) { + var err error + + // Result slice. + objects := make([]ClientConfigItem, 0) + + // Pick the prepared statement and arguments to use based on active criteria. + var sqlStmt *sql.Stmt + args := []any{} + queryParts := [2]string{} + + if len(filters) == 0 { + sqlStmt, err = cluster.Stmt(tx, clientConfigItemObjects) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjects\" prepared statement: %w", err) + } + } + + for i, filter := range filters { + if filter.Key != nil && filter.Host != nil { + args = append(args, []any{filter.Key, filter.Host}...) + if len(filters) == 1 { + sqlStmt, err = cluster.Stmt(tx, clientConfigItemObjectsByKeyAndHost) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjectsByKeyAndHost\" prepared statement: %w", err) + } + + break + } + + query, err := cluster.StmtString(clientConfigItemObjectsByKeyAndHost) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjects\" prepared statement: %w", err) + } + + parts := strings.SplitN(query, "ORDER BY", 2) + if i == 0 { + copy(queryParts[:], parts) + continue + } + + _, where, _ := strings.Cut(parts[0], "WHERE") + queryParts[0] += "OR" + where + } else if filter.Key != nil && filter.Host == nil { + args = append(args, []any{filter.Key}...) + if len(filters) == 1 { + sqlStmt, err = cluster.Stmt(tx, clientConfigItemObjectsByKey) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjectsByKey\" prepared statement: %w", err) + } + + break + } + + query, err := cluster.StmtString(clientConfigItemObjectsByKey) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjects\" prepared statement: %w", err) + } + + parts := strings.SplitN(query, "ORDER BY", 2) + if i == 0 { + copy(queryParts[:], parts) + continue + } + + _, where, _ := strings.Cut(parts[0], "WHERE") + queryParts[0] += "OR" + where + } else if filter.Host != nil && filter.Key == nil { + args = append(args, []any{filter.Host}...) + if len(filters) == 1 { + sqlStmt, err = cluster.Stmt(tx, clientConfigItemObjectsByHost) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjectsByHost\" prepared statement: %w", err) + } + + break + } + + query, err := cluster.StmtString(clientConfigItemObjectsByHost) + if err != nil { + return nil, fmt.Errorf("Failed to get \"clientConfigItemObjects\" prepared statement: %w", err) + } + + parts := strings.SplitN(query, "ORDER BY", 2) + if i == 0 { + copy(queryParts[:], parts) + continue + } + + _, where, _ := strings.Cut(parts[0], "WHERE") + queryParts[0] += "OR" + where + } else if filter.Host == nil && filter.Key == nil { + return nil, fmt.Errorf("Cannot filter on empty ClientConfigItemFilter") + } else { + return nil, fmt.Errorf("No statement exists for the given Filter") + } + } + + // Select. + if sqlStmt != nil { + objects, err = getClientConfigItems(ctx, sqlStmt, args...) + } else { + queryStr := strings.Join(queryParts[:], "ORDER BY") + objects, err = getClientConfigItemsRaw(ctx, tx, queryStr, args...) + } + + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"client_config\" table: %w", err) + } + + return objects, nil +} + +// GetClientConfigItem returns the ClientConfigItem with the given key. +// generator: ClientConfigItem GetOne +func GetClientConfigItem(ctx context.Context, tx *sql.Tx, host string, key string) (*ClientConfigItem, error) { + filter := ClientConfigItemFilter{} + filter.Host = &host + filter.Key = &key + + objects, err := GetClientConfigItems(ctx, tx, filter) + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"client_config\" table: %w", err) + } + + switch len(objects) { + case 0: + return nil, api.StatusErrorf(http.StatusNotFound, "ClientConfigItem not found") + case 1: + return &objects[0], nil + default: + return nil, fmt.Errorf("More than one \"client_config\" entry matches") + } +} + +// GetClientConfigItemID return the ID of the ClientConfigItem with the given key. +// generator: ClientConfigItem ID +func GetClientConfigItemID(ctx context.Context, tx *sql.Tx, host string, key string) (int64, error) { + stmt, err := cluster.Stmt(tx, clientConfigItemID) + if err != nil { + return -1, fmt.Errorf("Failed to get \"clientConfigItemID\" prepared statement: %w", err) + } + + row := stmt.QueryRowContext(ctx, host, key) + var id int64 + err = row.Scan(&id) + if errors.Is(err, sql.ErrNoRows) { + return -1, api.StatusErrorf(http.StatusNotFound, "ClientConfigItem not found") + } + + if err != nil { + return -1, fmt.Errorf("Failed to get \"client_config\" ID: %w", err) + } + + return id, nil +} + +// ClientConfigItemExists checks if a ClientConfigItem with the given key exists. +// generator: ClientConfigItem Exists +func ClientConfigItemExists(ctx context.Context, tx *sql.Tx, host string, key string) (bool, error) { + _, err := GetClientConfigItemID(ctx, tx, host, key) + if err != nil { + if api.StatusErrorCheck(err, http.StatusNotFound) { + return false, nil + } + + return false, err + } + + return true, nil +} + +// CreateClientConfigItem adds a new ClientConfigItem to the database. +// generator: ClientConfigItem Create +func CreateClientConfigItem(ctx context.Context, tx *sql.Tx, object ClientConfigItem) (int64, error) { + // Check if a ClientConfigItem with the same key exists. + exists, err := ClientConfigItemExists(ctx, tx, object.Host, object.Key) + if err != nil { + return -1, fmt.Errorf("Failed to check for duplicates: %w", err) + } + + if exists { + return -1, api.StatusErrorf(http.StatusConflict, "This \"client_config\" entry already exists") + } + + args := make([]any, 3) + + // Populate the statement arguments. + args[0] = object.Host + args[1] = object.Key + args[2] = object.Value + + // Prepared statement to use. + stmt, err := cluster.Stmt(tx, clientConfigItemCreate) + if err != nil { + return -1, fmt.Errorf("Failed to get \"clientConfigItemCreate\" prepared statement: %w", err) + } + + // Execute the statement. + result, err := stmt.Exec(args...) + if err != nil { + return -1, fmt.Errorf("Failed to create \"client_config\" entry: %w", err) + } + + id, err := result.LastInsertId() + if err != nil { + return -1, fmt.Errorf("Failed to fetch \"client_config\" entry ID: %w", err) + } + + return id, nil +} + +// DeleteClientConfigItem deletes the ClientConfigItem matching the given key parameters. +// generator: ClientConfigItem DeleteOne-by-Key-and-Host +func DeleteClientConfigItem(ctx context.Context, tx *sql.Tx, key string, host string) error { + stmt, err := cluster.Stmt(tx, clientConfigItemDeleteByKeyAndHost) + if err != nil { + return fmt.Errorf("Failed to get \"clientConfigItemDeleteByKeyAndHost\" prepared statement: %w", err) + } + + result, err := stmt.Exec(key, host) + if err != nil { + return fmt.Errorf("Delete \"client_config\": %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + if n == 0 { + return api.StatusErrorf(http.StatusNotFound, "ClientConfigItem not found") + } else if n > 1 { + return fmt.Errorf("Query deleted %d ClientConfigItem rows instead of 1", n) + } + + return nil +} + +// DeleteClientConfigItems deletes the ClientConfigItem matching the given key parameters. +// generator: ClientConfigItem DeleteMany-by-Key +func DeleteClientConfigItems(ctx context.Context, tx *sql.Tx, key string) error { + stmt, err := cluster.Stmt(tx, clientConfigItemDeleteByKey) + if err != nil { + return fmt.Errorf("Failed to get \"clientConfigItemDeleteByKey\" prepared statement: %w", err) + } + + result, err := stmt.Exec(key) + if err != nil { + return fmt.Errorf("Delete \"client_config\": %w", err) + } + + _, err = result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + return nil +} + +// UpdateClientConfigItem updates the ClientConfigItem matching the given key parameters. +// generator: ClientConfigItem Update +func UpdateClientConfigItem(ctx context.Context, tx *sql.Tx, host string, key string, object ClientConfigItem) error { + id, err := GetClientConfigItemID(ctx, tx, host, key) + if err != nil { + return err + } + + stmt, err := cluster.Stmt(tx, clientConfigItemUpdate) + if err != nil { + return fmt.Errorf("Failed to get \"clientConfigItemUpdate\" prepared statement: %w", err) + } + + result, err := stmt.Exec(object.Host, object.Key, object.Value, id) + if err != nil { + return fmt.Errorf("Update \"client_config\" entry failed: %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + if n != 1 { + return fmt.Errorf("Query updated %d rows instead of 1", n) + } + + return nil +} diff --git a/microceph/database/client_config_extras.go b/microceph/database/client_config_extras.go new file mode 100644 index 00000000..8ac75aa2 --- /dev/null +++ b/microceph/database/client_config_extras.go @@ -0,0 +1,220 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/microceph/microceph/api/types" + "github.com/canonical/microceph/microceph/common" + "github.com/canonical/microcluster/cluster" + "github.com/canonical/microcluster/state" +) + +// Slice of ClientConfigItem(s) +type ClientConfigItems []ClientConfigItem + +func (cci ClientConfigItems) GetClientConfigSlice() types.ClientConfigs { + ccs := types.ClientConfigs{} + for i, configItem := range cci { + logger.Infof("Client Config item %d: %v", i, configItem) + ccs = append(ccs, types.ClientConfig{ + Key: configItem.Key, + Value: configItem.Value, + Host: configItem.Host, + }) + } + + return ccs +} + +type ClientConfigQueryIntf interface { + + // Add Method + AddNew(s *state.State, key string, value string, host string) error + + // Fetch Methods + GetAll(s *state.State) (ClientConfigItems, error) + GetAllForKey(s *state.State, key string) (ClientConfigItems, error) + GetAllForHost(s *state.State, host string) (ClientConfigItems, error) + GetAllForKeyAndHost(s *state.State, key string, host string) (ClientConfigItems, error) + + // Delete Methods + RemoveAllForKey(s *state.State, key string) error + RemoveOneForKeyAndHost(s *state.State, key string, host string) error +} + +type ClientConfigQueryImpl struct{} + +// Add Method +func (ccq ClientConfigQueryImpl) AddNew(s *state.State, key string, value string, host string) error { + var err error + // If requested for a global config addition. + if host == common.ClientConfigGlobalHostConst { + err = addNewGlobalClientConfig(s, key, value) + } else { + err = addNewHostClientConfig(s, key, value, host) + } + if err != nil { + logger.Error(err.Error()) + return err + } + return nil +} + +// Fetch Methods +func (ccq ClientConfigQueryImpl) GetAll(s *state.State) (ClientConfigItems, error) { + return ccq.GetAllForFilter(s, ClientConfigItemFilter{Host: nil, Key: nil}) +} + +func (ccq ClientConfigQueryImpl) GetAllForKey(s *state.State, key string) (ClientConfigItems, error) { + return ccq.GetAllForFilter(s, ClientConfigItemFilter{Host: nil, Key: &key}) +} + +func (ccq ClientConfigQueryImpl) GetAllForHost(s *state.State, host string) (ClientConfigItems, error) { + return ccq.GetAllForFilter(s, ClientConfigItemFilter{Host: &host, Key: nil}) +} + +func (ccq ClientConfigQueryImpl) GetAllForKeyAndHost(s *state.State, key string, host string) (ClientConfigItems, error) { + return ccq.GetAllForFilter(s, ClientConfigItemFilter{Host: &host, Key: &key}) +} + +func (ccq ClientConfigQueryImpl) GetAllForFilter(s *state.State, filter ClientConfigItemFilter) (ClientConfigItems, error) { + var err error + var retval []ClientConfigItem + + err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + retval, err = GetClientConfigItems(ctx, tx, filter) + if err != nil { + return err + } + return nil + }) + if err != nil { + return retval, err + } + return retval, nil +} + +// Delete Methods +func (ccq ClientConfigQueryImpl) RemoveAllForKey(s *state.State, key string) error { + err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + err := DeleteClientConfigItems(ctx, tx, key) + if err != nil { + return fmt.Errorf("failed to clean existing keys %s: %v", key, err) + } + + return nil + }) + if err != nil { + return err + } + return nil +} + +func (ccq ClientConfigQueryImpl) RemoveOneForKeyAndHost(s *state.State, key string, host string) error { + err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + err := DeleteClientConfigItem(ctx, tx, key, host) + if err != nil { + return fmt.Errorf("failed to clean existing keys %s: %v", key, err) + } + + return nil + }) + if err != nil { + return err + } + return nil +} + +// Internal Methods +func addNewGlobalClientConfig(s *state.State, key string, value string) error { + members, err := common.GetClusterMemberNames(common.CephState{State: s}) + if err != nil { + return err + } + + err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + // Delete all instances of key before adding the global config. + err := DeleteClientConfigItems(ctx, tx, key) + if err != nil { + return fmt.Errorf("failed to clean existing keys %s: %v", key, err) + } + + for _, member := range members { + // Populate record for the member + logger.Infof("Host is: %v", member) + + // Add record to database. + _, err = CreateClientConfigItem(ctx, tx, ClientConfigItem{Key: key, Value: value, Host: member}) + if err != nil { + return fmt.Errorf("failed to add client config: %v", err) + } + } + + return nil + }) + if err != nil { + return err + } + return nil +} + +func addNewHostClientConfig(s *state.State, key string, value string, host string) error { + err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + // Remove existing client config entry. + err := DeleteClientConfigItem(ctx, tx, key, host) + if err != nil && !strings.Contains(err.Error(), "ClientConfigItem not found") { + // Failure not caused by missing instance on DB. + return fmt.Errorf("failed to delete existing config for key %s and host %s", key, host) + } + + // Add new instance + data := ClientConfigItem{ + Key: key, + Value: value, + Host: host, + } + _, err = CreateClientConfigItem(ctx, tx, data) + if err != nil { + return fmt.Errorf("failed to add client config %v: %v", data, err) + } + + return nil + }) + if err != nil { + return err + } + return nil +} + +// Fetch client configs using registered sql stmt and args +func Get(s *state.State, stmt int, args ...any) ([]ClientConfigItem, error) { + var err error + var retval []ClientConfigItem + err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + queryStr, err := cluster.StmtString(stmt) + if err != nil { + return err + } + + retval, err = getClientConfigItemsRaw(ctx, tx, queryStr, args...) + if err != nil { + return err + } + + logger.Infof("DATA form GET call: %v", retval) + + return nil + }) + if err != nil { + return retval, err + } + + return retval, nil +} + +// Singleton for mocker +var ClientConfigQuery ClientConfigQueryIntf = ClientConfigQueryImpl{} diff --git a/microceph/database/disk_extras.go b/microceph/database/disk_extras.go index 91fda831..dca9251e 100644 --- a/microceph/database/disk_extras.go +++ b/microceph/database/disk_extras.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/canonical/microceph/microceph/api/types" "github.com/canonical/lxd/lxd/db/query" diff --git a/microceph/database/schema.go b/microceph/database/schema.go index 525dad8c..f4e558b5 100644 --- a/microceph/database/schema.go +++ b/microceph/database/schema.go @@ -12,6 +12,7 @@ import ( // Each entry will increase the database schema version by one, and will be applied after internal schema updates. var SchemaExtensions = map[int]schema.Update{ 1: schemaUpdate1, + 2: schemaUpdate2, } func schemaUpdate1(ctx context.Context, tx *sql.Tx) error { @@ -46,3 +47,21 @@ CREATE TABLE services ( return err } + +// Adds client config table in database schema. +func schemaUpdate2(ctx context.Context, tx *sql.Tx) error { + stmt := ` +CREATE TABLE client_config ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + member_id INTEGER NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + FOREIGN KEY (member_id) REFERENCES "internal_cluster_members" (id) ON DELETE CASCADE, + UNIQUE(member_id, key) +); + ` + + _, err := tx.Exec(stmt) + + return err +}