Skip to content

Commit

Permalink
Added endpoints fallback in case endpointslice doesn't exists
Browse files Browse the repository at this point in the history
Signed-off-by: ajaychoudhary-hotstar <[email protected]>
  • Loading branch information
ajaychoudhary-hotstar authored and kflynn committed Dec 5, 2024
1 parent 46ab826 commit b1fb2d6
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 19 deletions.
80 changes: 73 additions & 7 deletions cmd/entrypoint/endpoint_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,31 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons

svcEndpointSlices := map[string][]*kates.EndpointSlice{}

// Collect all the EndpointSlices for each service
// Collect all the EndpointSlices for each service if the "kubernetes.io/service-name" label is present
for _, k8sEndpointSlice := range ksnap.EndpointSlices {
svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"])
svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice)
if serviceName, labelExists := k8sEndpointSlice.Labels["kubernetes.io/service-name"]; labelExists {
svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, serviceName)
svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice)
}
}
//Map each service to its corresponding endpoints from all its EndpointSlices

// Map each service to its corresponding endpoints from all its EndpointSlices, or fall back to Endpoints if needed
for svcKey, svc := range k8sServices {
if slices, ok := svcEndpointSlices[svcKey]; ok {
if slices, ok := svcEndpointSlices[svcKey]; ok && len(slices) > 0 {
for _, slice := range slices {
for _, ep := range k8sEndpointsToAmbex(slice, svc) {
for _, ep := range k8sEndpointSlicesToAmbex(slice, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
} else {
// Fallback to using Endpoints if no valid EndpointSlices are available
for _, k8sEp := range ksnap.Endpoints {
if key(k8sEp) == svcKey {
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
}
}
}

Expand All @@ -51,7 +63,61 @@ func key(resource kates.Object) string {
return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName())
}

func k8sEndpointsToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) {
func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) {
portmap := map[string][]string{}
for _, p := range svc.Spec.Ports {
port := fmt.Sprintf("%d", p.Port)
targetPort := p.TargetPort.String()
if targetPort == "" {
targetPort = fmt.Sprintf("%d", p.Port)
}

portmap[targetPort] = append(portmap[targetPort], port)
if p.Name != "" {
portmap[targetPort] = append(portmap[targetPort], p.Name)
portmap[p.Name] = append(portmap[p.Name], port)
}
if len(svc.Spec.Ports) == 1 {
portmap[targetPort] = append(portmap[targetPort], "")
portmap[""] = append(portmap[""], port)
portmap[""] = append(portmap[""], "")
}
}

for _, subset := range ep.Subsets {
for _, port := range subset.Ports {
if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP {
portNames := map[string]bool{}
candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""}
for _, c := range candidates {
if pns, ok := portmap[c]; ok {
for _, pn := range pns {
portNames[pn] = true
}
}
}
for _, addr := range subset.Addresses {
for pn := range portNames {
sep := "/"
if pn == "" {
sep = ""
}
result = append(result, &ambex.Endpoint{
ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn),
Ip: addr.IP,
Port: uint32(port.Port),
Protocol: string(port.Protocol),
})
}
}
}
}
}

return
}

func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) {
portmap := map[string][]string{}
for _, p := range svc.Spec.Ports {
port := fmt.Sprintf("%d", p.Port)
Expand Down
57 changes: 57 additions & 0 deletions cmd/entrypoint/endpoint_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ func TestEndpointRouting(t *testing.T) {
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
}

func TestEndpointRoutingWithNoEndpointSlices(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
assert.NoError(t, f.Upsert(makeService("default", "foo")))
subset, err := makeSubset(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
assert.NotNil(t, snap)

// Check that the endpoints resource we created at the start was properly propagated.
endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo"))
require.NoError(t, err)
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port)
assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
}

func TestEndpointRoutingMappingAnnotations(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
Expand Down Expand Up @@ -242,6 +265,40 @@ func makeService(namespace, name string) *kates.Service {
}
}

func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints {
return &kates.Endpoints{
TypeMeta: kates.TypeMeta{Kind: "Endpoints"},
ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name},
Subsets: subsets,
}
}

// makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are
// ports, any ip address strings are addresses, and no ip address strings are used as the port name
// for any ports that follow them in the arg list.
func makeSubset(args ...interface{}) (kates.EndpointSubset, error) {
portName := ""
var ports []kates.EndpointPort
var addrs []kates.EndpointAddress
for _, arg := range args {
switch v := arg.(type) {
case int:
ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP})
case string:
IP := net.ParseIP(v)
if IP == nil {
portName = v
} else {
addrs = append(addrs, kates.EndpointAddress{IP: v})
}
default:
return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v)
}
}

return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil
}

func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice {
return &kates.EndpointSlice{
TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"},
Expand Down
28 changes: 17 additions & 11 deletions cmd/entrypoint/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,7 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace
if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc {
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true

}
}
eri.mapEndpointWatches(ns, svc)
}
}

Expand All @@ -256,14 +250,26 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace
if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc {
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true
eri.mapEndpointWatches(ns, svc)
}
}

// mapEndpointWatches figures out what service discovery object available for a given service.
func (eri *endpointRoutingInfo) mapEndpointWatches(namespace string, serviceName string) {
foundEndpointSlice := false
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace, and has the required label
if endpointSlice.Namespace == namespace {
if service, ok := endpointSlice.Labels["kubernetes.io/service-name"]; ok && service == serviceName {
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, endpointSlice.Name)] = true
foundEndpointSlice = true
}
}
}
if !foundEndpointSlice {
//Use Endpoint if EndpointSlice doesn't exist
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, serviceName)] = true
}
}

func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/entrypoint/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (sh *SnapshotHolder) K8sUpdate(
for _, delta := range deltas {
sh.unsentDeltas = append(sh.unsentDeltas, delta)

if delta.Kind == "EndpointSlice" {
if delta.Kind == "EndpointSlice" || delta.Kind == "Endpoints" {
key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name)
if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) {
endpointsChanged = true
Expand Down

0 comments on commit b1fb2d6

Please sign in to comment.