Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for endpointslices #5798

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
instead of the Mapping name, which could reduce the cache's effectiveness. This has been fixed so
that the correct key is used. ([Incorrect Cache Key for Mapping])

- Change: Updated Emissary-Ingress to use EndpointSlices instead of Endpoints to support more than 1000 Backends

[Incorrect Cache Key for Mapping]: https://github.com/emissary-ingress/emissary/issues/5714

## [3.9.0] November 13, 2023
Expand Down Expand Up @@ -401,7 +403,7 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
releases, or a `Host` with or without a `TLSContext` as in prior 2.y releases.

- Bugfix: Prior releases of Emissary-ingress had the arbitrary limitation that a `TCPMapping` cannot
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
Emissary-ingress now allows `TCPMappings` to be used on the same `Listener` port as HTTP `Hosts`,
as long as that `Listener` terminates TLS.

Expand Down Expand Up @@ -567,7 +569,7 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
releases, or a `Host` with or without a `TLSContext` as in prior 2.y releases.

- Bugfix: Prior releases of Emissary-ingress had the arbitrary limitation that a `TCPMapping` cannot
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
Emissary-ingress now allows `TCPMappings` to be used on the same `Listener` port as HTTP `Hosts`,
as long as that `Listener` terminates TLS.

Expand Down
88 changes: 82 additions & 6 deletions cmd/entrypoint/endpoint_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,33 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons

result := map[string][]*ambex.Endpoint{}

for _, k8sEp := range ksnap.Endpoints {
svc, ok := k8sServices[key(k8sEp)]
if !ok {
continue
svcEndpointSlices := map[string][]*kates.EndpointSlice{}

// Collect all the EndpointSlices for each service if the "kubernetes.io/service-name" label is present
for _, k8sEndpointSlice := range ksnap.EndpointSlices {
if serviceName, labelExists := k8sEndpointSlice.Labels["kubernetes.io/service-name"]; labelExists {
svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, serviceName)
svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice)
}
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}

// Map each service to its corresponding endpoints from all its EndpointSlices, or fall back to Endpoints if needed
for svcKey, svc := range k8sServices {
if slices, ok := svcEndpointSlices[svcKey]; ok && len(slices) > 0 {
for _, slice := range slices {
for _, ep := range k8sEndpointSlicesToAmbex(slice, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
} else {
// Fallback to using Endpoints if no valid EndpointSlices are available
for _, k8sEp := range ksnap.Endpoints {
if key(k8sEp) == svcKey {
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
}
}
}

Expand Down Expand Up @@ -97,6 +117,62 @@ func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*amb
return
}

func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) {
portmap := map[string][]string{}
for _, p := range svc.Spec.Ports {
port := fmt.Sprintf("%d", p.Port)
targetPort := p.TargetPort.String()
if targetPort == "" {
targetPort = fmt.Sprintf("%d", p.Port)
}

portmap[targetPort] = append(portmap[targetPort], port)
if p.Name != "" {
portmap[targetPort] = append(portmap[targetPort], p.Name)
portmap[p.Name] = append(portmap[p.Name], port)
}
if len(svc.Spec.Ports) == 1 {
portmap[targetPort] = append(portmap[targetPort], "")
portmap[""] = append(portmap[""], port)
portmap[""] = append(portmap[""], "")
}
}

for _, endpoint := range endpointSlice.Endpoints {
for _, port := range endpointSlice.Ports {
if *port.Protocol == kates.ProtocolTCP || *port.Protocol == kates.ProtocolUDP {
portNames := map[string]bool{}
candidates := []string{fmt.Sprintf("%d", *port.Port), *port.Name, ""}
for _, c := range candidates {
if pns, ok := portmap[c]; ok {
for _, pn := range pns {
portNames[pn] = true
}
}
}
if *endpoint.Conditions.Ready {
for _, address := range endpoint.Addresses {
for pn := range portNames {
sep := "/"
if pn == "" {
sep = ""
}
result = append(result, &ambex.Endpoint{
ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn),
Ip: address,
Port: uint32(*port.Port),
Protocol: string(*port.Protocol),
})
}
}
}
}
}
}

return
}

