Skip to content

Commit

Permalink
tproxy: support named port
Browse files Browse the repository at this point in the history
Signed-off-by: zhangzujian <[email protected]>
  • Loading branch information
zhangzujian committed Sep 6, 2024
1 parent d76bf6d commit e8c4175
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 91 deletions.
4 changes: 2 additions & 2 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {
}

for _, pod := range pods {
if pod.Spec.NodeName != c.config.NodeName {
if pod.Spec.HostNetwork || pod.Spec.NodeName != c.config.NodeName {
continue
}

Expand All @@ -271,7 +271,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
return nil, fmt.Errorf("failed to get subnet '%s', err: %w", subnetName, err)
return nil, fmt.Errorf("failed to get subnet %q: %w", subnetName, err)
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
Expand Down
58 changes: 10 additions & 48 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
TProxyPreroutingMask = util.TProxyPreroutingMask
)

var (
tProxyOutputMarkMask = fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask = fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)
)

type policyRouteMeta struct {
family int
source string
Expand Down Expand Up @@ -856,7 +861,6 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
probePorts := strset.New()

pods, err := c.getTProxyConditionPod(true)
if err != nil {
Expand All @@ -877,54 +881,12 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}
}

if probePorts.IsEmpty() {
ports := getProbePorts(pod)
if ports.Len() == 0 {
continue
}

probePortList := probePorts.List()
sort.Strings(probePortList)
for _, probePort := range probePortList {
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)

for _, probePort := range ports.SortedList() {
hostIP := pod.Status.HostIP
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
Expand All @@ -938,8 +900,8 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}

Expand Down
101 changes: 60 additions & 41 deletions pkg/daemon/tproxy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"syscall"

"github.com/containernetworking/plugins/pkg/ns"
"github.com/scylladb/go-set/strset"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"k8s.io/utils/set"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
Expand Down Expand Up @@ -56,9 +58,42 @@ func (c *Controller) StartTProxyForwarding() {
}
}

func (c *Controller) StartTProxyTCPPortProbe() {
probePorts := strset.New()
func getProbePorts(pod *corev1.Pod) set.Set[int32] {
ports := set.New[int32]()
for _, container := range pod.Spec.Containers {
for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} {
if probe == nil {
continue
}
var port intstr.IntOrString
switch {
case probe.TCPSocket != nil:
port = probe.TCPSocket.Port
case probe.HTTPGet != nil:
port = probe.HTTPGet.Port
case probe.GRPC != nil:
port = intstr.FromInt32(probe.GRPC.Port)
default:
continue
}
if port.Type == intstr.Int {
ports.Insert(port.IntVal)
continue
}
for _, p := range container.Ports {
if p.Name == port.StrVal {
ports.Insert(p.ContainerPort)
break
}
}
}
}

ports.Delete(0)
return ports
}

func (c *Controller) StartTProxyTCPPortProbe() {
pods, err := c.getTProxyConditionPod(false)
if err != nil {
return
Expand All @@ -67,33 +102,19 @@ func (c *Controller) StartTProxyTCPPortProbe() {
for _, pod := range pods {
iface := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider)
nsName, err := ovs.GetInterfacePodNs(iface)
if err != nil || nsName == "" {
klog.Infof("iface %s's namespace not found", iface)
if err != nil {
klog.Errorf("failed to get netns for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if nsName == "" {
klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name)
continue
}

ports := getProbePorts(pod)
for _, podIP := range pod.Status.PodIPs {
customVPCPodIPToNs.Store(podIP.IP, nsName)
for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts.Add(port)
}
}
}

if container.LivenessProbe != nil {
if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts.Add(port)
}
}
}
}

probePortsList := probePorts.List()
for _, port := range probePortsList {
for _, port := range ports.UnsortedList() {
probePortInNs(podIP.IP, port, true, nil)
}
}
Expand Down Expand Up @@ -264,7 +285,7 @@ func delRouteIfExist(family, table int, dst *net.IPNet) error {
}

func handleRedirectFlow(conn net.Conn) {
klog.V(5).Infof("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String())
klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr())
defer func() {
if err := conn.Close(); err != nil {
klog.Errorf("conn Close err: %v", err)
Expand All @@ -278,42 +299,44 @@ func handleRedirectFlow(conn net.Conn) {
return
}

probePortInNs(podIP, probePort, false, conn)
port, err := strconv.ParseInt(probePort, 10, 32)
if err != nil {
klog.Errorf("failed to parse port number %q: %v", probePort, err)
return
}

probePortInNs(podIP, int32(port), false, conn)

Check failure on line 308 in pkg/daemon/tproxy_linux.go

View workflow job for this annotation

GitHub Actions / Build kube-ovn

G115: integer overflow conversion int64 -> int32 (gosec)
}

func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) {
func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) {
podNs, ok := customVPCPodIPToNs.Load(podIP)
if !ok {
return
}

iprobePort, err := strconv.Atoi(probePort)
if err != nil {
klog.V(3).Infof("failed to get netns for pod with ip %s", podIP)
return
}

podNS, err := ns.GetNS(podNs.(string))
if err != nil {
customVPCPodIPToNs.Delete(podIP)
klog.Infof("ns %s already deleted", podNs)
klog.V(3).Infof("netns %s not found", podNs)
return
}

_ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error {
// Packet's src and dst IP are both PodIP in netns
localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)}
remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: iprobePort}
remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)}

remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe)
if err != nil {
if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false)
customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false)
}
return nil
}

if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), true)
customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true)
return nil
}

Expand Down Expand Up @@ -342,10 +365,6 @@ func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) {
})
}

func getIPPortString(podIP, port string) string {
return fmt.Sprintf("%s|%s", podIP, port)
}

func getProtocols(protocol string) []string {
var protocols []string
if protocol == kubeovnv1.ProtocolDual {
Expand Down

0 comments on commit e8c4175

Please sign in to comment.