From 72a531e8b23afee97ab0c327875f114562023a45 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 22 Feb 2017 14:10:09 -0800 Subject: [PATCH] grpcproxy: add 'register' address For https://github.com/coreos/etcd/issues/6902. --- proxy/grpcproxy/logger.go | 19 ++++++++ proxy/grpcproxy/register.go | 74 +++++++++++++++++++++++++++++++ proxy/grpcproxy/register_test.go | 76 ++++++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 proxy/grpcproxy/logger.go create mode 100644 proxy/grpcproxy/register.go create mode 100644 proxy/grpcproxy/register_test.go diff --git a/proxy/grpcproxy/logger.go b/proxy/grpcproxy/logger.go new file mode 100644 index 00000000000..c2d81804395 --- /dev/null +++ b/proxy/grpcproxy/logger.go @@ -0,0 +1,19 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import "github.com/coreos/pkg/capnslog" + +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "grpcproxy") diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go new file mode 100644 index 00000000000..31379b3bcae --- /dev/null +++ b/proxy/grpcproxy/register.go @@ -0,0 +1,74 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/clientv3/naming" + + "golang.org/x/time/rate" + gnaming "google.golang.org/grpc/naming" +) + +// allow maximum 1 retry per second +const registerRetryRate = 1 + +// register registers itself as a grpc-proxy server by writing prefixed-key +// with session of specified TTL (in seconds). The returned channel is closed +// when the client's context is canceled. +func register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} { + rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate) + + donec := make(chan struct{}) + go func() { + defer close(donec) + + for rm.Wait(c.Ctx()) == nil { + ss, err := registerSession(c, prefix, addr, ttl) + if err != nil { + plog.Warningf("failed to create a session %v", err) + continue + } + select { + case <-c.Ctx().Done(): + ss.Close() + return + + case <-ss.Done(): + plog.Warning("session expired; possible network partition or server restart") + plog.Warning("creating a new session to rejoin") + continue + } + } + }() + + return donec +} + +func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) { + ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl)) + if err != nil { + return nil, err + } + + gr := &naming.GRPCResolver{Client: c} + if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil { + return nil, err + } + + plog.Infof("registered %q with %d-second lease", addr, ttl) + return ss, nil +} diff --git a/proxy/grpcproxy/register_test.go b/proxy/grpcproxy/register_test.go new file mode 100644 index 00000000000..6ce87a269ad --- /dev/null +++ b/proxy/grpcproxy/register_test.go @@ -0,0 +1,76 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/naming" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" + + gnaming "google.golang.org/grpc/naming" +) + +func Test_register(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.Client(0) + paddr := clus.Members[0].GRPCAddr() + + testPrefix := "test-name" + wa := createWatcher(t, cli, testPrefix) + ups, err := wa.Next() + if err != nil { + t.Fatal(err) + } + if len(ups) != 0 { + t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups) + } + + donec := register(cli, testPrefix, paddr, 5) + + ups, err = wa.Next() + if err != nil { + t.Fatal(err) + } + if len(ups) != 1 { + t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups) + } + if ups[0].Addr != paddr { + t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr) + } + + cli.Close() + clus.TakeClient(0) + select { + case <-donec: + case <-time.After(5 * time.Second): + t.Fatal("donec 'register' did not return in time") + } +} + +func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher { + gr := &naming.GRPCResolver{Client: c} + watcher, err := gr.Resolve(prefix) + if err != nil { + t.Fatalf("failed to resolve %q (%v)", prefix, err) + } + return watcher +}