Skip to content

Commit

Permalink
K8s resolver for the gRPC client (#4190)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Oct 19, 2023
1 parent e1bbd66 commit e7035a7
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/roundrobin" //nolint
"google.golang.org/grpc/resolver"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,6 +55,7 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
k8sResolver "github.com/flyteorg/flyte/flytestdlib/resolver"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

Expand Down Expand Up @@ -552,6 +554,8 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s
return errors.Wrapf(err, "error building Kubernetes Clientset")
}

resolver.Register(k8sResolver.NewBuilder(ctx, kubeClient, k8sResolver.Schema))

flyteworkflowClient, err := clientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building FlyteWorkflow clientset")
Expand Down
7 changes: 7 additions & 0 deletions flytestdlib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/gorm v1.22.4
gotest.tools v2.2.0+incompatible
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.0.0-20210217172142-7279fc64d847
Expand All @@ -54,6 +55,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -64,6 +66,7 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -92,6 +95,7 @@ require (
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
Expand All @@ -102,7 +106,10 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.5.0 // indirect
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.3 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace (
Expand Down
13 changes: 13 additions & 0 deletions flytestdlib/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 h1:cTavhURetDkezJCvxFggiyLeP40Mrk/TtVg2+ycw1Es=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down Expand Up @@ -257,6 +258,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.7.1 h1:gF4c0zjUP2H/s/hEGyLA3I0fA2ZWjzYiONAD6cvPr8A=
github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI=
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand All @@ -267,6 +269,7 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -338,8 +341,10 @@ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
Expand Down Expand Up @@ -601,6 +606,8 @@ golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -781,11 +788,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4=
gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand All @@ -800,6 +809,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.22.4 h1:8aPcyEJhY0MAt8aY6Dc524Pn+pO29K+ydu+e/cXSpQM=
gorm.io/gorm v1.22.4/go.mod h1:1aeVC+pe9ZmvKZban/gW4QPra7PRoTEssyc922qCAkk=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand All @@ -820,7 +831,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.5.0 h1:8mOnjf1RmUPW6KRqQCfYSZq/K20Unmp3IhuZUhxl8KI=
k8s.io/klog/v2 v2.5.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand Down
179 changes: 179 additions & 0 deletions flytestdlib/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package resolver

import (
"context"
"fmt"
"net"
"strings"
"sync"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyte/flytestdlib/logger"
)

const (
Schema = "k8s"
)

type targetInfo struct {
serviceName string
serviceNamespace string
port string
}

func (t targetInfo) String() string {
return fmt.Sprintf("%s:///%s.%s:%s", Schema, t.serviceNamespace, t.serviceName, t.port)
}

// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(ctx context.Context, client kubernetes.Interface, schema string) resolver.Builder {
return &kubeBuilder{
ctx: ctx,
k8sClient: client,
schema: schema,
}
}

type kubeBuilder struct {
ctx context.Context
k8sClient kubernetes.Interface
schema string
}

func splitServicePortNamespace(hpn string) (service, port, namespace string) {
service = hpn

colon := strings.LastIndexByte(service, ':')
if colon != -1 {
service, port = service[:colon], service[colon+1:]
}

parts := strings.SplitN(service, ".", 3)
if len(parts) >= 2 {
service, namespace = parts[0], parts[1]
}

return
}

func parseResolverTarget(target resolver.Target) (targetInfo, error) {
var service, port, namespace string
if target.URL.Host == "" {
// k8s:///service.namespace:port
service, port, namespace = splitServicePortNamespace(target.Endpoint())
} else if target.URL.Port() == "" && target.Endpoint() != "" {
// k8s://namespace/service:port
service, port, _ = splitServicePortNamespace(target.Endpoint())
namespace = target.URL.Hostname()
} else {
// k8s://service.namespace:port
service, port, namespace = splitServicePortNamespace(target.URL.Host)
}

if service == "" {
return targetInfo{}, fmt.Errorf("target %s must specify a service", &target.URL)
}

return targetInfo{
serviceName: service,
serviceNamespace: namespace,
port: port,
}, nil
}

// Build creates a new resolver for the given target, e.g. k8s:///flyteagent:flyte:8000.
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(b.ctx)
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
k8sClient: b.k8sClient,
}
go wait.Until(r.run, 0, ctx.Done())
return r, nil
}

// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
func (b *kubeBuilder) Scheme() string {
return b.schema
}

type kResolver struct {
target targetInfo
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
k8sClient kubernetes.Interface
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
wg sync.WaitGroup
}

// ResolveNow is a no-op at this point.
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {}

// Close closes the resolver.
func (k *kResolver) Close() {
k.cancel()
k.wg.Wait()
logger.Infof(k.ctx, "k8s resolver: closed")
}

func (k *kResolver) resolve(e *v1.Endpoints) {
var newAddrs []resolver.Address
for _, subset := range e.Subsets {
port := k.target.port

for _, address := range subset.Addresses {
newAddrs = append(newAddrs, resolver.Address{
Addr: net.JoinHostPort(address.IP, port),
ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace),
Metadata: nil,
})
}
}
err := k.cc.UpdateState(resolver.State{Addresses: newAddrs})
if err != nil {
grpclog.Errorf("k8s resolver: failed : %v", err)
}
}

func (k *kResolver) run() {
k.wg.Add(1)
defer k.wg.Done()
logger.Infof(k.ctx, "Starting k8s resolver for target: %s", k.target)
watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName})
if err != nil {
grpclog.Errorf("k8s resolver: failed to create watcher: %v", err)
return
}

for {
select {
case <-k.ctx.Done():
return
case event, ok := <-watcher.ResultChan():
if !ok {
logger.Debugf(k.ctx, "k8s resolver: watcher closed")
return
}
if event.Object == nil {
continue
}
k.resolve(event.Object.(*v1.Endpoints))
}
}
}
Loading

0 comments on commit e7035a7

Please sign in to comment.