func consulEndpointsToAmbex(ctx context.Context, endpoints consulwatch.Endpoints) (result []*ambex.Endpoint) {
for _, ep := range endpoints.Endpoints {
addrs, err := net.LookupHost(ep.Address)
Expand Down
90 changes: 84 additions & 6 deletions cmd/entrypoint/endpoint_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@ import (
)

func TestEndpointRouting(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
assert.NoError(t, f.Upsert(makeService("default", "foo")))
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
assert.NotNil(t, snap)

// Check that the endpoints resource we created at the start was properly propagated.
endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo"))
require.NoError(t, err)
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port)
assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
}

func TestEndpointRoutingWithNoEndpointSlices(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
Expand Down Expand Up @@ -57,9 +80,9 @@ service: foo
resolver: endpoint`,
}
assert.NoError(t, f.Upsert(svc))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasService("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -97,9 +120,9 @@ func TestEndpointRoutingMultiplePorts(t *testing.T) {
},
},
}))
subset, err := makeSubset("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -155,9 +178,9 @@ func TestEndpointRoutingIP(t *testing.T) {
func TestEndpointRoutingMappingCreation(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{}, nil)
assert.NoError(t, f.Upsert(makeService("default", "foo")))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
f.AssertEndpointsEmpty(timeout)
assert.NoError(t, f.UpsertYAML(`
Expand Down Expand Up @@ -275,3 +298,58 @@ func makeSubset(args ...interface{}) (kates.EndpointSubset, error) {

return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil
}

func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice {
return &kates.EndpointSlice{
TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"},
ObjectMeta: kates.ObjectMeta{
Namespace: namespace,
Name: name,
Labels: map[string]string{
"kubernetes.io/service-name": serviceName,
},
},
Endpoints: endpoint,
Ports: port,
}
}

func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointSlicePort, error) {
var endpoints []kates.Endpoint
var ports []kates.EndpointSlicePort
var currentPortName string

for _, arg := range args {
switch v := arg.(type) {
case int:
portName := currentPortName
ports = append(ports, kates.EndpointSlicePort{Name: &portName, Port: int32Ptr(int32(v)), Protocol: protocolPtr(kates.ProtocolTCP)})
case string:
IP := net.ParseIP(v)
if IP != nil {
endpoints = append(endpoints, kates.Endpoint{
Addresses: []string{v},
Conditions: kates.EndpointConditions{
Ready: &[]bool{true}[0],
Serving: &[]bool{true}[0],
Terminating: &[]bool{false}[0],
},
})
} else {
currentPortName = v // Assume it's a port name if not an IP address
}
default:
return nil, nil, fmt.Errorf("unrecognized type: %T", v)
}
}

return endpoints, ports, nil
}

func int32Ptr(i int32) *int32 {
return &i
}

func protocolPtr(p kates.Protocol) *kates.Protocol {
return &p
}
27 changes: 24 additions & 3 deletions cmd/entrypoint/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type endpointRoutingInfo struct {
module moduleResolver
endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
previousWatches map[string]bool
endpointSlices []*kates.EndpointSlice
}

type ResolverType int
Expand All @@ -47,7 +48,7 @@ func (rt ResolverType) String() string {

// newEndpointRoutingInfo creates a shiny new struct to hold information about
// resolvers in use and such.
func newEndpointRoutingInfo() endpointRoutingInfo {
func newEndpointRoutingInfo(endpointSlices []*kates.EndpointSlice) endpointRoutingInfo {
return endpointRoutingInfo{
// resolverTypes keeps track of the type of every resolver in the system.
// It starts out empty.
Expand All @@ -59,6 +60,7 @@ func newEndpointRoutingInfo() endpointRoutingInfo {
resolverTypes: make(map[string]ResolverType),
// Track which endpoints we actually want to watch.
endpointWatches: make(map[string]bool),
endpointSlices: endpointSlices,
}
}

Expand All @@ -71,6 +73,7 @@ func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s
eri.module = moduleResolver{}
eri.previousWatches = eri.endpointWatches
eri.endpointWatches = map[string]bool{}
eri.endpointSlices = s.EndpointSlices

// Phase one processes all the configuration stuff that Mappings depend on. Right now this
// includes Modules and Resolvers. When we are done with Phase one we have processed enough
Expand Down Expand Up @@ -228,7 +231,7 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
eri.mapEndpointWatches(ns, svc)
}
}

Expand All @@ -247,7 +250,25 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
eri.mapEndpointWatches(ns, svc)
}
}

// mapEndpointWatches figures out what service discovery object available for a given service.
func (eri *endpointRoutingInfo) mapEndpointWatches(namespace string, serviceName string) {
foundEndpointSlice := false
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace, and has the required label
if endpointSlice.Namespace == namespace {
if service, ok := endpointSlice.Labels["kubernetes.io/service-name"]; ok && service == serviceName {
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, endpointSlice.Name)] = true
foundEndpointSlice = true
}
}
}
if !foundEndpointSlice {
//Use Endpoint if EndpointSlice doesn't exist
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, serviceName)] = true
}
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/entrypoint/interesting_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func GetInterestingTypes(ctx context.Context, serverTypeList []kates.APIResource
//
// Note that we pull `secrets.v1.` in to "K8sSecrets". ReconcileSecrets will pull
// over the ones we need into "Secrets" and "Endpoints" respectively.
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"EndpointSlices": {{typename: "endpointslices.v1.discovery.k8s.io", fieldselector: endpointFs}},
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Ingresses": {
{typename: "ingresses.v1beta1.extensions"}, // New in Kubernetes 1.2.0 (2016-03-16), gone in Kubernetes 1.22.0 (2021-08-04)
{typename: "ingresses.v1beta1.networking.k8s.io"}, // New in Kubernetes 1.14.0 (2019-03-25), gone in Kubernetes 1.22.0 (2021-08-04)
Expand Down
Loading