Skip to content

Commit

Permalink
Use LTS microcluster (#395)
Browse files Browse the repository at this point in the history
Updates MicroCeph to use the stable release of microcluster at `v2.0.0`.

For the most part, the changes to the code in MicroCeph are just package
imports and changing the definition of `state.State` to an interface.

The actual behavioural changes are these:
* `state.Context` is no longer exposed, so `request.Context` from the
API handler is passed around explicitly to each helper. This should
actually result in more precise context handling for MicroCeph.
* One caveat is a few async API operations which spawn a goroutine. For
these `context.Background()` is used because the request context may
cancel before the goroutine is complete.

* Adds a schema update to MicroCeph that changes their foreign key
references to `core_cluster_members`. This table has been renamed from
`internal_cluster_members` to make it clear that accessing it is
expected behaviour. I also took the liberty of renaming the previous
schema update function to use an ordered naming scheme so that it
doesn't become ambiguous where an update fits among the others.

* Join tokens now have expiration dates. By default, MicroCeph will set
a 3 hour expiration for join tokens, but this can be configured with the
`--timeout` flag added to `cluster add`.

The project version supplied to microcluster has been set to `UNKOWN`
for now because microcluster requires a value here.

---------

Signed-off-by: Max Asnaashari <[email protected]>
  • Loading branch information
masnax authored Sep 12, 2024
1 parent 36f71d7 commit 54cf522
Show file tree
Hide file tree
Showing 89 changed files with 782 additions and 630 deletions.
9 changes: 7 additions & 2 deletions microceph/Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
pkg_version=$(shell apt-cache policy ceph-common | awk '/Candidate:/{ print $$2 }' )
git_version=$(shell git describe --always --dirty --abbrev=10)
MC_VERSION=ceph-version: $(pkg_version); microceph-git: $(git_version)
LDFLAGS="-X 'github.com/canonical/microceph/microceph/version.version=$(MC_VERSION)'"

.PHONY: default
default: build

# Build targets.
.PHONY: build
build:
go install -v ./cmd/microceph
go install -v ./cmd/microcephd
go install -v -ldflags $(LDFLAGS) ./cmd/microceph
go install -v -ldflags $(LDFLAGS) ./cmd/microcephd

# Testing targets.
.PHONY: check
Expand Down
46 changes: 23 additions & 23 deletions microceph/api/client_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/canonical/microceph/microceph/ceph"
"github.com/canonical/microceph/microceph/client"
"github.com/canonical/microceph/microceph/database"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"
)

// Top level client API
Expand All @@ -33,7 +33,7 @@ var clientConfigsCmd = rest.Endpoint{
}

