diff --git a/cmd/k8e/main.go b/cmd/k8e/main.go index 4cb2ee88c..c5f8cf93c 100644 --- a/cmd/k8e/main.go +++ b/cmd/k8e/main.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "slices" "strconv" "strings" @@ -27,6 +28,7 @@ import ( ) var criDefaultConfigPath = "/etc/crictl.yaml" +var externalCLIActions = []string{"crictl", "ctr", "kubectl"} // main entrypoint for the k8e multicall binary func main() { @@ -106,7 +108,7 @@ func findDebug(args []string) bool { if debug { return debug } - debug, _ = strconv.ParseBool(configfilearg.MustFindString(args, "debug")) + debug, _ = strconv.ParseBool(configfilearg.MustFindString(args, "debug", externalCLIActions...)) return debug } @@ -126,7 +128,7 @@ func findDataDir(args []string) string { if dataDir != "" { return dataDir } - dataDir = configfilearg.MustFindString(args, "data-dir") + dataDir = configfilearg.MustFindString(args, "data-dir", externalCLIActions...) if d, err := datadir.Resolve(dataDir); err == nil { dataDir = d } else { @@ -144,7 +146,7 @@ func findPreferBundledBin(args []string) bool { fs.SetOutput(io.Discard) fs.BoolVar(&preferBundledBin, "prefer-bundled-bin", false, "Prefer bundled binaries") - preferRes := configfilearg.MustFindString(args, "prefer-bundled-bin") + preferRes := configfilearg.MustFindString(args, "prefer-bundled-bin", externalCLIActions...) if preferRes != "" { preferBundledBin, _ = strconv.ParseBool(preferRes) } @@ -159,8 +161,7 @@ func findPreferBundledBin(args []string) bool { // it returns false so that standard CLI wrapping can occur. func runCLIs(dataDir string) bool { progName := filepath.Base(os.Args[0]) - switch progName { - case "crictl", "ctr", "kubectl": + if slices.Contains(externalCLIActions, progName) { if err := externalCLI(progName, dataDir, os.Args[1:]); err != nil && !errors.Is(err, context.Canceled) { logrus.Fatal(err) } diff --git a/go.mod b/go.mod index 22038d379..73d903dd2 100644 --- a/go.mod +++ b/go.mod @@ -129,9 +129,11 @@ require ( github.com/urfave/cli v1.22.15 github.com/yl2chen/cidranger v1.0.2 go.etcd.io/etcd/api/v3 v3.5.16 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 go.etcd.io/etcd/client/v3 v3.5.16 go.etcd.io/etcd/etcdutl/v3 v3.5.13 go.etcd.io/etcd/server/v3 v3.5.16 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.27.0 golang.org/x/net v0.29.0 golang.org/x/sys v0.25.0 @@ -413,7 +415,6 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/client/v2 v2.305.16 // indirect go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/raft/v3 v3.5.16 // indirect @@ -435,7 +436,6 @@ require ( go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 01e415fe2..1aedac644 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -552,6 +552,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N nodeConfig.Containerd.Log = filepath.Join(envInfo.DataDir, "agent", "containerd", "containerd.log") nodeConfig.Containerd.Registry = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "certs.d") nodeConfig.Containerd.NoDefault = envInfo.ContainerdNoDefault + nodeConfig.Containerd.NonrootDevices = envInfo.ContainerdNonrootDevices nodeConfig.Containerd.Debug = envInfo.Debug applyContainerdStateAndAddress(nodeConfig) applyCRIDockerdAddress(nodeConfig) diff --git a/pkg/agent/containerd/config_linux.go b/pkg/agent/containerd/config_linux.go index 063634bc0..a80efa05c 100644 --- a/pkg/agent/containerd/config_linux.go +++ b/pkg/agent/containerd/config_linux.go @@ -73,6 +73,7 @@ func SetupContainerdConfig(cfg *config.Node) error { SystemdCgroup: cfg.AgentConfig.Systemd, IsRunningInUserNS: isRunningInUserNS, EnableUnprivileged: kernel.CheckKernelVersion(4, 11, 0), + NonrootDevices: cfg.Containerd.NonrootDevices, PrivateRegistryConfig: cfg.AgentConfig.Registry, ExtraRuntimes: extraRuntimes, Program: version.Program, diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index 5edce355b..dc0a08eac 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -29,6 +29,7 @@ type Agent struct { Snapshotter string Docker bool ContainerdNoDefault bool + ContainerdNonrootDevices bool ContainerRuntimeEndpoint string DefaultRuntime string ImageServiceEndpoint string @@ -215,6 +216,11 @@ var ( Usage: "(agent/containerd) Disables containerd's fallback default registry endpoint when a mirror is configured for that registry", Destination: &AgentConfig.ContainerdNoDefault, } + NonrootDevicesFlag = &cli.BoolFlag{ + Name: "nonroot-devices", + Usage: "(agent/containerd) Allows non-root pods to access devices by setting device_ownership_from_security_context=true in the containerd CRI config", + Destination: &AgentConfig.ContainerdNonrootDevices, + } EnablePProfFlag = &cli.BoolFlag{ Name: "enable-pprof", Usage: "(experimental) Enable pprof endpoint on supervisor port", @@ -278,6 +284,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command { SnapshotterFlag, PrivateRegistryFlag, DisableDefaultRegistryEndpointFlag, + NonrootDevicesFlag, AirgapExtraRegistryFlag, NodeIPFlag, BindAddressFlag, diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 1e0a4c4b1..475d412bd 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -490,6 +490,7 @@ var ServerFlags = []cli.Flag{ DefaultRuntimeFlag, ImageServiceEndpointFlag, DisableDefaultRegistryEndpointFlag, + NonrootDevicesFlag, PauseImageFlag, SnapshotterFlag, PrivateRegistryFlag, diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 81bb0b442..2826153e8 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/xiaods/k8e/pkg/util" "github.com/xiaods/k8e/pkg/version" "github.com/xiaods/k8e/pkg/vpn" + etcdversion "go.etcd.io/etcd/api/v3/version" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" kubeapiserverflag "k8s.io/component-base/cli/flag" @@ -146,6 +147,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain serverConfig.ControlConfig.Datastore.NotifyInterval = 5 * time.Second + serverConfig.ControlConfig.Datastore.EmulatedETCDVersion = etcdversion.Version serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint serverConfig.ControlConfig.Datastore.BackendTLSConfig.CAFile = cfg.DatastoreCAFile serverConfig.ControlConfig.Datastore.BackendTLSConfig.CertFile = cfg.DatastoreCertFile diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 6691a069b..d8033bbf4 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -9,11 +9,11 @@ import ( "strings" "sync" - "github.com/xiaods/k8e/pkg/generated/controllers/k8e.cattle.io" "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/wharfie/pkg/registries" "github.com/rancher/wrangler/v3/pkg/generated/controllers/core" "github.com/rancher/wrangler/v3/pkg/leader" + "github.com/xiaods/k8e/pkg/generated/controllers/k8e.cattle.io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -22,12 +22,12 @@ import ( ) const ( - EgressSelectorModeAgent = "agent" - EgressSelectorModeCluster = "cluster" - EgressSelectorModeDisabled = "disabled" - EgressSelectorModePod = "pod" - CertificateRenewDays = 90 - StreamServerPort = "10010" + EgressSelectorModeAgent = "agent" + EgressSelectorModeCluster = "cluster" + EgressSelectorModeDisabled = "disabled" + EgressSelectorModePod = "pod" + CertificateRenewDays = 90 + StreamServerPort = "10010" ) type Node struct { @@ -66,19 +66,20 @@ type EtcdS3 struct { } type Containerd struct { - Address string - Log string - Root string - State string - Config string - Opt string - Template string - BlockIOConfig string - RDTConfig string - Registry string - NoDefault bool - SELinux bool - Debug bool + Address string + Log string + Root string + State string + Config string + Opt string + Template string + BlockIOConfig string + RDTConfig string + Registry string + NoDefault bool + NonrootDevices bool + SELinux bool + Debug bool } type CRIDockerd struct { @@ -450,4 +451,4 @@ func GetArgs(initialArgs map[string]string, extraArgs []string) []string { } return args -} \ No newline at end of file +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index ec0fec087..95cb99d5c 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -41,8 +41,15 @@ import ( "github.com/xiaods/k8e/pkg/version" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/credentials" snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -55,7 +62,7 @@ import ( ) const ( - testTimeout = time.Second * 30 + statusTimeout = time.Second * 30 manageTickerTime = time.Second * 15 learnerMaxStallTime = time.Minute * 5 memberRemovalTimeout = time.Minute * 1 @@ -206,35 +213,40 @@ func (e *ETCD) Test(ctx context.Context) error { return errors.New("etcd datastore is not started") } - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - endpoints := getEndpoints(e.config) - status, err := e.client.Status(ctx, endpoints[0]) + status, err := e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") + } else if status.IsLearner { + return errors.New("this server has not yet been promoted from learner to voting member") + } else if status.Leader == 0 { + return etcdserver.ErrNoLeader } - if status.IsLearner { - return errors.New("this server has not yet been promoted from learner to voting member") + logrus.Infof("Connected to etcd v%s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize) + if len(status.Errors) > 0 { + logrus.Warnf("Errors present on etcd cluster: %s", strings.Join(status.Errors, ",")) } + // defrag this node to reclaim freed space from compacted revisions if err := e.defragment(ctx); err != nil { return errors.Wrap(err, "failed to defragment etcd database") } - if err := e.clearAlarms(ctx); err != nil { - return errors.Wrap(err, "failed to report and disarm etcd alarms") + // clear alarms on this node + if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil { + return errors.Wrap(err, "failed to disarm etcd alarms") } - // refresh status to see if any errors remain after clearing alarms - status, err = e.client.Status(ctx, endpoints[0]) + // refresh status - note that errors may remain on other nodes, but this + // should not prevent us from continuing with startup. + status, err = e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") } + logrus.Infof("Datastore using %d of %d bytes after defragment", status.DbSizeInUse, status.DbSize) if len(status.Errors) > 0 { - return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", ")) + logrus.Warnf("Errors present on etcd cluster after defragment: %s", strings.Join(status.Errors, ",")) } members, err := e.client.MemberList(ctx) @@ -242,6 +254,7 @@ func (e *ETCD) Test(ctx context.Context) error { return err } + // Ensure that there is a cluster member with our peerURL and name var memberNameUrls []string for _, member := range members.Members { for _, peerURL := range member.PeerURLs { @@ -253,6 +266,8 @@ func (e *ETCD) Test(ctx context.Context) error { memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0]) } } + + // no matching PeerURL on any Member, return an error that indicates what was expected vs what we found. return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()} } @@ -523,7 +538,7 @@ func (e *ETCD) startClient(ctx context.Context) error { e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey - client, err := getClient(ctx, e.config, endpoints...) + client, conn, err := getClient(ctx, e.config, endpoints...) if err != nil { return err } @@ -531,9 +546,8 @@ func (e *ETCD) startClient(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() return nil @@ -554,11 +568,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er return err } - client, err := getClient(clientCtx, e.config, clientURLs...) + client, conn, err := getClient(clientCtx, e.config, clientURLs...) if err != nil { return err } - defer client.Close() + defer conn.Close() for _, member := range memberList.Members { for _, peer := range member.PeerURLs { @@ -725,13 +739,49 @@ func (e *ETCD) infoHandler() http.Handler { // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) { +func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) { + logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel) + if err != nil { + return nil, nil, err + } + cfg, err := getClientConfig(ctx, control, endpoints...) if err != nil { - return nil, err + return nil, nil, err + } + + // Set up dialer and resolver options. + // This is normally handled by clientv3.New() but that wraps all the GRPC + // service with retry handlers and uses deprecated grpc.DialContext() which + // tries to establish a connection even when one isn't wanted. + if cfg.DialKeepAliveTime > 0 { + params := keepalive.ClientParameters{ + Time: cfg.DialKeepAliveTime, + Timeout: cfg.DialKeepAliveTimeout, + PermitWithoutStream: cfg.PermitWithoutStream, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params)) + } + if cfg.TLS != nil { + creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials() + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds)) + } else { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) } - return clientv3.New(*cfg) + cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0]))) + target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0])) + conn, err := grpc.NewClient(target, cfg.DialOptions...) + if err != nil { + return nil, nil, err + } + // Create a new client and wire up the GRPC service interfaces. + // Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87 + client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client"))) + client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client) + client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client) + client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client) + return client, conn, nil } // getClientConfig generates an etcd client config connected to the specified endpoints. @@ -851,11 +901,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error { } defer sqliteClient.Close() - etcdClient, err := getClient(ctx, e.config) + etcdClient, conn, err := getClient(ctx, e.config) if err != nil { return err } - defer etcdClient.Close() + defer conn.Close() values, err := sqliteClient.List(ctx, "/registry/", 0) if err != nil { @@ -984,7 +1034,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { return errors.New("etcd datastore already started") } - client, err := getClient(ctx, e.config) + client, conn, err := getClient(ctx, e.config) if err != nil { return err } @@ -992,9 +1042,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil { @@ -1251,8 +1300,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre } func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) { - ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() resp, err := e.client.Status(ctx, url) if err != nil { return resp, errors.Wrap(err, "failed to check etcd member status") @@ -1363,11 +1410,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) return err } -// clearAlarms checks for any alarms on the local etcd member. If found, they are -// reported and the alarm state is cleared. -func (e *ETCD) clearAlarms(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() +// clearAlarms checks for any NOSPACE alarms on the local etcd member. +// If found, they are reported and the alarm state is cleared. +// Other alarm types are not handled. +func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error { if e.client == nil { return errors.New("etcd client was nil") @@ -1379,22 +1425,36 @@ func (e *ETCD) clearAlarms(ctx context.Context) error { } for _, alarm := range alarmList.Alarms { - logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm) - } - - if len(alarmList.Alarms) > 0 { - if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil { - return fmt.Errorf("etcd alarm disarm failed: %v", err) + if alarm.MemberID != memberID { + // ignore alarms on other cluster members, they should manage their own problems + continue + } + if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE { + if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil { + return fmt.Errorf("%s disarm failed: %v", alarm.Alarm, err) + } + logrus.Infof("%s disarmed successfully", alarm.Alarm) + } else { + return fmt.Errorf("%s alarm must be disarmed manually", alarm.Alarm) } - logrus.Infof("Alarms disarmed on etcd server") } return nil } -func (e *ETCD) defragment(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) +// status returns status using the first etcd endpoint. +func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) { + if e.client == nil { + return nil, errors.New("etcd client was nil") + } + ctx, cancel := context.WithTimeout(ctx, statusTimeout) defer cancel() + endpoints := getEndpoints(e.config) + return e.client.Status(ctx, endpoints[0]) +} + +// defragment defragments the etcd datastore using the first etcd endpoint +func (e *ETCD) defragment(ctx context.Context) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1550,11 +1610,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) // GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd // and unmarshal it to a list of apiserver endpoints. func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { - cl, err := getClient(ctx, cfg) + cl, conn, err := getClient(ctx, cfg) if err != nil { return nil, err } - defer cl.Close() + defer conn.Close() etcdResp, err := cl.KV.Get(ctx, AddressKey) if err != nil { @@ -1576,9 +1636,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin // GetMembersClientURLs will list through the member lists in etcd and return // back a combined list of client urls for each member in the cluster func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - members, err := e.client.MemberList(ctx) if err != nil { return nil, err @@ -1593,24 +1650,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { return clientURLs, nil } -// GetMembersNames will list through the member lists in etcd and return -// back a combined list of member names -func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - members, err := e.client.MemberList(ctx) - if err != nil { - return nil, err - } - - var memberNames []string - for _, member := range members.Members { - memberNames = append(memberNames, member.Name) - } - return memberNames, nil -} - // RemoveSelf will remove the member if it exists in the cluster. This should // only be called on a node that may have previously run etcd, but will not // currently run etcd, to ensure that it is not a member of the cluster. diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index a888e1174..eabaeaa85 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" @@ -15,11 +16,23 @@ import ( "github.com/xiaods/k8e/pkg/daemons/config" "github.com/xiaods/k8e/pkg/etcd/s3" testutil "github.com/xiaods/k8e/tests" + "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" utilnet "k8s.io/apimachinery/pkg/util/net" ) +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + func mustGetAddress() string { ipAddr, err := utilnet.ChooseHostInterface() if err != nil { @@ -75,7 +88,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { wantErr bool }{ { - name: "Directory exists", + name: "directory exists", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -94,7 +107,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { want: true, }, { - name: "Directory does not exist", + name: "directory does not exist", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -116,9 +129,6 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { }, } - // enable logging - logrus.SetLevel(logrus.DebugLevel) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := NewETCD() @@ -158,7 +168,7 @@ func Test_UnitETCD_Register(t *testing.T) { wantErr bool }{ { - name: "Call Register with standard config", + name: "standard config", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -173,7 +183,7 @@ func Test_UnitETCD_Register(t *testing.T) { }, }, { - name: "Call Register with a tombstone file created", + name: "with a tombstone file created", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -248,7 +258,7 @@ func Test_UnitETCD_Start(t *testing.T) { wantErr bool }{ { - name: "Start etcd without clientAccessInfo and without snapshots", + name: "nil clientAccessInfo and nil cron", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -265,17 +275,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd without clientAccessInfo on", + name: "nil clientAccessInfo", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -292,17 +303,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd with an existing cluster", + name: "existing cluster", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -321,13 +333,14 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) os.Remove(walDir(e.config)) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, @@ -352,8 +365,478 @@ func Test_UnitETCD_Start(t *testing.T) { } if err := tt.teardown(e, &tt.fields.context); err != nil { t.Errorf("Teardown for ETCD.Start() failed = %v", err) + } + }) + } +} + +func Test_UnitETCD_Test(t *testing.T) { + type contextInfo struct { + ctx context.Context + cancel context.CancelFunc + } + type fields struct { + context contextInfo + client *clientv3.Client + config *config.Control + name string + address string + } + type args struct { + clientAccessInfo *clientaccess.Info + } + tests := []struct { + name string + fields fields + setup func(e *ETCD, ctxInfo *contextInfo) error + teardown func(e *ETCD, ctxInfo *contextInfo) error + wantErr bool + }{ + { + name: "no server running", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "unreachable server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + e.config.Runtime.EtcdConfig.Endpoints = []string{"https://192.0.2.0:2379"} // RFC5737 + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "learner server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, true, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "corrupt server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, true, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "leaderless server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, true, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "normal server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "alarm on other server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + extraAlarm := &etcdserverpb.AlarmMember{MemberID: 2, Alarm: etcdserverpb.AlarmType_NOSPACE} + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second, extraAlarm); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "slow defrag", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, 40*time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ETCD{ + client: tt.fields.client, + config: tt.fields.config, + name: tt.fields.name, + address: tt.fields.address, + } + + if err := tt.setup(e, &tt.fields.context); err != nil { + t.Errorf("Setup for ETCD.Test() failed = %v", err) return } + start := time.Now() + err := e.Test(tt.fields.context.ctx) + duration := time.Now().Sub(start) + t.Logf("ETCD.Test() completed in %v with err=%v", duration, err) + if (err != nil) != tt.wantErr { + t.Errorf("ETCD.Test() error = %v, wantErr %v", err, tt.wantErr) + } + if err := tt.teardown(e, &tt.fields.context); err != nil { + t.Errorf("Teardown for ETCD.Test() failed = %v", err) + } }) } } + +// startMock starts up a mock etcd grpc service with canned responses +// that can be used to test specific scenarios. +func startMock(ctx context.Context, e *ETCD, isLearner, isCorrupt, noLeader bool, defragDelay time.Duration, extraAlarms ...*etcdserverpb.AlarmMember) error { + address := authority(getEndpoints(e.config)[0]) + // listen on endpoint and close listener on context cancel + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + + // set up tls if enabled + gopts := []grpc.ServerOption{} + if e.config.Datastore.ServerTLSConfig.CertFile != "" && e.config.Datastore.ServerTLSConfig.KeyFile != "" { + creds, err := credentials.NewServerTLSFromFile(e.config.Datastore.ServerTLSConfig.CertFile, e.config.Datastore.ServerTLSConfig.KeyFile) + if err != nil { + return err + } + gopts = append(gopts, grpc.Creds(creds)) + } + server := grpc.NewServer(gopts...) + + mock := &mockEtcd{ + e: e, + mu: &sync.RWMutex{}, + isLearner: isLearner, + isCorrupt: isCorrupt, + noLeader: noLeader, + defragDelay: defragDelay, + extraAlarms: extraAlarms, + } + + // register grpc services + etcdserverpb.RegisterKVServer(server, mock) + etcdserverpb.RegisterClusterServer(server, mock) + etcdserverpb.RegisterMaintenanceServer(server, mock) + + hsrv := health.NewServer() + hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, hsrv) + + reflection.Register(server) + + // shutdown on context cancel + go func() { + <-ctx.Done() + server.GracefulStop() + listener.Close() + }() + + // start serving + go func() { + logrus.Infof("Mock etcd server starting on %s", listener.Addr()) + logrus.Infof("Mock etcd server exited: %v", server.Serve(listener)) + }() + + return nil +} + +type mockEtcd struct { + e *ETCD + mu *sync.RWMutex + calls map[string]int + isLearner bool + isCorrupt bool + noLeader bool + defragDelay time.Duration + extraAlarms []*etcdserverpb.AlarmMember +} + +// increment call counter for this function +func (m *mockEtcd) inc(call string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.calls == nil { + m.calls = map[string]int{} + } + m.calls[call] = m.calls[call] + 1 +} + +// get call counter for this function +func (m *mockEtcd) get(call string) int { + m.mu.RLock() + defer m.mu.RUnlock() + return m.calls[call] +} + +// get alarm list +func (m *mockEtcd) alarms() []*etcdserverpb.AlarmMember { + alarms := m.extraAlarms + if m.get("alarm") < 2 { + // on the first check, return NOSPACE so that we can clear it after defragging + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_NOSPACE, + MemberID: 1, + }) + } + if m.isCorrupt { + // return CORRUPT if so requested + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_CORRUPT, + MemberID: 1, + }) + } + return alarms +} + +// KV mocks +func (m *mockEtcd) Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { + m.inc("range") + return nil, unsupported("range") +} +func (m *mockEtcd) Put(context.Context, *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + m.inc("put") + return nil, unsupported("put") +} +func (m *mockEtcd) DeleteRange(context.Context, *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + m.inc("deleterange") + return nil, unsupported("deleterange") +} +func (m *mockEtcd) Txn(context.Context, *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { + m.inc("txn") + return nil, unsupported("txn") +} +func (m *mockEtcd) Compact(context.Context, *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + m.inc("compact") + return nil, unsupported("compact") +} + +// Maintenance mocks +func (m *mockEtcd) Alarm(ctx context.Context, r *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { + m.inc("alarm") + res := &etcdserverpb.AlarmResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + } + if r.Action == etcdserverpb.AlarmRequest_GET { + res.Alarms = m.alarms() + } + return res, nil +} +func (m *mockEtcd) Status(context.Context, *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) { + m.inc("status") + res := &etcdserverpb.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Leader: 1, + Version: "v3.5.0-mock0", + DbSize: 1024, + DbSizeInUse: 512, + IsLearner: m.isLearner, + } + if m.noLeader { + res.Leader = 0 + res.Errors = append(res.Errors, etcdserver.ErrNoLeader.Error()) + } + for _, a := range m.alarms() { + res.Errors = append(res.Errors, a.String()) + } + return res, nil +} +func (m *mockEtcd) Defragment(ctx context.Context, r *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) { + m.inc("defragment") + // delay defrag response by configured time, or until the request is cancelled + select { + case <-ctx.Done(): + case <-time.After(m.defragDelay): + } + return &etcdserverpb.DefragmentResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + }, nil +} +func (m *mockEtcd) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) { + m.inc("hash") + return nil, unsupported("hash") +} +func (m *mockEtcd) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) { + m.inc("hashkv") + return nil, unsupported("hashkv") +} +func (m *mockEtcd) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error { + m.inc("snapshot") + return unsupported("snapshot") +} +func (m *mockEtcd) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) { + m.inc("moveleader") + return nil, unsupported("moveleader") +} +func (m *mockEtcd) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) { + m.inc("downgrade") + return nil, unsupported("downgrade") +} + +// Cluster mocks +func (m *mockEtcd) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) { + m.inc("memberadd") + return nil, unsupported("memberadd") +} +func (m *mockEtcd) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) { + m.inc("memberremove") + return nil, etcdserver.ErrNotEnoughStartedMembers +} +func (m *mockEtcd) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) { + m.inc("memberupdate") + return nil, unsupported("memberupdate") +} +func (m *mockEtcd) MemberList(context.Context, *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) { + m.inc("memberlist") + scheme := "http" + if m.e.config.Datastore.ServerTLSConfig.CertFile != "" { + scheme = "https" + } + + return &etcdserverpb.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Members: []*etcdserverpb.Member{ + { + ID: 1, + Name: m.e.name, + IsLearner: m.isLearner, + ClientURLs: []string{scheme + "://127.0.0.1:2379"}, + PeerURLs: []string{scheme + "://" + m.e.address + ":2380"}, + }, + }, + }, nil +} + +func (m *mockEtcd) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) { + m.inc("memberpromote") + return nil, unsupported("memberpromote") +} + +func unsupported(field string) error { + return status.New(codes.Unimplemented, field+" is not implemented").Err() +} diff --git a/pkg/etcd/resolver.go b/pkg/etcd/resolver.go new file mode 100644 index 000000000..b95242cbf --- /dev/null +++ b/pkg/etcd/resolver.go @@ -0,0 +1,80 @@ +package etcd + +import ( + "net/url" + "path" + "strings" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +const scheme = "etcd-endpoint" + +type EtcdSimpleResolver struct { + *manual.Resolver + endpoint string +} + +// Cribbed from https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/internal/resolver/resolver.go +// but only supports a single fixed endpoint. We use this instead of the internal etcd client resolver +// because the agent loadbalancer handles failover and we don't want etcd or grpc's special behavior. +func NewSimpleResolver(endpoint string) *EtcdSimpleResolver { + r := manual.NewBuilderWithScheme(scheme) + return &EtcdSimpleResolver{Resolver: r, endpoint: endpoint} +} + +func (r *EtcdSimpleResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + + if r.CC != nil { + addr, serverName := interpret(r.endpoint) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr, ServerName: serverName}}, + }) + } + + return res, nil +} + +func interpret(ep string) (string, string) { + if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") { + if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") { + _, absolutePath, _ := strings.Cut(ep, "://") + return "unix://" + absolutePath, path.Base(absolutePath) + } + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") { + _, localPath, _ := strings.Cut(ep, "://") + return "unix:" + localPath, path.Base(localPath) + } + _, localPath, _ := strings.Cut(ep, ":") + return "unix:" + localPath, path.Base(localPath) + } + if strings.Contains(ep, "://") { + url, err := url.Parse(ep) + if err != nil { + return ep, ep + } + if url.Scheme == "http" || url.Scheme == "https" { + return url.Host, url.Host + } + return ep, url.Host + } + return ep, ep +} + +func authority(ep string) string { + if _, authority, ok := strings.Cut(ep, "://"); ok { + return authority + } + if suff, ok := strings.CutPrefix(ep, "unix:"); ok { + return suff + } + if suff, ok := strings.CutPrefix(ep, "unixs:"); ok { + return suff + } + return ep +} diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index fb83bdaf3..5d5b7da59 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -27,7 +27,7 @@ import ( "github.com/xiaods/k8e/pkg/etcd/snapshot" "github.com/xiaods/k8e/pkg/util" "github.com/xiaods/k8e/pkg/version" - snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + snapshotv3 "go.etcd.io/etcd/client/v3/snapshot" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -243,7 +243,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { var sf *snapshot.File - if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil { + if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil { sf = &snapshot.File{ Name: snapshotName, Location: "", diff --git a/tests/unit.go b/tests/unit.go index c6aa3f501..458a3e6e4 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -54,6 +54,10 @@ func GenerateRuntime(cnf *config.Control) error { deps.CreateRuntimeCertFiles(cnf) + cnf.Datastore.ServerTLSConfig.CAFile = cnf.Runtime.ETCDServerCA + cnf.Datastore.ServerTLSConfig.CertFile = cnf.Runtime.ServerETCDCert + cnf.Datastore.ServerTLSConfig.KeyFile = cnf.Runtime.ServerETCDKey + return deps.GenServerDeps(cnf) }