Skip to content

Commit

Permalink
Merge pull request #365 from xiaods/dev
Browse files Browse the repository at this point in the history
fix: update etcd modle
  • Loading branch information
xiaods authored Oct 23, 2024
2 parents 5254021 + 375a371 commit 5405c41
Show file tree
Hide file tree
Showing 19 changed files with 3,782 additions and 1,219 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/address_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"sync"

controllerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
controllerv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
"github.com/xiaods/k8e/pkg/util"
v1 "k8s.io/api/core/v1"
Expand Down
58 changes: 45 additions & 13 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"
"runtime"
"strings"
"time"

"github.com/k3s-io/kine/pkg/endpoint"
"github.com/pkg/errors"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/xiaods/k8e/pkg/cluster/managed"
"github.com/xiaods/k8e/pkg/daemons/config"
"github.com/xiaods/k8e/pkg/etcd"
"k8s.io/apimachinery/pkg/util/wait"
utilsnet "k8s.io/utils/net"
)

Expand Down Expand Up @@ -41,23 +43,37 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
ready := make(chan struct{})
defer close(ready)

// try to get /db/info urls first before attempting to use join url
// try to get /db/info urls first, for a current list of etcd cluster member client URLs
clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo, c.config.PrivateIP)
if err != nil {
return nil, err
}
if len(clientURLs) < 1 {
// If we somehow got no error but also no client URLs, just use the address of the server we're joining
if len(clientURLs) == 0 {
clientURL, err := url.Parse(c.config.JoinURL)
if err != nil {
return nil, err
}
clientURL.Host = clientURL.Hostname() + ":2379"
clientURLs = append(clientURLs, clientURL.String())
logrus.Warnf("Got empty etcd ClientURL list; using server URL %s", clientURL)
}
etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0]))
etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0]))
if err != nil {
return nil, err
}
// immediately update the load balancer with all etcd addresses
// client URLs are a full URI, but the proxy only wants host:port
for i, c := range clientURLs {
u, err := url.Parse(c)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd ClientURL")
}
clientURLs[i] = u.Host
}
etcdProxy.Update(clientURLs)

// start periodic endpoint sync goroutine
c.setupEtcdProxy(ctx, etcdProxy)

// remove etcd member if it exists
Expand All @@ -82,7 +98,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
return nil, err
}

if err := c.startStorage(ctx); err != nil {
if err := c.startStorage(ctx, false); err != nil {
return nil, err
}

Expand All @@ -107,11 +123,14 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
}

if !c.config.EtcdDisableSnapshots {
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
_ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := c.managedDB.ReconcileSnapshotData(ctx)
if err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
return err == nil, nil
})
}

return
default:
runtime.Gosched()
Expand All @@ -127,12 +146,19 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
// This calls into the kine endpoint code, which sets up the database client
// and unix domain socket listener if using an external database. In the case of an etcd
// backend it just returns the user-provided etcd endpoints and tls config.
func (c *Cluster) startStorage(ctx context.Context) error {
if c.storageStarted {
func (c *Cluster) startStorage(ctx context.Context, bootstrap bool) error {
if c.storageStarted && !c.config.KineTLS {
return nil
}
c.storageStarted = true

if !bootstrap {
// set the tls config for the kine storage
c.config.Datastore.ServerTLSConfig.CAFile = c.config.Runtime.ETCDServerCA
c.config.Datastore.ServerTLSConfig.CertFile = c.config.Runtime.ServerETCDCert
c.config.Datastore.ServerTLSConfig.KeyFile = c.config.Runtime.ServerETCDKey
}

// start listening on the kine socket as an etcd endpoint, or return the external etcd endpoints
etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore)
if err != nil {
Expand All @@ -143,9 +169,15 @@ func (c *Cluster) startStorage(ctx context.Context) error {
// based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require
// leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers.
c.config.Runtime.EtcdConfig = etcdConfig
c.config.Datastore.BackendTLSConfig = etcdConfig.TLSConfig
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect

// after the bootstrap we need to set the args for api-server with kine in unixs or just set the
// values if the datastoreTLS is not enabled
if !bootstrap || !c.config.KineTLS {
c.config.Datastore.BackendTLSConfig = etcdConfig.TLSConfig
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect
}

return nil
}

Expand Down
25 changes: 4 additions & 21 deletions pkg/cluster/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,25 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"path/filepath"
"strconv"

"github.com/gorilla/mux"
"github.com/rancher/dynamiclistener"
"github.com/rancher/dynamiclistener/factory"
"github.com/rancher/dynamiclistener/storage/file"
"github.com/rancher/dynamiclistener/storage/kubernetes"
"github.com/rancher/dynamiclistener/storage/memory"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
"github.com/sirupsen/logrus"
"github.com/xiaods/k8e/pkg/daemons/config"
"github.com/xiaods/k8e/pkg/util"
"github.com/xiaods/k8e/pkg/version"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilsnet "k8s.io/utils/net"
)

// newListener returns a new TCP listener and HTTP request handler using dynamiclistener.
Expand All @@ -43,11 +41,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
os.Remove(filepath.Join(c.config.DataDir, "tls/dynamic-cert.json"))
}
}
ip := c.config.BindAddress
if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
tcp, err := dynamiclistener.NewTCPListener(ip, c.config.SupervisorPort)
tcp, err := util.ListenWithLoopback(ctx, c.config.BindAddress, strconv.Itoa(c.config.SupervisorPort))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -114,17 +108,6 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
return err
}

if c.config.EnablePProf {
mux := mux.NewRouter().SkipClean(true)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
mux.NotFoundHandler = handler
handler = mux
}