// cmdClientConfigsGet fetches multiple client config key entries from internal database.
func cmdClientConfigsGet(s *state.State, r *http.Request) response.Response {
func cmdClientConfigsGet(s state.State, r *http.Request) response.Response {
var req types.ClientConfig
var configs database.ClientConfigItems

Expand All @@ -43,9 +43,9 @@ func cmdClientConfigsGet(s *state.State, r *http.Request) response.Response {
}

if req.Host == constants.ClientConfigGlobalHostConst {
configs, err = database.ClientConfigQuery.GetAll(s)
configs, err = database.ClientConfigQuery.GetAll(r.Context(), s)
} else {
configs, err = database.ClientConfigQuery.GetAllForHost(s, req.Host)
configs, err = database.ClientConfigQuery.GetAllForHost(r.Context(), s, req.Host)
}
if err != nil {
logger.Errorf("failed fetching client configs: %v for %v", err, req)
Expand All @@ -58,9 +58,9 @@ func cmdClientConfigsGet(s *state.State, r *http.Request) response.Response {
}

// cmdClientConfigsPut renders .conf file at that particular host.
func cmdClientConfigsPut(s *state.State, r *http.Request) response.Response {
func cmdClientConfigsPut(s state.State, r *http.Request) response.Response {
// Check if microceph is bootstrapped.
err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
err := s.Database().Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error {
isFsid, err := database.ConfigItemExists(ctx, tx, "fsid")
if err != nil || !isFsid {
return fmt.Errorf("client configuration cannot be performed before bootstrapping the cluster")
Expand All @@ -72,7 +72,7 @@ func cmdClientConfigsPut(s *state.State, r *http.Request) response.Response {
return response.BadRequest(err)
}

err = ceph.UpdateConfig(interfaces.CephState{State: s})
err = ceph.UpdateConfig(r.Context(), interfaces.CephState{State: s})
if err != nil {
logger.Error(err.Error())
response.InternalError(err)
Expand All @@ -90,7 +90,7 @@ var clientConfigsKeyCmd = rest.Endpoint{
}

// clientConfigsKeyGet fetches particular client config key entries from internal db.
func clientConfigsKeyGet(s *state.State, r *http.Request) response.Response {
func clientConfigsKeyGet(s state.State, r *http.Request) response.Response {
var req types.ClientConfig
var configs database.ClientConfigItems

Expand All @@ -100,9 +100,9 @@ func clientConfigsKeyGet(s *state.State, r *http.Request) response.Response {
}

if req.Host == constants.ClientConfigGlobalHostConst {
configs, err = database.ClientConfigQuery.GetAllForKey(s, req.Key)
configs, err = database.ClientConfigQuery.GetAllForKey(r.Context(), s, req.Key)
} else {
configs, err = database.ClientConfigQuery.GetAllForKeyAndHost(s, req.Key, req.Host)
configs, err = database.ClientConfigQuery.GetAllForKeyAndHost(r.Context(), s, req.Key, req.Host)
}
if err != nil {
logger.Errorf("failed fetching client configs: %v for %v", err, req)
Expand All @@ -115,7 +115,7 @@ func clientConfigsKeyGet(s *state.State, r *http.Request) response.Response {
}

// clientConfigsKeyPut sets particular client config key.
func clientConfigsKeyPut(s *state.State, r *http.Request) response.Response {
func clientConfigsKeyPut(s state.State, r *http.Request) response.Response {
var req types.ClientConfig

err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -124,19 +124,19 @@ func clientConfigsKeyPut(s *state.State, r *http.Request) response.Response {
}

// If new config request is for global configuration.
err = database.ClientConfigQuery.AddNew(s, req.Key, req.Value, req.Host)
err = database.ClientConfigQuery.AddNew(r.Context(), s, req.Key, req.Value, req.Host)
if err != nil {
return response.InternalError(err)
}

// Trigger /conf file update across cluster.
clientConfigUpdate(s, req.Wait)
clientConfigUpdate(r.Context(), s, req.Wait)

return response.EmptySyncResponse
}

// clientConfigsKeyDelete removes particular client config key entries from internal db.
func clientConfigsKeyDelete(s *state.State, r *http.Request) response.Response {
func clientConfigsKeyDelete(s state.State, r *http.Request) response.Response {
var req types.ClientConfig

err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -145,38 +145,38 @@ func clientConfigsKeyDelete(s *state.State, r *http.Request) response.Response {
}

if req.Host == constants.ClientConfigGlobalHostConst {
err = database.ClientConfigQuery.RemoveAllForKey(s, req.Key)
err = database.ClientConfigQuery.RemoveAllForKey(r.Context(), s, req.Key)
} else {
err = database.ClientConfigQuery.RemoveOneForKeyAndHost(s, req.Key, req.Host)
err = database.ClientConfigQuery.RemoveOneForKeyAndHost(r.Context(), s, req.Key, req.Host)
}
if err != nil {
return response.InternalError(err)
}

// Trigger /conf file update across cluster.
clientConfigUpdate(s, req.Wait)
clientConfigUpdate(r.Context(), s, req.Wait)

return response.EmptySyncResponse
}

// clientConfigUpdate performs ordered (one after other) updation of ceph.conf across the ceph cluster.
func clientConfigUpdate(s *state.State, wait bool) error {
func clientConfigUpdate(ctx context.Context, s state.State, wait bool) error {
if wait {
// Execute update conf synchronously
err := client.SendUpdateClientConfRequestToClusterMembers(interfaces.CephState{State: s})
err := client.SendUpdateClientConfRequestToClusterMembers(ctx, interfaces.CephState{State: s})
if err != nil {
return err
}

// Update on current host.
err = ceph.UpdateConfig(interfaces.CephState{State: s})
err = ceph.UpdateConfig(ctx, interfaces.CephState{State: s})
if err != nil {
return err
}
} else { // Execute update asynchronously
go func() {
client.SendUpdateClientConfRequestToClusterMembers(interfaces.CephState{State: s})
ceph.UpdateConfig(interfaces.CephState{State: s}) // Restart on current host.
client.SendUpdateClientConfRequestToClusterMembers(context.Background(), interfaces.CephState{State: s})
ceph.UpdateConfig(context.Background(), interfaces.CephState{State: s}) // Restart on current host.
}()
}

Expand Down
25 changes: 13 additions & 12 deletions microceph/api/configs.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package api

import (
"context"
"encoding/json"
"net/http"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"

"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/ceph"
Expand All @@ -23,7 +24,7 @@ var configsCmd = rest.Endpoint{
Delete: rest.EndpointAction{Handler: cmdConfigsDelete, ProxyTarget: true},
}

func cmdConfigsGet(s *state.State, r *http.Request) response.Response {
func cmdConfigsGet(s state.State, r *http.Request) response.Response {
var err error
var req types.Config
var configs types.Configs
Expand All @@ -47,7 +48,7 @@ func cmdConfigsGet(s *state.State, r *http.Request) response.Response {
return response.SyncResponse(true, configs)
}

func cmdConfigsPut(s *state.State, r *http.Request) response.Response {
func cmdConfigsPut(s state.State, r *http.Request) response.Response {
var req types.Config
configTable := ceph.GetConstConfigTable()

Expand All @@ -64,13 +65,13 @@ func cmdConfigsPut(s *state.State, r *http.Request) response.Response {

if !req.SkipRestart {
services := configTable[req.Key].Daemons
configChangeRefresh(s, services, req.Wait)
configChangeRefresh(r.Context(), s, services, req.Wait)
}

return response.EmptySyncResponse
}

func cmdConfigsDelete(s *state.State, r *http.Request) response.Response {
func cmdConfigsDelete(s state.State, r *http.Request) response.Response {
var req types.Config
configTable := ceph.GetConstConfigTable()

Expand All @@ -87,30 +88,30 @@ func cmdConfigsDelete(s *state.State, r *http.Request) response.Response {

if !req.SkipRestart {
services := configTable[req.Key].Daemons
configChangeRefresh(s, services, req.Wait)
configChangeRefresh(r.Context(), s, services, req.Wait)
}

return response.EmptySyncResponse
}

// Perform ordered (one after other) restart of provided Ceph services across the ceph cluster.
func configChangeRefresh(s *state.State, services []string, wait bool) error {
func configChangeRefresh(ctx context.Context, s state.State, services []string, wait bool) error {
if wait {
// Execute restart synchronously
err := client.SendRestartRequestToClusterMembers(s, services)
err := client.SendRestartRequestToClusterMembers(ctx, s, services)
if err != nil {
return err
}

// Restart on current host.
err = ceph.RestartCephServices(interfaces.CephState{State: s}, services)
err = ceph.RestartCephServices(ctx, interfaces.CephState{State: s}, services)
if err != nil {
return err
}
} else { // Execute restart asynchronously
go func() {
client.SendRestartRequestToClusterMembers(s, services)
ceph.RestartCephServices(interfaces.CephState{State: s}, services) // Restart on current host.
client.SendRestartRequestToClusterMembers(context.Background(), s, services)
ceph.RestartCephServices(context.Background(), interfaces.CephState{State: s}, services) // Restart on current host.
}()
}

Expand Down
18 changes: 9 additions & 9 deletions microceph/api/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/tidwall/sjson"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"

"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/ceph"
Expand All @@ -41,16 +41,16 @@ var disksDelCmd = rest.Endpoint{

var mu sync.Mutex

func cmdDisksGet(s *state.State, r *http.Request) response.Response {
disks, err := ceph.ListOSD(s)
func cmdDisksGet(s state.State, r *http.Request) response.Response {
disks, err := ceph.ListOSD(r.Context(), s)
if err != nil {
return response.InternalError(err)
}

return response.SyncResponse(true, disks)
}

func cmdDisksPost(s *state.State, r *http.Request) response.Response {
func cmdDisksPost(s state.State, r *http.Request) response.Response {
var req types.DisksPost
var wal *types.DiskParameter
var db *types.DiskParameter
Expand Down Expand Up @@ -88,7 +88,7 @@ func cmdDisksPost(s *state.State, r *http.Request) response.Response {
db = &types.DiskParameter{Path: *req.DBDev, Encrypt: req.DBEncrypt, Wipe: req.DBWipe, LoopSize: 0}
}

resp := ceph.AddBulkDisks(s, disks, wal, db)
resp := ceph.AddBulkDisks(r.Context(), s, disks, wal, db)
if len(resp.ValidationError) == 0 {
response.SyncResponse(false, resp)
}
Expand All @@ -97,7 +97,7 @@ func cmdDisksPost(s *state.State, r *http.Request) response.Response {
}

// cmdDisksDelete is the handler for DELETE /1.0/disks/{osdid}.
func cmdDisksDelete(s *state.State, r *http.Request) response.Response {
func cmdDisksDelete(s state.State, r *http.Request) response.Response {
var osd string
osd, err := url.PathUnescape(mux.Vars(r)["osdid"])
if err != nil {
Expand All @@ -122,7 +122,7 @@ func cmdDisksDelete(s *state.State, r *http.Request) response.Response {

// if check for crush rule scaledown only if crush change is not prohibited.
if !req.ProhibitCrushScaledown {
needDowngrade, err := ceph.IsDowngradeNeeded(cs, osdid)
needDowngrade, err := ceph.IsDowngradeNeeded(r.Context(), cs, osdid)
if err != nil {
return response.InternalError(err)
}
Expand All @@ -137,7 +137,7 @@ func cmdDisksDelete(s *state.State, r *http.Request) response.Response {
}
}

err = ceph.RemoveOSD(cs, osdid, req.BypassSafety, req.Timeout)
err = ceph.RemoveOSD(r.Context(), cs, osdid, req.BypassSafety, req.Timeout)
if err != nil {
return response.SmartError(err)
}
Expand Down
8 changes: 4 additions & 4 deletions microceph/api/microceph_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/ceph"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"
)

// top level microceph API
Expand All @@ -28,7 +28,7 @@ var logLevelCmd = rest.Endpoint{
Get: rest.EndpointAction{Handler: logLevelGet, ProxyTarget: true},
}

func logLevelPut(s *state.State, r *http.Request) response.Response {
func logLevelPut(s state.State, r *http.Request) response.Response {
var req types.LogLevelPut

err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -46,6 +46,6 @@ func logLevelPut(s *state.State, r *http.Request) response.Response {
return response.EmptySyncResponse
}

func logLevelGet(s *state.State, r *http.Request) response.Response {
func logLevelGet(s state.State, r *http.Request) response.Response {
return response.SyncResponse(true, ceph.GetLogLevel())
}
6 changes: 3 additions & 3 deletions microceph/api/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/canonical/lxd/shared/logger"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"

"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/ceph"
Expand All @@ -20,7 +20,7 @@ var poolsCmd = rest.Endpoint{
Put: rest.EndpointAction{Handler: cmdPoolsPut, ProxyTarget: true},
}

func cmdPoolsPut(s *state.State, r *http.Request) response.Response {
func cmdPoolsPut(s state.State, r *http.Request) response.Response {
var req types.PoolPut

err := json.NewDecoder(r.Body).Decode(&req)
Expand Down
6 changes: 3 additions & 3 deletions microceph/api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/canonical/lxd/lxd/resources"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
"github.com/canonical/microcluster/v2/rest"
"github.com/canonical/microcluster/v2/state"
)

// /1.0/resources endpoint.
Expand All @@ -16,7 +16,7 @@ var resourcesCmd = rest.Endpoint{
Get: rest.EndpointAction{Handler: cmdResourcesGet, ProxyTarget: true},
}

func cmdResourcesGet(s *state.State, r *http.Request) response.Response {
func cmdResourcesGet(s state.State, r *http.Request) response.Response {
storage, err := resources.GetStorage()
if err != nil {
return response.InternalError(err)
Expand Down
Loading

0 comments on commit 54cf522

Please sign in to comment.