From b1fb2d6bfb5037e56bc794bafdff28ab5ba0c2d1 Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Wed, 30 Oct 2024 16:27:25 +0530 Subject: [PATCH] Added endpoints fallback in case endpointslice doesn't exists Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/endpoint_routing.go | 80 ++++++++++++++++++++++--- cmd/entrypoint/endpoint_routing_test.go | 57 ++++++++++++++++++ cmd/entrypoint/endpoints.go | 28 +++++---- cmd/entrypoint/watcher.go | 2 +- 4 files changed, 148 insertions(+), 19 deletions(-) diff --git a/cmd/entrypoint/endpoint_routing.go b/cmd/entrypoint/endpoint_routing.go index b70a1f5fac..6839fec3a6 100644 --- a/cmd/entrypoint/endpoint_routing.go +++ b/cmd/entrypoint/endpoint_routing.go @@ -22,19 +22,31 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons svcEndpointSlices := map[string][]*kates.EndpointSlice{} - // Collect all the EndpointSlices for each service + // Collect all the EndpointSlices for each service if the "kubernetes.io/service-name" label is present for _, k8sEndpointSlice := range ksnap.EndpointSlices { - svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"]) - svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) + if serviceName, labelExists := k8sEndpointSlice.Labels["kubernetes.io/service-name"]; labelExists { + svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, serviceName) + svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) + } } - //Map each service to its corresponding endpoints from all its EndpointSlices + + // Map each service to its corresponding endpoints from all its EndpointSlices, or fall back to Endpoints if needed for svcKey, svc := range k8sServices { - if slices, ok := svcEndpointSlices[svcKey]; ok { + if slices, ok := svcEndpointSlices[svcKey]; ok && len(slices) > 0 { for _, slice := range slices { - for _, ep := range k8sEndpointsToAmbex(slice, svc) { + for _, ep := range k8sEndpointSlicesToAmbex(slice, svc) { result[ep.ClusterName] = append(result[ep.ClusterName], ep) } } + } else { + // Fallback to using Endpoints if no valid EndpointSlices are available + for _, k8sEp := range ksnap.Endpoints { + if key(k8sEp) == svcKey { + for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) { + result[ep.ClusterName] = append(result[ep.ClusterName], ep) + } + } + } } } @@ -51,7 +63,61 @@ func key(resource kates.Object) string { return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName()) } -func k8sEndpointsToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) { +func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) { + portmap := map[string][]string{} + for _, p := range svc.Spec.Ports { + port := fmt.Sprintf("%d", p.Port) + targetPort := p.TargetPort.String() + if targetPort == "" { + targetPort = fmt.Sprintf("%d", p.Port) + } + + portmap[targetPort] = append(portmap[targetPort], port) + if p.Name != "" { + portmap[targetPort] = append(portmap[targetPort], p.Name) + portmap[p.Name] = append(portmap[p.Name], port) + } + if len(svc.Spec.Ports) == 1 { + portmap[targetPort] = append(portmap[targetPort], "") + portmap[""] = append(portmap[""], port) + portmap[""] = append(portmap[""], "") + } + } + + for _, subset := range ep.Subsets { + for _, port := range subset.Ports { + if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP { + portNames := map[string]bool{} + candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""} + for _, c := range candidates { + if pns, ok := portmap[c]; ok { + for _, pn := range pns { + portNames[pn] = true + } + } + } + for _, addr := range subset.Addresses { + for pn := range portNames { + sep := "/" + if pn == "" { + sep = "" + } + result = append(result, &ambex.Endpoint{ + ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn), + Ip: addr.IP, + Port: uint32(port.Port), + Protocol: string(port.Protocol), + }) + } + } + } + } + } + + return +} + +func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) { portmap := map[string][]string{} for _, p := range svc.Spec.Ports { port := fmt.Sprintf("%d", p.Port) diff --git a/cmd/entrypoint/endpoint_routing_test.go b/cmd/entrypoint/endpoint_routing_test.go index 54d7beedc7..f132b541c2 100644 --- a/cmd/entrypoint/endpoint_routing_test.go +++ b/cmd/entrypoint/endpoint_routing_test.go @@ -42,6 +42,29 @@ func TestEndpointRouting(t *testing.T) { assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port) } +func TestEndpointRoutingWithNoEndpointSlices(t *testing.T) { + f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil) + // Create Mapping, Service, and Endpoints resources to start. + assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint"))) + assert.NoError(t, f.Upsert(makeService("default", "foo"))) + subset, err := makeSubset(8080, "1.2.3.4") + require.NoError(t, err) + assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + f.Flush() + snap, err := f.GetSnapshot(HasMapping("default", "foo")) + require.NoError(t, err) + assert.NotNil(t, snap) + + // Check that the endpoints resource we created at the start was properly propagated. + endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo")) + require.NoError(t, err) + assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip) + assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port) + assert.Contains(t, endpoints.Entries, "k8s/default/foo/80") + assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip) + assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port) +} + func TestEndpointRoutingMappingAnnotations(t *testing.T) { f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil) // Create Mapping, Service, and Endpoints resources to start. @@ -242,6 +265,40 @@ func makeService(namespace, name string) *kates.Service { } } +func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints { + return &kates.Endpoints{ + TypeMeta: kates.TypeMeta{Kind: "Endpoints"}, + ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name}, + Subsets: subsets, + } +} + +// makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are +// ports, any ip address strings are addresses, and no ip address strings are used as the port name +// for any ports that follow them in the arg list. +func makeSubset(args ...interface{}) (kates.EndpointSubset, error) { + portName := "" + var ports []kates.EndpointPort + var addrs []kates.EndpointAddress + for _, arg := range args { + switch v := arg.(type) { + case int: + ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP}) + case string: + IP := net.ParseIP(v) + if IP == nil { + portName = v + } else { + addrs = append(addrs, kates.EndpointAddress{IP: v}) + } + default: + return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v) + } + } + + return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil +} + func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice { return &kates.EndpointSlice{ TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"}, diff --git a/cmd/entrypoint/endpoints.go b/cmd/entrypoint/endpoints.go index b83330b68c..0a62082de4 100644 --- a/cmd/entrypoint/endpoints.go +++ b/cmd/entrypoint/endpoints.go @@ -231,13 +231,7 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace()) - for _, endpointSlice := range eri.endpointSlices { - // Check if this EndpointSlice matches the target service and namespace - if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true - - } - } + eri.mapEndpointWatches(ns, svc) } } @@ -256,14 +250,26 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace()) - for _, endpointSlice := range eri.endpointSlices { - // Check if this EndpointSlice matches the target service and namespace - if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true + eri.mapEndpointWatches(ns, svc) + } +} +// mapEndpointWatches figures out what service discovery object available for a given service. +func (eri *endpointRoutingInfo) mapEndpointWatches(namespace string, serviceName string) { + foundEndpointSlice := false + for _, endpointSlice := range eri.endpointSlices { + // Check if this EndpointSlice matches the target service and namespace, and has the required label + if endpointSlice.Namespace == namespace { + if service, ok := endpointSlice.Labels["kubernetes.io/service-name"]; ok && service == serviceName { + eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, endpointSlice.Name)] = true + foundEndpointSlice = true } } } + if !foundEndpointSlice { + //Use Endpoint if EndpointSlice doesn't exist + eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, serviceName)] = true + } } func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) { diff --git a/cmd/entrypoint/watcher.go b/cmd/entrypoint/watcher.go index d4b74fb653..d28796b8df 100644 --- a/cmd/entrypoint/watcher.go +++ b/cmd/entrypoint/watcher.go @@ -492,7 +492,7 @@ func (sh *SnapshotHolder) K8sUpdate( for _, delta := range deltas { sh.unsentDeltas = append(sh.unsentDeltas, delta) - if delta.Kind == "EndpointSlice" { + if delta.Kind == "EndpointSlice" || delta.Kind == "Endpoints" { key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name) if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) { endpointsChanged = true