// Create a HTTP server with the registered request handlers, using logrus for logging
server := http.Server{
Handler: handler,
Expand Down
44 changes: 22 additions & 22 deletions pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/xiaods/k8e/pkg/nodepassword"
"github.com/xiaods/k8e/pkg/version"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
)

// testClusterDB returns a channel that will be closed when the datastore connection is available.
Expand Down Expand Up @@ -90,7 +91,9 @@ func (c *Cluster) start(ctx context.Context) error {
return c.managedDB.Start(ctx, c.clientAccessInfo)
}

// registerDBHandlers registers routes for database info with the http request handler
// registerDBHandlers registers managed-datastore-specific callbacks, and installs additional HTTP route handlers.
// Note that for etcd, controllers only run on nodes with a local apiserver, in order to provide stable external
// management of etcd cluster membership without being disrupted when a member is removed from the cluster.
func (c *Cluster) registerDBHandlers(handler http.Handler) (http.Handler, error) {
if c.managedDB == nil {
return handler, nil
Expand Down Expand Up @@ -126,34 +129,31 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
return nil
}

// setupEtcdProxy periodically updates the etcd proxy with the current list of
// setupEtcdProxy starts a goroutine to periodically update the etcd proxy with the current list of
// cluster client URLs, as retrieved from etcd.
func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
if c.managedDB == nil {
return
}
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for range t.C {
newAddresses, err := c.managedDB.GetMembersClientURLs(ctx)
// We use Poll here instead of Until because we want to wait the interval before running the function.
go wait.PollUntilContextCancel(ctx, 30*time.Second, false, func(ctx context.Context) (bool, error) {
clientURLs, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)
return false, nil
}
// client URLs are a full URI, but the proxy only wants host:port
for i, c := range clientURLs {
u, err := url.Parse(c)
if err != nil {
logrus.Warnf("failed to get etcd client URLs: %v", err)
continue
logrus.Warnf("Failed to parse etcd ClientURL: %v", err)
return false, nil
}
// client URLs are a full URI, but the proxy only wants host:port
var hosts []string
for _, address := range newAddresses {
u, err := url.Parse(address)
if err != nil {
logrus.Warnf("failed to parse etcd client URL: %v", err)
continue
}
hosts = append(hosts, u.Host)
}
etcdProxy.Update(hosts)
clientURLs[i] = u.Host
}
}()
etcdProxy.Update(clientURLs)
return false, nil
})
}

// deleteNodePasswdSecret wipes out the node password secret after restoration
Expand All @@ -162,7 +162,7 @@ func (c *Cluster) deleteNodePasswdSecret(ctx context.Context) {
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("node password secret is not found for node %s", nodeName)
logrus.Debugf("Node password secret is not found for node %s", nodeName)
return
}
logrus.Warnf("failed to delete old node password secret: %v", err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/cluster/managed/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Driver interface {
Test(ctx context.Context) error
Restore(ctx context.Context) error
EndpointName() string
Snapshot(ctx context.Context) error
Snapshot(ctx context.Context) (*SnapshotResult, error)
ReconcileSnapshotData(ctx context.Context) error
GetMembersClientURLs(ctx context.Context) ([]string, error)
RemoveSelf(ctx context.Context) error
Expand All @@ -40,3 +40,10 @@ func Registered() []Driver {
func Default() Driver {
return drivers[0]
}

// SnapshotResult is returned by the Snapshot function,
// and lists the names of created and deleted snapshots.
type SnapshotResult struct {
Created []string `json:"created,omitempty"`
Deleted []string `json:"deleted,omitempty"`
}
25 changes: 19 additions & 6 deletions pkg/cluster/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
const maxBootstrapWaitAttempts = 5

func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken string) error {

token, err := util.ReadTokenFromFile(config.Runtime.ServerToken, config.Runtime.ServerCA, config.DataDir)
if err != nil {
return err
Expand All @@ -40,7 +39,7 @@ func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken
tokenKey := storageKey(normalizedToken)

var bootstrapList []client.Value
if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
return false, err
Expand Down Expand Up @@ -151,8 +150,21 @@ func bootstrapKeyData(ctx context.Context, storageClient client.Client) (*client
// bootstrap key as a lock. This function will not return successfully until either the
// bootstrap key has been locked, or data is read into the struct.
func (c *Cluster) storageBootstrap(ctx context.Context) error {
if err := c.startStorage(ctx); err != nil {
return err
if c.config.KineTLS {
bootstrapCtx, cancel := context.WithCancel(ctx)
defer func() {
time.Sleep(time.Second)
cancel()
}()

logrus.Info("Starting temporary kine to reconcile with datastore")
if err := c.startStorage(bootstrapCtx, true); err != nil {
return err
}
} else {
if err := c.startStorage(ctx, true); err != nil {
return err
}
}

storageClient, err := client.New(c.config.Runtime.EtcdConfig)
Expand Down Expand Up @@ -186,7 +198,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {

attempts := 0
tokenKey := storageKey(normalizedToken)
return wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
attempts++
value, saveBootstrap, err := getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token)
c.saveBootstrap = saveBootstrap
Expand Down Expand Up @@ -242,10 +254,11 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {
func getBootstrapKeyFromStorage(ctx context.Context, storageClient client.Client, normalizedToken, oldToken string) (*client.Value, bool, error) {
emptyStringKey := storageKey("")
tokenKey := storageKey(normalizedToken)

var bootstrapList []client.Value
var err error

if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) {
Expand Down
Loading

0 comments on commit 5405c41

Please sign in to comment.