From 4ecb8dd5cfb3a0910371117bdba48613f2a0f356 Mon Sep 17 00:00:00 2001 From: Dumitru Ceara Date: Thu, 8 Sep 2022 02:00:11 +0200 Subject: [PATCH] Add service support. (#8) It can be enabled by setting 'service: True' in the profile definition (see CRRService in netperf.yml). For now the server control and data ports are fixed: const serverCtlPort = 12865 const serverDataPort = 42424 Signed-off-by: Dumitru Ceara Signed-off-by: Dumitru Ceara --- kubernetes.go | 42 ++++++++++++++++++++++++++++++++++++++++ netperf.go | 53 +++++++++++++++++++++++++++++++++++++++++---------- netperf.yml | 7 +++++++ result.go | 20 +++++++++---------- 4 files changed, 102 insertions(+), 20 deletions(-) diff --git a/kubernetes.go b/kubernetes.go index 3ae6fc60..b87955a9 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -7,6 +7,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" ) @@ -123,3 +124,44 @@ func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (*apiv1.PodList, erro } return pods, nil } + +func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Service, error) { + s, err := client.CoreV1().Services(sp.namespace).Get(context.TODO(), sp.name, metav1.GetOptions{}) + if err == nil { + fmt.Println("♻️ Using existing Deployment") + return s, nil + } + fmt.Printf("🚀 Creating service for %s in %s\n", sp.name, sp.namespace) + sc := client.CoreV1().Services(sp.namespace) + service := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: sp.name, + Namespace: sp.namespace, + }, + Spec: apiv1.ServiceSpec{ + Ports: []apiv1.ServicePort{ + { + Name: fmt.Sprintf("%s-ctl", sp.name), + Protocol: apiv1.ProtocolTCP, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.ctlPort)), + Port: sp.ctlPort, + }, + { + Name: fmt.Sprintf("%s-data-tcp", sp.name), + Protocol: apiv1.ProtocolTCP, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), + Port: sp.dataPort, + }, + { + Name: fmt.Sprintf("%s-data-udp", sp.name), + Protocol: apiv1.ProtocolUDP, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), + Port: sp.dataPort, + }, + }, + Type: apiv1.ServiceType("ClusterIP"), + Selector: sp.labels, + }, + } + return sc.Create(context.TODO(), service, metav1.CreateOptions{}) +} diff --git a/netperf.go b/netperf.go index 50f3a006..aa4565fb 100644 --- a/netperf.go +++ b/netperf.go @@ -37,6 +37,7 @@ type NetPerfConfig struct { Profile string `yaml:"profile,omitempty"` Samples int `yaml:"samples,omitempty"` MessageSize int `yaml:"messagesize,omitempty"` + Service bool `default:"false" yaml:"service,omitempty"` } type PerfScenarios struct { @@ -44,6 +45,7 @@ type PerfScenarios struct { client *apiv1.PodList server *apiv1.PodList clientAcross *apiv1.PodList + service *apiv1.Service restconfig rest.Config } @@ -60,6 +62,17 @@ type DeploymentParams struct { port int } +type ServiceParams struct { + name string + namespace string + labels map[string]string + ctlPort int32 + dataPort int32 +} + +const serverCtlPort = 12865 +const serverDataPort = 42424 + func parseConf(fn string) ([]NetPerfConfig, error) { fmt.Printf("📒 Reading %s file.\r\n", fn) buf, err := ioutil.ReadFile(fn) @@ -131,7 +144,7 @@ func main() { image: "quay.io/jtaleric/k8snetperf:latest", labels: map[string]string{"role": "client"}, command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: 12865, + port: serverCtlPort, } if z != "" { cdp.nodeaffinity = apiv1.NodeAffinity{ @@ -165,6 +178,21 @@ func main() { os.Exit(1) } + // Create netperf TCP service + spTcp := ServiceParams{ + name: "netperf-service", + namespace: "netperf", + labels: map[string]string{"role": "server"}, + ctlPort: serverCtlPort, + dataPort: serverDataPort, + } + s.service, err = CreateService(spTcp, client) + if err != nil { + fmt.Println("😥 Unable to create TCP netperf service") + log.Error(err) + os.Exit(1) + } + // Start netperf server sdp := DeploymentParams{ name: "server", @@ -185,7 +213,7 @@ func main() { }, }, }, - port: 12865, + port: serverCtlPort, } if z != "" { sdp.nodeaffinity = apiv1.NodeAffinity{ @@ -241,7 +269,7 @@ func main() { image: "quay.io/jtaleric/k8snetperf:latest", labels: map[string]string{"role": "client-across"}, command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: 12865, + port: serverCtlPort, } if z != "" { cdp_across.nodeaffinity = apiv1.NodeAffinity{ @@ -287,13 +315,19 @@ func main() { if strings.Contains(nc.Profile, "STREAM") { metric = "Mb/s" } + var serverIP string + if nc.Service { + serverIP = s.service.Spec.ClusterIP + } else { + serverIP = s.server.Items[0].Status.PodIP + } if s.clientAcross != nil { npr := NetPerfResult{} npr.NetPerfConfig = nc npr.Metric = metric npr.SameNode = false for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, s.server) + r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, serverIP) if err != nil { log.Error(err) os.Exit(1) @@ -314,7 +348,7 @@ func main() { npr.Metric = metric npr.SameNode = true for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.client, s.server) + r, err := RunNetPerf(client, s.restconfig, nc, s.client, serverIP) if err != nil { log.Error(err) os.Exit(1) @@ -334,18 +368,17 @@ func main() { // Display the netperf config func ShowConfig(c NetPerfConfig) { - fmt.Printf("🗒️ Running netperf %s for %ds\r\n", c.Profile, c.Duration) + fmt.Printf("🗒️ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration) } // RunNetPerf will use the k8s client to run the netperf binary in the container image // it will return a bytes.Buffer of the stdout. -func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, server *apiv1.PodList) (bytes.Buffer, error) { +func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer - sip := server.Items[0].Status.PodIP pod := client.Items[0] - fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, sip) + fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP) ShowConfig(nc) - cmd := []string{"/usr/local/bin/netperf", "-H", sip, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize)} + cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", serverDataPort)} req := c.CoreV1().RESTClient(). Post(). Namespace(pod.Namespace). diff --git a/netperf.yml b/netperf.yml index a1f6b276..97530bb7 100644 --- a/netperf.yml +++ b/netperf.yml @@ -17,6 +17,13 @@ CRR: samples: 1 messagesize: 16384 +CRRService: + profile: "TCP_CRR" + duration: 10 + samples: 1 + messagesize: 16384 + service: True + RR: profile: "TCP_RR" duration: 10 diff --git a/result.go b/result.go index dbb39f86..7042be12 100644 --- a/result.go +++ b/result.go @@ -32,31 +32,31 @@ func checkResults(s ScenarioResults, check string) bool { // ShowStreamResults accepts NetPerfResults to display to the user via stdout func ShowStreamResult(s ScenarioResults) { if checkResults(s, "STREAM") { - fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", (51)), strings.Repeat("-", (51))) - fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Message Size", "Same node", "Duration", "Samples", "Avg value") - fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75))) + fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", 59), strings.Repeat("-", 59)) + fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value") + fmt.Printf("%s\r\n", strings.Repeat("-", 136)) for _, r := range s.results { if strings.Contains(r.Profile, "STREAM") { avg, _ := average(r.Summary) - fmt.Printf("📊 %-15s | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) + fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) } } - fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75))) + fmt.Printf("%s\r\n", strings.Repeat("-", 136)) } } func ShowRRResult(s ScenarioResults) { if checkResults(s, "RR") { - fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", (53)), strings.Repeat("-", (53))) - fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Message Size", "Same node", "Duration", "Samples", "99%tile value") - fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75))) + fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", 62), strings.Repeat("-", 62)) + fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "99%tile value") + fmt.Printf("%s\r\n", strings.Repeat("-", 136)) for _, r := range s.results { if strings.Contains(r.Profile, "RR") { pct, _ := percentile(r.Summary, 99) - fmt.Printf("📊 %-15s | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric) + fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric) } } - fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75))) + fmt.Printf("%s\r\n", strings.Repeat("-", 136)) } }