From 6e5d0e269428a496d91cb4eae30c14edb05c3ffe Mon Sep 17 00:00:00 2001 From: xuriwuyun Date: Tue, 24 Oct 2023 17:56:50 +0800 Subject: [PATCH] remove etcd binding --- lorry/binding/etcd/etcd.go | 166 ------------------ lorry/engines/etcd/get_replica_role.go | 43 +++++ lorry/engines/etcd/manager.go | 108 ++++++++++++ lorry/engines/etcd/manager_test.go | 102 +++++++++++ .../etcd/suite_test.go} | 103 ++++++----- lorry/engines/register/managers.go | 2 + 6 files changed, 310 insertions(+), 214 deletions(-) delete mode 100644 lorry/binding/etcd/etcd.go create mode 100644 lorry/engines/etcd/get_replica_role.go create mode 100644 lorry/engines/etcd/manager.go create mode 100644 lorry/engines/etcd/manager_test.go rename lorry/{binding/etcd/etcd_test.go => engines/etcd/suite_test.go} (56%) diff --git a/lorry/binding/etcd/etcd.go b/lorry/binding/etcd/etcd.go deleted file mode 100644 index a6ab80d7aa6..00000000000 --- a/lorry/binding/etcd/etcd.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright (C) 2022-2023 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package etcd - -import ( - "context" - "strconv" - "strings" - "sync" - "time" - - "github.com/go-logr/logr" - v3 "go.etcd.io/etcd/client/v3" - ctrl "sigs.k8s.io/controller-runtime" - - . "github.com/apecloud/kubeblocks/lorry/binding" - "github.com/apecloud/kubeblocks/lorry/component" - "github.com/apecloud/kubeblocks/lorry/util" -) - -type Etcd struct { - lock sync.Mutex - etcd *v3.Client - endpoint string - BaseOperations -} - -const ( - endpoint = "endpoint" - - defaultPort = 2379 - defaultDialTimeout = 400 * time.Millisecond -) - -// NewEtcd returns a new etcd binding instance. -func NewEtcd() *Etcd { - logger := ctrl.Log.WithName("Etcd") - return &Etcd{BaseOperations: BaseOperations{Logger: logger}} -} - -func (e *Etcd) Init(metadata component.Properties) error { - e.BaseOperations.Init(metadata) - e.endpoint = e.Metadata[endpoint] - e.DBType = "etcd" - e.InitIfNeed = e.initIfNeed - e.DBPort = e.GetRunningPort() - e.BaseOperations.GetRole = e.GetRole - e.OperationsMap[util.GetRoleOperation] = e.GetRoleOps - return nil -} - -func (e *Etcd) initIfNeed() bool { - if e.etcd == nil { - go func() { - err := e.InitDelay() - e.Logger.Error(err, "Etcd connection init failed") - }() - return true - } - return false -} - -func (e *Etcd) InitDelay() error { - e.lock.Lock() - defer e.lock.Unlock() - - if e.etcd != nil { - return nil - } - - cli, err := v3.New(v3.Config{ - Endpoints: []string{e.endpoint}, - DialTimeout: defaultDialTimeout, - }) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) - _, err = cli.Status(ctx, e.endpoint) - cancel() - if err != nil { - cli.Close() - return err - } - - e.etcd = cli - - return nil -} - -func (e *Etcd) GetRole(ctx context.Context, req *ProbeRequest, resp *ProbeResponse) (string, error) { - etcdResp, err := e.etcd.Status(ctx, e.endpoint) - if err != nil { - return "", err - } - - role := "follower" - switch { - case etcdResp.Leader == etcdResp.Header.MemberId: - role = "leader" - case etcdResp.IsLearner: - role = "learner" - } - - return role, nil -} - -func (e *Etcd) GetRoleOps(ctx context.Context, req *ProbeRequest, resp *ProbeResponse) (OpsResult, error) { - role, err := e.GetRole(ctx, req, resp) - if err != nil { - return nil, err - } - opsRes := OpsResult{} - opsRes["role"] = role - return opsRes, nil -} - -func (e *Etcd) GetRunningPort() int { - index := strings.Index(e.endpoint, ":") - if index < 0 { - return defaultPort - } - port, err := strconv.Atoi(e.endpoint[index+1:]) - if err != nil { - return defaultPort - } - - return port -} - -func (e *Etcd) StatusCheck(ctx context.Context, cmd string, response *ProbeResponse) ([]byte, error) { - // TODO implement me when proposal is passed - return nil, nil -} - -func (e *Etcd) GetLogger() logr.Logger { - return e.Logger -} - -func (e *Etcd) InternalQuery(ctx context.Context, sql string) ([]byte, error) { - // TODO: impl - return nil, nil -} - -func (e *Etcd) InternalExec(ctx context.Context, sql string) (int64, error) { - // TODO: impl - return 0, nil -} diff --git a/lorry/engines/etcd/get_replica_role.go b/lorry/engines/etcd/get_replica_role.go new file mode 100644 index 00000000000..c90ddbb5175 --- /dev/null +++ b/lorry/engines/etcd/get_replica_role.go @@ -0,0 +1,43 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package etcd + +import ( + "context" + + "github.com/apecloud/kubeblocks/lorry/dcs" +) + +func (mgr *Manager) GetReplicaRole(ctx context.Context, cluster *dcs.Cluster) (string, error) { + etcdResp, err := mgr.etcd.Status(ctx, mgr.endpoint) + if err != nil { + return "", err + } + + role := "follower" + switch { + case etcdResp.Leader == etcdResp.Header.MemberId: + role = "leader" + case etcdResp.IsLearner: + role = "learner" + } + + return role, nil +} diff --git a/lorry/engines/etcd/manager.go b/lorry/engines/etcd/manager.go new file mode 100644 index 00000000000..19e1c4d787c --- /dev/null +++ b/lorry/engines/etcd/manager.go @@ -0,0 +1,108 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package etcd + +import ( + "context" + "strconv" + "strings" + "time" + + v3 "go.etcd.io/etcd/client/v3" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/apecloud/kubeblocks/lorry/engines" +) + +const ( + endpoint = "endpoint" + + defaultPort = 2379 + defaultDialTimeout = 600 * time.Millisecond +) + +type Manager struct { + engines.DBManagerBase + etcd *v3.Client + endpoint string +} + +var _ engines.DBManager = &Manager{} + +func NewManager(properties engines.Properties) (engines.DBManager, error) { + logger := ctrl.Log.WithName("ETCD") + + managerBase, err := engines.NewDBManagerBase(logger) + if err != nil { + return nil, err + } + + mgr := &Manager{ + DBManagerBase: *managerBase, + } + + var endpoints []string + endpoint, ok := properties[endpoint] + if ok { + mgr.endpoint = endpoint + endpoints = []string{endpoint} + } + + cli, err := v3.New(v3.Config{ + Endpoints: endpoints, + DialTimeout: defaultDialTimeout, + }) + if err != nil { + return nil, err + } + + mgr.etcd = cli + return mgr, nil +} + +func (mgr *Manager) IsDBStartupReady() bool { + if mgr.DBStartupReady { + return true + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) + status, err := mgr.etcd.Status(ctx, mgr.endpoint) + cancel() + if err != nil { + mgr.Logger.Info("get etcd status failed", "error", err, "status", status) + } + + mgr.DBStartupReady = true + mgr.Logger.Info("DB startup ready") + return true +} + +func (mgr *Manager) GetRunningPort() int { + index := strings.Index(mgr.endpoint, ":") + if index < 0 { + return defaultPort + } + port, err := strconv.Atoi(mgr.endpoint[index+1:]) + if err != nil { + return defaultPort + } + + return port +} diff --git a/lorry/engines/etcd/manager_test.go b/lorry/engines/etcd/manager_test.go new file mode 100644 index 00000000000..86d934fa211 --- /dev/null +++ b/lorry/engines/etcd/manager_test.go @@ -0,0 +1,102 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package etcd + +import ( + "context" + "fmt" + "net" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + + "github.com/apecloud/kubeblocks/lorry/engines" +) + +const ( + urlWithPort = "127.0.0.1:2379" +) + +// Test case for Init() function +var _ = Describe("ETCD DBManager", func() { + // Set up relevant viper config variables + viper.Set("KB_SERVICE_USER", "testuser") + viper.Set("KB_SERVICE_PASSWORD", "testpassword") + Context("new db manager", func() { + It("with rigth configurations", func() { + properties := engines.Properties{ + "endpoint": urlWithPort, + } + dbManger, err := NewManager(properties) + Expect(err).Should(Succeed()) + Expect(dbManger).ShouldNot(BeNil()) + }) + + It("with wrong configurations", func() { + properties := engines.Properties{} + dbManger, err := NewManager(properties) + Expect(err).Should(HaveOccurred()) + Expect(dbManger).Should(BeNil()) + }) + }) + + Context("is db startup ready", func() { + It("it is ready", func() { + etcdServer, err := StartEtcdServer() + Expect(err).Should(BeNil()) + defer etcdServer.Stop() + testEndpoint := fmt.Sprintf("http://%s", etcdServer.ETCD.Clients[0].Addr().(*net.TCPAddr).String()) + manager := &Manager{ + etcd: etcdServer.client, + endpoint: testEndpoint, + } + Expect(manager.IsDBStartupReady()).Should(BeTrue()) + }) + + // It("it is not ready", func() { + // etcdServer, err := StartEtcdServer() + // Expect(err).Should(BeNil()) + // etcdServer.Stop() + // testEndpoint := fmt.Sprintf("http://%s", etcdServer.ETCD.Clients[0].Addr().(*net.TCPAddr).String()) + // manager := &Manager{ + // etcd: etcdServer.client, + // endpoint: testEndpoint, + // } + // Expect(manager.IsDBStartupReady()).Should(BeFalse()) + // }) + }) + + Context("get replica role", func() { + It("get leader", func() { + etcdServer, err := StartEtcdServer() + Expect(err).Should(BeNil()) + defer etcdServer.Stop() + testEndpoint := fmt.Sprintf("http://%s", etcdServer.ETCD.Clients[0].Addr().(*net.TCPAddr).String()) + manager := &Manager{ + etcd: etcdServer.client, + endpoint: testEndpoint, + } + role, err := manager.GetReplicaRole(context.Background(), nil) + Expect(err).Should(BeNil()) + Expect(role).Should(Equal("leader")) + }) + }) +}) diff --git a/lorry/binding/etcd/etcd_test.go b/lorry/engines/etcd/suite_test.go similarity index 56% rename from lorry/binding/etcd/etcd_test.go rename to lorry/engines/etcd/suite_test.go index 1c6cbf35da0..5a4fe226ed5 100644 --- a/lorry/binding/etcd/etcd_test.go +++ b/lorry/engines/etcd/suite_test.go @@ -20,75 +20,81 @@ along with this program. If not, see . package etcd import ( - "context" - "fmt" - "io/ioutil" - "net" + "errors" "net/url" "os" "testing" "time" "github.com/go-logr/logr" - "github.com/go-logr/zapr" - "github.com/pkg/errors" + "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" - "go.uber.org/zap" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" - "github.com/apecloud/kubeblocks/lorry/binding" + "github.com/apecloud/kubeblocks/lorry/dcs" ) const ( etcdStartTimeout = 30 ) -func TestETCD(t *testing.T) { - etcdServer, err := startEtcdServer("http://localhost:0") - if err != nil { - t.Errorf("start embedded etcd server error: %s", err) - } - defer stopEtcdServer(etcdServer) - testEndpoint := fmt.Sprintf("http://%s", etcdServer.ETCD.Clients[0].Addr().(*net.TCPAddr).String()) - - t.Run("Invoke GetRole", func(t *testing.T) { - e := mockEtcd(etcdServer) - role, err := e.GetRole(context.Background(), &binding.ProbeRequest{}, &binding.ProbeResponse{}) - if err != nil { - t.Errorf("get role error: %s", err) - } - if role != "leader" { - t.Errorf("unexpected role: %s", role) - } - }) - t.Run("InitDelay", func(t *testing.T) { - e := &Etcd{endpoint: testEndpoint} - err = e.InitDelay() - if err != nil { - t.Errorf("etcd client init error: %s", err) - } - }) +var ( + dcsStore dcs.DCS + mockDCSStore *dcs.MockDCS + etcdServer *EmbeddedETCD +) + +func init() { + viper.AutomaticEnv() + viper.SetDefault("KB_POD_NAME", "pod-test-0") + viper.SetDefault("KB_CLUSTER_COMP_NAME", "cluster-component-test") + viper.SetDefault("KB_NAMESPACE", "namespace-test") + ctrl.SetLogger(zap.New()) } -func mockEtcd(etcdServer *EmbeddedETCD) *Etcd { - e := &Etcd{} - e.etcd = etcdServer.client - return e +func TestETCDDBManager(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ETCD DBManager. Suite") } -func startEtcdServer(peerAddress string) (*EmbeddedETCD, error) { - etcd := &EmbeddedETCD{} - development, err := zap.NewDevelopment() - if err != nil { - return nil, err - } - logger := zapr.NewLogger(development) - etcd.logger = logger - return etcd, etcd.Start(peerAddress) +var _ = BeforeSuite(func() { + // Init mock dcs store + InitMockDCSStore() + + // Start ETCD Server + // server, err := StartEtcdServer() + // Expect(err).Should(BeNil()) + // etcdServer = server +}) + +var _ = AfterSuite(func() { + StopEtcdServer(etcdServer) +}) + +func InitMockDCSStore() { + ctrl := gomock.NewController(GinkgoT()) + mockDCSStore = dcs.NewMockDCS(ctrl) + mockDCSStore.EXPECT().GetClusterFromCache().Return(&dcs.Cluster{}).AnyTimes() + dcs.SetStore(mockDCSStore) + dcsStore = mockDCSStore +} + +func StartEtcdServer() (*EmbeddedETCD, error) { + peerAddress := "http://localhost:0" + + etcdServer := &EmbeddedETCD{} + logger := ctrl.Log.WithName("ETCD server") + etcdServer.logger = logger + return etcdServer, etcdServer.Start(peerAddress) } -func stopEtcdServer(etcdServer *EmbeddedETCD) { +func StopEtcdServer(etcdServer *EmbeddedETCD) { if etcdServer != nil { etcdServer.Stop() } @@ -103,7 +109,7 @@ type EmbeddedETCD struct { // Start starts embedded ETCD. func (e *EmbeddedETCD) Start(peerAddress string) error { - dir, err := ioutil.TempDir("", "ETCD") + dir, err := os.MkdirTemp("", "ETCD") if err != nil { return err } @@ -134,5 +140,6 @@ func (e *EmbeddedETCD) Start(peerAddress string) error { // Stop stops the embedded ETCD & cleanups the tmp dir. func (e *EmbeddedETCD) Stop() { e.ETCD.Close() + e.ETCD.Server.Stop() os.RemoveAll(e.tmpDir) } diff --git a/lorry/engines/register/managers.go b/lorry/engines/register/managers.go index 662e47975d0..2b89f55d529 100644 --- a/lorry/engines/register/managers.go +++ b/lorry/engines/register/managers.go @@ -30,6 +30,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "github.com/apecloud/kubeblocks/lorry/engines" + "github.com/apecloud/kubeblocks/lorry/engines/etcd" "github.com/apecloud/kubeblocks/lorry/engines/mongodb" "github.com/apecloud/kubeblocks/lorry/engines/mysql" "github.com/apecloud/kubeblocks/lorry/engines/polardbx" @@ -56,6 +57,7 @@ func init() { RegisterManagerNewFunc("mysql", "consensus", wesql.NewManager) RegisterManagerNewFunc("mysql", "replication", mysql.NewManager) RegisterManagerNewFunc("redis", "replication", redis.NewManager) + RegisterManagerNewFunc("etcd", "consensus", etcd.NewManager) RegisterManagerNewFunc("mongodb", "consensus", mongodb.NewManager) RegisterManagerNewFunc("polardbx", "consensus", polardbx.NewManager) RegisterManagerNewFunc("postgresql", "replication", officalpostgres.NewManager)