Skip to content

Commit

Permalink
switch to grpc balancer (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 authored Sep 14, 2021
1 parent 218c686 commit 68eccab
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 209 deletions.
92 changes: 21 additions & 71 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,20 @@ import (
"io/ioutil"
"math/rand"
"strconv"
"sync"
"time"

"github.com/gogo/status"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

durosv2 "github.com/metal-stack/duros-go/api/duros/v2"
)
Expand All @@ -40,22 +39,15 @@ const (
GRPC GRPCScheme = "grpc"
// GRPCS defines https protocol for the communication
GRPCS GRPCScheme = "grpcs"

defaultUserAgent = "duros-go"
)

// client for the duros grpc endpoint
type client struct {
eps EPs
conn *grpc.ClientConn
DurosClient durosv2.DurosAPIClient

id string
tgts string // cached string repr of `eps`
eps EPs
conn *grpc.ClientConn
log *zap.SugaredLogger

// peerMu protects all peer-related fields:
peerMu sync.Mutex
lastPeer peer.Peer
switched bool // a matter of aesthetics: 1st conn shouldn't warn
}

// DialConfig is the configuration to create a duros-api connection
Expand All @@ -66,6 +58,8 @@ type DialConfig struct {
Credentials *Credentials
ByteCredentials *ByteCredentials
Log *zap.SugaredLogger
// UserAgent to use, if empty duros-go is used
UserAgent string
}

// Credentials specify the TLS Certificate based authentication for the grpc connection
Expand Down Expand Up @@ -115,24 +109,26 @@ func Dial(ctx context.Context, config DialConfig) (durosv2.DurosAPIClient, error
}
log := config.Log

ua := defaultUserAgent
if config.UserAgent != "" {
ua = config.UserAgent
}

log.Infow("connecting...",
"client", "duros-go",
"client", ua,
"targets", config.Endpoints,
"client-id", id,
)

res := &client{
eps: config.Endpoints.clone(),
id: id,
tgts: config.Endpoints.String(),
log: log,
eps: config.Endpoints.clone(),
log: log,
}

zapOpts := []grpc_zap.Option{
grpc_zap.WithLevels(grpcToZapLevel),
}
interceptors := []grpc.UnaryClientInterceptor{
mkUnaryClientInterceptor(res),
grpc_zap.UnaryClientInterceptor(log.Desugar(), zapOpts...),
grpc_zap.PayloadUnaryClientInterceptor(log.Desugar(),
func(context.Context, string) bool { return true },
Expand Down Expand Up @@ -164,18 +160,15 @@ func Dial(ctx context.Context, config DialConfig) (durosv2.DurosAPIClient, error
MinConnectTimeout: 6 * time.Second,
}

scheme := "lightos-" + id
lbr := newLbResolver(log, scheme, res.eps)

opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithDisableRetry(),
grpc.WithUserAgent("duros-go"), // TODO enable setting this by client
grpc.WithUserAgent(ua),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(interceptors...)),
grpc.WithKeepaliveParams(kal),
grpc.WithConnectParams(cp),
grpc.WithResolvers(lbr),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, roundrobin.Name)),
grpc.WithPerRPCCredentials(tokenAuth{
token: config.Token,
}),
Expand Down Expand Up @@ -204,17 +197,17 @@ func Dial(ctx context.Context, config DialConfig) (durosv2.DurosAPIClient, error
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})))
}
default:
return nil, fmt.Errorf("unsupported scheme:%v", scheme)
return nil, fmt.Errorf("unsupported scheme:%v", config.Scheme)
}

var err error
res.conn, err = grpc.DialContext(
ctx,
scheme+":///lb-resolver", // use our resolver instead of explicit target
config.Endpoints.String(),
opts...,
)
if err != nil {
log.Errorw("failed to connect", "error", err.Error())
log.Errorw("failed to connect", "endpoints", config.Endpoints.String(), "error", err.Error())
return nil, err
}

Expand All @@ -236,49 +229,6 @@ func (tokenAuth) RequireTransportSecurity() bool {
return true
}

// TODO: add stream interceptor *IF* LB API adds streaming entrypoints...
func mkUnaryClientInterceptor(c *client) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, rep interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
return c.peerReviewUnaryInterceptor(ctx, method, req, rep, cc, invoker, opts...)
}
}

func (c *client) peerReviewUnaryInterceptor( // sic!
ctx context.Context, method string, req, rep interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
var currPeer peer.Peer
opts = append(opts, grpc.Peer(&currPeer))
err := invoker(ctx, method, req, rep, cc, opts...)
c.peerMu.Lock()
if currPeer.Addr != c.lastPeer.Addr {
// TODO: introduce rate-limiter to spare logs and perf!
lastPeer := c.lastPeer
c.lastPeer = currPeer
c.peerMu.Unlock()
curr := "<NONE>"
if currPeer.Addr != nil {
curr = currPeer.Addr.String()
}
last := "<NONE>"
if lastPeer.Addr != nil {
last = lastPeer.Addr.String()
}
// don't want to warn on healthy flow...
if c.switched {
c.log.Warnf("switched target: %s -> %s", last, curr)
} else {
c.switched = true
c.log.Infof("switched target: %s -> %s", last, curr)
}
} else {
c.peerMu.Unlock()
}
return err
}

func grpcToZapLevel(code codes.Code) zapcore.Level {
switch code {
case codes.OK,
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func main() {
Scheme: grpcScheme,
Token: token,
Log: zlog.Sugar(),
UserAgent: "duros-go-cli",
}
if caFile != "" && certFile != "" && keyFile != "" && serverName != "" {
creds := &duros.Credentials{
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ module github.com/metal-stack/duros-go
go 1.16

require (
github.com/gogo/status v1.1.0
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.0
google.golang.org/genproto v0.0.0-20210824181836-a4879c3d0e89
go.uber.org/zap v1.19.1
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand Down
23 changes: 6 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA=
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -98,14 +93,14 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.19.0 h1:mZQZefskPPCMIBCSEH0v2/iUqqLrYtaeqwD6FUGUnFE=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand All @@ -114,7 +109,6 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -137,7 +131,6 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -162,12 +155,10 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -176,15 +167,13 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210824181836-a4879c3d0e89 h1:x1dY+qZWu7fKPOOo4mM9kMcUfVVlDvHreE17KGDho00=
google.golang.org/genproto v0.0.0-20210824181836-a4879c3d0e89/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af h1:aLMMXFYqw01RA6XJim5uaN+afqNNjc9P8HPAbnpnc5s=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
Expand Down
Loading

0 comments on commit 68eccab

Please sign in to comment.